The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
/**
 * What is a library without an example to show you how to use the library?
 * This example use both interfaces to implement a small memcached server.
 * Please note that this is an exemple on how to use the library, not
 * an implementation of a scalable memcached server. If you look closely
 * at the example it isn't even multithreaded ;-)
 *
 * With that in mind, let me give you some pointers into the source:
 *   storage.c/h       - Implements the item store for this server and not really
 *                       interesting for this example.
 *   interface_v0.cc   - Shows an implementation of the memcached server by using
 *                       the "raw" access to the packets as they arrive
 *   interface_v1.cc   - Shows an implementation of the memcached server by using
 *                       the more "logical" interface.
 *   memcached_light.cc- This file sets up all of the sockets and run the main
 *                       message loop.
 *
 *
 * config.h is included so that I can use the ntohll/htonll on platforms that
 * doesn't have that (this is a private function inside libmemcached, so you
 * cannot use it directly from libmemcached without special modifications to
 * the library)
 */

#include <mem_config.h>

#include <libmemcachedprotocol-0.0/handler.h>
#include <libmemcached/socket.hpp>
#include <example/byteorder.h>
#include "example/storage.h"
#include "example/memcached_light.h"

#include "util/daemon.hpp"
#include "util/log.hpp"
#include "util/pidfile.hpp"

using namespace datadifferential;

#include <event.h>

#include <cassert>
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <getopt.h>
#include <iostream>
#include <sys/types.h>
#include <unistd.h>

extern memcached_binary_protocol_callback_st interface_v0_impl;
extern memcached_binary_protocol_callback_st interface_v1_impl;

static memcached_socket_t server_sockets[1024];
static int num_server_sockets= 0;

struct connection
{
  void *userdata;
  struct event event;
};

/* The default maximum number of connections... (change with -c) */
static int maxconns= 1024;

static struct connection *socket_userdata_map;
static struct event_base *event_base= NULL;

struct options_st {
  std::string pid_file;
  std::string service;
  std::string log_file;
  bool is_verbose;
  bool opt_daemon;

  options_st() :
    service("9999"),
    is_verbose(false),
    opt_daemon(false)
  {
  }
};

static options_st global_options;

/**
 * Callback for driving a client connection
 * @param fd the socket for the client socket
 * @param which identifying the event that occurred (not used)
 * @param arg the connection structure for the client
 */
static void drive_client(memcached_socket_t fd, short, void *arg)
{
  struct connection *client= (struct connection*)arg;
  struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
  assert(c != NULL);

  memcached_protocol_event_t events= memcached_protocol_client_work(c);
  if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
  {
    if (global_options.is_verbose)
    {
      struct sockaddr_in sin;
      socklen_t addrlen= sizeof(sin);

      if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
      {
        std::cout << __FILE__ << ":" << __LINE__
          << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
          << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
          << " fd:" << fd
          << std::endl;
      }
      else
      {
        std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
      }
    }

    memcached_protocol_client_destroy(c);
    closesocket(fd);
  }
  else
  {
    short flags = 0;
    if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
    {
      flags= EV_WRITE;
    }

    if (events & MEMCACHED_PROTOCOL_READ_EVENT)
    {
      flags|= EV_READ;
    }

    event_set(&client->event, int(fd), flags, drive_client, client);
    event_base_set(event_base, &client->event);

    if (event_add(&client->event, 0) == -1)
    {
      memcached_protocol_client_destroy(c);
      closesocket(fd);
    }
  }
}

/**
 * Callback for accepting new connections
 * @param fd the socket for the server socket
 * @param which identifying the event that occurred (not used)
 * @param arg the connection structure for the server
 */
static void accept_handler(memcached_socket_t fd, short, void *arg)
{
  struct connection *server= (struct connection *)arg;
  /* accept new client */
  struct sockaddr_storage addr;
  socklen_t addrlen= sizeof(addr);
  memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);

  if (sock == INVALID_SOCKET)
  {
    perror("Failed to accept client");
  }

#ifndef WIN32
  if (sock >= maxconns)
  {
    closesocket(sock);
    return ;
  }
#endif

  struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
  if (c == NULL)
  {
    closesocket(sock);
  }
  else
  {
    memcached_protocol_client_set_verbose(c, global_options.is_verbose);
    struct connection *client = &socket_userdata_map[sock];
    client->userdata= c;

    event_set(&client->event, int(sock), EV_READ, drive_client, client);
    event_base_set(event_base, &client->event);
    if (event_add(&client->event, 0) == -1)
    {
      std::cerr << "Failed to add event for " << sock << std::endl;
      memcached_protocol_client_destroy(c);
      closesocket(sock);
    }
  }
}

static bool server_socket(util::log_info_st& log_file, const std::string& service)
{
  struct addrinfo *ai;
  struct addrinfo hints;
  memset(&hints, 0, sizeof(struct addrinfo));

  hints.ai_flags= AI_PASSIVE;
  hints.ai_family= AF_UNSPEC;
  hints.ai_socktype= SOCK_STREAM;

  int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
  if (error != 0)
  {
    if (error != EAI_SYSTEM)
    {
      std::string buffer("getaddrinfo: ");
      buffer+= gai_strerror(error);
      log_file.write(util::VERBOSE_ERROR, buffer.c_str());
    }
    else
    {
      std::string buffer("getaddrinfo: ");
      buffer+= strerror(errno);
      log_file.write(util::VERBOSE_ERROR, buffer.c_str());
    }

    return false;
  }

  struct linger ling= {0, 0};

  for (struct addrinfo *next= ai; next; next= next->ai_next)
  {
    memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
    if (sock == INVALID_SOCKET)
    {
      std::string buffer("Failed to create socket: ");
      buffer+= strerror(errno);
      log_file.write(util::VERBOSE_ERROR, buffer.c_str());
      continue;
    }

    int flags;
#ifdef WIN32
    u_long arg = 1;
    if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
    {
      std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
      closesocket(sock);
      continue;
    }
#else
    flags= fcntl(sock, F_GETFL, 0);
    if (flags == -1)
    {
      std::string buffer("Failed to get socket flags: ");
      buffer+= strerror(errno);
      log_file.write(util::VERBOSE_ERROR, buffer.c_str());
      closesocket(sock);
      continue;
    }

    if ((flags & O_NONBLOCK) != O_NONBLOCK)
    {
      if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
      {
        std::string buffer("Failed to set socket to nonblocking mode: ");
        buffer+= strerror(errno);
        log_file.write(util::VERBOSE_ERROR, buffer.c_str());
        closesocket(sock);
        continue;
      }
    }
#endif

    flags= 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
    {
      std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
    }

    if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
    {
      std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
    }

    if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
    {
      std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
    }

    if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
    {
      std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
    }

    if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
    {
      if (get_socket_errno() != EADDRINUSE)
      {
        std::cerr << "bind(): " << strerror(errno) << std::endl;
        freeaddrinfo(ai);
      }
      closesocket(sock);
      continue;
    }

    if (listen(sock, 1024) == SOCKET_ERROR)
    {
      std::string buffer("listen(): ");
      buffer+= strerror(errno);
      log_file.write(util::VERBOSE_ERROR, buffer.c_str());
      closesocket(sock);
      continue;
    }

    if (global_options.is_verbose)
    {
      std::string buffer("Listening to: ");
      buffer+= global_options.service;
      log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
    }

    server_sockets[num_server_sockets++]= sock;
  }

  freeaddrinfo(ai);

  return (num_server_sockets > 0) ? true : false;
}

/**
 * Convert a command code to a textual string
 * @param cmd the comcode to convert
 * @return a textual string with the command or NULL for unknown commands
 */
static const char* comcode2str(uint8_t cmd)
{
  static const char * const text[] = {
    "GET", "SET", "ADD", "REPLACE", "DELETE",
    "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
    "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
    "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
    "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
    "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
  };

  if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
  {
    return text[cmd];
  }

  return NULL;
}

/**
 * Print out the command we are about to execute
 */
static void pre_execute(const void *cookie,
                        protocol_binary_request_header *header)
{
  if (global_options.is_verbose)
  {
    if (header)
    {
      const char *cmd= comcode2str(header->request.opcode);
      if (cmd != NULL)
      {
        std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
      }
      else
      {
        std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
      }
    }
    else
    {
      std::cout << "pre_execute from " << cookie << std::endl;
    }
  }
}

/**
 * Print out the command we just executed
 */
static void post_execute(const void *cookie,
                         protocol_binary_request_header *header)
{
  if (global_options.is_verbose)
  {
    if (header)
    {
      const char *cmd= comcode2str(header->request.opcode);
      if (cmd != NULL)
      {
        std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
      }
      else
      {
        std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
      }
    }
    else
    {
      std::cout << "post_execute from " << cookie << std::endl;
    }
  }
}

/**
 * Callback handler for all unknown commands.
 * Send an unknown command back to the client
 */
static protocol_binary_response_status unknown(const void *cookie,
                                               protocol_binary_request_header *header,
                                               memcached_binary_protocol_raw_response_handler response_handler)
{
  protocol_binary_response_no_extras response;
  memset(&response, 0, sizeof(protocol_binary_response_no_extras));

  response.message.header.response.magic= PROTOCOL_BINARY_RES;
  response.message.header.response.opcode= header->request.opcode;
  response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
  response.message.header.response.opaque= header->request.opaque;

  return response_handler(cookie, header, (protocol_binary_response_header*)&response);
}

/**
 * Program entry point. Bind to the specified port(s) and serve clients
 *
 * @param argc number of items in the argument vector
 * @param argv argument vector
 * @return EXIT_SUCCESS on success, 1 otherwise
 */
int main(int argc, char **argv)
{
  memcached_binary_protocol_callback_st *interface= &interface_v0_impl;

  {
    enum long_option_t {
      OPT_HELP,
      OPT_VERBOSE,
      OPT_DAEMON,
      OPT_PROTOCOL_VERSION,
      OPT_VERSION,
      OPT_PORT,
      OPT_MAX_CONNECTIONS,
      OPT_LOGFILE,
      OPT_PIDFILE
    };

    static struct option long_options[]=
    {
      { "help", no_argument, NULL, OPT_HELP },
      { "port", required_argument, NULL, OPT_PORT },
      { "verbose", no_argument, NULL, OPT_VERBOSE },
      { "daemon", no_argument, NULL, OPT_DAEMON },
      { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
      { "version", no_argument, NULL, OPT_VERSION },
      { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
      { "pid-file", required_argument, NULL, OPT_PIDFILE },
      { "log-file", required_argument, NULL, OPT_LOGFILE },
      {0, 0, 0, 0}
    };

    bool opt_help= false;
    int option_index;
    bool done= false;
    while (done == false)
    {
      switch (getopt_long(argc, argv, "", long_options, &option_index))
      {
      case -1:
        done= true;
        break;

      case OPT_PROTOCOL_VERSION:
        interface= &interface_v1_impl;
        break;

      case OPT_PIDFILE:
        global_options.pid_file= optarg;
        break;

      case OPT_LOGFILE:
        global_options.log_file= optarg;
        break;

      case OPT_VERBOSE:
        global_options.is_verbose= true;
        break;

      case OPT_VERSION:
        break;

      case OPT_DAEMON:
        global_options.opt_daemon= true;
        break;

      case OPT_PORT:
        global_options.service= optarg;
        break;

      case OPT_MAX_CONNECTIONS:
        maxconns= atoi(optarg);
        break;

      case OPT_HELP:  /* FALLTHROUGH */
        opt_help= true;
        break;

      default:
        {
          std::cerr << "Unknown option: " << optarg << std::endl;
          return EXIT_FAILURE;
        }
      }
    }

    if (opt_help)
    {
      std::cout << "Usage: " << argv[0] << std::endl;
      for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
      {
        std::cout << "\t" << ptr_option->name << std::endl;
      }
      return EXIT_SUCCESS;
    }
  }

  if (global_options.opt_daemon)
  {
    util::daemonize(false, true);
  }

  if (initialize_storage() == false)
  {
    /* Error message already printed */
    return EXIT_FAILURE;
  }

  util::Pidfile _pid_file(global_options.pid_file);

  if (_pid_file.create() == false)
  {
    std::cerr << "Failed to create pid-file" <<  _pid_file.error_message() << std::endl;
    return EXIT_FAILURE;
  }

  util::log_info_st log_file(argv[0], global_options.log_file, false);
  log_file.write(util::VERBOSE_NOTICE, "starting log");

  /*
   * We need to initialize the handlers manually due to a bug in the
   * warnings generated by struct initialization in gcc (all the way up to 4.4)
   */
  initialize_interface_v0_handler(log_file);
  initialize_interface_v1_handler(log_file);


  if (server_socket(log_file, global_options.service) == false)
  {
    return EXIT_FAILURE;
  }

  if (num_server_sockets == 0)
  {
    log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
    return EXIT_FAILURE;
  }

  /*
   * Create and initialize the handles to the protocol handlers. I want
   * to be able to trace the traffic throught the pre/post handlers, and
   * set up a common handler for unknown messages
   */
  interface->pre_execute= pre_execute;
  interface->post_execute= post_execute;
  interface->unknown= unknown;

  struct memcached_protocol_st *protocol_handle;
  if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
  {
    log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
    return EXIT_FAILURE;
  }

  socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
  if (socket_userdata_map == NULL)
  {
    log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
    return EXIT_FAILURE;
  }

  memcached_binary_protocol_set_callbacks(protocol_handle, interface);
  memcached_binary_protocol_set_pedantic(protocol_handle, true);

  event_base= event_init();
  if (event_base == NULL)
  {
    std::cerr << "Failed to create an instance of libevent" << std::endl;
    return EXIT_FAILURE;
  }

  for (int xx= 0; xx < num_server_sockets; ++xx)
  {
    struct connection *conn= &socket_userdata_map[server_sockets[xx]];
    conn->userdata= protocol_handle;

    event_set(&conn->event, int(server_sockets[xx]), EV_READ | EV_PERSIST, accept_handler, conn);

    event_base_set(event_base, &conn->event);
    if (event_add(&conn->event, 0) == -1)
    {
      log_file.write(util::VERBOSE_ERROR, "Failed to add event");
      closesocket(server_sockets[xx]);
    }
  }

  if (global_options.opt_daemon)
  {
    if (util::daemon_is_ready(true) == false)
    {
      log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
      return EXIT_FAILURE;
    }
  }


  /* Serve all of the clients */
  switch (event_base_loop(event_base, 0))
  {
  case -1:
    log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
    break;

  case 1:
    log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
    break;

  default:
    break;
  }
  log_file.write(util::VERBOSE_NOTICE, "exiting");

  /* NOTREACHED */
  return EXIT_SUCCESS;
}