The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use lib '../lib';

use DBI;
use Test::More qw{ no_plan };
use Getopt::Long;
use IO::Socket::INET;
use MySQL::Replication::Command;
use MySQL::Replication::Client;

my ( %Options, $Dbh );

InitEnvironment();
Test( ErrorConditions() );
Test( Successful() );

sub InitEnvironment {
  %Options = (
    DBUser => 'test',
    DBPass => 'testpass',
		Schema => 'Replication',
  );

  GetOptions(
    'dbuser=s' => \$Options{DBUser},
    'dbpass=s' => \$Options{DBPass},
		'schema=s' => \$Options{Schema},
  );

  $Dbh = DBI->connect( "dbi:mysql:$Options{Schema}", $Options{DBUser}, $Options{DBPass} )
    or die "Error connecting to database ($DBI::errstr)";

  $Dbh->do( sprintf( q{
			DELETE FROM %s.SourcePosition
			WHERE  Host   = 'localhost'
			AND    Binlog = 'test'
		}, $Options{Schema} ) ) or die "Error deleting source position for localhost:test ($DBI::errstr)";

  $Dbh->do( sprintf( q{
    INSERT INTO %s.SourcePosition ( Host, Binlog, Log, Pos )
    VALUES ( 'localhost', 'test', 33, 4 )
  }, $Options{Schema} ) ) or die "Error initialising source position ($DBI::errstr)";
}

sub ErrorConditions {
  my @Tests = (
    {
      Error => 'Error reading source position \(mock error\)',
      Pre   => sub {
        no warnings;
        $DBI::db::OldSelectrow_array  = *DBI::db::selectrow_array{CODE};
        *DBI::db::selectrow_array = sub {
          $_[0]->DBI::set_err( $DBI::stderr, 'mock error' );
          return;
        };
      },
      Post  => sub {
        no warnings;
        *DBI::db::selectrow_array = $DBI::db::OldSelectrow_array;
      },
    },
    {
      Error => 'No source position for source \(localhost:test\)',
      Pre   => sub {
        $Dbh->do( qq{
          DELETE FROM $Options{Schema}.SourcePosition
          WHERE  Host   = 'localhost'
          AND    Binlog = 'test'
        });
      },
      Post  => sub {
        $Dbh->do( qq{
          INSERT INTO $Options{Schema}.SourcePosition ( Host, Binlog, Log, Pos )
          VALUES ( 'localhost', 'test', 33, 4 )
        });
      },
    },
    {
      Error => 'Error creating request \(mock error\)',
      Pre   => sub {
        no warnings;
        $MySQL::Replication::Command::Oldnew = *MySQL::Replication::Command::new{CODE};
        *MySQL::Replication::Command::new    = sub {
          $MySQL::Replication::Command::Errstr = 'mock error';
          return;
        };
      },
      Post  => sub {
        no warnings;
        *MySQL::Replication::Command::new = $MySQL::Replication::Command::Oldnew;
      },
    },
    {
      Error => 'Error sending request \(mock error\)',
      Pre   => sub {
        no warnings;
        $MySQL::Replication::Command::OldSendToSocket = *MySQL::Replication::Command::SendToSocket{CODE};
        *MySQL::Replication::Command::SendToSocket= sub {
          $MySQL::Replication::Command::Errstr = 'mock error';
          return;
        };
      },
      Post  => sub {
        no warnings;
        *MySQL::Replication::Command::SendToSocket = $MySQL::Replication::Command::OldSendToSocket;
      },
    },
  );

  return @Tests;
}

sub Successful {
  return {
    Name   => 'ok',
  }
}

sub Test {
  my ( @Tests ) = @_;

  foreach my $Test ( @Tests ) {
    pipe my $Parent, my $Child;
    socketpair my $RelayServer, my $RelayClient, AF_UNIX, SOCK_STREAM, PF_UNSPEC;

    my $Pid = fork();

    if ( not $Pid ) {
      if ( $Test->{Error} ) {
        syswrite $Child, 'Done';
        sleep;
      }

      my $RelayRequest = MySQL::Replication::Command->NewFromSocket(
        Socket => $RelayClient,
        Buffer => \my $Buffer,
      );

      is_deeply(
        $RelayRequest,
        MySQL::Replication::Command->new(
          Command => 'GET',
          Headers => {
            Host     => 'localhost',
            Binlog   => 'test',
            StartLog => 33,
            StartPos => 4,
          },
        ),
        "$Test->{Name} (request)",
      );

      syswrite $Child, 'Done';
      sleep;
    }

    my $Consumer = MySQL::Replication::Client->new(
      Dbh         => $Dbh,
      DBName      => $Options{Schema},
      RelaySocket => $RelayServer,
      Host        => 'localhost',
      Binlog      => 'test',
    );

    if ( $Test->{Pre} ) {
      $Test->{Pre}->();
    }

    my $Result = eval { $Consumer->RequestQueries() };

    if ( $Test->{Post} ) {
      $Test->{Post}->();
    }

    if ( $Test->{Error} ) {
      is( $Result, undef, "$Test->{Error} (result)" );
      like( $@, qr/$Test->{Error}/, "$Test->{Error} (error message)" );
    }
    else {
      ok( $Result, "$Test->{Name} (result)" );
      is( $@, '', "$Test->{Name} (error message)" );
    }

    sysread $Parent, my $Buffer, 100;
    kill 9, $Pid;
  }
}