The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
#include <stdlib.h>
#include <inttypes.h>
#include <time.h>
#include <stdbool.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <embedded_innodb-1.0/innodb.h>

#include "storage.h"

const char *tablename= "memcached/items";

#define key_col_idx 0
#define data_col_idx 1
#define flags_col_idx 2
#define cas_col_idx 3
#define exp_col_idx 4

static uint64_t cas;

/**
 * To avoid cluttering down all the code with error checking I use the
 * following macro. It will execute the statement and verify that the
 * result of the operation is DB_SUCCESS. If any other error code is
 * returned it will print an "assert-like" output and jump to the
 * label error_exit. There I release resources before returning out of
 * the function.
 *
 * @param a the expression to execute
 *
 */
#define checked(expression)                                    \
do {                                                           \
  ib_err_t checked_err= expression;                            \
  if (checked_err != DB_SUCCESS)                               \
  {                                                            \
    fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n",   \
            __FILE__, __LINE__, #expression,                   \
            ib_strerror(checked_err));                         \
    goto error_exit;                                           \
  }                                                            \
} while (0);

/**
 * Create the database schema.
 * @return true if the database schema was created without any problems
 *         false otherwise.
 */
static bool create_schema(void) 
{
  ib_tbl_sch_t schema= NULL;
  ib_idx_sch_t dbindex= NULL;

  if (ib_database_create("memcached") != IB_TRUE)
  {
    fprintf(stderr, "Failed to create database\n");
    return false;
  }

  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
  ib_id_t table_id;

  checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
  checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
                                  IB_COL_NOT_NULL, 0, 32767));
  checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
                                  IB_COL_NONE, 0, 1024*1024));
  checked(ib_table_schema_add_col(schema, "flags", IB_INT,
                                  IB_COL_UNSIGNED, 0, 4));
  checked(ib_table_schema_add_col(schema, "cas", IB_INT,
                                  IB_COL_UNSIGNED, 0, 8));
  checked(ib_table_schema_add_col(schema, "exp", IB_INT,
                                  IB_COL_UNSIGNED, 0, 4));
  checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
  checked(ib_index_schema_add_col(dbindex, "key", 0));
  checked(ib_index_schema_set_clustered(dbindex));
  checked(ib_schema_lock_exclusive(transaction));
  checked(ib_table_create(transaction, schema, &table_id));
  checked(ib_trx_commit(transaction));
  ib_table_schema_delete(schema);

  return true;

 error_exit:
  /* @todo release resources! */
  {
    ib_err_t error= ib_trx_rollback(transaction);
    if (error != DB_SUCCESS)
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
              ib_strerror(error));
  }
  return false;
}

/**
 * Store an item into the database. Update the CAS id on the item before
 * storing it in the database.
 *
 * @param trx the transaction to use
 * @param item the item to store
 * @return true if we can go ahead and commit the transaction, false otherwise
 */
static bool do_put_item(ib_trx_t trx, struct item* item) 
{
  update_cas(item);

  ib_crsr_t cursor= NULL;
  ib_tpl_t tuple= NULL;
  bool retval= false;

  checked(ib_cursor_open_table(tablename, trx, &cursor));
  checked(ib_cursor_lock(cursor, IB_LOCK_X));
  tuple= ib_clust_read_tuple_create(cursor);

  checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
  checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
  checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
  checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
  checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
  checked(ib_cursor_insert_row(cursor, tuple));

  retval= true;
  /* Release resources: */
  /* FALLTHROUGH */

 error_exit:
  if (tuple != NULL)
    ib_tuple_delete(tuple);

  ib_err_t currsor_error;
  if (cursor != NULL)
    currsor_error= ib_cursor_close(cursor);

  return retval;
}

/**
 * Try to locate an item in the database. Return a cursor and the tuple to
 * the item if I found it in the database.
 *
 * @param trx the transaction to use
 * @param key the key of the item to look up
 * @param nkey the size of the key
 * @param cursor where to store the cursor (OUT)
 * @param tuple where to store the tuple (OUT)
 * @return true if I found the object, false otherwise
 */
static bool do_locate_item(ib_trx_t trx,
                           const void* key,
                           size_t nkey,
                           ib_crsr_t *cursor)
{
  int res;
  ib_tpl_t tuple= NULL;

  *cursor= NULL;

  checked(ib_cursor_open_table(tablename, trx, cursor));
  tuple= ib_clust_search_tuple_create(*cursor);
  if (tuple == NULL)
  {
    fprintf(stderr, "Failed to allocate tuple object\n");
    goto error_exit;
  }

  checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
  ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);

  if (err == DB_SUCCESS && res == 0)
  {
    ib_tuple_delete(tuple);
    return true;
  }
  else if (err != DB_SUCCESS &&
           err != DB_RECORD_NOT_FOUND &&
           err != DB_END_OF_INDEX)
  {
    fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
  }
  /* FALLTHROUGH */
 error_exit:
  if (tuple != NULL)
    ib_tuple_delete(tuple);

  ib_err_t cursor_error;
  if (*cursor != NULL)
    cursor_error= ib_cursor_close(*cursor);

  *cursor= NULL;

  return false;
}

/**
 * Try to get an item from the database
 *
 * @param trx the transaction to use
 * @param key the key to get
 * @param nkey the lenght of the key
 * @return a pointer to the item if I found it in the database
 */
static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) 
{
  ib_crsr_t cursor= NULL;
  ib_tpl_t tuple= NULL;
  struct item* retval= NULL;

  if (do_locate_item(trx, key, nkey, &cursor)) 
  {
    tuple= ib_clust_read_tuple_create(cursor);
    if (tuple == NULL)
    {
      fprintf(stderr, "Failed to create read tuple\n");
      goto error_exit;
    }
    checked(ib_cursor_read_row(cursor, tuple));
    ib_col_meta_t meta;
    ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
    ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
    ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
    ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
    const void *dataptr= ib_col_get_value(tuple, data_col_idx);

    retval= create_item(key, nkey, dataptr, datalen, 0, 0);
    if (retval == NULL) 
    {
      fprintf(stderr, "Failed to allocate memory\n");
      goto error_exit;
    }

    if (flaglen != 0) 
    {
      ib_u32_t val;
      checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
      retval->flags= (uint32_t)val;
    }
    if (caslen != 0) 
    {
      ib_u64_t val;
      checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
      retval->cas= (uint64_t)val;
    }
    if (explen != 0) 
    {
      ib_u32_t val;
      checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
      retval->exp= (time_t)val;
    }
  }

  /* Release resources */
  /* FALLTHROUGH */

 error_exit:
  if (tuple != NULL)
    ib_tuple_delete(tuple);

  ib_err_t cursor_error;
  if (cursor != NULL)
    cursor_error= ib_cursor_close(cursor);

  return retval;
}

/**
 * Delete an item from the cache
 * @param trx the transaction to use
 * @param key the key of the item to delete
 * @param nkey the length of the key
 * @return true if we should go ahead and commit the transaction
 *         or false if we should roll back (if the key didn't exists)
 */
static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
  ib_crsr_t cursor= NULL;
  bool retval= false;

  if (do_locate_item(trx, key, nkey, &cursor))
  {
    checked(ib_cursor_lock(cursor, IB_LOCK_X));
    checked(ib_cursor_delete_row(cursor));
    retval= true;
  }
  /* Release resources */
  /* FALLTHROUGH */

 error_exit:
  if (cursor != NULL)
  {
    ib_err_t cursor_error;
    cursor_error= ib_cursor_close(cursor);
  }

  return retval;
}


/****************************************************************************
 * External interface
 ***************************************************************************/

/**
 * Initialize the database storage
 * @return true if the database was initialized successfully, false otherwise
 */
bool initialize_storage(void) 
{
  ib_err_t error;
  ib_id_t tid;

  checked(ib_init());
  checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
  checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
  checked(ib_cfg_set_bool_on("file_per_table"));
  checked(ib_startup("barracuda"));

  /* check to see if the table exists or if we should create the schema */
  error= ib_table_get_id(tablename, &tid);
  if (error == DB_TABLE_NOT_FOUND) 
  {
    if (!create_schema()) 
    {
      return false;
    }
  } 
  else if (error != DB_SUCCESS) 
  {
    fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
    return false;
  }

  return true;

 error_exit:

  return false;
}

/**
 * Shut down this storage engine
 */
void shutdown_storage(void) 
{
  checked(ib_shutdown(IB_SHUTDOWN_NORMAL));
 error_exit:
  ;
}

/**
 * Store an item in the databse
 *
 * @param item the item to store
 */
void put_item(struct item* item) 
{
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
  if (do_put_item(transaction, item)) 
  {
    ib_err_t error= ib_trx_commit(transaction);
    if (error != DB_SUCCESS) 
    {
      fprintf(stderr, "Failed to store key:\n\t%s\n",
              ib_strerror(error));
    }
  } 
  else 
  {
    ib_err_t error= ib_trx_rollback(transaction);
    if (error != DB_SUCCESS)
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
              ib_strerror(error));
  }
}

/**
 * Get an item from the engine
 * @param key the key to grab
 * @param nkey number of bytes in the key
 * @return pointer to the item if found
 */
struct item* get_item(const void* key, size_t nkey) 
{
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
  struct item* ret= do_get_item(transaction, key, nkey);
  ib_err_t error= ib_trx_rollback(transaction);

  if (error != DB_SUCCESS)
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
            ib_strerror(error));

  return ret;
}

/**
 * Create an item structure and initialize it with the content
 *
 * @param key the key for the item
 * @param nkey the number of bytes in the key
 * @param data pointer to the value for the item (may be NULL)
 * @param size the size of the data
 * @param flags the flags to store with the data
 * @param exp the expiry time for the item
 * @return pointer to an initialized item object or NULL if allocation failed
 */
struct item* create_item(const void* key, size_t nkey, const void* data,
                         size_t size, uint32_t flags, time_t exp)
{
  struct item* ret= calloc(1, sizeof(*ret));
  if (ret != NULL)
  {
    ret->key= malloc(nkey);
    if (size > 0)
    {
      ret->data= malloc(size);
    }

    if (ret->key == NULL || (size > 0 && ret->data == NULL))
    {
      free(ret->key);
      free(ret->data);
      free(ret);
      return NULL;
    }

    memcpy(ret->key, key, nkey);
    if (data != NULL)
    {
      memcpy(ret->data, data, size);
    }

    ret->nkey= nkey;
    ret->size= size;
    ret->flags= flags;
    ret->exp= exp;
  }

  return ret;
}

/**
 * Delete an item from the cache
 * @param key the key of the item to delete
 * @param nkey the length of the key
 * @return true if the item was deleted from the cache
 */
bool delete_item(const void* key, size_t nkey) {
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);

  bool ret= do_delete_item(transaction, key, nkey);

  if (ret)
  {
    /* object found. commit transaction */
    ib_err_t error= ib_trx_commit(transaction);
    if (error != DB_SUCCESS)
    {
      fprintf(stderr, "Failed to delete key:\n\t%s\n",
              ib_strerror(error));
      ret= false;
    }
  }
  else
  {
    ib_err_t error= ib_trx_rollback(transaction);
    if (error != DB_SUCCESS)
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
              ib_strerror(error));
  }

  return ret;
}

/**
 * Flush the entire cache
 * @param when when the cache should be flushed (0 == immediately)
 */
void flush(uint32_t when __attribute__((unused))) 
{
  /* @TODO implement support for when != 0 */
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
  ib_crsr_t cursor= NULL;
	ib_err_t err= DB_SUCCESS;

  checked(ib_cursor_open_table(tablename, transaction, &cursor));
  checked(ib_cursor_first(cursor));
  checked(ib_cursor_lock(cursor, IB_LOCK_X));

  do 
  {
    checked(ib_cursor_delete_row(cursor));
  } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);

  if (err != DB_END_OF_INDEX)
  {
    fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
    goto error_exit;
  }
  ib_err_t cursor_error;
  cursor_error= ib_cursor_close(cursor);
  cursor= NULL;
  checked(ib_trx_commit(transaction));
  return;

 error_exit:
  if (cursor != NULL)
  {
    cursor_error= ib_cursor_close(cursor);
  }

  ib_err_t error= ib_trx_rollback(transaction);
  if (error != DB_SUCCESS)
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
            ib_strerror(error));
}

/**
 * Update the cas ID in the item structure
 * @param item the item to update
 */
void update_cas(struct item* item) 
{
  item->cas= ++cas;
}

/**
 * Release all the resources allocated by the item
 * @param item the item to release
 */
void release_item(struct item* item) 
{
  free(item->key);
  free(item->data);
  free(item);
}