The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/*
 * File:   ms_task.c
 * Author: Mingqiang Zhuang
 *
 * Created on February 10, 2009
 *
 * (c) Copyright 2009, Schooner Information Technology, Inc.
 * http://www.schoonerinfotech.com/
 *
 */

#include "mem_config.h"

#if defined(HAVE_SYS_TIME_H)
# include <sys/time.h>
#endif

#if defined(HAVE_TIME_H)
# include <time.h>
#endif

#include "ms_thread.h"
#include "ms_setting.h"
#include "ms_atomic.h"

/* command distribution adjustment cycle */
#define CMD_DISTR_ADJUST_CYCLE    1000
#define DISADJUST_FACTOR          0.03 /**
                                 * In one adjustment cycle, if undo set or get
                                 * operations proportion is more than 3% , means
                                 * there are too many new item or need more new
                                 * item in the window. This factor shows it.
                                 */

/* get item from task window */
static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);


/* select next operation to do */
static void ms_select_opt(ms_conn_t *c, ms_task_t *task);


/* set and get speed estimate for controlling and adjustment */
static bool ms_is_set_too_fast(ms_task_t *task);
static bool ms_is_get_too_fast(ms_task_t *task);
static void ms_kick_out_item(ms_task_item_t *item);


/* miss rate adjustment */
static bool ms_need_overwrite_item(ms_task_t *task);
static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);


/* deal with data verification initialization */
static void ms_task_data_verify_init(ms_task_t *task);
static void ms_task_expire_verify_init(ms_task_t *task);


/* select a new task to do */
static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);


/* run the selected task */
static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
static void ms_update_stat_result(ms_conn_t *c);
static void ms_update_multi_get_result(ms_conn_t *c);
static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
static void ms_update_task_result(ms_conn_t *c);
static void ms_single_getset_task_sch(ms_conn_t *c);
static void ms_multi_getset_task_sch(ms_conn_t *c);
static void ms_send_signal(ms_sync_lock_t *sync_lock);
static void ms_warmup_server(ms_conn_t *c);
static int ms_run_getset_task(ms_conn_t *c);


/**
 * used to get the current operation item(object)
 *
 * @param c, pointer of the concurrency
 *
 * @return ms_task_item_t*, current operating item
 */
static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
{
  return c->curr_task.item;
}


/**
 * used to get the next item to do get operation
 *
 * @param c, pointer of the concurrency
 *
 * @return ms_task_item_t*, the pointer of the next item to do
 *         get operation
 */
static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
{
  ms_task_item_t *item= NULL;

  if (c->set_cursor <= 0)
  {
    /* the first item in the window */
    item= &c->item_win[0];
  }
  else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
  {
    /* random get one item set before */
    item= &c->item_win[random() % (int64_t)c->set_cursor];
  }
  else
  {
    /* random get one item from the window */
    item= &c->item_win[random() % c->win_size];
  }

  return item;
} /* ms_get_next_get_item */


/**
 * used to get the next item to do set operation
 *
 * @param c, pointer of the concurrency
 *
 * @return ms_task_item_t*, the pointer of the next item to do
 *         set operation
 */
static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
{
  /**
   *  when a set command successes, the cursor will plus 1. If set
   *  fails, the cursor doesn't change. it isn't necessary to
   *  increase the cursor here.
   */
  return &c->item_win[(int64_t)c->set_cursor % c->win_size];
}


/**
 * If we need do overwrite, we could select a item set before.
 * This function is used to get a item set before to do
 * overwrite.
 *
 * @param c, pointer of the concurrency
 *
 * @return ms_task_item_t*, the pointer of the previous item of
 *         set operation
 */
static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
{
    return ms_get_next_get_item(c);
} /* ms_get_random_overwrite_item */

/**
 * According to the proportion of operations(get or set), select
 * an operation to do.
 *
 * @param c, pointer of the concurrency
 * @param task, pointer of current task in the concurrency
 */
static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
{
  double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
  double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;

  /* update cycle operation number if necessary */
  if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
  {
    task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
    task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
  }

  /**
   *  According to operation distribution to choose doing which
   *  operation. If it can't set new object to sever, just change
   *  to do get operation.
   */
  if ((set_prop > PROP_ERROR)
      && ((double)task->get_opt * set_prop >= (double)task->set_opt
          * get_prop))
  {
    task->cmd= CMD_SET;
    task->item= ms_get_next_set_item(c);
  }
  else
  {
    task->cmd= CMD_GET;
    task->item= ms_get_next_get_item(c);
  }
} /* ms_select_opt */


/**
 * used to judge whether the number of get operations done is
 * more than expected number of get operations to do right now.
 *
 * @param task, pointer of current task in the concurrency
 *
 * @return bool, if get too fast, return true, else return false
 */
static bool ms_is_get_too_fast(ms_task_t *task)
{
  double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
  double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;

  /* no get operation */
  if (get_prop < PROP_ERROR)
  {
    return false;
  }

  int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
                    * task->cycle_undo_get;

  if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
      && (task->cycle_undo_set > max_undo_set))
  {
    return true;
  }

  return false;
} /* ms_is_get_too_fast */


/**
 * used to judge whether the number of set operations done is
 * more than expected number of set operations to do right now.
 *
 * @param task, pointer of current task in the concurrency
 *
 * @return bool, if set too fast, return true, else return false
 */
static bool ms_is_set_too_fast(ms_task_t *task)
{
  double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
  double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;

  /* no set operation */
  if (set_prop < PROP_ERROR)
  {
    return false;
  }

  /* If it does set operation too fast, skip some */
  int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
                          * (double)task->cycle_undo_set);

  if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
      && (task->cycle_undo_get > max_undo_get))
  {
    return true;
  }

  return false;
} /* ms_is_set_too_fast */


/**
 * kick out the old item in the window, and add a new item to
 * overwrite the old item. When we don't want to do overwrite
 * object, and the current item to do set operation is an old
 * item, we could kick out the old item and add a new item. Then
 * we can ensure we set new object every time.
 *
 * @param item, pointer of task item which includes the object
 *            information
 */
static void ms_kick_out_item(ms_task_item_t *item)
{
  /* allocate a new item */
  item->key_prefix= ms_get_key_prefix();

  item->key_suffix_offset++;
  item->value_offset= INVALID_OFFSET;       /* new item use invalid value offset */
  item->client_time= 0;
} /* ms_kick_out_item */


/**
 *  used to judge whether we need overwrite object based on the
 *  options user specified
 *
 * @param task, pointer of current task in the concurrency
 *
 * @return bool, if need overwrite, return true, else return
 *         false
 */
static bool ms_need_overwrite_item(ms_task_t *task)
{
  ms_task_item_t *item= task->item;

  assert(item != NULL);
  assert(task->cmd == CMD_SET);

  /**
   *  according to data overwrite percent to determine if do data
   *  overwrite.
   */
  if (task->overwrite_set < (double)task->set_opt
      * ms_setting.overwrite_percent)
  {
    return true;
  }

  return false;
} /* ms_need_overwirte_item */


/**
 * used to adjust operation. the function must be called after
 * select operation. the function change get operation to set
 * operation, or set operation to get operation based on the
 * current case.
 *
 * @param c, pointer of the concurrency
 * @param task, pointer of current task in the concurrency
 *
 * @return bool, if success, return true, else return false
 */
static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
{
  ms_task_item_t *item= task->item;

  assert(item != NULL);

  if (task->cmd == CMD_SET)
  {
    /* If did set operation too fast, skip some */
    if (ms_is_set_too_fast(task))
    {
      /* get the item instead */
      if (item->value_offset != INVALID_OFFSET)
      {
        task->cmd= CMD_GET;
        return true;
      }
    }

    /* If the current item is not a new item, kick it out */
    if (item->value_offset != INVALID_OFFSET)
    {
      if (ms_need_overwrite_item(task))
      {
        /* overwrite */
        task->overwrite_set++;
      }
      else
      {
        /* kick out the current item to do set operation */
        ms_kick_out_item(item);
      }
    }
    else            /* it's a new item */
    {
      /* need overwrite */
      if (ms_need_overwrite_item(task))
      {
        /**
         *  overwrite not use the item with current set cursor, revert
         *  set cursor.
         */
        c->set_cursor--;

        item= ms_get_random_overwrite_item(c);
        if (item->value_offset != INVALID_OFFSET)
        {
          task->item= item;
          task->overwrite_set++;
        }
        else                /* item is a new item */
        {
          /* select the item to run, and cancel overwrite */
          task->item= item;
        }
      }
    }
    task->cmd= CMD_SET;
    return true;
  }
  else
  {
    if (item->value_offset == INVALID_OFFSET)
    {
      task->cmd= CMD_SET;
      return true;
    }

    /**
     *  If It does get operation too fast, it will change the
     *  operation to set.
     */
    if (ms_is_get_too_fast(task))
    {
      /* don't kick out the first item in the window */
      if (! ms_is_set_too_fast(task))
      {
        ms_kick_out_item(item);
        task->cmd= CMD_SET;
        return true;
      }
      else
      {
        return false;
      }
    }

    assert(item->value_offset != INVALID_OFFSET);

    task->cmd= CMD_GET;
    return true;
  }
} /* ms_adjust_opt */


/**
 * used to initialize the task which need verify data.
 *
 * @param task, pointer of current task in the concurrency
 */
static void ms_task_data_verify_init(ms_task_t *task)
{
  ms_task_item_t *item= task->item;

  assert(item != NULL);
  assert(task->cmd == CMD_GET);

  /**
   *  according to data verification percent to determine if do
   *  data verification.
   */
  if (task->verified_get < (double)task->get_opt
      * ms_setting.verify_percent)
  {
    /**
     *  currently it doesn't do verify, just increase the counter,
     *  and do verification next proper get command
     */
    if ((task->item->value_offset != INVALID_OFFSET)
        && (item->exp_time == 0))
    {
      task->verify= true;
      task->finish_verify= false;
      task->verified_get++;
    }
  }
} /* ms_task_data_verify_init */


/**
 * used to initialize the task which need verify expire time.
 *
 * @param task, pointer of current task in the concurrency
 */
static void ms_task_expire_verify_init(ms_task_t *task)
{
  ms_task_item_t *item= task->item;

  assert(item != NULL);
  assert(task->cmd == CMD_GET);
  assert(item->exp_time > 0);

  task->verify= true;
  task->finish_verify= false;
} /* ms_task_expire_verify_init */


/**
 * used to get one task, the function initializes the task
 * structure.
 *
 * @param c, pointer of the concurrency
 * @param warmup, whether it need warmup
 *
 * @return ms_task_t*, pointer of current task in the
 *         concurrency
 */
static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
{
  ms_task_t *task= &c->curr_task;

  while (1)
  {
    task->verify= false;
    task->finish_verify= true;
    task->get_miss= true;

    if (warmup)
    {
      task->cmd= CMD_SET;
      task->item= ms_get_next_set_item(c);

      return task;
    }

    /* according to operation distribution to choose doing which operation */
    ms_select_opt(c, task);

    if (! ms_adjust_opt(c, task))
    {
      continue;
    }

    if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
    {
      ms_task_data_verify_init(task);
    }

    if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
        && (task->item->exp_time > 0))
    {
      ms_task_expire_verify_init(task);
    }

    break;
  }

  /**
   *  Only update get and delete counter, set counter will be
   *  updated after set operation successes.
   */
  if (task->cmd == CMD_GET)
  {
    task->get_opt++;
    task->cycle_undo_get--;
  }

  return task;
} /* ms_get_task */


/**
 * send a signal to the main monitor thread
 *
 * @param sync_lock, pointer of the lock
 */
static void ms_send_signal(ms_sync_lock_t *sync_lock)
{
  pthread_mutex_lock(&sync_lock->lock);
  sync_lock->count++;
  pthread_cond_signal(&sync_lock->cond);
  pthread_mutex_unlock(&sync_lock->lock);
} /* ms_send_signal */


/**
 * If user only want to do get operation, but there is no object
 * in server , so we use this function to warmup the server, and
 * set some objects to server. It runs at the beginning of task.
 *
 * @param c, pointer of the concurrency
 */
static void ms_warmup_server(ms_conn_t *c)
{
  ms_task_t *task;
  ms_task_item_t *item;

  /**
   * Extra one loop to get the last command returned state.
   * Normally it gets the previous command returned state.
   */
  if ((c->remain_warmup_num >= 0)
      && (c->remain_warmup_num != c->warmup_num))
  {
    item= ms_get_cur_opt_item(c);
    /* only update the set command result state for data verification */
    if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
    {
      item->value_offset= item->key_suffix_offset;
      /* set success, update counter */
      c->set_cursor++;
    }
    else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
    {
      printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
    }
  }

  /* the last time don't run a task */
  if (c->remain_warmup_num-- > 0)
  {
    /* operate next task item */
    task= ms_get_task(c, true);
    item= task->item;
    ms_mcd_set(c, item);
  }

  /**
   *  finish warming up server, wait all connects initialize
   *  complete. Then all connects can start do task at the same
   *  time.
   */
  if (c->remain_warmup_num == -1)
  {
    ms_send_signal(&ms_global.warmup_lock);
    c->remain_warmup_num--;       /* never run the if branch */
  }
} /* ms_warmup_server */


/**
 * dispatch single get and set task
 *
 * @param c, pointer of the concurrency
 */
static void ms_single_getset_task_sch(ms_conn_t *c)
{
  ms_task_t *task;
  ms_task_item_t *item;

  /* the last time don't run a task */
  if (c->remain_exec_num-- > 0)
  {
    task= ms_get_task(c, false);
    item= task->item;
    if (task->cmd == CMD_SET)
    {
      ms_mcd_set(c, item);
    }
    else if (task->cmd == CMD_GET)
    {
      assert(task->cmd == CMD_GET);
      ms_mcd_get(c, item);
    }
  }
} /* ms_single_getset_task_sch */


/**
 * dispatch multi-get and set task
 *
 * @param c, pointer of the concurrency
 */
static void ms_multi_getset_task_sch(ms_conn_t *c)
{
  ms_task_t *task;
  ms_mlget_task_item_t *mlget_item;

  while (1)
  {
    if (c->remain_exec_num-- > 0)
    {
      task= ms_get_task(c, false);
      if (task->cmd == CMD_SET)             /* just do it */
      {
        ms_mcd_set(c, task->item);
        break;
      }
      else
      {
        assert(task->cmd == CMD_GET);
        mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
        mlget_item->item= task->item;
        mlget_item->verify= task->verify;
        mlget_item->finish_verify= task->finish_verify;
        mlget_item->get_miss= task->get_miss;
        c->mlget_task.mlget_num++;

        /* enough multi-get task items can be done */
        if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
            || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
        {
          ms_mcd_mlget(c);
          break;
        }
      }
    }
    else
    {
      if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
      {
        ms_mcd_mlget(c);
      }
      break;
    }
  }
} /* ms_multi_getset_task_sch */


/**
 * calculate the difference value of two time points
 *
 * @param start_time, the start time
 * @param end_time, the end time
 *
 * @return uint64_t, the difference value between start_time and end_time in us
 */
int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
{
  int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
  int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;

  assert(endtime >= starttime);

  return endtime - starttime;
} /* ms_time_diff */


/**
 * after get the response from server for multi-get, the
 * function update the state of the task and do data verify if
 * necessary.
 *
 * @param c, pointer of the concurrency
 */
static void ms_update_multi_get_result(ms_conn_t *c)
{
  ms_mlget_task_item_t *mlget_item;
  ms_task_item_t *item;
  char *orignval= NULL;
  char *orignkey= NULL;

  if (c == NULL)
  {
    return;
  }
  assert(c != NULL);

  for (int i= 0; i < c->mlget_task.mlget_num; i++)
  {
    mlget_item= &c->mlget_task.mlget_item[i];
    item= mlget_item->item;
    orignval= &ms_setting.char_block[item->value_offset];
    orignkey= &ms_setting.char_block[item->key_suffix_offset];

    /* update get miss counter */
    if (mlget_item->get_miss)
    {
      atomic_add_size(&ms_stats.get_misses, 1);
    }

    /* get nothing from server for this task item */
    if (mlget_item->verify && ! mlget_item->finish_verify)
    {
      /* verify expire time if necessary */
      if (item->exp_time > 0)
      {
        struct timeval curr_time;
        gettimeofday(&curr_time, NULL);

        /* object doesn't expire but can't get it now */
        if (curr_time.tv_sec - item->client_time
            < item->exp_time - EXPIRE_TIME_ERROR)
        {
          atomic_add_size(&ms_stats.unexp_unget, 1);

          if (ms_setting.verbose)
          {
            char set_time[64];
            char cur_time[64];
            strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
                     localtime(&item->client_time));
            strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
                     localtime(&curr_time.tv_sec));
            fprintf(stderr,
                    "\n\t<%d expire time verification failed, object "
                    "doesn't expire but can't get it now\n"
                    "\tkey len: %d\n"
                    "\tkey: %" PRIx64 " %.*s\n"
                    "\tset time: %s current time: %s "
                    "diff time: %d expire time: %d\n"
                    "\texpected data len: %d\n"
                    "\texpected data: %.*s\n"
                    "\treceived data: \n",
                    c->sfd,
                    item->key_size,
                    item->key_prefix,
                    item->key_size - (int)KEY_PREFIX_SIZE,
                    orignkey,
                    set_time,
                    cur_time,
                    (int)(curr_time.tv_sec - item->client_time),
                    item->exp_time,
                    item->value_size,
                    item->value_size,
                    orignval);
            fflush(stderr);
          }
        }
      }
      else
      {
        atomic_add_size(&ms_stats.vef_miss, 1);

        if (ms_setting.verbose)
        {
          fprintf(stderr, "\n<%d data verification failed\n"
                          "\tkey len: %d\n"
                          "\tkey: %" PRIx64 " %.*s\n"
                          "\texpected data len: %d\n"
                          "\texpected data: %.*s\n"
                          "\treceived data: \n",
                  c->sfd, item->key_size, item->key_prefix,
                  item->key_size - (int)KEY_PREFIX_SIZE,
                  orignkey, item->value_size, item->value_size, orignval);
          fflush(stderr);
        }
      }
    }
  }
  c->mlget_task.mlget_num= 0;
  c->mlget_task.value_index= INVALID_OFFSET;
} /* ms_update_multi_get_result */


/**
 * after get the response from server for single get, the
 * function update the state of the task and do data verify if
 * necessary.
 *
 * @param c, pointer of the concurrency
 * @param item, pointer of task item which includes the object
 *            information
 */
static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
{
  char *orignval= NULL;
  char *orignkey= NULL;

  if ((c == NULL) || (item == NULL))
  {
    return;
  }
  assert(c != NULL);
  assert(item != NULL);

  orignval= &ms_setting.char_block[item->value_offset];
  orignkey= &ms_setting.char_block[item->key_suffix_offset];

  /* update get miss counter */
  if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
  {
    atomic_add_size(&ms_stats.get_misses, 1);
  }

  /* get nothing from server for this task item */
  if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
      && ! c->curr_task.finish_verify)
  {
    /* verify expire time if necessary */
    if (item->exp_time > 0)
    {
      struct timeval curr_time;
      gettimeofday(&curr_time, NULL);

      /* object doesn't expire but can't get it now */
      if (curr_time.tv_sec - item->client_time
          < item->exp_time - EXPIRE_TIME_ERROR)
      {
        atomic_add_size(&ms_stats.unexp_unget, 1);

        if (ms_setting.verbose)
        {
          char set_time[64];
          char cur_time[64];
          strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
                   localtime(&item->client_time));
          strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
                   localtime(&curr_time.tv_sec));
          fprintf(stderr,
                  "\n\t<%d expire time verification failed, object "
                  "doesn't expire but can't get it now\n"
                  "\tkey len: %d\n"
                  "\tkey: %" PRIx64 " %.*s\n"
                  "\tset time: %s current time: %s "
                  "diff time: %d expire time: %d\n"
                  "\texpected data len: %d\n"
                  "\texpected data: %.*s\n"
                  "\treceived data: \n",
                  c->sfd,
                  item->key_size,
                  item->key_prefix,
                  item->key_size - (int)KEY_PREFIX_SIZE,
                  orignkey,
                  set_time,
                  cur_time,
                  (int)(curr_time.tv_sec - item->client_time),
                  item->exp_time,
                  item->value_size,
                  item->value_size,
                  orignval);
          fflush(stderr);
        }
      }
    }
    else
    {
      atomic_add_size(&ms_stats.vef_miss, 1);

      if (ms_setting.verbose)
      {
        fprintf(stderr, "\n<%d data verification failed\n"
                        "\tkey len: %d\n"
                        "\tkey: %" PRIx64 " %.*s\n"
                        "\texpected data len: %d\n"
                        "\texpected data: %.*s\n"
                        "\treceived data: \n",
                c->sfd, item->key_size, item->key_prefix,
                item->key_size - (int)KEY_PREFIX_SIZE,
                orignkey, item->value_size, item->value_size, orignval);
        fflush(stderr);
      }
    }
  }
} /* ms_update_single_get_result */


/**
 * after get the response from server for set the function
 * update the state of the task and do data verify if necessary.
 *
 * @param c, pointer of the concurrency
 * @param item, pointer of task item which includes the object
 *            information
 */
static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
{
  if ((c == NULL) || (item == NULL))
  {
    return;
  }
  assert(c != NULL);
  assert(item != NULL);

  if (c->precmd.cmd == CMD_SET)
  {
    switch (c->precmd.retstat)
    {
    case MCD_STORED:
      if (item->value_offset == INVALID_OFFSET)
      {
        /* first set with the same offset of key suffix */
        item->value_offset= item->key_suffix_offset;
      }
      else
      {
        /* not first set, just increase the value offset */
        item->value_offset+= 1;
      }

      /* set successes, update counter */
      c->set_cursor++;
      c->curr_task.set_opt++;
      c->curr_task.cycle_undo_set--;
      break;

    case MCD_SERVER_ERROR:
    default:
      break;
    } /* switch */
  }
} /* ms_update_set_result */


/**
 * update the response time result
 *
 * @param c, pointer of the concurrency
 */
static void ms_update_stat_result(ms_conn_t *c)
{
  bool get_miss= false;

  if (c == NULL)
  {
    return;
  }
  assert(c != NULL);

  gettimeofday(&c->end_time, NULL);
  uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);

  pthread_mutex_lock(&ms_statistic.stat_mutex);

  switch (c->precmd.cmd)
  {
  case CMD_SET:
    ms_record_event(&ms_statistic.set_stat, time_diff, false);
    break;

  case CMD_GET:
    if (c->curr_task.get_miss)
    {
      get_miss= true;
    }
    ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
    break;

  default:
    break;
  } /* switch */

  ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
  pthread_mutex_unlock(&ms_statistic.stat_mutex);
} /* ms_update_stat_result */


/**
 * after get response from server for the current operation, and
 * before doing the next operation, update the state of the
 * current operation.
 *
 * @param c, pointer of the concurrency
 */
static void ms_update_task_result(ms_conn_t *c)
{
  ms_task_item_t *item;

  if (c == NULL)
  {
    return;
  }
  assert(c != NULL);

  item= ms_get_cur_opt_item(c);
  if (item == NULL)
  {
    return;
  }
  assert(item != NULL);

  ms_update_set_result(c, item);

  if ((ms_setting.stat_freq > 0)
      && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
  {
    ms_update_stat_result(c);
  }

  /* update multi-get task item */
  if (((ms_setting.mult_key_num > 1)
       && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
      || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
  {
    ms_update_multi_get_result(c);
  }
  else
  {
    ms_update_single_get_result(c, item);
  }
} /* ms_update_task_result */


/**
 * run get and set operation
 *
 * @param c, pointer of the concurrency
 *
 * @return int, if success, return EXIT_SUCCESS, else return -1
 */
static int ms_run_getset_task(ms_conn_t *c)
{
  /**
   * extra one loop to get the last command return state. get the
   * last command return state.
   */
  if ((c->remain_exec_num >= 0)
      && (c->remain_exec_num != c->exec_num))
  {
    ms_update_task_result(c);
  }

  /* multi-get */
  if (ms_setting.mult_key_num > 1)
  {
    /* operate next task item */
    ms_multi_getset_task_sch(c);
  }
  else
  {
    /* operate next task item */
    ms_single_getset_task_sch(c);
  }

  /* no task to do, exit */
  if ((c->remain_exec_num == -1) || ms_global.time_out)
  {
    return -1;
  }

  return EXIT_SUCCESS;
} /* ms_run_getset_task */


/**
 * the state machine call the function to execute task.
 *
 * @param c, pointer of the concurrency
 *
 * @return int, if success, return EXIT_SUCCESS, else return -1
 */
int ms_exec_task(struct conn *c)
{
  if (! ms_global.finish_warmup)
  {
    ms_warmup_server(c);
  }
  else
  {
    if (ms_run_getset_task(c) != 0)
    {
      return -1;
    }
  }

  return EXIT_SUCCESS;
} /* ms_exec_task */