The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package POE::Component::Server::MySQL;
use Moose;
use MooseX::MethodAttributes;

our $VERSION = "0.02";

use POE;
use POE::Kernel;
use POE qw(Component::Server::TCP);
use POE::Filter::Block;


use Socket qw(INADDR_ANY inet_ntoa inet_aton AF_INET AF_UNIX PF_UNIX);
use Errno qw(ECONNABORTED ECONNRESET);

use Carp qw( croak );
use Class::Inspector;

use Module::Find;
use Data::Dumper;

use Time::HiRes qw(gettimeofday tv_interval);

use POE::Component::Server::MySQL::Client;

has 'address'  => (is => 'rw', isa => 'Str');
has 'port'     => (is => 'rw', isa => 'Int');

has 'local_dsn' => (is => 'rw', isa => 'Str');
has 'local_user' => (is => 'rw', isa => 'Str');
has 'local_password' => (is => 'rw', isa => 'Str');

has 'listener' => (is => 'rw', isa => 'Any');
has 'session' => (is => 'rw', isa => 'Any');

sub DEBUG {1}

sub BUILD {
	my ($self, $opt) = @_;
   
   POE::Session->create(
     object_states => [
         $self =>  { 
            _start         => '_server_start',
            _stop          => '_server_stop',
            _socket_birth  => '_socket_birth',
            _socket_death  => '_socket_death',    
         }
     ],  
      inline_states => {
         _do_fork        => \&_do_fork,
      },
   );

   return $self;
}

sub _server_start {
   my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP];
   
   $self->port(23306) unless $self->port;
   $self->address('127.0.0.1') unless $self->address;
   
   print "Ready to accept clients in namespace ".ref($self)." on ".$self->address.":".$self->port." \n";
   
   $heap->{children}       = {};
   $heap->{is_a_child}     = 0;
   $heap->{max_processes}  = 5;
   
   $self->listener(POE::Wheel::SocketFactory->new(
      BindAddress  => $self->address,
      BindPort     => $self->port,
      Reuse        => 'yes',
      SuccessEvent => '_socket_birth',
      FailureEvent => '_socket_death',
   ));

   $kernel->yield('_do_fork');
}

sub _server_stop {
   my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP];
   print "Stop server \n";
   $self->listener(undef);
   $self->session(undef);

}

sub _do_fork {
  my ($kernel, $heap) = @_[KERNEL, HEAP];

  return if $heap->{is_a_child};

  while (scalar(keys %{$heap->{children}}) < $heap->{max_processes}) {
    my $pid = fork();

    unless (defined($pid)) {
      $kernel->delay(do_fork => 1);
      return;
    }

    if ($pid) {
      print "Server $$ forked child $pid\n";
      $heap->{children}->{$pid} = 1;
      $kernel->sig_child($pid, "got_sig_child");
      next;
    }

    $kernel->has_forked();
    $heap->{is_a_child} = 1;
    $heap->{children}   = {};
    return;
  }
}


sub _socket_birth {
   my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP];
   my ($socket, $address, $port) = @_[ARG0, ARG1, ARG2];
   $address = inet_ntoa($address);

   my $client = POE::Component::Server::MySQL::Client->new({
      server_class   => ref($self),
   });
   
   print '$self->local_dsn = '.$self->local_dsn."\n";
   
   $client->local_dsn($self->local_dsn);
   $client->local_user($self->local_user);
   $client->local_password($self->local_password);
   
   POE::Session->create(
      object_states => [
         $client =>  { 
            client_input         => 'client_input',
            client_error         => 'client_error',
            shutdown             => 'shutdown',
            client_connect       => 'client_connect',
         }
      ],
      inline_states => {
         _start => sub {
            my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
   
            $heap->{client} = POE::Wheel::ReadWrite->new(
               Handle       => $socket,
               Driver       => POE::Driver::SysRW->new(),
               Filter       => POE::Filter::Block->new(
                  LengthCodec => [ \&_length_encoder, \&_length_decoder ]
               ),
               InputEvent   => 'client_input',
               ErrorEvent   => 'client_error',
            );
            
            $client->wheel($heap->{client});
            $client->session_id($session->ID);
            $client->session($session);

             # register system states
             my @methods = Class::Inspector->methods(
                 'POE::Component::Server::MySQL::Client',
                 'expanded',
                 'public'
             );

             foreach my $method (@{ $methods[0] }) {
                 my ($full, $class, $method, undef ) = @{ $method };
                 next if $method =~ /^send_/
                         or $method eq 'write'
                         or $method eq 'new_definition';
                 if ($class eq 'POE::Component::Server::MySQL::Client') {
                     $session->_register_state($method, $client);
                 }
             }       
         
            my @dispatchers;
         
            my @modules = findsubmod(ref($self));
            
            foreach my $module (@modules) {
               next unless $module;
               
               unless (Class::Inspector->loaded($module)) {
                  require Class::Inspector->filename($module);
               }
               
               my @methods = Class::Inspector->methods($module, 'expanded', 'public');
               
               foreach my $method (@{ $methods[0] }) {
                  my ($full, $class, $method, undef ) = @{ $method };
                  next unless $module->meta->get_method($method);
                   
                  my $attrs;
                  
                  eval {
                     $attrs = $module->meta->get_method($method)->attributes;
                  };
                  
                  next if $@;
                           
                  foreach my $attr (@{$attrs}) {
                     
                     if ($attr =~ /Regexp\(('|")(.*)('|")\)/io) { #"
                        my $eval_str = 'push @dispatchers, {
                           regexp   => '.$2.',
                           method   => $method,
                        };';                        
                        eval($eval_str);
                     }
                     elsif ($attr =~ /Match\(('|")(.*)('|")\)/io) { #"
                        push @dispatchers, {
                           match   => $2,
                           method   => $method,
                        };
                     }
                     elsif ($attr =~ /Default/io) {
                        $client->default({
                           default  => 1,
                           method   => $method,
                        });
                     }
                     
                     Moose::Util::apply_all_roles(ref($client), ($module));
                     $client->session->_register_state($method, $client);
                     
                  }
                  
               }
            
            }

            $client->dispatchers(\@dispatchers);   
             
            $kernel->yield('client_connect');
         },
         _child  => sub { },
      },
   );   

}

sub _socket_death {
   my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP];
   print "_socket_death \n";
}

sub run {
   POE::Kernel->run();
}

sub _length_encoder { 
   return; 

}

sub _length_decoder {
   my $stuff = shift;

   if (length($$stuff) > 1) {
      return length($$stuff);
   }
   else {
      return 1;
   }
}

=head1 NAME

POE::Component::Server::MySQL - A MySQL POE Server

=head1 DESCRIPTION

This modules helps building a MySQL proxy in which you can write
handler to deal with specific queries.

You can modifiy the query, write a specific response, relay a query
or do wahtever you want within each handler.

This is the evolution of POE::Component::DBIx::MyServer, it
uses Moose and POE.

=head1 SYNOPSYS

First you create a server class that extends POE::Component::Server::MySQL.

   package MyMySQL;
   use Moose;
   
   extends 'POE::Component::Server::MySQL';
   with 'MooseX::Getopt';

Then in a perl script you can instantiate your new server

   use MyMySQL;
   my $server = MyMySQL->new_with_options();
   $server->run;

In the MyMySQL namespace you can add roles which will act as handlers
for your trapped queries:

   package MyMySQL::OnSteroids;
   use MooseX::MethodAttributes::Role;
   
   sub fortune : Regexp('qr{fortune}io') {
      my ($self) = @_;
      
   	my $fortune = `fortune`;
   	chomp($fortune);
   
      $self->send_results(['fortune'],[[$fortune]]);
   
   }

=head1 AUTHORS

Eriam Schaffter, C<eriam@cpan.org> with original work 
done by Philip Stoev in the DBIx::MyServer module.

=head1 BUGS

At least one, in specific cases the servers sends several 
packets instead of a single one. It works fine with most clients
but it crashes Toad for MySQL for example.

=head1 COPYRIGHT

This program is free software, you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut



1;