The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Protocol::Memcached;
# ABSTRACT: Support for the memcached binary protocol
use strict;
use warnings FATAL => 'all';

our $VERSION = '0.004';

=head1 NAME

Protocol::Memcached - memcached binary protocol implementation

=head1 VERSION

version 0.004


 package Subclass::Of::Protocol::Memcached;
 use parent qw(Protocol::Memcached);

 sub write { $_[0]->{socket}->write($_[1]) }

 package main;
 my $mc = Subclass::Of::Protocol::Memcached->new;
 my ($k, $v) = ('hello' => 'world');
 	$k => $v,
	on_complete	=> sub {
			on_complete	=> sub { my $v = shift; print "Had $v\n" },
			on_error	=> sub { die "Failed because of @_\n" }


Bare minimum protocol support for memcached. This class is transport-agnostic and as
such is not a working implementation - you need to subclass and provide your own ->write

If you're using this class, you're most likely doing it wrong - head over to the
L</SEE ALSO> section to rectify this.

L<Protocol::Memcached::Client> is probably the module you want if you are going to subclass


Provide the following method:

=head2 write

This will be called with the data to be written, and zero or more named parameters:

=over 4

=item * on_flush - coderef to execute when the data has left the building, if this is
not supported by the transport layer then the subclass should call the coderef
before returning


and when you have data, call L</on_read>.


# Modules

use Scalar::Util ();
use Digest::MD5 ();
use List::Util qw(sum);
use List::UtilsBy qw(nsort_by);
use POSIX qw(floor);

# Constants

use constant {

# Mapping from numeric opcode value in packet header to method
my %OPCODE_BY_ID = (
	0x00 => 'Get',
	0x01 => 'Set',
	0x02 => 'Add',
	0x03 => 'Replace',
	0x04 => 'Delete',
	0x05 => 'Increment',
	0x06 => 'Decrement',
	0x07 => 'Quit',
	0x08 => 'Flush',
	0x09 => 'GetQ',
	0x0A => 'No-op',
	0x0B => 'Version',
	0x0C => 'GetK',
	0x0D => 'GetKQ',
	0x0E => 'Append',
	0x0F => 'Prepend',
	0x10 => 'Stat',
	0x11 => 'SetQ',
	0x12 => 'AddQ',
	0x13 => 'ReplaceQ',
	0x14 => 'DeleteQ',
	0x15 => 'IncrementQ',
	0x16 => 'DecrementQ',
	0x17 => 'QuitQ',
	0x18 => 'FlushQ',
	0x19 => 'AppendQ',
	0x1A => 'PrependQ',
# Map from method name to opcode byte

# Status values from response
	0x0000 => 'No error',
	0x0001 => 'Key not found',
	0x0002 => 'Key exists',
	0x0003 => 'Value too large',
	0x0004 => 'Invalid arguments',
	0x0005 => 'Item not stored',
	0x0006 => 'Incr/Decr on non-numeric value',
	0x0081 => 'Unknown command',
	0x0082 => 'Out of memory',

=head1 METHODS


=head2 new

Bare minimum constructor - subclass may need to inherit from something with a
non-trivial constructor, so we put all our init code in L</init>.


sub new {
	my $class = shift;
	my $self = bless { }, $class;
	return $self;

=head2 sap

Helper method for weak callbacks.


sub sap { my ($self, $sub) = @_; Scalar::Util::weaken $self; return sub { $self->$sub(@_); }; }

=head2 get

Retrieves a value from memcached.

Takes a key and zero or more optional named parameters:

=over 4

=item * on_write - called when we've sent the request to the server



sub get {
	my $self = shift;
	my $k = shift; # FIXME should we do anything about encoding or length checks here?
	my %args = @_;

# Pull out any callbacks that we handle directly
	my $on_write = delete $args{on_write};

	my $len = length $k; # TODO benchmark - 2xlength calls or lexical var?

			'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 a*',
			MAGIC_REQUEST,		# What type this packet is
			$OPCODE_BY_NAME{'Get'},	# Opcode
			$len,			# Key length
			0x00,			# Extras length
			0x00,			# Data type binary
			0x0000,			# Reserved
			$len,			# Total body
			0x00000000,		# Opaque
			0x00,			# CAS
			0x00,			# more CAS - 8byte value but don't want to rely on pack 'Q'
		on_flush => $self->sap(sub {
			my $self = shift;
			push @{ $self->{pending} }, {
				type	=> 'Get',
				key	=> $k,
			$on_write->($self, key => $k) if $on_write;

=head2 set

Retrieves a value from memcached.

Takes a key and zero or more optional named parameters:

=over 4

=item * on_write - called when we've sent the request to the server



sub set {
	my $self = shift;
	my $k = shift; # FIXME should we do anything about encoding or length checks here?
	my $v = shift;
	my %args = @_;

# Pull out any callbacks that we handle directly
	my $on_write = delete $args{on_write};

			'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 N1 N1 a* a*',
			MAGIC_REQUEST,		# What type this packet is
			$OPCODE_BY_NAME{'Set'},	# Opcode
			length($k),		# Key length
			0x08,			# Extras length
			0x00,			# Data type binary
			0x0000,			# Reserved
			8 + length($k) + length($v),			# Total body
			0x00000000,		# Opaque
			0x00,			# CAS
			0x00,			# more CAS - 8byte value but don't want to rely on pack 'Q'
			$args{flags} || 0,
			$args{ttl} || 60,
		on_flush => $self->sap(sub {
			my $self = shift;
			push @{ $self->{pending} }, {
				type	=> 'Set',
				key	=> $k,
				value	=> $v,
			$on_write->($self, key => $k, value => $v) if $on_write;

=head2 init

Sets things up.

Currently just does some internal housekeeping, takes no parameters, and returns $self.


sub init {	
	my $self = shift;
	$self->{pending} = [];
	return $self;

=head2 on_read

This should be called when there is data to be processed. It takes a single parameter:
a reference to a buffer containing the incoming data. If a packet is processed
successfully then it will be removed from this buffer (via C< substr > or C< s// >).

Returns true if a packet was found, false if not. It is recommended (but not required)
that this method be called repeatedly until it returns false.


sub on_read {
	my ($self, $buffref) = @_;

	# Bail out if we don't have a full header
	return 0 unless length $$buffref >= 24;

	# Extract the basic header data first - specifically we want the length
	# Not using most of these. At least, not yet
	# my ($magic, $opcode, $kl, $el, $dt, $status, $blen, $opaque, $cas1, $cas2) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
	my ($magic, $opcode, undef, undef, undef, $status, $blen) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
	die "Not a response" unless $magic == MAGIC_RESPONSE;

# If we don't have the full body as well, bail out here
	return 0 unless length $$buffref >= ($blen + 24);

# Strip the header
	substr $$buffref, 0, 24, '';

	my $body = substr $$buffref, 0, $blen, '';
	if($opcode == 0x00) {
		# unused
		# my $flags = substr $body, 0, 4, '';
		substr $body, 0, 4, '';
	# printf "=> %-9.9s %-40.40s %08x%08x %s\n", $OPCODE_BY_ID{$opcode}, $body, $cas1, $cas2, $RESPONSE_STATUS{$status} // 'unknown status';
	my $item = shift @{$self->{pending}} or die "Had response with no queued item\n";
	$item->{value} = $body if length $body;
	if($status) {
		return $item->{on_error}->(%$item, status => $status) if exists $item->{on_error};
		die "Failed with " . $RESPONSE_STATUS{$status} . " on item " . join ',', %$item . "\n";
	} else {
		$item->{on_complete}->(%$item) if exists $item->{on_complete};
	return 1;

=head2 status_text

Returns the status message corresponding to the given code.


sub status_text {
	my $self = shift;

=head2 build_packet

Generic packet construction.


sub build_packet {
	my $self = shift;
	my %args = @_;
	my $pkt = pack(
		'C1 C1 S1 C1 C1 S1 N1 N1 N1',
		$args{request} ? MAGIC_REQUEST : MAGIC_RESPONSE,
		defined($args{key}) ? length($args{key}) : 0,
		defined($args{extras}) ? length($args{extras}) : 0,
		defined($args{body}) ? length($args{body}) : 0,
	return $pkt;

=head2 hash_key

Returns a hashed version of the given key using md5.


sub hash_key {
	my $self = shift;
	return Digest::MD5::md5(shift);

=head2 ketama

Provided for backward compatibility only. See L</hash_key>.


sub ketama { shift->hash_key(@_) }

=head2 build_ketama_map

Generates a Ketama hash map from the given list of servers.

Returns an arrayref of points.


sub build_ketama_map {
	my $self = shift;
	my @servers = @_;
	my $total = 0 + sum values %{ +{ @servers } };
	my @points;
	my $server_count = @servers / 2;
	while(@servers) {
		my ($srv, $weight) = splice @servers, 0, 2;
		my $pct = $weight / $total;
		my $ks = floor($pct * 40.0 * $server_count);
		foreach my $k (0..$ks-1) {
			my $hash = sprintf '%s-%d', $srv, $k;
			my @digest = map ord, split //, $self->hash_key($hash);
			foreach my $h (0..3) {
				push @points, {
					point => ( $digest[3+$h*4] << 24 )
					| ( $digest[2+$h*4] << 16 )
					| ( $digest[1+$h*4] <<  8 )
					|   $digest[$h*4],
					ip => $srv
	@points = nsort_by { $_->{point} } @points;
	$self->{points} = \@points;
	return \@points;

=head2 ketama_hashi

Calculates an integer hash value from the given key.


sub ketama_hashi {
	my $self = shift;
	my $key = shift;
	my @digest = map ord, split //, $self->hash_key($key);
    	return ( $digest[3] << 24 )
		| ( $digest[2] << 16 )
		| ( $digest[1] <<  8 )
		|   $digest[0];

=head2 ketama_find_point

Given a key value, calculates the closest point on the Ketama map.


sub ketama_find_point {
	my ($self, $key) = @_;

	# Convert this key into a suitably-hashed integer
	my $h = $self->ketama_hashi($key);

	# Find the array bounds...
	my $highp = my $maxp = scalar @{$self->{points}};
	my $lowp = 0;

	# then kick off our divide and conquer array search,
	# which will end when we've found the server with next
	# biggest point after what this key hashes to
	while(1) {
		my $midp = floor(($lowp + $highp ) / 2);
		if ( $midp == $maxp ) {
			# if at the end, roll back to zeroth
			# off-by-one? you'd think, but note the oh-so-helpful $midp-1 later on.
			$midp = 1 if $midp == @{$self->{points}};
			return $self->{points}->[$midp - 1];
		my $midval = $self->{points}->[$midp]->{point};
		my $midval1 = $midp == 0 ? 0 : $self->{points}->[$midp-1]->{point};

		return $self->{points}->[$midp] if $h <= $midval && $h > $midval1;

		if ($midval < $h) {
			$lowp = $midp + 1;
		} else {
			$highp = $midp - 1;

		return $self->{points}->[0] if $lowp > $highp;



=head1 WHY

Three main reasons:

=over 4

=item * B<Transport-agnostic> - purposefully does B< not > get involved in the details of
sending or receiving data, when it wants to write something it'll call L<write>, and
when you have data to process you call L<on_read>

=item * B<Nonblocking> - since this just operates on data and callbacks, rather than getting
involved in transporting data, all operations should return quickly (in Perl terms)

=item * B<Debugging support> - strap this over a memcached transport layer and see
human-readable versions of the binary packets


If you're looking for good performance, stability, an extensive set of tests, support,
and a pony, then you're reading the wrong module:

=head1 SEE ALSO

=over 4

=item * L<Cache::Memcached> - official implementation

=item * L<AnyEvent::Memcached> - text protocol support for L<AnyEvent>

=item * L<Cache::Memcached::AnyEvent> - provides binary protocol support for L<AnyEvent>

=item * L<Memcached::Client> - another L<AnyEvent> implementation, again with the
transport layer too highly coupled for my purposes

=item * L<Cache::Memcached::GetParserXS> - XS implementation for parsing memcached
binary data, apparently "possibly twice as fast as the original perl version".


=head1 AUTHOR

Tom Molesworth <>

=head1 LICENSE

Copyright Tom Molesworth 2011-2012. Licensed under the same terms as Perl itself.