The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package MR::IProto::Cluster::Server;

=head1 NAME

MR::IProto::Cluster::Server - server

=head1 DESCRIPTION

This class is used to implement all communication with one server.

=cut

use Mouse;
use Mouse::Util::TypeConstraints;
use Scalar::Util;
use MR::IProto::Connection::Async;
use MR::IProto::Connection::Sync;
use MR::IProto::Message;

with 'MR::IProto::Role::Debuggable';

coerce 'MR::IProto::Cluster::Server'
    => from 'Str'
    => via {
        my ($host, $port, $weight) = split /:/, $_;
        __PACKAGE__->new(
            host => $host,
            port => $port,
            defined $weight ? ( weight => $weight ) : (),
        );
    };

has prefix => (
    is  => 'ro',
    isa => 'Str',
    default => 'MR::IProto',
);

=head1 ATTRIBUTES

=over

=item host

Host name or IP address.

=cut

has host => (
    is  => 'ro',
    isa => 'Str',
    required => 1,
);

=item port

Port number.

=cut

has port => (
    is  => 'ro',
    isa => 'Int',
    required => 1,
);

=item weight

Server weight.

=cut

has weight => (
    is  => 'ro',
    isa => 'Int',
    default => 1,
);

=item connect_timeout

Timeout of connect operation.

=cut

has connect_timeout => (
    is  => 'rw',
    isa => 'Num',
    default => 2,
);

=item timeout

Timeout of read and write operations.

=cut

has timeout => (
    is  => 'rw',
    isa => 'Num',
    default => 2,
    trigger => sub {
        my ($self, $new) = @_;
        $self->async->set_timeout($new) if $self->has_async();
        $self->sync->set_timeout($new) if $self->has_sync();
        return;
    },
);

=item tcp_nodelay

Enable TCP_NODELAY.

=cut

has tcp_nodelay => (
    is  => 'ro',
    isa => 'Int',
    default => 1,
);

=item tcp_keepalive

Enable SO_KEEPALIVE.

=cut

has tcp_keepalive => (
    is  => 'ro',
    isa => 'Int',
    default => 0,
);

=item max_parallel

Max amount of simultaneous request.

=cut

has max_parallel => (
    is  => 'ro',
    isa => 'Int',
    default => 10,
);

=item active

Is server used in balancing.

=cut

has active => (
    is  => 'rw',
    isa => 'Bool',
    default => 1,
);

has on_close => (
    is  => 'rw',
    isa => 'CodeRef',
);

has async => (
    is  => 'ro',
    isa => 'MR::IProto::Connection::Async',
    lazy_build => 1,
);

has sync => (
    is  => 'ro',
    isa => 'MR::IProto::Connection::Sync',
    lazy_build => 1,
);

my %servers;

=back

=head1 PUBLIC METHODS

=over

=item disconnect_all

Class method used to disconnect all iproto-connections. Very useful in case of fork().

=cut

sub disconnect_all {
    my ($class) = @_;
    foreach my $server (values %servers) {
        $server->clear_async();
        $server->clear_sync();
    }
    return;
}

=head1 PROTECTED METHODS

=over

=cut

sub BUILD {
    my ($self) = @_;
    $servers{Scalar::Util::refaddr($self)} = $self;
    Scalar::Util::weaken($servers{Scalar::Util::refaddr($self)});
    return;
}

sub DEMOLISH {
    my ($self) = @_;
    delete $servers{Scalar::Util::refaddr($self)};
    return;
}

sub _build_async {
    my ($self) = @_;
    return MR::IProto::Connection::Async->new( server => $self );
}

sub _build_sync {
    my ($self) = @_;
    return MR::IProto::Connection::Sync->new( server => $self );
}

sub _build_debug_cb {
    my ($self) = @_;
    my $prefix = $self->prefix;
    return sub {
        my ($msg) = @_;
        chomp $msg;
        warn "$prefix: $msg\n";
        return;
    };
}

=item _send_started( $sync, $message, $data )

This method is called when message is started to send.

=cut

sub _send_started {
    return;
}

=item _recv_finished( $sync, $message, $data, $error )

This method is called when message is received.

=cut

sub _recv_finished {
    return;
}

sub _debug {
    my ($self, $msg) = @_;
    $self->debug_cb->( sprintf "%s:%d: %s", $self->host, $self->port, $msg );
    return;
}

=back

=head1 SEE ALSO

L<MR::IProto>, L<MR::IProto::Cluster>.

=cut

no Mouse;
__PACKAGE__->meta->make_immutable();

1;