The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::MtGox::Stream;

use strict;
use warnings;

use AnyEvent;
use AnyEvent::HTTP qw(http_post);
use AnyEvent::Util qw(guard);
use Carp qw(croak);
use Errno qw(EPIPE);
use JSON ();
use Protocol::WebSocket::Frame;
use Protocol::WebSocket::Handshake::Client;
use URI;

our $VERSION = '0.02';
$VERSION = eval $VERSION;

sub new {
    my ($class, %params) = @_;

    my $secure        = $params{secure}        || 0;
    my $on_disconnect = $params{on_disconnect} || sub { croak 'Disconnected' };
    my $on_error      = $params{on_error}      || sub { croak @_ };
    my $on_message    = $params{on_message}    || sub { };

    my $server = 'socketio.mtgox.com';
    my $handle;

    # Socket.IO handshake.
    my $uri = URI->new("http://$server/socket.io/1");
    $uri->scheme('https') if $secure;
    AE::log debug => "Making Socket.IO handshake to $uri";
    http_post $uri, undef, sub {
        my ($body, $headers) = @_;
        return $on_error->('Socket.IO handshake failed')
            unless '200' eq $headers->{Status} and defined $body;
        my ($sid, $heartbeat) = split ':', $body, 3;
        AE::log debug => "Socket.IO handshake succeeded: $sid";

        my $timer;
        $handle = AnyEvent::Handle->new(
            connect  => [ $uri->host, $uri->port ],
            tls      => $secure ? 'connect' : undef,
            on_error => sub {
                my ($handle, $fatal, $msg) = @_;
                $handle->destroy;
                undef $timer;
                $!{EPIPE} == $! ? $on_disconnect->() : $on_error->($msg);
            },
        );

        # WebSocket handshake.
        $uri->path("/socket.io/1/websocket/$sid");
        $uri->scheme($secure ? 'wss' : 'ws');
        AE::log debug => "Making WebSocket handshake to $uri";
        my $wsh = Protocol::WebSocket::Handshake::Client->new(url => "$uri");
        $handle->push_write($wsh->to_string);

        $handle->push_read(sub {
            $wsh->parse($handle->rbuf)
                or return 1, $on_error->($wsh->error);
            if ($wsh->is_done) {
                AE::log debug => 'WebSocket handshake succeeded';
                return 1;
            }
        });

        my $frame = Protocol::WebSocket::Frame->new;
        my $send_message = sub {
            $handle->push_write($frame->new(@_)->to_bytes);
        };
        my $send_heartbeat = sub {
            AE::log debug => 'Sending heartbeat';
            $send_message->('2::');
        };

        $timer = AE::timer $heartbeat - 2, $heartbeat - 2, $send_heartbeat
            if $heartbeat;

        my $json = JSON->new;

        $handle->push_read(sub {
            $frame->append($handle->rbuf);
            while (defined(my $msg = $frame->next)) {
                my ($type, $id, $endpoint, $data) = split ':', $msg, 4;
                if ('4' eq $type and '/mtgox' eq $endpoint) {
                    if (my $scalar = eval { $json->decode($data) }) {
                        $on_message->($scalar);
                    }
                }
                # Respond to heartbeats only if a heartbeat timeout wasn't
                # given in the handshake.
                elsif ('2' eq $type and not $heartbeat) {
                    $send_heartbeat->();
                }
                elsif ('1::' eq $msg) { $send_message->('1::/mtgox') }
                elsif ('0' eq $type)  { return 1, $on_disconnect->() }
                elsif ('7' eq $type)  { return 1, $on_error->($data) }

                # Unhandled message types:
                #   3: message
                #   5: event
                #   6: ack
                #   8: noop
            }
        });
    };

    return unless defined wantarray;
    return guard { $handle->destroy };
}

1;

__END__

=head1 NAME

AnyEvent::MtGox::Stream - AnyEvent client for the MtGox streaming API

=head1 SYNOPSIS

    use AnyEvent::MtGox::Stream;

    my $client = AnyEvent::MtGox::Stream->new(
        on_message    => sub { },
        on_error      => sub { },
        on_disconnect => sub { },
    );

=head1 DESCRIPTION

The C<AnyEvent::MtGox::Stream> module is an C<AnyEvent> client for the MtGox
streaming API.

=head1 METHODS

=head2 new

    $client = AnyEvent::MtGox::Stream->new(%args)

Creates a new client and returns a guard object if called in non-void context.
When it is destroyed, it closes the stream.

The following named arguments are accepted:

=over

=item B<secure>

Use HTTPS for securing network traffic. (default: 0)

=item B<on_message>

Callback to execute for each message.

=item B<on_error>

Callback to execute on error.

=item B<on_disconnect>

Callback to execute on disconnect.

=back

=head1 SEE ALSO

L<AnyEvent>

L<https://en.bitcoin.it/wiki/MtGox/API#Streaming_API>

L<https://github.com/LearnBoost/socket.io-spec>

=head1 REQUESTS AND BUGS

Please report any bugs or feature requests to
L<http://rt.cpan.org/Public/Bug/Report.html?Queue=AnyEvent-MtGox-Stream>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc AnyEvent::MtGox::Stream

You can also look for information at:

=over

=item * GitHub Source Repository

L<http://github.com/gray/anyevent-mtgox-stream>

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/AnyEvent-MtGox-Stream>

=item * CPAN Ratings

L<http://cpanratings.perl.org/d/AnyEvent-MtGox-Stream>

=item * RT: CPAN's request tracker

L<http://rt.cpan.org/Public/Dist/Display.html?Name=AnyEvent-MtGox-Stream>

=item * Search CPAN

L<http://search.cpan.org/dist/AnyEvent-MtGox-Stream/>

=back

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2011 gray <gray at cpan.org>, all rights reserved.

This library is free software; you can redistribute it and/or modify it under
the same terms as Perl itself.

=head1 AUTHOR

gray, <gray at cpan.org>

=cut