The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package PocketIO::Connection;

use strict;
use warnings;

use AnyEvent;
use Scalar::Util qw(blessed);

use PocketIO::Message;
use PocketIO::Socket;
use PocketIO::Sockets;
use PocketIO::Broadcast;

use constant DEBUG => $ENV{POCKETIO_CONNECTION_DEBUG};

sub new {
    my $class = shift;

    my $self = {@_};
    bless $self, $class;

    $self->{connect_timeout}   ||= 30;
    $self->{reconnect_timeout} ||= 15;
    $self->{close_timeout}     ||= 15;

    $self->{on_connect_timeout}   = sub { $_[0]->emit('connect_failed') };
    $self->{on_reconnect_timeout} = sub { $_[0]->emit('reconnect_failed') };
    $self->{on_close_timeout}     = sub { $_[0]->close };

    $self->{max_messages_to_stage} ||= 32;
    $self->{messages} = [];

    $self->{on_connect_failed}   ||= sub { };
    $self->{on_reconnect}        ||= sub { };
    $self->{on_reconnect_failed} ||= sub { };
    $self->{on_message}          ||= sub { };
    $self->{on_disconnect}       ||= sub { };
    $self->{on_error}            ||= sub { };
    $self->{on_close}            ||= sub { };

    $self->{socket} ||= $self->_build_socket;
    my $on_connect = delete $self->{on_connect} || sub { };
    $self->{on_connect} = sub {
        my $self = shift;

        eval {
            $on_connect->($self->{socket}, @{$self->{on_connect_args} || []});
            1;
        } || do {
            warn "Connection error: $_";

            $self->close;
        };
    };

    DEBUG && $self->_debug('Connection created');

    $self->connecting;

    return $self;
}

sub new_passive {
    my $class = shift;
    my $self  = {@_};
    bless $self, $class;
    return $self;
}

sub socket { $_[0]->{socket} }

sub pool { $_[0]->{pool} }

sub type { @_ > 1 ? $_[0]->{type} = $_[1] : $_[0]->{type} }

sub is_connected { $_[0]->{is_connected} }

sub connecting {
    my $self = shift;

    DEBUG && $self->_debug("State 'connecting'");

    $self->_start_timer('connect');
}

sub reconnecting {
    my $self = shift;

    DEBUG && $self->_debug("State 'reconnecting'");

    $self->_stop_timer('close');

    $self->_start_timer('reconnect');
}

sub connected {
    my $self = shift;

    DEBUG && $self->_debug("State 'connected'");

    $self->_stop_timer('connect');

    $self->{is_connected} = 1;

    my $message = PocketIO::Message->new(type => 'connect');
    $self->write($message);

    $self->_start_timer('close');

    $self->emit('connect');

    return $self;
}

sub reconnected {
    my $self = shift;

    DEBUG && $self->_debug("State 'reconnected'");

    $self->_stop_timer('reconnect');

    $self->emit('reconnect');

    $self->_start_timer('close');

    return $self;
}

sub disconnected {
    my $self = shift;

    DEBUG && $self->_debug("State 'disconnected'");

    $self->_stop_timer('connect');
    $self->_stop_timer('reconnect');
    $self->_stop_timer('close');

    $self->{data}     = '';
    $self->{messages} = [];

    $self->{is_connected} = 0;

    $self->{disconnect_timer} = AnyEvent->timer(
        after => 0,
        cb    => sub {
            return unless $self;

            if ($self->{socket}) {
                if (my $cb = $self->{socket}->on('disconnect')) {
                    $cb->($self->{socket});
                }
                undef $self->{socket};
            }

            undef $self;
        }
    );

    return $self;
}

sub close {
    my $self = shift;

    my $message = PocketIO::Message->new(type => 'disconnect');
    $self->write($message);

    $self->emit('close');

    #$self->disconnected;

    return $self;
}

sub id {
    my $self = shift;

    $self->{id} ||= $self->_generate_id;

    return $self->{id};
}

sub on {
    my $self  = shift;
    my $event = shift;

    my $name = "on_$event";

    unless (@_) {
        DEBUG && $self->_debug("Event 'on_$event'");

        return $self->{$name};
    }

    $self->{$name} = $_[0];

    return $self;
}

sub emit {
    my $self  = shift;
    my $event = shift;

    $event = "on_$event";

    return unless exists $self->{$event};

    DEBUG && $self->_debug("Emitting '$event'");

    $self->{$event}->($self, @_);

    return $self;
}

sub stage_message {
    my $self = shift;
    my ($message) = @_;

    return if @{$self->{messages}} >= $self->{max_messages_to_stage};

    push @{$self->{messages}}, $message;

    return $self;
}

sub has_staged_messages {
    my $self = shift;

    return @{$self->{messages}} > 0;
}

sub staged_message {
    my $self = shift;

    return shift @{$self->{messages}};
}

sub send_heartbeat {
    my $self = shift;

    $self->{heartbeat}++;

    DEBUG && $self->_debug('Send heartbeat');

    my $message = PocketIO::Message->new(type => 'heartbeat');

    return $self->write($message);
}

sub send {
    my $self = shift;
    my ($message) = @_;

    $message = $self->_build_message($message);

    $self->write($message);

    return $self;
}

sub broadcast {
    my $self = shift;

    return PocketIO::Broadcast->new(conn => $self, pool => $self->pool);
}

sub sockets {
    my $self = shift;

    return PocketIO::Sockets->new(pool => $self->pool);
}

sub parse_message {
    my $self = shift;
    my ($message) = @_;

    DEBUG && $self->_debug("Received '" . substr($message, 0, 80) . "'");

    $message = PocketIO::Message->new->parse($message);
    return unless $message;

    $self->_stop_timer('close');

    if ($message->is_message) {
        $self->{socket}->on('message')->($self->{socket}, $message->data);
    }
    elsif ($message->type eq 'event') {
        my $name = $message->data->{name};
        my $args = $message->data->{args};

        my $id = $message->id;

        $self->{socket}->on($name)->(
            $self->{socket},
            @$args => sub {
                my $message = PocketIO::Message->new(
                    type       => 'ack',
                    message_id => $id,
                    args       => [@_]
                );

                $self->write($message);
            }
        ) if defined $self->{socket}->on($name);
    }
    elsif ($message->type eq 'heartbeat') {

        # TODO
    }
    else {

        # TODO
    }

    $self->_start_timer('close');

    return $self;
}

sub write {
    my $self = shift;
    my ($bytes) = @_;

    $self->_restart_timer('close');

    $bytes = $bytes->to_bytes if blessed $bytes;

    if ($self->{on_write}) {
        DEBUG && $self->_debug("Writing '" . substr($bytes, 0, 50) . "'");
        $self->emit('write', $bytes);
    }
    else {
        DEBUG && $self->_debug("Staging '" . substr($bytes, 0, 50) . "'");
        $self->stage_message($bytes);
    }
}

sub _start_timer {
    my $self = shift;
    my ($timer) = @_;

    my $timeout = $self->{"${timer}_timeout"};
    return if (!defined $timeout);

    DEBUG && $self->_debug("Start '${timer}_timer' ($timeout)");

    $self->{"${timer}_timer"} = AnyEvent->timer(
        after => $timeout,
        cb    => sub {
            DEBUG && $self->_debug("Timeout '${timer}_timeout'");

            $self->{"on_${timer}_timeout"}->($self);
        }
    );
}

sub _stop_timer {
    my $self = shift;
    my ($timer) = @_;

    DEBUG && $self->_debug("Stop '${timer}_timer'");

    delete $self->{"${timer}_timer"};
}

sub _restart_timer {
    my $self = shift;
    my ($timer) = @_;

    $self->_stop_timer($timer);
    $self->_start_timer($timer);
}

sub _build_message {
    my $self = shift;
    my ($message) = @_;

    return $message if blessed $message;

    return PocketIO::Message->new(data => $message);
}

sub _generate_id {
    my $self = shift;

    my $string = '';

    for (1 .. 16) {
        $string .= int(rand(10));
    }

    return $string;
}

sub _debug {
    my $self = shift;
    my ($message) = @_;

    warn time . ' (' . $self->id . '): ' . $message . "\n";
}

sub _build_socket {
    my $self = shift;

    return PocketIO::Socket->new(conn => $self);
}

1;
__END__

=head1 NAME

PocketIO::Connection - Connection class

=head1 DESCRIPTION

L<PocketIO::Connection> is a connection class that
encapsulates all the logic for bulding and parsing Socket.IO messages. Used
internally.

=head1 METHODS

=head2 C<broadcast>

=head2 C<close>

=head2 C<connected>

=head2 C<connecting>

=head2 C<disconnected>

=head2 C<emit>

=head2 C<has_staged_messages>

=head2 C<id>

=head2 C<is_connected>

=head2 C<new>

=head2 C<on>

=head2 C<parse_message>

=head2 C<pool>

=head2 C<reconnected>

=head2 C<reconnecting>

=head2 C<send>

=head2 C<send_heartbeat>

=head2 C<socket>

=head2 C<sockets>

=head2 C<stage_message>

=head2 C<staged_message>

=head2 C<type>

=head2 C<write>

=cut