The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define C_LUCY_FILEPURGER
#include <ctype.h>
#include "Lucy/Util/ToolSet.h"

#include "Lucy/Index/FilePurger.h"
#include "Lucy/Index/IndexManager.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/DirHandle.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/Lock.h"

// Place unused files into purgables array and obsolete Snapshots into
// snapshots array.
static void
S_discover_unused(FilePurger *self, VArray **purgables, VArray **snapshots);

// Clean up after a failed background merge session, adding all dead files to
// the list of candidates to be zapped.
static void
S_zap_dead_merge(FilePurger *self, Hash *candidates);

// Return an array of recursively expanded filepath entries.
static VArray*
S_find_all_referenced(Folder *folder, VArray *entries);

FilePurger*
FilePurger_new(Folder *folder, Snapshot *snapshot, IndexManager *manager) {
    FilePurger *self = (FilePurger*)VTable_Make_Obj(FILEPURGER);
    return FilePurger_init(self, folder, snapshot, manager);
}

FilePurger*
FilePurger_init(FilePurger *self, Folder *folder, Snapshot *snapshot,
                IndexManager *manager) {
    self->folder       = (Folder*)INCREF(folder);
    self->snapshot     = (Snapshot*)INCREF(snapshot);
    self->manager      = manager
                         ? (IndexManager*)INCREF(manager)
                         : IxManager_new(NULL, NULL);
    IxManager_Set_Folder(self->manager, folder);

    // Don't allow the locks directory to be zapped.
    self->disallowed = Hash_new(0);
    Hash_Store_Str(self->disallowed, "locks", 5, INCREF(&EMPTY));

    return self;
}

void
FilePurger_destroy(FilePurger *self) {
    DECREF(self->folder);
    DECREF(self->snapshot);
    DECREF(self->manager);
    DECREF(self->disallowed);
    SUPER_DESTROY(self, FILEPURGER);
}

void
FilePurger_purge(FilePurger *self) {
    Lock *deletion_lock = IxManager_Make_Deletion_Lock(self->manager);

    // Obtain deletion lock, purge files, release deletion lock.
    Lock_Clear_Stale(deletion_lock);
    if (Lock_Obtain(deletion_lock)) {
        Folder *folder   = self->folder;
        Hash   *failures = Hash_new(0);
        VArray *purgables;
        VArray *snapshots;

        S_discover_unused(self, &purgables, &snapshots);

        // Attempt to delete entries -- if failure, no big deal, just try
        // again later.  Proceed in reverse lexical order so that directories
        // get deleted after they've been emptied.
        VA_Sort(purgables, NULL, NULL);
        for (uint32_t i = VA_Get_Size(purgables); i--;) {
            CharBuf *entry = (CharBuf*)VA_fetch(purgables, i);
            if (Hash_Fetch(self->disallowed, (Obj*)entry)) { continue; }
            if (!Folder_Delete(folder, entry)) {
                if (Folder_Exists(folder, entry)) {
                    Hash_Store(failures, (Obj*)entry, INCREF(&EMPTY));
                }
            }
        }

        for (uint32_t i = 0, max = VA_Get_Size(snapshots); i < max; i++) {
            Snapshot *snapshot = (Snapshot*)VA_Fetch(snapshots, i);
            bool_t snapshot_has_failures = false;
            if (Hash_Get_Size(failures)) {
                // Only delete snapshot files if all of their entries were
                // successfully deleted.
                VArray *entries = Snapshot_List(snapshot);
                for (uint32_t j = VA_Get_Size(entries); j--;) {
                    CharBuf *entry = (CharBuf*)VA_Fetch(entries, j);
                    if (Hash_Fetch(failures, (Obj*)entry)) {
                        snapshot_has_failures = true;
                        break;
                    }
                }
                DECREF(entries);
            }
            if (!snapshot_has_failures) {
                CharBuf *snapfile = Snapshot_Get_Path(snapshot);
                Folder_Delete(folder, snapfile);
            }
        }

        DECREF(failures);
        DECREF(purgables);
        DECREF(snapshots);
        Lock_Release(deletion_lock);
    }
    else {
        WARN("Can't obtain deletion lock, skipping deletion of "
             "obsolete files");
    }

    DECREF(deletion_lock);
}

static void
S_zap_dead_merge(FilePurger *self, Hash *candidates) {
    IndexManager *manager    = self->manager;
    Lock         *merge_lock = IxManager_Make_Merge_Lock(manager);

    Lock_Clear_Stale(merge_lock);
    if (!Lock_Is_Locked(merge_lock)) {
        Hash *merge_data = IxManager_Read_Merge_Data(manager);
        Obj  *cutoff = merge_data
                       ? Hash_Fetch_Str(merge_data, "cutoff", 6)
                       : NULL;

        if (cutoff) {
            CharBuf *cutoff_seg = Seg_num_to_name(Obj_To_I64(cutoff));
            if (Folder_Exists(self->folder, cutoff_seg)) {
                ZombieCharBuf *merge_json = ZCB_WRAP_STR("merge.json", 10);
                DirHandle *dh = Folder_Open_Dir(self->folder, cutoff_seg);
                CharBuf *entry = dh ? DH_Get_Entry(dh) : NULL;
                CharBuf *filepath = CB_new(32);

                if (!dh) {
                    THROW(ERR, "Can't open segment dir '%o'", filepath);
                }

                Hash_Store(candidates, (Obj*)cutoff_seg, INCREF(&EMPTY));
                Hash_Store(candidates, (Obj*)merge_json, INCREF(&EMPTY));
                while (DH_Next(dh)) {
                    // TODO: recursively delete subdirs within seg dir.
                    CB_setf(filepath, "%o/%o", cutoff_seg, entry);
                    Hash_Store(candidates, (Obj*)filepath, INCREF(&EMPTY));
                }
                DECREF(filepath);
                DECREF(dh);
            }
            DECREF(cutoff_seg);
        }

        DECREF(merge_data);
    }

    DECREF(merge_lock);
    return;
}

static void
S_discover_unused(FilePurger *self, VArray **purgables_ptr,
                  VArray **snapshots_ptr) {
    Folder      *folder       = self->folder;
    DirHandle   *dh           = Folder_Open_Dir(folder, NULL);
    if (!dh) { RETHROW(INCREF(Err_get_error())); }
    VArray      *spared       = VA_new(1);
    VArray      *snapshots    = VA_new(1);
    CharBuf     *snapfile     = NULL;

    // Start off with the list of files in the current snapshot.
    if (self->snapshot) {
        VArray *entries    = Snapshot_List(self->snapshot);
        VArray *referenced = S_find_all_referenced(folder, entries);
        VA_Push_VArray(spared, referenced);
        DECREF(entries);
        DECREF(referenced);
        snapfile = Snapshot_Get_Path(self->snapshot);
        if (snapfile) { VA_Push(spared, INCREF(snapfile)); }
    }

    CharBuf *entry      = DH_Get_Entry(dh);
    Hash    *candidates = Hash_new(64);
    while (DH_Next(dh)) {
        if (!CB_Starts_With_Str(entry, "snapshot_", 9))        { continue; }
        else if (!CB_Ends_With_Str(entry, ".json", 5))         { continue; }
        else if (snapfile && CB_Equals(entry, (Obj*)snapfile)) { continue; }
        else {
            Snapshot *snapshot
                = Snapshot_Read_File(Snapshot_new(), folder, entry);
            Lock *lock
                = IxManager_Make_Snapshot_Read_Lock(self->manager, entry);
            VArray *snap_list  = Snapshot_List(snapshot);
            VArray *referenced = S_find_all_referenced(folder, snap_list);

            // DON'T obtain the lock -- only see whether another
            // entity holds a lock on the snapshot file.
            if (lock) {
                Lock_Clear_Stale(lock);
            }
            if (lock && Lock_Is_Locked(lock)) {
                // The snapshot file is locked, which means someone's using
                // that version of the index -- protect all of its entries.
                uint32_t new_size = VA_Get_Size(spared)
                                    + VA_Get_Size(referenced)
                                    + 1;
                VA_Grow(spared, new_size);
                VA_Push(spared, (Obj*)CB_Clone(entry));
                VA_Push_VArray(spared, referenced);
            }
            else {
                // No one's using this snapshot, so all of its entries are
                // candidates for deletion.
                for (uint32_t i = 0, max = VA_Get_Size(referenced); i < max; i++) {
                    CharBuf *file = (CharBuf*)VA_Fetch(referenced, i);
                    Hash_Store(candidates, (Obj*)file, INCREF(&EMPTY));
                }
                VA_Push(snapshots, INCREF(snapshot));
            }

            DECREF(referenced);
            DECREF(snap_list);
            DECREF(snapshot);
            DECREF(lock);
        }
    }
    DECREF(dh);

    // Clean up after a dead segment consolidation.
    S_zap_dead_merge(self, candidates);

    // Eliminate any current files from the list of files to be purged.
    for (uint32_t i = 0, max = VA_Get_Size(spared); i < max; i++) {
        CharBuf *filename = (CharBuf*)VA_Fetch(spared, i);
        DECREF(Hash_Delete(candidates, (Obj*)filename));
    }

    // Pass back purgables and Snapshots.
    *purgables_ptr = Hash_Keys(candidates);
    *snapshots_ptr = snapshots;

    DECREF(candidates);
    DECREF(spared);
}

static VArray*
S_find_all_referenced(Folder *folder, VArray *entries) {
    Hash *uniqued = Hash_new(VA_Get_Size(entries));
    for (uint32_t i = 0, max = VA_Get_Size(entries); i < max; i++) {
        CharBuf *entry = (CharBuf*)VA_Fetch(entries, i);
        Hash_Store(uniqued, (Obj*)entry, INCREF(&EMPTY));
        if (Folder_Is_Directory(folder, entry)) {
            VArray *contents = Folder_List_R(folder, entry);
            for (uint32_t j = VA_Get_Size(contents); j--;) {
                CharBuf *sub_entry = (CharBuf*)VA_Fetch(contents, j);
                Hash_Store(uniqued, (Obj*)sub_entry, INCREF(&EMPTY));
            }
            DECREF(contents);
        }
    }
    VArray *referenced = Hash_Keys(uniqued);
    DECREF(uniqued);
    return referenced;
}