The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package AnyEvent::RabbitMQ::Channel;

use strict;
use warnings;

use AnyEvent::RabbitMQ::LocalQueue;
use Scalar::Util qw(weaken);
use Carp qw(croak);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }

our $VERSION = '1.11';

sub new {
    my $class = shift;

    my $self = bless {
        @_,    # id, connection, on_return, on_close, on_inactive, on_active
        _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
        _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
    }, $class;
    return $self->_reset;

sub _reset {
    my $self = shift;

    my %a = (
        _is_open       => 0,
        _is_active     => 0,
        _is_confirm    => 0,
        _publish_tag   => 0,
        _publish_cbs   => {},  # values: [on_ack, on_nack, on_return]
        _consumer_cbs  => {},
        _consumer_cans => {},
    @$self{keys %a} = values %a;

    return $self;

sub is_open {
    my $self = shift;
    return $self->{_is_open};

sub is_active {
    my $self = shift;
    return $self->{_is_active};

sub is_confirm {
    my $self = shift;
    return $self->{_is_confirm};

sub queue {
    my $self = shift;
    return $self->{_queue};

sub open {
    my $self = shift;
    my %args = @_;

    if ($self->{_is_open}) {
        $args{on_failure}->('Channel has already been opened');
        return $self;

        'Channel::Open', {}, 'Channel::OpenOk',
        sub {
            $self->{_is_open}   = 1;
            $self->{_is_active} = 1;
        sub {

    return $self;

sub close {
    my $self = shift;
    my $connection = $self->{connection}
        or return;
    my %args = $connection->_set_cbs(@_);

    # Ensure to remove this channel from the connection even if we're not
    # fully open to ensure $rf->close works always.
    # FIXME - We can end up racing here so the server thinks the channel is
    # open, but we've closed it - a more elegant fix would be to mark that
    # the channel is opening, and wait for it to open before closing it
    if (!$self->{_is_open}) {
        return $self;

    # spell it out, so the callbacks always can call ->method_frame
    my $close_frame = Net::AMQP::Frame::Method->new(
        method_frame => Net::AMQP::Protocol::Channel::Close->new,


    weaken(my $wself = $self);

        sub {
            my $me = $wself or return;
            $me->_close($close_frame, 0);
        sub {
            my $me = $wself or return;
            $me->_close($close_frame, 0);

    return $self;

sub _close {
    my $self = shift;
    my ($frame, $forced) = @_;

    my $connection = $self->{connection};
    my $on_close = $self->{on_close};

    $self->{_is_open} = 0;
    if ($frame) {

    $connection->delete_channel($self->{id}) if $connection;

    if (defined $on_close) {
        local $@;
        warn "Error in channel on_close callback, ignored:\n  $@  " if $@;

    return $self;

sub declare_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            type        => 'direct',
            passive     => 0,
            durable     => 0,
            auto_delete => 0,
            internal    => 0,
            %args, # exchange
            ticket      => 0,
            nowait      => 0, # FIXME

    return $self;

sub delete_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            if_unused => 0,
            %args, # exchange
            ticket    => 0,
            nowait    => 0, # FIXME

    return $self;

sub declare_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            queue       => '',
            passive     => 0,
            durable     => 0,
            exclusive   => 0,
            auto_delete => 0,
            no_ack      => 1,
            ticket      => 0,
            nowait      => 0, # FIXME

sub bind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            %args, # queue, exchange, routing_key
            ticket => 0,
            nowait => 0, # FIXME

    return $self;

sub unbind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            %args, # queue, exchange, routing_key
            ticket => 0,

    return $self;

sub purge_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            %args, # queue
            ticket => 0,
            nowait => 0, # FIXME

    return $self;

sub delete_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            if_unused => 0,
            if_empty  => 0,
            %args, # queue
            ticket    => 0,
            nowait    => 0, # FIXME

    return $self;

sub publish {
    my $self = shift;
    my %args = @_;

    # Docs should advise channel-level callback over this, but still, better to give user an out
    unless ($self->{_is_active}) {
        if (defined $args{on_inactive}) {
            return $self;
        croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";

    my $header_args = delete $args{header};
    my $body        = delete $args{body};
    my $ack_cb      = delete $args{on_ack};
    my $nack_cb     = delete $args{on_nack};
    my $return_cb   = delete $args{on_return};

    defined($header_args) or $header_args = {};
    defined($body) or $body = '';
    defined($ack_cb) or defined($nack_cb) or defined($return_cb)
       and !$self->{_is_confirm}
       and croak "Can't set on_ack/on_nack/on_return callback when not in confirm mode";

    my $tag;
    if ($self->{_is_confirm}) {
        # yeah, delivery tags in acks are sequential.  see Java client
        $tag = ++$self->{_publish_tag};
        if ($return_cb) {
            $header_args = { %$header_args };
            $header_args->{headers}{_ar_return} = $tag;  # just reuse the same value, why not
        $self->{_publish_cbs}{$tag} = [$ack_cb, $nack_cb, $return_cb];

        $header_args, $body,

    return $self;

sub _publish {
    my $self = shift;
    my %args = @_;

            exchange  => '',
            mandatory => 0,
            immediate => 0,
            %args, # routing_key
            ticket    => 0,

    return $self;

sub _header {
    my ($self, $args, $body) = @_;

    my $weight = delete $args->{weight} || 0;

            weight       => $weight,
            body_size    => length($body),
            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
                content_type     => 'application/octet-stream',
                content_encoding => undef,
                headers          => {},
                delivery_mode    => 1,
                priority         => 1,
                correlation_id   => undef,
                expiration       => undef,
                message_id       => undef,
                timestamp        => time,
                type             => undef,
                user_id          => $self->{connection}->login_user,
                app_id           => undef,
                cluster_id       => undef,

    return $self;

sub _body {
    my ($self, $body,) = @_;

        Net::AMQP::Frame::Body->new(payload => $body),

    return $self;

sub consume {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    my $consumer_cb = delete $args{on_consume} || sub {};
            consumer_tag => '',
            no_local     => 0,
            no_ack       => 1,
            exclusive    => 0,
            %args, # queue
            ticket       => 0,
            nowait       => 0, # FIXME
        sub {
            my $frame = shift;
            } = $consumer_cb;

    return $self;

sub cancel {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    if (!defined $args{consumer_tag}) {
        $failure_cb->('consumer_tag is not set');
        return $self;

    if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) {
        $failure_cb->('Unknown consumer_tag');
        return $self;

    $self->{_consumer_cans}{$args{consumer_tag}} = $cb;

            %args, # consumer_tag
            nowait => 0,

    return $self;

sub get {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            no_ack => 1,
            %args, # queue
            ticket => 0,
        [qw(Basic::GetOk Basic::GetEmpty)], 
        sub {
            my $frame = shift;
            return $cb->({empty => $frame})
                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);

    return $self;

sub ack {
    my $self = shift;
    my %args = @_;

    return $self if !$self->_check_open(sub {});

            delivery_tag => 0,
            multiple     => (
                defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1

    return $self;

sub qos {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

            prefetch_count => 1,
            prefetch_size  => 0,
            global         => 0,

    return $self;

sub confirm {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);
    return $self if !$self->_check_version(0, 9, $failure_cb);

    weaken(my $wself = $self);

            nowait       => 0, # FIXME
        sub {
            my $me = $wself or return;
            $me->{_is_confirm} = 1;

    return $self;

sub recover {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open(sub {});

            requeue => 1,

     if (!$args{nowait} && $self->_check_version(0, 9)) {
    else {

    return $self;

sub reject {
    my $self = shift;
    my %args = @_;

    return $self if !$self->_check_open( sub { } );

            delivery_tag => 0,
            requeue      => 0,

    return $self;

sub select_tx {
    my $self = shift;
    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

        'Tx::Select', {}, 'Tx::SelectOk',

    return $self;

sub commit_tx {
    my $self = shift;
    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

        'Tx::Commit', {}, 'Tx::CommitOk',

    return $self;

sub rollback_tx {
    my $self = shift;
    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

        'Tx::Rollback', {}, 'Tx::RollbackOk',

    return $self;

sub push_queue_or_consume {
    my $self = shift;
    my ($frame, $failure_cb,) = @_;

    if ($frame->isa('Net::AMQP::Frame::Method')) {
        my $method_frame = $frame->method_frame;
        if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
            $self->_close($frame, 0);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
            my $cb = $self->{_consumer_cbs}->{
            } || sub {};
            $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
            my $can_cb = delete $self->{_consumer_cans}{$method_frame->consumer_tag};
            if ($can_cb) {
            else {
                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
            weaken(my $wself = $self);
            my $cb = sub {
                my $ret = shift;
                my $me = $wself or return;
                my $headers = $ret->{header}->headers || {};
                my $onret_cb;
                if (defined(my $tag = $headers->{_ar_return})) {
                    my $cbs = delete $me->{_publish_cbs}{$tag};
                    $onret_cb = $cbs->[2] if $cbs;
                $onret_cb ||= $me->{on_return} || $me->{connection}{on_return} || sub {};  # oh well
            $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
            (my $resp = ref($method_frame)) =~ s/.*:://;
            my $cbs;
            if (!$self->{_is_confirm}) {
                $failure_cb->("Received $resp when not in confirm mode");
            else {
                my @tags;
                if ($method_frame->{multiple}) {
                    @tags = sort { $a <=> $b }
                              grep { $_ <= $method_frame->{delivery_tag} }
                                keys %{$self->{_publish_cbs}};
                else {
                    @tags = ($method_frame->{delivery_tag});
                my $cbi = ($resp eq 'Ack') ? 0 : 1;
                for my $tag (@tags) {
                    my $cbs;
                    if (not $cbs = delete $self->{_publish_cbs}{$tag}) {
                        $failure_cb->("Received $resp of unknown delivery tag $tag");
                    elsif ($cbs->[$cbi]) {
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
            $self->{_is_active} = $method_frame->active;
                    active => $method_frame->active,
            my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
            my $cb = $self->{$cbname} || $self->{connection}{$cbname} || sub {};
            return $self;
    } else {

    return $self;

sub _push_read_header_and_body {
    my $self = shift;
    my ($type, $frame, $cb, $failure_cb,) = @_;
    my $response = {$type => $frame};
    my $body_size = 0;

        my $frame = shift;

        return $failure_cb->('Received data is not header frame')
            if !$frame->isa('Net::AMQP::Frame::Header');

        my $header_frame = $frame->header_frame;
        return $failure_cb->(
              'Header is not Protocol::Basic::ContentHeader'
            . 'Header was ' . ref $header_frame
        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');

        $response->{header} = $header_frame;
        $body_size = $frame->body_size;

    weaken(my $wcontq = $self->{_content_queue});
    my $body_payload = "";
    my $w_next_frame;
    my $next_frame = sub {
        my $frame = shift;

        my $contq = $wcontq or return;

        return $failure_cb->('Received data is not body frame')
            if !$frame->isa('Net::AMQP::Frame::Body');

        $body_payload .= $frame->payload;

        if (length($body_payload) < $body_size) {
            # More to come
        else {
            $response->{body} = $frame;
    $w_next_frame = $next_frame;


    return $self;

sub _delete_cbs {
    my $self = shift;
    my %args = @_;

    my $cb         = delete $args{on_success} || sub {};
    my $failure_cb = delete $args{on_failure} || sub {die @_};

    return $cb, $failure_cb, %args;

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->{_is_open};

    $failure_cb->('Channel has already been closed');
    return 0;

sub _check_version {
    my $self = shift;
    my ($major, $minor, $failure_cb) = @_;

    my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;

    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;

    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
    return 0;

    my $self = shift;
    $self->close() if $self->{_is_open};



=head1 NAME

AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.


    my $ch = $rf->open_channel();
    $ch->declare_exchange(exchange => 'test_exchange');


=head1 ARGUMENTS FOR C<open_channel>


=item on_close

Callback invoked when the channel closes.  Callback will be passed the
incoming message that caused the close, if any.

=item on_return

Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.


=head1 METHODS

=head2 declare_exchange (%args)

Declare an exchange (to publish messages to) on the server.



=item on_success

=item on_failure

=item type

Default 'direct'

=item passive

Default 0

=item durable

Default 0

=item auto_delete

Default 0

=item internal

Default 0

=item exchange

The name of the exchange


=head2 delete_exchange

=head2 declare_queue

=head2 bind_queue

Binds a queue to an exchange, with a routing key.



=item queue

The name of the queue to bind

=item exchange

The name of the exchange to bind

=item routing_key

The routing key to bind with


=head2 unbind_queue

=head2 purge_queue

Flushes the contents of a queue.

=head2 delete_queue

Deletes a queue. The queue may not have any active consumers.

=head2 publish

Publish a message to an exchange



=item body

The text body of the message to send.

=item header

Customer headers for the message (if any).

=item exchange

The name of the exchange to send the message to.

=item routing_key

The routing key with which to publish the message.

=item on_ack

Callback (if any) for confirming acknowledgment when in confirm mode.


=head2 consume

Subscribe to consume messages from a queue.



=item on_consume

Callback called with an argument of the message which has been consumed.

=item consumer_tag

Identifies this consumer, will be auto-generated if you do not provide it, but you must
supply a value if you want to be able to later cancel the subscription.

=item on_success

Callback called if the subscription was successful (before the first message is consumed).

=item on_failure

Callback called if the subscription fails for any reason.


=head2 cancel

Cancel a queue subscription.

Note that the cancellation B<will not> take place at once, and further messages may be
consumed before the subscription is cancelled. No further messages will be
consumed after the on_success callback has been called.



=item consumer_tag

Identifies this consumer, needs to be the value supplied when the queue is initially
consumed from.

=item on_success

Callback called if the subscription was successfully cancelled.

=item on_failure

Callback called if the subscription could not be cancelled for any reason.


=head2 get

Try to get a single message from a queue.



=item queue

Mandatory. Name of the queue to try to receive a message from.

=item on_success

Will be called either with either a message, or, if the queue is empty,
a notification that there was nothing to collect from the queue.

=item on_failure

This callback will be called if an error is signalled on this channel.


=head2 ack

=head2 qos

=head2 confirm

Put channel into confirm mode.  In confirm mode, publishes are confirmed by
the server, so the on_ack callback of publish works.

=head2 recover

=head2 select_tx

=head2 commit_tx

=head2 rollback_tx


See L<AnyEvent::RabbitMQ> for author(s), copyright and license.
