The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Class::DBI::Replicated;

use warnings;
use strict;
use Class::Trigger;
use Sub::Install qw(install_sub);
use base qw(Class::Accessor::Class Class::Data::Inheritable);
use Params::Validate qw(:all);
use Devel::Peek qw(CvGV);
use Carp qw(croak);

=head1 NAME

Class::DBI::Replicated - Replication from single master to multiple slaves 

=head1 VERSION

Version 0.040

=cut

our $VERSION = '0.040';

=head1 SYNOPSIS

  package My::DBI;
  use base qw(Class::DBI::Replicated::mysql);
  # use base qw(Class::DBI::Replicated::Pg::Slony1);

  My::DBI->replication(\%arg);

=head1 DESCRIPTION

Class::DBI::Replicated does some stuff, blah blah.

=head1 METHODS

=head2 C<< replication >>

  $class->replication(\%arg);

Analogous to C<< connection >>.  Takes a single hashref.

=over 4

=item B<master>

a single arrayref (as passed to C<< connection >>)

=item B<slaves> 

an arrayref of arrayrefs, one per slave.  NOTE: currently,
using more than one slave does nothing.

=item B<user>

=item B<password>

If present, these specify the user and password to use for
replication-specific queries (such as MySQL's C<SHOW MASTER
STATUS>).

=back

=head2 C<< replication_db >>

Returns the name of the current database in use (minus the
leading C<< db_ >>).

=cut

# we also track the most recent slave db name
sub replication_db {
  my $class = shift;
  return $class->__replication_db unless @_;
  my ($db_name) = @_;
  if ($db_name =~ /^Slave_/) {
    $class->__slave_db($db_name);
  }
  $class->__replication_db($db_name);
}

__PACKAGE__->mk_classdata('__replication_std_triggers');
__PACKAGE__->mk_classdata('__force_master');

sub replication {
  my $class = shift;
  my $arg = { validate_with(
    params => [ %{ +shift } ],
    spec   => {
      master   => { type => ARRAYREF },
      slaves   => { type => ARRAYREF },
      user     => { type => SCALAR, optional => 1,
                    depends => ['password'],
                  },
      password => { type => SCALAR, optional => 1,
                    depends => ['user'],
                  },
      $class->replication_args,
    }
  ) };

  $class->mk_class_accessors(
    '__slave_names',
    '__slave_db',
    '__replication_db',
    '__repl_user',
    '__repl_pass',
    '__replication_setup',
    'repl_pos',
  );
  
  if ($arg->{user}) {
    $class->__repl_user($arg->{user});
    $class->__repl_pass($arg->{password});
  }

  $class->__add_std_triggers;
  $class->replication_setup($arg);

  my @slaves = @{$arg->{slaves}};

  if (!@slaves or @slaves % 2) {
    croak "list of slaves must be name => dsn pairs\n";
  }
  $class->Ima::DBI::set_db('Master' => @{$arg->{master}});
  if ($arg->{user}) {
    $class->Ima::DBI::set_db(
      'Master_Repl',
      $arg->{master}->[0],
      $arg->{user},
      $arg->{password},
    );
  }
  my @names;
  while (my ($name, $dsn) = splice @slaves, 0, 2) {
    push @names, $name;
    my $slave_arg = ref $dsn eq 'HASH' ? $dsn : { dsn => $dsn };
    $class->Ima::DBI::set_db("Slave_$name" => @{ $slave_arg->{dsn} });
    if ($arg->{user}) {
      $class->Ima::DBI::set_db(
        "Slave_$name\_Repl",
        $slave_arg->{dsn}->[0],
        $arg->{user},
        $arg->{password},
      );
    }
    unless ($class->replication_db) {
      $class->replication_db("Slave_$name");
    }
  }
  $class->__slave_names([ @names ]);
}

=head2 C<< db_Main >>

Return a master or slave DBH, as dictated by the current
replication state.

=cut

sub db_Main {
  my ($class) = @_;
  my $db_name = $class->__force_master ?
    'db_Master' : 'db_' . $class->replication_db;
  $class->call_trigger(
    'repl_db',
    $db_name,
  );
  return $class->$db_name;
}

=head2 C<< db_Slave >>

Always returns a DBH for the most recently-used slave.

=cut

sub db_Slave {
  my ($class) = @_;
  my $db_name = 'db_' . $class->__slave_db;
  return $class->$db_name;
}

=head2 C<< db_Master >>

Generated by Class::DBI.

=head2 C<< db_Slave_Repl >>

Most recently-used slave's connection for replication.

Falls back to db_Slave if no user/password given

=head2 C<< db_Master_Repl >>

Master's connection for replication.

Falls back to db_Master if no user/password given

=cut

sub db_Slave_Repl {
  my ($class) = @_;
  return $class->db_Slave unless $class->__repl_user;
  my $db_name = 'db_' . $class->__slave_db . '_Repl';
  return $class->$db_name;
}

# any auto-generated db_Master_Repl will override this

sub db_Master_Repl {
  my ($class) = @_;
  return $class->db_Master;
}

=head2 C<< switch_to_master >>

=cut

sub switch_to_master {
  my ($class) = @_;
  return if $class->replication_db eq 'Master';
  $class->replication_db('Master');
  $class->call_trigger('switch_to_master');
}

=head2 C<< switch_to_slave >>

=cut

sub __default_slave {
  my ($class) = @_;
  return $class->__slave_names->[0];
}

sub switch_to_slave {
  my ($class, $name) = @_;
  $name ||= $class->__default_slave;
  $class->replication_db("Slave_$name");
  $class->call_trigger('switch_to_slave', $name);
}

=head2 C<< wait_for_slave >>

=cut

sub wait_for_slave {
  my ($class, $name) = @_;
  my $ok = eval {
    $class->repl_wait({
      slave => $name
    });
  };
  my $err = $@;
  die $err if $err;
  return $ok;
}

=head1 REPLICATION METHODS

That is, methods dealing specifically with replication
positions.

=head2 C<< repl_mark >>

Get current master position and save it

=head2 C<< repl_pos >>

Class data accessor/mutator for current marked master position

=head2 C<< repl_get_master >>

virtual (scalar)

=head2 C<< repl_get_slave >>

virtual (scalar)

=head2 C<< repl_check >>

  if ($class->repl_check) { ...

=head2 C<< repl_wait >>

  unless ($class->repl_wait(\%arg)) {
    # not up to date
  }

Possible arguments:

=over 4

=item B<timeout>

defaults to 30

=item B<slave>

slave name, defaults to the first one

=item B<fatal>

die instead of returning 0

=back

return 0 for failure

=head2 C<< repl_compare >>

  my $later = $class->repl_compare($my_pos, $master_pos);

virtual (boolean)

return 1 if $my_pos is at least as new as $master_pos
return 0 otherwise

=cut

sub _mk_unimplemented {
  my ($class, $meth) = @_;
  no strict 'refs';
  *{$class . "::" . $meth} = sub {
    croak "$class does not implement $meth"
  };
}

sub repl_mark {
  my ($class) = @_;
  $class->call_trigger(
    'repl_mark',
  );
  my $pos = $class->repl_get_master;
  $class->repl_pos($pos);
}

sub repl_wait {
  my ($class, $arg) = @_;
  $arg->{timeout} = 30 unless defined $arg->{timeout};
  $arg->{slave}   ||= $class->__default_slave;
  my $tries = 0;
  my $done  = $class->repl_check($arg);
  while ($tries < $arg->{timeout} and not $done) {
    sleep 1;
    $done = $class->repl_check($arg);
  }
  if ($tries >= $arg->{timeout} and not $done) {
    die "$arg->{slave} is not up to date after $arg->{timeout} seconds"
      if $arg->{fatal};
    return 0;
  }
  unless ($class->replication_db eq "Slave_$arg->{slave}") {
    $class->switch_to_slave($arg->{slave});
  }
  return 1;
}  

sub repl_check {
  my ($class, $arg) = @_;
  $arg->{slave} ||= $class->__default_slave;
  return 1 unless defined $class->repl_pos;

  $class->call_trigger(
    'repl_check',
  );

  my $get = sub { $class->repl_get_slave };
  my $slave_pos  = do {
    if ($arg->{slave} eq $class->__slave_db) {
      $get->();
    } else {
      my $old = $class->__slave_db;
      $class->__slave_db("Slave_" . $arg->{slave});
      my $return = $get->();
      $class->__slave_db($old);
      $return;
    }
  };
  if ($slave_pos && $class->repl_compare($slave_pos, $class->repl_pos)) {
    $class->repl_pos(undef);
    return 1;
  }

  return 0;
}

for (qw(
        repl_get_master
        repl_get_slave
        repl_compare
      )) {
  __PACKAGE__->_mk_unimplemented($_);
}


=head1 TRIGGERS

=head2 C<< before_create >>

=head2 C<< before_update >>

=head2 C<< before_delete >>

switch to using master

=head2 C<< after_create >>

=head2 C<< after_update >>

=head2 C<< after_delete >>

mark master position

=head2 C<< select >>

=cut

sub _mark {
  shift->repl_mark;
}

sub _check {
  shift->repl_wait({
    timeout => 0,
  });
}

# this exists only because you can't take a reference to a
# method and still let inheritance have a chance, unlike the
# two subs above which actually add functionality
sub __master {
  shift->switch_to_master;
}

sub __add_std_triggers {
  my ($class) = @_;
  return if $class->__replication_std_triggers;
  $class->__real_add_std_triggers;
  $class->__replication_std_triggers(1);
}

sub __real_add_std_triggers {
  my ($class) = @_;
  $class->add_trigger(
    before_create => \&__master,
    before_update => \&__master,
    before_delete => \&__master,
    after_create  => \&_mark,
    after_update  => \&_mark,
    after_delete  => \&_mark,
    select        => \&_check,
  );
}

=head1 SUBCLASSING

=head2 C<< mk_force_masters >>

=cut

sub mk_force_masters {
  my $class = shift;
  for my $meth (@_) {
    # XXX this is very disrespectful.
    my $oldcode = $class->can($meth);
    install_sub({
      code => sub {
        my $class = shift;
        my $old = $class->__force_master;
        $class->__force_master(1);
        my $r = $class->$oldcode(@_);
        $class->__force_master($old);
        return $r;
      },
      into => $class,
      as   => $meth,
    });
  }
}

# XXX fix duplication here

=head2 C<< mk_markers >>

=cut

sub mk_markers {
  my $class = shift;
  for my $meth (@_) {
    # XXX this is very disrespectful.
    my $oldcode = $class->can($meth);
    install_sub({
      code => sub {
        my $class = shift;
        $class->switch_to_master;
        my $r = $class->$oldcode(@_);
        $class->repl_mark;
        $class->repl_wait({ timeout => 0 });
        return $r;
      },
      into => $class,
      as   => $meth,
    });
  }
}

=head2 C<< replication_args >>

Extra Params::Validate specifications for C<< replication >>.

=cut

sub replication_args { () }

=head2 C<< replication_setup >>

Called automatically inside C<< replication >>.

Gets a hashref of the arguments to C<< replication >>.

=cut

sub replication_setup { () }

=head1 AUTHOR

Hans Dieter Pearcey, C<< <hdp@cpan.org> >>

=head1 BUGS

Please report any bugs or feature requests to
C<bug-class-dbi-replicated@rt.cpan.org>, or through the web interface at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Class-DBI-Replicated>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.

=head1 ACKNOWLEDGEMENTS

=head1 COPYRIGHT & LICENSE

Copyright 2005 Hans Dieter Pearcey, all rights reserved.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut

1; # End of Class::DBI::Replicated