The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Protocol::Memcached;
# ABSTRACT: Support for the memcached binary protocol
use strict;
use warnings FATAL => 'all';

our $VERSION = '0.004';

=head1 NAME

Protocol::Memcached - memcached binary protocol implementation

=head1 VERSION

version 0.004

=head1 SYNOPSIS

 package Subclass::Of::Protocol::Memcached;
 use parent qw(Protocol::Memcached);

 sub write { $_[0]->{socket}->write($_[1]) }

 package main;
 my $mc = Subclass::Of::Protocol::Memcached->new;
 my ($k, $v) = ('hello' => 'world');
 $mc->set(
 	$k => $v,
	on_complete	=> sub {
 		$mc->get(
 			'key',
			on_complete	=> sub { my $v = shift; print "Had $v\n" },
			on_error	=> sub { die "Failed because of @_\n" }
 		);
	}
 );

=head1 DESCRIPTION

Bare minimum protocol support for memcached. This class is transport-agnostic and as
such is not a working implementation - you need to subclass and provide your own ->write
method.

If you're using this class, you're most likely doing it wrong - head over to the
L</SEE ALSO> section to rectify this.

L<Protocol::Memcached::Client> is probably the module you want if you are going to subclass
this.

=head1 SUBCLASSING

Provide the following method:

=head2 write

This will be called with the data to be written, and zero or more named parameters:

=over 4

=item * on_flush - coderef to execute when the data has left the building, if this is
not supported by the transport layer then the subclass should call the coderef
before returning

=back

and when you have data, call L</on_read>.

=cut

# Modules

use Scalar::Util ();
use Digest::MD5 ();
use List::Util qw(sum);
use List::UtilsBy qw(nsort_by);
use POSIX qw(floor);

# Constants

use constant {
	MAGIC_REQUEST => 0x80,
	MAGIC_RESPONSE => 0x81,
};

# Mapping from numeric opcode value in packet header to method
my %OPCODE_BY_ID = (
	0x00 => 'Get',
	0x01 => 'Set',
	0x02 => 'Add',
	0x03 => 'Replace',
	0x04 => 'Delete',
	0x05 => 'Increment',
	0x06 => 'Decrement',
	0x07 => 'Quit',
	0x08 => 'Flush',
	0x09 => 'GetQ',
	0x0A => 'No-op',
	0x0B => 'Version',
	0x0C => 'GetK',
	0x0D => 'GetKQ',
	0x0E => 'Append',
	0x0F => 'Prepend',
	0x10 => 'Stat',
	0x11 => 'SetQ',
	0x12 => 'AddQ',
	0x13 => 'ReplaceQ',
	0x14 => 'DeleteQ',
	0x15 => 'IncrementQ',
	0x16 => 'DecrementQ',
	0x17 => 'QuitQ',
	0x18 => 'FlushQ',
	0x19 => 'AppendQ',
	0x1A => 'PrependQ',
);
# Map from method name to opcode byte
my %OPCODE_BY_NAME = reverse %OPCODE_BY_ID;

# Status values from response
my %RESPONSE_STATUS = (
	0x0000 => 'No error',
	0x0001 => 'Key not found',
	0x0002 => 'Key exists',
	0x0003 => 'Value too large',
	0x0004 => 'Invalid arguments',
	0x0005 => 'Item not stored',
	0x0006 => 'Incr/Decr on non-numeric value',
	0x0081 => 'Unknown command',
	0x0082 => 'Out of memory',
);

=head1 METHODS

=cut

=head2 new

Bare minimum constructor - subclass may need to inherit from something with a
non-trivial constructor, so we put all our init code in L</init>.

=cut

sub new {
	my $class = shift;
	my $self = bless { }, $class;
	return $self;
}

=head2 sap

Helper method for weak callbacks.

=cut

sub sap { my ($self, $sub) = @_; Scalar::Util::weaken $self; return sub { $self->$sub(@_); }; }

=head2 get

Retrieves a value from memcached.

Takes a key and zero or more optional named parameters:

=over 4

=item * on_write - called when we've sent the request to the server

=back

=cut

sub get {
	my $self = shift;
	my $k = shift; # FIXME should we do anything about encoding or length checks here?
	my %args = @_;

# Pull out any callbacks that we handle directly
	my $on_write = delete $args{on_write};

	my $len = length $k; # TODO benchmark - 2xlength calls or lexical var?

	$self->write(
		pack(
			'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 a*',
			MAGIC_REQUEST,		# What type this packet is
			$OPCODE_BY_NAME{'Get'},	# Opcode
			$len,			# Key length
			0x00,			# Extras length
			0x00,			# Data type binary
			0x0000,			# Reserved
			$len,			# Total body
			0x00000000,		# Opaque
			0x00,			# CAS
			0x00,			# more CAS - 8byte value but don't want to rely on pack 'Q'
			$k,
		),
		on_flush => $self->sap(sub {
			my $self = shift;
			push @{ $self->{pending} }, {
				%args,
				type	=> 'Get',
				key	=> $k,
			};
			$on_write->($self, key => $k) if $on_write;
		})
	);
	$self
}

=head2 set

Retrieves a value from memcached.

Takes a key and zero or more optional named parameters:

=over 4

=item * on_write - called when we've sent the request to the server

=back

=cut

sub set {
	my $self = shift;
	my $k = shift; # FIXME should we do anything about encoding or length checks here?
	my $v = shift;
	my %args = @_;

# Pull out any callbacks that we handle directly
	my $on_write = delete $args{on_write};

	$self->write(
		pack(
			'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 N1 N1 a* a*',
			MAGIC_REQUEST,		# What type this packet is
			$OPCODE_BY_NAME{'Set'},	# Opcode
			length($k),		# Key length
			0x08,			# Extras length
			0x00,			# Data type binary
			0x0000,			# Reserved
			8 + length($k) + length($v),			# Total body
			0x00000000,		# Opaque
			0x00,			# CAS
			0x00,			# more CAS - 8byte value but don't want to rely on pack 'Q'
			$args{flags} || 0,
			$args{ttl} || 60,
			$k,
			$v,
		),
		on_flush => $self->sap(sub {
			my $self = shift;
			push @{ $self->{pending} }, {
				%args,
				type	=> 'Set',
				key	=> $k,
				value	=> $v,
			};
			$on_write->($self, key => $k, value => $v) if $on_write;
		})
	);
	$self
}

=head2 init

Sets things up.

Currently just does some internal housekeeping, takes no parameters, and returns $self.

=cut

sub init {	
	my $self = shift;
	$self->{pending} = [];
	return $self;
}

=head2 on_read

This should be called when there is data to be processed. It takes a single parameter:
a reference to a buffer containing the incoming data. If a packet is processed
successfully then it will be removed from this buffer (via C< substr > or C< s// >).

Returns true if a packet was found, false if not. It is recommended (but not required)
that this method be called repeatedly until it returns false.

=cut

sub on_read {
	my ($self, $buffref) = @_;

	# Bail out if we don't have a full header
	return 0 unless length $$buffref >= 24;

	# Extract the basic header data first - specifically we want the length
	# Not using most of these. At least, not yet
	# my ($magic, $opcode, $kl, $el, $dt, $status, $blen, $opaque, $cas1, $cas2) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
	my ($magic, $opcode, undef, undef, undef, $status, $blen) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
	die "Not a response" unless $magic == MAGIC_RESPONSE;

# If we don't have the full body as well, bail out here
	return 0 unless length $$buffref >= ($blen + 24);

# Strip the header
	substr $$buffref, 0, 24, '';

	my $body = substr $$buffref, 0, $blen, '';
	if($opcode == 0x00) {
		# unused
		# my $flags = substr $body, 0, 4, '';
		substr $body, 0, 4, '';
	}
	# printf "=> %-9.9s %-40.40s %08x%08x %s\n", $OPCODE_BY_ID{$opcode}, $body, $cas1, $cas2, $RESPONSE_STATUS{$status} // 'unknown status';
	my $item = shift @{$self->{pending}} or die "Had response with no queued item\n";
	$item->{value} = $body if length $body;
	if($status) {
		return $item->{on_error}->(%$item, status => $status) if exists $item->{on_error};
		die "Failed with " . $RESPONSE_STATUS{$status} . " on item " . join ',', %$item . "\n";
	} else {
		$item->{on_complete}->(%$item) if exists $item->{on_complete};
	}
	return 1;
}

=head2 status_text

Returns the status message corresponding to the given code.

=cut

sub status_text {
	my $self = shift;
	$RESPONSE_STATUS{+shift}
}

=head2 build_packet

Generic packet construction.

=cut

sub build_packet {
	my $self = shift;
	my %args = @_;
	my $pkt = pack(
		'C1 C1 S1 C1 C1 S1 N1 N1 N1',
		$args{request} ? MAGIC_REQUEST : MAGIC_RESPONSE,
		$args{opcode},
		defined($args{key}) ? length($args{key}) : 0,
		defined($args{extras}) ? length($args{extras}) : 0,
		0x00,
		defined($args{body}) ? length($args{body}) : 0,
		0x00,
		0x00
	);
	return $pkt;
}

=head2 hash_key

Returns a hashed version of the given key using md5.

=cut

sub hash_key {
	my $self = shift;
	return Digest::MD5::md5(shift);
}

=head2 ketama

Provided for backward compatibility only. See L</hash_key>.

=cut

sub ketama { shift->hash_key(@_) }

=head2 build_ketama_map

Generates a Ketama hash map from the given list of servers.

Returns an arrayref of points.

=cut

sub build_ketama_map {
	my $self = shift;
	my @servers = @_;
	my $total = 0 + sum values %{ +{ @servers } };
	my @points;
	my $server_count = @servers / 2;
	while(@servers) {
		my ($srv, $weight) = splice @servers, 0, 2;
		my $pct = $weight / $total;
		my $ks = floor($pct * 40.0 * $server_count);
		foreach my $k (0..$ks-1) {
			my $hash = sprintf '%s-%d', $srv, $k;
			my @digest = map ord, split //, $self->hash_key($hash);
			foreach my $h (0..3) {
				push @points, {
					point => ( $digest[3+$h*4] << 24 )
					| ( $digest[2+$h*4] << 16 )
					| ( $digest[1+$h*4] <<  8 )
					|   $digest[$h*4],
					ip => $srv
				};
			}
		}
	}
	@points = nsort_by { $_->{point} } @points;
	$self->{points} = \@points;
	return \@points;
}

=head2 ketama_hashi

Calculates an integer hash value from the given key.

=cut

sub ketama_hashi {
	my $self = shift;
	my $key = shift;
	my @digest = map ord, split //, $self->hash_key($key);
    	return ( $digest[3] << 24 )
		| ( $digest[2] << 16 )
		| ( $digest[1] <<  8 )
		|   $digest[0];
}

=head2 ketama_find_point

Given a key value, calculates the closest point on the Ketama map.

=cut

sub ketama_find_point {
	my ($self, $key) = @_;

	# Convert this key into a suitably-hashed integer
	my $h = $self->ketama_hashi($key);

	# Find the array bounds...
	my $highp = my $maxp = scalar @{$self->{points}};
	my $lowp = 0;

	# then kick off our divide and conquer array search,
	# which will end when we've found the server with next
	# biggest point after what this key hashes to
	while(1) {
		my $midp = floor(($lowp + $highp ) / 2);
		if ( $midp == $maxp ) {
			# if at the end, roll back to zeroth
			# off-by-one? you'd think, but note the oh-so-helpful $midp-1 later on.
			$midp = 1 if $midp == @{$self->{points}};
			return $self->{points}->[$midp - 1];
		}
		my $midval = $self->{points}->[$midp]->{point};
		my $midval1 = $midp == 0 ? 0 : $self->{points}->[$midp-1]->{point};

		return $self->{points}->[$midp] if $h <= $midval && $h > $midval1;

		if ($midval < $h) {
			$lowp = $midp + 1;
		} else {
			$highp = $midp - 1;
		}

		return $self->{points}->[0] if $lowp > $highp;
	}
}

1;

__END__

=head1 WHY

Three main reasons:

=over 4

=item * B<Transport-agnostic> - purposefully does B< not > get involved in the details of
sending or receiving data, when it wants to write something it'll call L<write>, and
when you have data to process you call L<on_read>

=item * B<Nonblocking> - since this just operates on data and callbacks, rather than getting
involved in transporting data, all operations should return quickly (in Perl terms)

=item * B<Debugging support> - strap this over a memcached transport layer and see
human-readable versions of the binary packets

=back

If you're looking for good performance, stability, an extensive set of tests, support,
and a pony, then you're reading the wrong module:

=head1 SEE ALSO

=over 4

=item * L<Cache::Memcached> - official implementation

=item * L<AnyEvent::Memcached> - text protocol support for L<AnyEvent>

=item * L<Cache::Memcached::AnyEvent> - provides binary protocol support for L<AnyEvent>

=item * L<Memcached::Client> - another L<AnyEvent> implementation, again with the
transport layer too highly coupled for my purposes

=item * L<Cache::Memcached::GetParserXS> - XS implementation for parsing memcached
binary data, apparently "possibly twice as fast as the original perl version".

=back

=head1 AUTHOR

Tom Molesworth <cpan@entitymodel.com>

=head1 LICENSE

Copyright Tom Molesworth 2011-2012. Licensed under the same terms as Perl itself.