The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: Client.pm 9 2007-12-19 22:40:09Z miyagawa $

package Data::YUID::Client;
use strict;

use fields qw( servers select_timeout hosts );
use Carp;
use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
use IO::Socket::INET;
use Socket qw( MSG_NOSIGNAL );
use URI::Escape ();

use constant DEFAULT_PORT => 9001;

our $FLAG_NOSIGNAL = 0;
eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL };

my $Active_Sock;
my $Socket_Used_Count = 0; #yuids served per socket before we refresh it, to force rebalancing
my $Max_Socket_Reused = 1000 + rand(1000);

sub new {
    my Data::YUID::Client $client = shift;
    my %args = @_;
    $client = fields::new($client) unless ref $client;

    croak "servers must be an arrayref if specified"
        unless !exists $args{servers} || ref $args{servers} eq 'ARRAY';
    $client->{servers} = $args{servers} || [];
    $client->{select_timeout} = 1.0;
    $client->{hosts} = $args{servers};

    $client;
}

sub _oneline {
    my Data::YUID::Client $client = shift;
    my ($sock, $line) = @_;
    my $res;
    my ($ret, $offset) = (undef, 0);

    # state: 0 - writing, 1 - reading, 2 - done
    my $state = defined $line ? 0 : 1;

    # the bitsets for select
    my ($rin, $rout, $win, $wout);
    my $nfound;

    my $copy_state = -1;
    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;

    # the select loop
    while(1) {
        if ($copy_state!=$state) {
            last if $state==2;
            ($rin, $win) = ('', '');
            vec($rin, fileno($sock), 1) = 1 if $state==1;
            vec($win, fileno($sock), 1) = 1 if $state==0;
            $copy_state = $state;
        }
        $nfound = select($rout=$rin, $wout=$win, undef,
                         $client->{select_timeout});
        last unless $nfound;

        if (vec($wout, fileno($sock), 1)) {
            $res = send($sock, $line, $FLAG_NOSIGNAL);
            next
                if not defined $res and $!==EWOULDBLOCK;
            unless (($res || 0) > 0) {
                _close_sock($sock);
                return undef;
            }
            if ($res == length($line)) { # all sent
                $state = 1;
            } else { # we only succeeded in sending some of it
                substr($line, 0, $res, ''); # delete the part we sent
            }
        }

        if (vec($rout, fileno($sock), 1)) {
            $res = sysread($sock, $ret, 255, $offset);
            next
                if !defined($res) and $!==EWOULDBLOCK;
            if ($res == 0) { # catches 0=conn closed or undef=error
                _close_sock($sock);
                return undef;
            }
            $offset += $res;
            if (rindex($ret, "\r\n") + 2 == length($ret)) {
                $state = 2;
            }
        }
    }

    unless ($state == 2) {
        _close_sock($sock);
        return undef;
    }

    return $ret;
}

sub get_id {
    my Data::YUID::Client $client = shift;
    my($ns) = @_;
    my $id;
    while (!$id && (my $sock = $client->get_sock)) {
        my $cmd = sprintf "getid ns=%s\r\n", URI::Escape::uri_escape($ns || '');
        my $res = $client->_oneline($sock, $cmd) or next;
        ($id) = $res =~ /^ok\s+id=(\d+)/i;
    }
    $id;
}

sub _close_sock {
    my($sock) = @_;
    undef $Active_Sock 
        if ($Active_Sock && fileno($sock) == fileno($Active_Sock));
    close $sock;
}

#class method, for request cleanup
sub close_active_sock {
    close $Active_Sock;
}

sub connect_to_server {
    my Data::YUID::Client $client = shift;
    my($host) = @_;
    my($ip, $port) = split /:/, $host;
    $port ||= DEFAULT_PORT;
    my $sock = IO::Socket::INET->new(
            PeerAddr        => $ip,
            PeerPort        => $port,
            Proto           => 'tcp',
            Type            => SOCK_STREAM,
            ReuseAddr       => 1,
            Blocking        => 0,
        ) or return;
    $sock;
}

sub get_sock {
    my Data::YUID::Client $client = shift;

    ++$Socket_Used_Count;

    if ($Active_Sock && $Active_Sock->connected && ($Socket_Used_Count % $Max_Socket_Reused != 0)) {
        return $Active_Sock;
    }else{
        my $hosts = $client->{hosts};
        my @try_hosts = @$hosts;
        my $sock;
        while (@try_hosts){
            my $host = splice(@try_hosts, int(rand(@try_hosts)), 1);
            $sock = $client->connect_to_server($host);
            last if $sock;
        }
        return $Active_Sock = $sock;
    }
}

1;
__END__

=head1 NAME

Data::YUID::Client - Client for distributed YUID generation

=head1 SYNOPSIS

    use Data::YUID::Client;
    my $client = Data::YUID::Client->new(
            servers => [
                '192.168.100.4:11001',
                '192.168.100.5:11001',
            ],
        );
    my $id = $client->get_id;

=head1 DESCRIPTION

I<Data::YUID::Client> is a client for the client/server protocol used to
generate distributed unique IDs. F<bin/yuidd> implements the server portion
of the protocol.

=head1 USAGE

=head2 Data::YUID::Client->new(%param)

Creates a new client object, initialized with I<%param>, and returns the
new object.

I<%param> can contain:

=over 4

=item * servers

A reference to a list of server addresses, in I<host:port> notation. These
should point to the locations of servers running the F<yuidd> server using
the client/server protocol for ID generation.

New Behavior: the original version of this would create a connection
to every server at C<new()> and cache them all.  This version
creates only one connection, when the first YUID is requested, and
caches that connection.  If the connection goes bad, it'll try other servers
it its list until one works or until it has worked through the list.

=back

=head2 $client->get_id([ $namespace ])

Obtains a unique ID from one of the servers, in the optional namespace
I<$namespace>.  

Returns undef if it can't get an ID from any server.

=head1 AUTHOR & COPYRIGHT

Please see the I<Data::YUID> manpage for author, copyright, and license
information.

=cut