Naveed Massjouni > POE-Component-MessageQueue > POE::Component::MessageQueue::Manual::Clustering

Download:
POE-Component-MessageQueue-0.3001.tar.gz

Annotate this POD

View/Report Bugs
Source  

NAME ^

POE::Component::MessageQueue::Manual::Clustering - Instructions for setting up clustered MQ's with fail-over

DESCRIPTION ^

PoCo::MQ now supports running multiple MQs (possibly on different machines) which share the same DBI back-store. If you put these MQs behind a reverse-proxy/load-balancer and write your clients to reconnect when a connection is broken, you will have automatic fail-over if one of the MQs goes down!

EXAMPLE ^

If you are using mq.pl, this is one possible way to start your MQs.

  # Start the first MQ (ex. on machine mq1.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq1 \
        --data-dir /tmp/perl_mq

  # Start the second MQ (ex. on machine mq2.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq2 \
        --data-dir /tmp/perl_mq

There are a couple important things to note above:

REVERSE PROXY / LOAD BALANCER ^

balance

balance is a very simple reverse-proxy and load-balancer that has packages for many distributions.

http://www.inlab.de/balance.html

Executing it on the load balancing machine (per our EXAMPLE above):

   balance -f 61613 mq1.example.com:61613 mq2.example.com:61613

And that's it!

LVS (Linux Virtual Server)

LVS is a load-balancing solution built into the Linux Kernel. I recommend the HOWTO for learning more about LVS:

http://www.austintek.com/LVS/LVS-HOWTO/

You will have to do a number of things (described that the documentation above) to get your system setup to do LVS, but once its ready, run the following commands as root to configure the load-balancer (per our EXAMPLE above):

  ipvsadm -A -t external.example.com:61613 -s wlc
  ipvsadm -a -t external.example.com:61613 -r mq1.example.com:61613 -m
  ipvsadm -a -t external.example.com:61613 -r mq2.example.com:61613 -m

Here is a link to the LVS project homepage:

http://www.linuxvirtualserver.org/

CLIENTS ^

Writting clients that are robust to MQ failure and simply reconnect and retry, is a little more challenging than it seems at first.

The main problem, is detecting failure correctly. Different systems (with different OSs, Perl versions) and also over different network configurations (through firewalls, routers, etc) will act differently when the connection to the MQ is broken.

Here are some example snippets for scripts that correctly handle failure when all programs involved (MQs, load-balancer, clients) are running on the same machine. These were tested on Debian Lenny with Linux 2.6.26 and Perl 5.10.

Your mileage may vary when running on your systems on your network. Please test appropriately!

CONSUMER

  use Net::Stomp;
  use strict;

  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';

  sub main
  {
    my $stomp;
 
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
          $stomp->subscribe({
            destination => $QUEUE_NAME,
            ack         => 'client',
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();

    my $receive = sub {
      my $frame;
      while (1) {
        eval {
          $frame = $stomp->receive_frame;
        };
        if (!$@) {
          # If no Perl exception was thrown, return our $frame!
          return $frame;
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      }
    };
 
    while (1) {
      my $frame = $receive->();

      # TODO: Do something useful with this frame!!
      print "received:". $frame->body ."\n";

      $stomp->ack({ frame => $frame });
    }
 
    $stomp->disconnect();
  }
  main;

PRODUCER

  use Net::Stomp;
  use Data::Random qw/rand_chars/;
  use strict;
 
  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';
 
  # If we try to send a message on a connection after the MQ on the other side has died,
  # we received SIGPIPE.  It would be awesome if instead of receiving a signal, we got
  # an exception from $stomp->send(), but this is unfortunately how it is...
  my $sigpipe = 0;
  $SIG{PIPE} = sub {
    print STDERR "Caught signal SIGPIPE\n";
    $sigpipe = 1;
  };
  
  sub main
  {
    my $stomp;
  
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();
 
    my $send = sub {
      my ($message) = @_;

      # This requests a reciept from the server.  This is very important!  Without this, the
      # server won't let us know if the message was received or not.
      $message->{receipt} = join('', rand_chars(set => 'alpha', size => 10)),

      while (1) {
        $sigpipe = 0;
        eval {
          $stomp->send({ %$message });
        };
        if (!$@ && !$sigpipe) {
          # No Perl exceptions and no SIGPIPE, but we can't advance until we get a receipt!
          # Wait 5 seconds for one, if it doesn't come, then we need to reconnect and try again.

          my $receipt;
          eval {
            $receipt = $stomp->receive_frame(5);
          };
          if (!$@ && $receipt && $receipt->headers->{'receipt-id'} eq $message->{'receipt'}) {
            # No Perl exception, we got a receipt and it matchs the message sent!
            # "return" triumphantly!
            return;
          }
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      };
    };
 
    # Send messages forever...
    while (1) {
      # TODO: construct a real message!
      my $data = "test";

      $send->({
        destination => $QUEUE_NAME,
        body        => $data,
        persistent  => 'true',
      });
    }
 
    $stomp->disconnect();
  }
  main;

AUTHORS ^

Copyright 2010 David Snopek (http://www.hackyourlife.org)

LICENSE ^

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.

syntax highlighting: