The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Plack::Handler::Stomp;
$Plack::Handler::Stomp::VERSION = '1.13';
{
  $Plack::Handler::Stomp::DIST = 'Plack-Handler-Stomp';
}
use Moose;
use MooseX::Types::Moose qw(Bool);
use Plack::Handler::Stomp::Types qw(Logger PathMap);
use Net::Stomp::MooseHelpers::Types qw(NetStompish);
use Plack::Handler::Stomp::PathInfoMunger 'munge_path_info';
use Plack::Handler::Stomp::Exceptions;
use Net::Stomp::MooseHelpers::Exceptions;
use namespace::autoclean;
use Try::Tiny;
use Plack::Util;

with 'Net::Stomp::MooseHelpers::CanConnect' => { -version => '2.6' };
with 'Net::Stomp::MooseHelpers::CanSubscribe' => { -version => '2.6' };
with 'Net::Stomp::MooseHelpers::ReconnectOnFailure';

# ABSTRACT: adapt STOMP to (almost) HTTP, via Plack


has logger => (
    is => 'rw',
    isa => Logger,
    lazy_build => 1,
);
sub _build_logger {
    require Net::Stomp::StupidLogger;
    Net::Stomp::StupidLogger->new();
}

sub _build_connection {
    my ($self) = @_;

    return $self->connection_builder->({
        %{$self->extra_connection_builder_args},
        logger => $self->logger,
        hosts => $self->servers,
    });
}


has destination_path_map => (
    is => 'ro',
    isa => PathMap,
    default => sub { { } },
);


has one_shot => (
    is => 'rw',
    isa => Bool,
    default => 0,
);


sub run {
    my ($self, $app) = @_;

    my $exception;
    $self->reconnect_on_failure(
        sub {
        try {
            $self->subscribe();

            $self->frame_loop($app);
        } catch {
            $exception = $_;
        };
        if ($exception) {
            if (!blessed $exception) {
                $exception = "unhandled exception $exception";
                return;
            }
            if ($exception->isa('Plack::Handler::Stomp::Exceptions::AppError')) {
                return;
            }
            if ($exception->isa('Plack::Handler::Stomp::Exceptions::UnknownFrame')) {
                return;
            }
            if ($exception->isa('Plack::Handler::Stomp::Exceptions::OneShot')) {
                $exception=undef;
                return;
            }
            die $exception;
        }
    });
    die $exception if defined $exception;
    return;
}


sub frame_loop {
    my ($self,$app) = @_;

    while (1) {
        my $frame = $self->connection->receive_frame();
        if(!$frame || !ref($frame)) {
            Net::Stomp::MooseHelpers::Exceptions::Stomp->throw({
                stomp_error => 'empty frame received',
            });
        }
        $self->handle_stomp_frame($app, $frame);

        Plack::Handler::Stomp::Exceptions::OneShot->throw()
              if $self->one_shot;
    }
}


sub handle_stomp_frame {
    my ($self, $app, $frame) = @_;

    my $command = $frame->command();
    my $method = $self->can("handle_stomp_\L$command");
    if ($method) {
        $self->$method($app, $frame);
    }
    else {
        Plack::Handler::Stomp::Exceptions::UnknownFrame->throw(
            {frame=>$frame}
        );
    }
}


sub handle_stomp_error {
    my ($self, $app, $frame) = @_;

    my $error = $frame->headers->{message};
    $self->logger->warn($error);
}


sub handle_stomp_message {
    my ($self, $app, $frame) = @_;

    my $env = $self->build_psgi_env($frame);
    try {
        $self->process_the_message($app,$env);

        $self->connection->ack({ frame => $frame });
    } catch {
        Plack::Handler::Stomp::Exceptions::AppError->throw({
            app_error => $_
        });
    };
}


sub process_the_message {
    my ($self,$app,$env) = @_;

    my $res = $app->($env);

    my $flattened_response=[];
    my $cb = sub { $flattened_response->[2].=$_[0] };

    my $response_handler = sub {
        my ($response) = @_;

        $flattened_response->[0]=$response->[0];
        $flattened_response->[1]=$response->[1];

        my $body=$response->[2];
        if (defined $body) {
            Plack::Util::foreach($body, $cb);
        }
        else {
            return Plack::Util::inline_object(
                write => $cb,
                close => sub { },
            );
        }
    };

    if (ref $res eq 'ARRAY') {
        $response_handler->($res);
    }
    elsif (ref $res eq 'CODE') {
        $res->($response_handler);
    }
    else {
        Plack::Handler::Stomp::Exceptions::AppError->throw({
            app_error => "Bad response $res"
        });
    }

    $self->maybe_send_reply($flattened_response);

    return;
}


sub handle_stomp_receipt {
    my ($self, $app, $frame) = @_;

    $self->logger->debug('ignored RECEIPT frame for '
                             .$frame->headers->{'receipt-id'});
}


sub maybe_send_reply {
    my ($self, $response) = @_;

    my $reply_to = $self->where_should_send_reply($response);
    if ($reply_to) {
        $self->send_reply($response,$reply_to);
    }

    return;
}


sub where_should_send_reply {
    my ($self, $response) = @_;

    return Plack::Util::header_get($response->[1],
                                   'X-Reply-Address')
        || Plack::Util::header_get($response->[1],
                                   'X-STOMP-Reply-Address')
}


sub send_reply {
    my ($self, $response, $reply_address) = @_;

    my $reply_queue = '/remote-temp-queue/' . $reply_address;

    my $content = '';
    unless (Plack::Util::status_with_no_entity_body($response->[0])) {
        # pre-flattened, see L</process_the_message>
        $content = $response->[2];
    }

    my %reply_hh = ();
    while (my ($k,$v) = splice @{$response->[1]},0,2) {
        $k=lc($k);
        next if $k eq 'x-stomp-reply-address';
        next if $k eq 'x-reply-address';
        next unless $k =~ s{^x-stomp-}{};

        $reply_hh{lc($k)} = $v;
    }

    $self->connection->send({
        %reply_hh,
        destination => $reply_queue,
        body => $content
    });

    return;
}


after subscribe_single => sub {
    my ($self,$sub,$headers) = @_;

    my $destination = $headers->{destination};
    my $sub_id = $headers->{id};

    $self->destination_path_map->{$destination} =
        $self->destination_path_map->{"/subscription/$sub_id"} =
            $sub->{path_info} || $destination;

    return;
};


sub build_psgi_env {
    my ($self, $frame) = @_;

    my $destination = $frame->headers->{destination};
    my $sub_id = $frame->headers->{subscription};

    my $path_info;
    if (defined $sub_id) {
        $path_info = $self->destination_path_map->{"/subscription/$sub_id"}
    };
    $path_info ||= $self->destination_path_map->{$destination};
    if ($path_info) {
        $path_info = munge_path_info(
            $path_info,
            $self->current_server,
            $frame,
        );
    }
    $path_info ||= $destination; # should not really be needed

    use bytes;

    my $env = {
        # server
        SERVER_NAME => 'localhost',
        SERVER_PORT => 0,
        SERVER_PROTOCOL => 'STOMP',

        # client
        REQUEST_METHOD => 'POST',
        REQUEST_URI => $path_info,
        SCRIPT_NAME => '',
        PATH_INFO => $path_info,
        QUERY_STRING => '',

        # broker
        REMOTE_ADDR => $self->current_server->{hostname},

        # http
        HTTP_USER_AGENT => 'Net::Stomp',
        CONTENT_LENGTH => length($frame->body),
        CONTENT_TYPE => ( $frame->headers->{'content-type'} || 'application/octet-stream' ),

        # psgi
        'psgi.version' => [1,0],
        'psgi.url_scheme' => 'http',
        'psgi.multithread' => 0,
        'psgi.multiprocess' => 0,
        'psgi.run_once' => 0,
        'psgi.nonblocking' => 0,
        'psgi.streaming' => 1,
        'psgi.input' => do {
            open my $input, '<', \($frame->body);
            $input;
        },
        'psgi.errors' => Plack::Util::inline_object(
            print => sub { $self->logger->error(@_) },
        ),
    };

    if ($frame->headers) {
        for my $header (keys %{$frame->headers}) {
            $env->{"jms.$header"} = $frame->headers->{$header};
        }
    }

    return $env;
}

__PACKAGE__->meta->make_immutable;


1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Plack::Handler::Stomp - adapt STOMP to (almost) HTTP, via Plack

=head1 VERSION

version 1.13

=head1 SYNOPSIS

  my $runner = Plack::Handler::Stomp->new({
    servers => [ { hostname => 'localhost', port => 61613 } ],
    subscriptions => [
      { destination => '/queue/plack-handler-stomp-test' },
      { destination => '/topic/plack-handler-stomp-test',
        headers => {
            selector => q{custom_header = '1' or JMSType = 'test_foo'},
        },
        path_info => '/topic/ch1', },
      { destination => '/topic/plack-handler-stomp-test',
        headers => {
            selector => q{custom_header = '2' or JMSType = 'test_bar'},
        },
        path_info => '/topic/ch2', },
    ],
  });
  $runner->run(MyApp->get_app());

=head1 DESCRIPTION

Sometimes you want to use your very nice web-application-framework
dispatcher, module loading mechanisms, etc, but you're not really
writing a web application, you're writing a ActiveMQ consumer. In
those cases, this module is for you.

This module is inspired by L<Catalyst::Engine::Stomp>, but aims to be
usable by any PSGI application.

=head2 Roles Consumed

We consume L<Net::Stomp::MooseHelpers::CanConnect> and
L<Net::Stomp::MooseHelpers::CanSubscribe>. Read those modules'
documentation to see how to configure servers and subscriptions.

=head1 ATTRIBUTES

=head2 C<logger>

A logger object used by thes handler. Not to be confused by the logger
used by the application (either internally, or via a Middleware). Can
be any object that can C<debug>, C<info>, C<warn>, C<error>. Defaults
to an instance of L<Net::Stomp::StupidLogger>. This logger is passed
on to the L<Net::Stomp> object held in C<connection> (see
L<Net::Stomp::MooseHelpers::CanConnect>).

=head2 C<destination_path_map>

A hashref mapping destinations (queues, topics, subscription ids) to
URI paths to send to the application. You should not modify this.

=head2 C<one_shot>

If true, exit after the first message is consumed. Useful for testing,
defaults to false.

=head1 METHODS

=head2 C<run>

Given a PSGI application, loops forever:

=over 4

=item *

connect to a STOMP server (see
L<connect|Net::Stomp::MooseHelpers::CanConnect/connect> and
L<servers|Net::Stomp::MooseHelpers::CanConnect/servers> in
L<Net::Stomp::MooseHelpers::CanConnect>)

=item *

subscribe to whatever needed (see
L<subscribe|Net::Stomp::MooseHelpers::CanSubscribe/subscribe> and
L<subscriptions|Net::Stomp::MooseHelpers::CanSubscribe/subscriptions>
in L<Net::Stomp::MooseHelpers::CanSubscribe>)

=item *

consume STOMP frames in an inner loop (see L</frame_loop>)

=back

If the application throws an exception, the loop exits re-throwing the
exception. If the STOMP connection has problems, the loop is repeated
with a different server (see
L<next_server|Net::Stomp::MooseHelpers::CanConnect/next_server> in
L<Net::Stomp::MooseHelpers::CanConnect>).

If L</one_shot> is set, this function exits after having consumed
exactly 1 frame.

=head2 C<frame_loop>

Loop forever receiving frames from the STOMP connection. Call
L</handle_stomp_frame> for each frame.

If L</one_shot> is set, this function exits after having consumed
exactly 1 frame.

=head2 C<handle_stomp_frame>

Delegates the handling to L</handle_stomp_message>,
L</handle_stomp_error>, L</handle_stomp_receipt>, or throws
L<Plack::Handler::Stomp::Exceptions::UnknownFrame> if the frame is of
some other kind. If you want to handle different kind of frames (maybe
because you have some non-standard STOMP server), you can just
subclass and add methods; for example, to handle C<STRANGE> frames,
add a C<handle_stomp_strange> method.

=head2 C<handle_stomp_error>

Logs the error via the L</logger>, level C<warn>.

=head2 C<handle_stomp_message>

Calls L</build_psgi_env> to convert the STOMP message into a PSGI
environment.

The environment is then passed to L</process_the_message>, and the
frame is acknowledged.

=head2 C<process_the_message>

Runs a PSGI environment through the application, then flattens the
response body into a simple string.

The response so flattened is sent back via L</maybe_send_reply>.

=head2 C<handle_stomp_receipt>

Logs (level C<debug>) the receipt id. Nothing else is done with
receipts.

=head2 C<maybe_send_reply>

Calls L</where_should_send_reply> to determine if to send a reply, and
where. If it returns a true value, L</send_reply> is called to
actually send the reply.

=head2 C<where_should_send_reply>

Returns the header C<X-Reply-Address> or C<X-STOMP-Reply-Address> from
the response.

=head2 C<send_reply>

Converts the PSGI response into a STOMP frame, by removing the prefix
C<x-stomp-> from the key of header fields that have it, removing
entirely header fields that don't, and stringifying the body.

Then sends the frame.

=head2 C<subscribe_single>

C<after> modifier on the method provided by
L<Net::Stomp::MooseHelpers::CanSubscribe>.

It sets the L</destination_path_map> to map the destination and the
subscription id to the C<path_info> slot of the L</subscriptions>
element, or to the destination itself if C<path_info> is not defined.

=head2 C<build_psgi_env>

Builds a PSGI environment from the message, like:

  # server
  SERVER_NAME => 'localhost',
  SERVER_PORT => 0,
  SERVER_PROTOCOL => 'STOMP',

  # client
  REQUEST_METHOD => 'POST',
  REQUEST_URI => $path_info,
  SCRIPT_NAME => '',
  PATH_INFO => $path_info,
  QUERY_STRING => '',

  # broker
  REMOTE_ADDR => $server_hostname,

  # http
  HTTP_USER_AGENT => 'Net::Stomp',
  CONTENT_LENGTH => length($body),
  CONTENT_TYPE => $content-type,

  # psgi
  'psgi.version' => [1,0],
  'psgi.url_scheme' => 'http',
  'psgi.multithread' => 0,
  'psgi.multiprocess' => 0,
  'psgi.run_once' => 0,
  'psgi.nonblocking' => 0,
  'psgi.streaming' => 1,

In addition, reading from C<psgi.input> will return the message body,
and writing to C<psgi.errors> will log via the L</logger> at level
C<error>.

Finally, every header in the STOMP message will be available in the
"namespace" C<jms.>, so for example the message type is in
C<jms.type>.

The C<$path_info> is obtained from the L</destination_path_map>
(i.e. from the C<path_info> subscription options) passed through
L<munge_path_info|Plack::Handler::Stomp::PathInfoMunger/munge_path_info>.

=head1 EXAMPLES

You can find examples of use in the tests, or at
https://github.com/dakkar/CatalystX-StompSampleApps

=head1 AUTHOR

Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2012 by Net-a-porter.com.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut