The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
// Copyright 2010 Google Inc.
// Modified by Yuji Kaneda
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//      http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

// Implements szl table structure for storing approximate quantiles
// in a table.  The implementation is based on the following paper:
//
// [MP80]  Munro & Paterson, "Selection and Sorting with Limited Storage",
//         Theoretical Computer Science, Vol 12, p 315-323, 1980.
//    More info available at
//    http://scholar.google.com/scholar?q=munro+paterson
//
// The above paper is not available online. You could read a detailed
// description of the same algorithm here:
//
// [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
//         Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM
//         SIGMOD, Vol 27, No 2, p 426-435, June 1998.
//
// Also see  the following paper by Greenwald and Khanna, which contains
// another implementation that is thought to be slower:
// M. Greenwald and S. Khanna. Space-efficient online computation of
// quantile summerles. SIGMOD'01, pp. 58-66, Santa Barbara, CA, May
// 2001.
//
// Brief description of Munro-Paterson algorithm
// =============================================
// Imagine a binary tree of buffers. Every buffer has size "k". Now imagine
// populating the leaves of the tree (from left to right) with the input
// stream.  Munro-Paterson is very simple: As soon as both children of a
// buffer are full, we invoke a Collapse() operation.  What is a Collapse()?
// Basically, we take two buffers of size "k" each, sort them together and pick
// every other element in the sorted sequence. That's it!
//
// When the input stream runs dry, we would have populated some "b" buffers at
// various levels by following the Munro-Paterson algorithm.  How do we compute
// 100 quantiles from these "b" buffers? Assign a "weight" of 2^i to every
// element of a buffer at level i (leaves are at level 0).  Now sort all the
// elements (in various buffers) together. Then compute the "weighted 100
// splitters" of this sequence. Please see the code below or the paper above for
// furthe details.

#include <math.h>
#include <algorithm>

#include "public/porting.h"
// #include "public/logging.h"

namespace SZaru {
// We compute the "smallest possible k" satisfying two inequalities:
//    1)   (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * MAX_TOT_ELEMS
//    2)   k * (2 ^ (b - 1)) \geq MAX_TOT_ELEMS
//
// For an explanation of these inequalities, please read the Munro-Paterson or
// the Manku-Rajagopalan-Linday papers.
template<typename Key>
int64 QuantileEstimatorImpl<Key>::ComputeK() {
  const double epsilon = 1.0 / (num_quantiles_ - 1);
  int b = 2;
  while ((b - 2) * (0x1LL << (b - 2)) + 0.5 <= epsilon * MAX_TOT_ELEMS) {
    ++b;
  }
  const int64 k = MAX_TOT_ELEMS / (0x1LL << (b - 1));
  // VLOG(2) << StringPrintf(
  // "ComputeK(): returning k = %lld for num_quantiles_ = %d (epsilon = %f)",
  // k, num_quantiles_, epsilon);
  return k;
}


// If buffer_[level] already exists, do nothing.
// Else create a new buffer_[level] that is empty.
template<typename Key>
void QuantileEstimatorImpl<Key>::EnsureBuffer(const int level) {
  // int extra_memory = 0;
  if (buffer_.size() < static_cast<size_t>(level) + 1) {
    // size_t old_capacity = buffer_.capacity();
    buffer_.resize(level + 1, NULL);
    // extra_memory +=
    // (buffer_.capacity() - old_capacity) * sizeof(vector<string>*);
  }
  if (buffer_[level] == NULL) {
    // VLOG(2) << StringPrintf("Creating buffer_[%d] ...", level);
    buffer_[level] = new vector<Key>();
    // extra_memory += sizeof(*(buffer_[level]));
  }
  // return extra_memory;
}

// For Collapse(), both "a" and "b" should be vectors of length "k_".
// Conceptually, Collapse() combines "a" and "b" into a single vector, sorts
// this vector and then chooses every other member of this vector.
// The result is stored in "output".
//
// The return value is the change is memory requirements. What causes
// increase/decrease in memory?  (i) "output" is populated and (ii) just before
// returning, both "a" and "b" are cleared.
template<typename Key>
void QuantileEstimatorImpl<Key>::Collapse(vector<Key> *const a,
                                            vector<Key> *const b,
                                            vector<Key> *const output) {
  // CHECK_EQ(a->size(), k_);
  // CHECK_EQ(b->size(), k_);
  // CHECK_EQ(output->size(), 0);

  // int memory_delta = 0;
  int index_a = 0;
  int index_b = 0;
  int count = 0;
  const Key* smaller;

  while (index_a < k_ || index_b < k_) {
    if (index_a >= k_ || (index_b < k_ && a->at(index_a) >= b->at(index_b))) {
      smaller = &(b->at(index_b++));
    } else {
      smaller = &(a->at(index_a++));
    }

    if ((count++ % 2) == 0) {  // remember "smallest"
      output->push_back(*smaller);
    } else {  // forget "smallest"
      // memory_delta -= smaller->size();
    }
  }

  // Account for the memory taken by output and a & b.
  // memory_delta += (output->capacity() - a->capacity() - b->capacity())
  // * sizeof(string);

  // Make sure we completely deallocate the memory taken by a & b.
  {
    vector<Key> tmp;
    a->swap(tmp);
  }
  {
    vector<Key> tmp;
    b->swap(tmp);
  }

  // return memory_delta;
}

// Algorithm for RecursiveCollapse():
//
// 1. Let "merged" denote the output of Collapse("buf", "buffer_[level]")
//
// 2. If "buffer_[level + 1]" is full (i.e., already exists)
//      Collapse("merged", "buffer_[level + 1]")
//    else
//      "buffer_[level + 1]"  <-- "merged"
//
// The return value is the difference in memory usage.
template<typename Key>
void QuantileEstimatorImpl<Key>::RecursiveCollapse(vector<Key> *buf,
                                                     const int level) {
  // VLOG(2) << StringPrintf("RecursiveCollapse() invoked with level = %d", level);

  // CHECK_EQ(buf->size(), k_);
  // CHECK_GE(level, 1);
  // CHECK_GE(buffer_.size(), level + 1);
  // CHECK(buffer_[level] != NULL);
  // CHECK_EQ(buffer_[level]->size(), k_);

  // int memory_delta = EnsureBuffer(level + 1);
  EnsureBuffer(level + 1);

  vector<Key> *merged;
  if (buffer_[level + 1]->size() == 0) {  // buffer_[level + 1] is empty
    merged = buffer_[level + 1];
  } else {                                // buffer_[level + 1] is full
    merged = new vector<Key>;
    // merged is going to be filled with k_ elements we might as well
    // reserve the space now.
    merged->reserve(k_);
  }
  // We account for the memory taken by merged even if it's a
  // temporary vector, since in this case it will be passed to
  // RecursiveCollapse, which will substract the memory it takes.
  // memory_delta += Collapse(buffer_[level], buf, merged);
  Collapse(buffer_[level], buf, merged);
  if (buffer_[level + 1] == merged) {
    // return memory_delta;
    return;
  }
  //memory_delta += RecursiveCollapse(merged, level + 1);
  RecursiveCollapse(merged, level + 1);
  delete merged;
  // return memory_delta;
}

// Goal: Add a new element ("elem" is a SzlEncoded value).
// Return value: "diff in memory usage".
//
// Algorithm:
//   if (buffer_[0] is not full (i.e., has less than k_ elements)) {
//       insert "elem" into buffer_[0]
//   } else if buffer_[1] is not full (i.e., has less than k_ elements) {
//       insert "elem" into "buffer_[1]"
//   } else {
//       Sort buffer_[0] and buffer_[1]
//       RecursiveCollapse(buffer_[0], buffer_[1])
//       Insert into buffer_[0]
//   }
template<typename Key>
void QuantileEstimatorImpl<Key>::AddElem(const Key& elem) {
  // int memory_delta = 0;

  // Update min_ and max_.
  if ((tot_elems_ == 0) || (elem < min_)) {
    // memory_delta += elem.size() - min_.size();
    min_ = elem;
    // VLOG(3) << "AddElem(" << elem << "): min_ updated to " << min_;
  }
  if ((tot_elems_ == 0) || (max_ < elem)) {
    // memory_delta += elem.size() - max_.size();
    max_ = elem;
    // VLOG(3) << "AddElem(" << elem << "): max_ updated to " << max_;
  }

  // First, test if both buffer_[0] and buffer_[1] are full.
  // This is equivalent to testing
  //    (tot_elems_ > 0)    &&   (tot_elems_ % (2 * k_) == 0).
  // If so, sort buffer_[0] and buffer_[1] and invoke RecursiveCollapse().
  // When RecursiveCollapse() returns, both buffer_[0] and buffer_[1] would be
  // "empty" (i.e., with zero elements in them).
  if ((tot_elems_ > 0) && (tot_elems_ % (2 * k_) == 0)) {
    // CHECK(buffer_[0] != NULL);
    // CHECK(buffer_[1] != NULL);
    // CHECK_EQ(buffer_[0]->size(), k_);
    // CHECK_EQ(buffer_[1]->size(), k_);
    // VLOG(2) << "AddElem(" << elem << "): Sorting buffer_[0] ...";
    sort(buffer_[0]->begin(), buffer_[0]->end());
    // VLOG(2) << "AddElem(" << elem << "): Sorting buffer_[1] ...";
    sort(buffer_[1]->begin(), buffer_[1]->end());
    const int level = 1;
    // RecursiveCollapse will start with Collapse(buffer_[0], buffer_[level]).
    // memory_delta += RecursiveCollapse(buffer_[0], level);
    RecursiveCollapse(buffer_[0], level);
  }

  // At this point, we are sure that either buffer_[0] or buffer_[1] can
  // accommodate "elem".
  //memory_delta += EnsureBuffer(0);
  //memory_delta += EnsureBuffer(1);
  EnsureBuffer(0);
  EnsureBuffer(1);
  // CHECK((buffer_[0]->size(), k_) || (buffer_[1]->size(), k_));
  int index = (buffer_[0]->size() < k_) ? 0 : 1;
  // VLOG(3) << "AddElem(" << elem << "): Inserting into buffer_[" << index << "]";
  // int old_capacity = buffer_[index]->capacity();
  buffer_[index]->push_back(elem);
  // memory_delta += elem.size()
  // + (buffer_[index]->capacity() - old_capacity) * sizeof(string);
  ++tot_elems_;
  // VLOG(3) << StringPrintf("AddElem(%s): returning with tot_elems_ = %lld",
  // elem.c_str(), tot_elems_);
  // return memory_delta;
}

// Imported from src/emitters/szlcomputequantiles.cc by Yuji Kaneda.
// 
// Please read the short description of the Munro-Paterson algorithm at the
// beginning of sawquantile.cc
//
// Basically, our goal is to compute quantiles from a bunch of buffers.
// We assign a "weight" of 2^i to every element of a buffer at
// level i in the binary tree (leaves are at level 0).  Now sort all the
// elements (in various buffers) together. Then compute the "weighted 100
// splitters" of this sequence.
template<typename Key>
void ComputeQuantiles(const vector<vector<Key>* >& buffer,
                      const Key& min_Key, const Key& max_Key,
                      size_t num_quantiles, int64 tot_elems,
                      vector<Key>* quantiles) {
  // CHECK(max_Key >= min_Key);
  // CHECK_GE(buffer.size(), 1);

  quantiles->clear();

  // VLOG(2) << "ComputeQuantiles(): min=" << min_Key;
  quantiles->push_back(min_Key);

  // buffer[0] and buffer[1] may be unsorted; all others are already sorted.
  if (buffer[0] == NULL) {
    // VLOG(2) << "ComputeQuantiles(): Not sorting buffer[0] (it is NULL).";
  } else {
    // VLOG(2) << "ComputeQuantiles(): Sorting buffer[0] ...";
    sort(buffer[0]->begin(), buffer[0]->end());
  }
  if ((buffer.size() < 2) || (buffer[1] == NULL)) {
    // VLOG(2) << "ComputeQuantiles(): Not sorting buffer[1] (it doesn't exist).";
  } else {
    // VLOG(2) << "ComputeQuantiles(): Sorting buffer[1] ...";
    sort(buffer[1]->begin(), buffer[1]->end());
  }

  // Simple sanity check: the weighted sum of all buffers should equal
  // "tot_elems".  The weight of buffer[i] is 2^(i-1) for i >= 2. Otherwise,
  // the weight is 1.
  // int64 t = 0;
  // for (int j = 0; j < buffer.size(); ++j) {
  // const int64 weight = (j <= 1) ? 1LL : (0x1LL << (j - 1));
  // t += (buffer[j] == NULL) ? 0 : (buffer[j]->size() * weight);
  // }
  // CHECK_EQ(t, tot_elems);

  vector<size_t> index(buffer.size(), 0);

  // Our goal is to identify the weighted "num_quantiles - 2" splitters in the
  // sorted sequence of all buffers taken together.
  // "S" will store the cumulative weighted sum so far.
  int64 S = 0;
  for (size_t i = 1; i <= num_quantiles - 2; ++i) {
    // Target "S" for the next splitter (next quantile).
    const int64 target_S
      = static_cast<int64>(ceil(i * (tot_elems / (num_quantiles - 1.0))));
    // CHECK_LE(target_S, tot_elems);

    while (true) {
      // Identify the smallest element among buffer_[0][index[0]],
      // buffer_[1][index[1]],  buffer_[2][index[2]], ...
      Key smallest = max_Key;
      int min_buffer_id = -1;
      for (size_t j = 0; j < buffer.size(); ++j) {
        if ((buffer[j] != NULL) && (index[j] < buffer[j]->size())) {
          if (!(smallest < buffer[j]->at(index[j]))) {
            smallest = buffer[j]->at(index[j]);
            min_buffer_id = j;
          }
        }
      }
      // CHECK_GE(min_buffer_id, 0);

      // Now increment "S" by the weight associated with "min_buffer_id".
      //
      // Note: The "weight" of elements in buffer[0] and buffer[1] is 1 (these
      //       are leaf nodes in the Munro-Paterson "tree of buffers".
      //       The weight of elements in buffer[i] is 2^(i-1) for i >= 2.
      int64 S_incr = (min_buffer_id <= 1) ? 1 : (0x1LL << (min_buffer_id - 1));

      // If we have met/exceeded "target_S", we have found the next quantile.
      // Then break the loop. Otherwise, just update index[min_buffer_id] and S
      // appropriately.
      if (S + S_incr >= target_S) {
        // CHECK(buffer[min_buffer_id]->at(index[min_buffer_id]) == smallest);
        quantiles->push_back(smallest);
        break;
      } else {
        ++index[min_buffer_id];
        S += S_incr;
      }
    }
  }

  // VLOG(2) << KeyPrintf("ComputeQuantiles(): max=%s", max_Key.c_str());
  quantiles->push_back(max_Key);
}

template<typename Key>
void QuantileEstimatorImpl<Key>::Estimate(vector<Key>& output) {
  output.clear();
  if (tot_elems_ == 0) {
    output.push_back(Key());
    return;
  }
  // We display the quantiles, not the raw output.
  ComputeQuantiles<Key>(buffer_, min_, max_, num_quantiles_, tot_elems_, &output);
}

}