The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::RabbitMQ;

use strict;
use warnings;
use Carp qw(confess croak);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use File::ShareDir;
use Readonly;
use Scalar::Util qw/ weaken /;
use namespace::clean;

require Data::Dumper;
sub Dumper {
    local $Data::Dumper::Terse = 1;
    local $Data::Dumper::Indent = 1;
    local $Data::Dumper::Useqq = 1;
    local $Data::Dumper::Deparse = 1;
    local $Data::Dumper::Quotekeys = 0;
    local $Data::Dumper::Sortkeys = 1;
    &Data::Dumper::Dumper
}

use AnyEvent::Handle;
use AnyEvent::Socket;

use Net::AMQP;
use Net::AMQP::Common qw(:all);

use AnyEvent::RabbitMQ::Channel;
use AnyEvent::RabbitMQ::LocalQueue;

our $VERSION = '1.11';

Readonly my $DEFAULT_AMQP_SPEC
    => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';

sub new {
    my $class = shift;
    return bless {
        verbose            => 0,
        @_,
        _is_open           => 0,
        _queue             => AnyEvent::RabbitMQ::LocalQueue->new,
        _channels          => {},
        _login_user        => '',
        _server_properties => {},
    }, $class;
}

sub verbose {
    my $self = shift;
    @_ ? ($self->{verbose} = shift) : $self->{verbose}
}

sub is_open {
    my $self = shift;
    $self->{_is_open}
}

sub channels {
    my $self = shift;
    return $self->{_channels};
}

sub delete_channel {
    my $self = shift;
    my ($id) = @_;
    return defined delete $self->{_channels}->{$id};
}

sub login_user {
    my $self = shift;
    return $self->{_login_user};
}

my $_loaded_spec;
sub load_xml_spec {
    my $self = shift;
    my ($spec) = @_;
    $spec ||= $DEFAULT_AMQP_SPEC;
    if ($_loaded_spec && $_loaded_spec ne $spec) {
        croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
    }
    elsif (!$_loaded_spec) {
        Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
    }
    return $self;
}

sub connect {
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    if ($self->{_is_open}) {
        $args{on_failure}->('Connection has already been opened');
        return $self;
    }

    $args{on_close}        ||= sub {};
    $args{on_read_failure} ||= sub {warn @_, "\n"};
    $args{timeout}         ||= 0;

    for (qw/ host port /) {
        confess("No $_ passed to connect to") unless $args{$_};
    }

    if ($self->{verbose}) {
        warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
    }

    $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
        $args{host},
        $args{port},
        sub {
            my $fh = shift or return $args{on_failure}->(
                sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
            );

            my %handle_args = (
                fh       => $fh,
                on_error => sub {
                    my ($handle, $fatal, $message) = @_;

                    $self->{_channels} = {};
                    if (!$self->{_is_open}) {
                        $args{on_failure}->(@_);
                    }
                    $self->{_is_open} = 0;
                    $self->_disconnect();
                    $args{on_close}->($message);
                },
                on_drain => sub {
                    my ($handle) = @_;
                    $self->{drain_condvar}->send
                        if exists $self->{drain_condvar};
                },
            );
            if ($args{tls}) {
                $handle_args{tls} = 'connect';
            }
            $self->{_handle} = AnyEvent::Handle->new(%handle_args);
            $self->_read_loop($args{on_close}, $args{on_read_failure});
            $self->_start(%args,);
        },
        sub {
            return $args{timeout};
        },
    );

    return $self;
}

sub server_properties {
    return shift->{_server_properties};
}

sub _read_loop {
    my ($self, $close_cb, $failure_cb,) = @_;

    return if !defined $self->{_handle}; # called on_error

    $self->{_handle}->push_read(chunk => 8, sub {
        my $data = $_[1];
        my $stack = $_[1];

        if (length($data) <= 7) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
        if (!defined $type_id || !defined $channel || !defined $length) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        $self->{_handle}->push_read(chunk => $length, sub {
            $stack .= $_[1];
            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);

            if ($self->{verbose}) {
                warn '[C] <-- [S] ', Dumper($frame),
                     '-----------', "\n";
            }

            my $id = $frame->channel;
            if (0 == $id) {
                if ($frame->type_id == 8) {
                    $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
                    return;
                }
                return if !$self->_check_close_and_clean($frame, $close_cb,);
                $self->{_queue}->push($frame);
            } else {
                my $channel = $self->{_channels}->{$id};
                if (defined $channel) {
                    $channel->push_queue_or_consume($frame, $failure_cb);
                } else {
                    $failure_cb->('Unknown channel id: ' . $frame->channel);
                }
            }

            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        });
    });

    return $self;
}

sub _check_close_and_clean {
    my $self = shift;
    my ($frame, $close_cb,) = @_;

    return 1 if !$frame->isa('Net::AMQP::Frame::Method');

    my $method_frame = $frame->method_frame;
    return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');

    $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
    $self->{_channels} = {};
    $self->{_is_open} = 0;
    $self->_disconnect();
    $close_cb->($frame);
    return;
}

sub _start {
    my $self = shift;
    my %args = @_;

    if ($self->{verbose}) {
        warn 'post header', "\n";
    }

    $self->{_handle}->push_write(Net::AMQP::Protocol->header);

    $self->_push_read_and_valid(
        'Connection::Start',
        sub {
            my $frame = shift;

            my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
            return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
                if none {$_ eq 'AMQPLAIN'} @mechanisms;

            my @locales = split /\s/, $frame->method_frame->locales;
            return $args{on_failure}->('en_US is not found in locales')
                if none {$_ eq 'en_US'} @locales;

            $self->{_server_properties} = $frame->method_frame->server_properties;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::StartOk->new(
                    client_properties => {
                        platform    => 'Perl',
                        product     => __PACKAGE__,
                        information => 'http://d.hatena.ne.jp/cooldaemon/',
                        version     => __PACKAGE__->VERSION,
                    },
                    mechanism => 'AMQPLAIN',
                    response => {
                        LOGIN    => $args{user},
                        PASSWORD => $args{pass},
                    },
                    locale => 'en_US',
                ),
            );

            $self->_tune(%args,);
        },
        $args{on_failure},
    );

    return $self;
}

sub _tune {
    my $self = shift;
    my %args = @_;

    $self->_push_read_and_valid(
        'Connection::Tune',
        sub {
            my $frame = shift;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::TuneOk->new(
                    channel_max => $frame->method_frame->channel_max,
                    frame_max   => $frame->method_frame->frame_max,
                    heartbeat   => $frame->method_frame->heartbeat,
                ),
            );

            $self->_open(%args,);
        },
        $args{on_failure},
    );

    return $self;
}

sub _open {
    my $self = shift;
    my %args = @_;

    $self->_push_write_and_read(
        'Connection::Open',
        {
            virtual_host => $args{vhost},
            insist       => 1,
        },
        'Connection::OpenOk',
        sub {
            $self->{_is_open}   = 1;
            $self->{_login_user} = $args{user};
            $args{on_success}->($self);
        },
        $args{on_failure},
    );

    return $self;
}

sub close {
    my $self = shift;
    my $weak_self = $self;
    weaken($weak_self);
    my %args = $self->_set_cbs(@_);

    if (!$self->{_is_open}) {
        $args{on_success}->(@_);
        return $self;
    }

    my $channels_to_close = 0;
    my $all_closed_cb = sub {
        return unless 0 == $channels_to_close;
        $weak_self->_close(
            sub {
                $weak_self->_disconnect();
                $args{on_success}->(@_);
            },
            sub {
                $weak_self->_disconnect();
                $args{on_failure}->(@_);
            }
        );
    };

    if (scalar(keys %{ $self->{_channels} })==0) {
        $args{on_success}->(@_);
    }

    for my $id (keys %{$self->{_channels}}) {
         my $channel = $self->{_channels}->{$id}
            or next; # Could have already gone away on global destruction..

        $channels_to_close++;

         $channel->close(
            on_success => sub {
                $channels_to_close--;
                $all_closed_cb->();
            },
            on_failure => sub {
                $channels_to_close--;
                $all_closed_cb->();
                $args{on_failure}->(@_);
            },
        );
    }

    return $self;
}

sub _close {
    my $self = shift;
    my ($cb, $failure_cb,) = @_;

    if (!$self->{_is_open}) {
        $cb->("Already closed");
        return $self;
    }

    if (my @ch = keys %{$self->{_channels}}) {
        $failure_cb->("Can't disconnect with channel(s) open: @ch");
        return $self;
    }

    $self->_push_write_and_read(
        'Connection::Close', {}, 'Connection::CloseOk',
        $cb, $failure_cb,
    );
    $self->{_is_open} = 0;

    return $self;
}

sub _disconnect {
    my $self = shift;
    $self->{_handle}->push_shutdown;
    return $self;
}

sub open_channel {
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    return $self if !$self->_check_open($args{on_failure});

    $args{on_close} ||= sub {};

    my $id = $args{id};
    if ($id && $self->{_channels}->{$id}) {
        $args{on_failure}->("Channel id $id is already in use");
        return $self;
    }

    if (!$id) {
        for my $candidate_id (1 .. (2**16 - 1)) {
            next if defined $self->{_channels}->{$candidate_id};
            $id = $candidate_id;
            last;
        }
        if (!$id) {
            $args{on_failure}->('Ran out of channel ids');
            return $self;
        }
    }

    my $channel = AnyEvent::RabbitMQ::Channel->new(
        id         => $id,
        connection => $self,
        on_close   => $args{on_close},
    );

    $self->{_channels}->{$id} = $channel;

    $channel->open(
        on_success => sub {
            $args{on_success}->($channel);
        },
        on_failure => sub {
            $self->delete_channel($id);
            $args{on_failure}->(@_);
        },
    );

    return $self;
}

sub _push_write_and_read {
    my $self = shift;
    my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;

    $method = 'Net::AMQP::Protocol::' . $method;
    $self->_push_write(
        Net::AMQP::Frame::Method->new(
            method_frame => $method->new(%$args)
        ),
        $id,
    );

    return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
}

sub _push_read_and_valid {
    my $self = shift;
    my ($exp, $cb, $failure_cb, $id,) = @_;
    $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];

    my $queue;
    if (!$id) {
        $queue = $self->{_queue};
    } elsif (defined $self->{_channels}->{$id}) {
        $queue = $self->{_channels}->{$id}->queue;
    } else {
        $failure_cb->('Unknown channel id: ' . $id);
    }

    return unless $queue; # Can go away in global destruction..
    $queue->get(sub {
        my $frame = shift;

        return $failure_cb->('Received data is not method frame')
            if !$frame->isa('Net::AMQP::Frame::Method');

        my $method_frame = $frame->method_frame;
        for my $exp_elem (@$exp) {
            return $cb->($frame)
                if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
        }

        $failure_cb->(
              'Method is not ' . join(',', @$exp) . "\n"
            . 'Method was ' . ref $method_frame
        );
    });
}

sub _push_write {
    my $self = shift;
    my ($output, $id,) = @_;

    if ($output->isa('Net::AMQP::Protocol::Base')) {
        $output = $output->frame_wrap;
    }
    $output->channel($id || 0);

    if ($self->{verbose}) {
        warn '[C] --> [S] ', Dumper($output);
    }

    $self->{_handle}->push_write($output->to_raw_frame())
        if $self->{_handle}; # Careful - could have gone (global destruction)
    return;
}

sub _set_cbs {
    my $self = shift;
    my %args = @_;

    $args{on_success} ||= sub {};
    $args{on_failure} ||= sub { return if in_global_destruction; die @_};

    return %args;
}

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->{_is_open};

    $failure_cb->('Connection has already been closed');
    return 0;
}

sub drain_writes {
    my ($self, $timeout) = shift;
    $self->{drain_condvar} = AnyEvent->condvar;
    if ($timeout) {
        $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
            $self->{drain_condvar}->croak("Timed out after $timeout");
        });
    }
    $self->{drain_condvar}->recv;
    delete $self->{drain_timer};
}

my $is_gd;

END { $is_gd++ };

sub DESTROY {
    my $self = shift;
    return if $is_gd;
    $self->close() if defined $self;
    return;
}

1;
__END__

=head1 NAME

AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.

=head1 SYNOPSIS

  use AnyEvent::RabbitMQ;

  my $cv = AnyEvent->condvar;

  my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      on_success => sub {
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              }
          );
      },
      on_failure => $cv,
      on_read_failure => sub {die @_},
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      }
      on_close   => sub {
          my $method_frame = shift->method_frame;
          die $method_frame->reply_code, $method_frame->reply_text;
      },
  );

  print $cv->recv, "\n";

=head1 DESCRIPTION

AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.

You can use AnyEvent::RabbitMQ to -

  * Declare and delete exchanges
  * Declare, delete, bind and unbind queues
  * Set QoS and confirm mode
  * Publish, consume, get, ack, recover and reject messages
  * Select, commit and rollback transactions

AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.

This client is the non-blocking version, for a blocking version with a similar API, see L<Net::RabbitFoot>.

=head1 AUTHOR

Masahito Ikuta E<lt>cooldaemon@gmail.comE<gt>

=head1 MAINTAINER

Currently maintained by C<< <bobtfish@bobtfish.net> >> due to the original
author being missing in action.

=head1 COPYRIGHT

Copyright (c) 2010, the above named author(s).

=head1 SEE ALSO

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut