The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
/**
 * This file contains an implementation of the callback interface for level 0
 * in the protocol library. You might want to have your copy of the protocol
 * specification next to your coffee ;-)
 */

#include "mem_config.h"

#include <cassert>
#include <sys/types.h>
#include <cstdio>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
#include <cstdlib>
#include <cstring>

#include <libmemcachedprotocol-0.0/handler.h>
#include <example/byteorder.h>
#include "example/memcached_light.h"
#include "example/storage.h"
#include "util/log.hpp"


using namespace datadifferential;

static util::log_info_st *log_file= NULL;

static protocol_binary_response_status noop_command_handler(const void *cookie,
                                                            protocol_binary_request_header *header,
                                                            memcached_binary_protocol_raw_response_handler response_handler)
{
  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= PROTOCOL_BINARY_CMD_NOOP;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status quit_command_handler(const void *cookie,
                                                            protocol_binary_request_header *header,
                                                            memcached_binary_protocol_raw_response_handler response_handler)
{
  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= PROTOCOL_BINARY_CMD_QUIT;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT)
  {
    response_handler(cookie, header, (protocol_binary_response_header*)&response);
  }

  /* I need a better way to signal to close the connection */
  return PROTOCOL_BINARY_RESPONSE_EINTERNAL;
}

static protocol_binary_response_status get_command_handler(const void *cookie,
                                                           protocol_binary_request_header *header,
                                                           memcached_binary_protocol_raw_response_handler response_handler)
{
  uint8_t opcode= header->request.opcode;
  union protocol_binary_response_get_un {
    protocol_binary_response_get response;
    char buffer[4096];
  };
  
  protocol_binary_response_get_un msg;
  memset(&msg, 0, sizeof(protocol_binary_response_get_un));

  msg.response.message.header.response.magic= PROTOCOL_BINARY_RES;
  msg.response.message.header.response.opcode= opcode;
  msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  msg.response.message.header.response.opaque= header->request.opaque;

  struct item *item= get_item(header + 1, ntohs(header->request.keylen));
  if (item)
  {
    msg.response.message.body.flags= htonl(item->flags);
    char *ptr= (char*)(msg.response.bytes + sizeof(*header) + 4);
    uint32_t bodysize= 4;
    msg.response.message.header.response.cas= example_htonll(item->cas);
    if (opcode == PROTOCOL_BINARY_CMD_GETK || opcode == PROTOCOL_BINARY_CMD_GETKQ)
    {
      memcpy(ptr, item->key, item->nkey);
      msg.response.message.header.response.keylen= htons((uint16_t)item->nkey);
      ptr += item->nkey;
      bodysize += (uint32_t)item->nkey;
    }
    memcpy(ptr, item->data, item->size);
    bodysize += (uint32_t)item->size;
    msg.response.message.header.response.bodylen= htonl(bodysize);
    msg.response.message.header.response.extlen= 4;

    release_item(item);
    return response_handler(cookie, header, (protocol_binary_response_header*)&msg);
  }
  else if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETK)
  {
    msg.response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
    return response_handler(cookie, header, (protocol_binary_response_header*)&msg);
  }

  /* Q shouldn't report a miss ;-) */
  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

static protocol_binary_response_status delete_command_handler(const void *cookie,
                                                              protocol_binary_request_header *header,
                                                              memcached_binary_protocol_raw_response_handler response_handler)
{
  size_t keylen= ntohs(header->request.keylen);

  char *key= ((char*)header) + sizeof(*header);
  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.opaque= header->request.opaque;

  if (delete_item(key, keylen) == false)
  {
    log_file->write(util::VERBOSE_NOTICE, "%s not found: %.*s", __func__, keylen, key);
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
    return response_handler(cookie, header, (protocol_binary_response_header*)&response);
  }
  else if (header->request.opcode == PROTOCOL_BINARY_CMD_DELETE)
  {
    log_file->write(util::VERBOSE_NOTICE, "%s not found: %.*s", __func__, keylen, key);
    /* DELETEQ doesn't want success response */
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
    return response_handler(cookie, header, (protocol_binary_response_header*)&response);
  }

  log_file->write(util::VERBOSE_NOTICE, "%s deleted: %.*s", __func__, keylen, key);

  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

static protocol_binary_response_status flush_command_handler(const void *cookie,
                                                             protocol_binary_request_header *header,
                                                             memcached_binary_protocol_raw_response_handler response_handler)
{
  uint8_t opcode= header->request.opcode;

  /* @fixme sett inn when! */
  flush(0);

  if (opcode == PROTOCOL_BINARY_CMD_FLUSH)
  {
    protocol_binary_response_no_extras response;
    memset(&response, 0, sizeof(protocol_binary_response_no_extras));

    response.message.header.response.magic= PROTOCOL_BINARY_RES;
    response.message.header.response.opcode= opcode;
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
    response.message.header.response.opaque= header->request.opaque;

    return response_handler(cookie, header, (protocol_binary_response_header*)&response);
  }

  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
}

static protocol_binary_response_status arithmetic_command_handler(const void *cookie,
                                                                  protocol_binary_request_header *header,
                                                                  memcached_binary_protocol_raw_response_handler response_handler)
{
  protocol_binary_request_incr *req= (protocol_binary_request_incr*)header;
  protocol_binary_response_incr response;
  memset(&response, 0, sizeof(protocol_binary_response_incr));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.opaque= header->request.opaque;

  uint16_t keylen= ntohs(header->request.keylen);
  uint64_t initial= example_ntohll(req->message.body.initial);
  uint64_t delta= example_ntohll(req->message.body.delta);
  uint32_t expiration= ntohl(req->message.body.expiration);
  uint32_t flags= 0;
  void *key= req->bytes + sizeof(req->bytes);
  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;

  uint64_t value= initial;

  struct item *item= get_item(key, keylen);
  if (item != NULL)
  {
    if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT ||
        header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ)
    {
      value= (*(uint64_t*)item->data) + delta;
    }
    else
    {
      if (delta > *(uint64_t*)item->data)
      {
        value= 0;
      }
      else
      {
        value= *(uint64_t*)item->data - delta;
      }
    }
    expiration= (uint32_t)item->exp;
    flags= item->flags;

    release_item(item);
    delete_item(key, keylen);
  }

  item= create_item(key, keylen, NULL, sizeof(value), flags, (time_t)expiration);
  if (item == NULL)
  {
    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
  }
  else
  {
    memcpy(item->data, &value, sizeof(value));
    put_item(item);
  }

  response.message.header.response.status= htons(rval);
  if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS)
  {
    response.message.header.response.bodylen= ntohl(8);
    response.message.body.value= example_ntohll((*(uint64_t*)item->data));
    response.message.header.response.cas= example_ntohll(item->cas);

    release_item(item);
    if (header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENTQ ||
        header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENTQ)
    {
      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
    }
  }

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status version_command_handler(const void *cookie,
                                                               protocol_binary_request_header *header,
                                                               memcached_binary_protocol_raw_response_handler response_handler)
{
  const char *versionstring= "1.0.0";
  union protocol_binary_response_header_un
  {
    protocol_binary_response_header packet;
    char buffer[256];
  };
  
  protocol_binary_response_header_un response;
  memset(&response, 0, sizeof(protocol_binary_response_header_un));

  response.packet.response.magic= PROTOCOL_BINARY_RES;
  response.packet.response.opcode= PROTOCOL_BINARY_CMD_VERSION;
  response.packet.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.packet.response.opaque= header->request.opaque;
  response.packet.response.cas= 0;
  response.packet.response.bodylen= htonl((uint32_t)strlen(versionstring));

  assert(sizeof(protocol_binary_response_header) +strlen(versionstring) <= 256);
  memcpy(response.buffer + sizeof(protocol_binary_response_header), versionstring, strlen(versionstring));

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status concat_command_handler(const void *cookie,
                                                              protocol_binary_request_header *header,
                                                              memcached_binary_protocol_raw_response_handler response_handler)
{
  protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_SUCCESS;
  uint16_t keylen= ntohs(header->request.keylen);
  uint64_t cas= example_ntohll(header->request.cas);
  void *key= header + 1;
  uint32_t vallen= ntohl(header->request.bodylen) - keylen;
  void *val= (char*)key + keylen;

  struct item *item= get_item(key, keylen);
  struct item *nitem= NULL;

  if (item == NULL)
  {
    rval= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
  }
  else if (cas != 0 && cas != item->cas)
  {
    rval= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
  }
  else if ((nitem= create_item(key, keylen, NULL, item->size + vallen,
                               item->flags, item->exp)) == NULL)
  {
    release_item(item);
    rval= PROTOCOL_BINARY_RESPONSE_ENOMEM;
  }
  else
  {
    if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
        header->request.opcode == PROTOCOL_BINARY_CMD_APPENDQ)
    {
      memcpy(nitem->data, item->data, item->size);
      memcpy(((char*)(nitem->data)) + item->size, val, vallen);
    }
    else
    {
      memcpy(nitem->data, val, vallen);
      memcpy(((char*)(nitem->data)) + vallen, item->data, item->size);
    }
    release_item(item);
    delete_item(key, keylen);
    put_item(nitem);
    cas= nitem->cas;
    release_item(nitem);

    if (header->request.opcode == PROTOCOL_BINARY_CMD_APPEND ||
        header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND)
    {
      protocol_binary_response_no_extras response;
      memset(&response, 0, sizeof(protocol_binary_response_no_extras));

      response.message.header.response.magic= PROTOCOL_BINARY_RES;
      response.message.header.response.opcode= header->request.opcode;
      response.message.header.response.status= htons(rval);
      response.message.header.response.opaque= header->request.opaque;
      response.message.header.response.cas= example_htonll(cas);

      return response_handler(cookie, header, (protocol_binary_response_header*)&response);
    }
  }

  return rval;
}

static protocol_binary_response_status set_command_handler(const void *cookie,
                                                           protocol_binary_request_header *header,
                                                           memcached_binary_protocol_raw_response_handler response_handler)
{
  size_t keylen= ntohs(header->request.keylen);
  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
  protocol_binary_request_replace *request= (protocol_binary_request_replace*)header;
  uint32_t flags= ntohl(request->message.body.flags);
  time_t timeout= (time_t)ntohl(request->message.body.expiration);
  char *key= ((char*)header) + sizeof(*header) + 8;
  char *data= key + keylen;

  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  if (header->request.cas != 0)
  {
    /* validate cas */
    struct item* item= get_item(key, keylen);
    if (item != NULL)
    {
      if (item->cas != example_ntohll(header->request.cas))
      {
        release_item(item);
        response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
        return response_handler(cookie, header, (protocol_binary_response_header*)&response);
      }
      release_item(item);
    }
  }

  delete_item(key, keylen);
  struct item* item= create_item(key, keylen, data, datalen, flags, timeout);
  if (item == NULL)
  {
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
  }
  else
  {
    put_item(item);
    /* SETQ shouldn't return a message */
    if (header->request.opcode == PROTOCOL_BINARY_CMD_SET)
    {
      response.message.header.response.cas= example_htonll(item->cas);
      release_item(item);
      return response_handler(cookie, header, (protocol_binary_response_header*)&response);
    }
    release_item(item);

    return PROTOCOL_BINARY_RESPONSE_SUCCESS;
  }

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status add_command_handler(const void *cookie,
                                                           protocol_binary_request_header *header,
                                                           memcached_binary_protocol_raw_response_handler response_handler)
{
  size_t keylen= ntohs(header->request.keylen);
  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
  protocol_binary_request_add *request= (protocol_binary_request_add*)header;
  uint32_t flags= ntohl(request->message.body.flags);
  time_t timeout= (time_t)ntohl(request->message.body.expiration);
  char *key= ((char*)header) + sizeof(*header) + 8;
  char *data= key + keylen;

  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  struct item* item= get_item(key, keylen);
  if (item == NULL)
  {
    item= create_item(key, keylen, data, datalen, flags, timeout);
    if (item == NULL)
      response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
    else
    {
      put_item(item);
      /* ADDQ shouldn't return a message */
      if (header->request.opcode == PROTOCOL_BINARY_CMD_ADD)
      {
        response.message.header.response.cas= example_htonll(item->cas);
        release_item(item);
        return response_handler(cookie, header, (protocol_binary_response_header*)&response);
      }
      release_item(item);
      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
    }
  }
  else
  {
    release_item(item);
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
  }

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status replace_command_handler(const void *cookie,
                                                               protocol_binary_request_header *header,
                                                               memcached_binary_protocol_raw_response_handler response_handler)
{
  size_t keylen= ntohs(header->request.keylen);
  size_t datalen= ntohl(header->request.bodylen) - keylen - 8;
  protocol_binary_request_replace *request= (protocol_binary_request_replace*)header;
  uint32_t flags= ntohl(request->message.body.flags);
  time_t timeout= (time_t)ntohl(request->message.body.expiration);
  char *key= ((char*)header) + sizeof(*header) + 8;
  char *data= key + keylen;

  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  struct item* item= get_item(key, keylen);
  if (item == NULL)
  {
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
  }
  else if (header->request.cas == 0 || example_ntohll(header->request.cas) == item->cas)
  {
    release_item(item);
    delete_item(key, keylen);
    item= create_item(key, keylen, data, datalen, flags, timeout);

    if (item == NULL)
    {
      response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_ENOMEM);
    }
    else
    {
      put_item(item);
      /* REPLACEQ shouldn't return a message */
      if (header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE)
      {
        response.message.header.response.cas= example_htonll(item->cas);
        release_item(item);
        return response_handler(cookie, header, (protocol_binary_response_header*)&response);
      }
      release_item(item);
      return PROTOCOL_BINARY_RESPONSE_SUCCESS;
    }
  }
  else
  {
    response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
    release_item(item);
  }

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

static protocol_binary_response_status stat_command_handler(const void *cookie,
                                                            protocol_binary_request_header *header,
                                                            memcached_binary_protocol_raw_response_handler response_handler)
{
  /* Just send the terminating packet*/
  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= PROTOCOL_BINARY_CMD_STAT;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS);
  response.message.header.response.opaque= header->request.opaque;

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

memcached_binary_protocol_callback_st interface_v0_impl;

void initialize_interface_v0_handler(util::log_info_st& arg)
{
  log_file= &arg;

  interface_v0_impl.interface_version= MEMCACHED_PROTOCOL_HANDLER_V0;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GET]= get_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SET]= set_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADD]= add_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENT]= arithmetic_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENT]= arithmetic_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETQ]= get_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_VERSION]= version_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETK]= get_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPEND]= concat_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPEND]= concat_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_STAT]= stat_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_SETQ]= set_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_INCREMENTQ]= arithmetic_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_DECREMENTQ]= arithmetic_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_APPENDQ]= concat_command_handler;
  interface_v0_impl.interface.v0.comcode[PROTOCOL_BINARY_CMD_PREPENDQ]= concat_command_handler;
}