package Class::DBI::Replicated;
use warnings;
use strict;
use Class::Trigger;
use Sub::Install qw(install_sub);
use base qw(Class::Accessor::Class Class::Data::Inheritable);
use Params::Validate qw(:all);
use Devel::Peek qw(CvGV);
use Carp qw(croak);
=head1 NAME
Class::DBI::Replicated - Replication from single master to multiple slaves
=head1 VERSION
Version 0.040
=cut
our $VERSION = '0.040';
=head1 SYNOPSIS
package My::DBI;
use base qw(Class::DBI::Replicated::mysql);
# use base qw(Class::DBI::Replicated::Pg::Slony1);
My::DBI->replication(\%arg);
=head1 DESCRIPTION
Class::DBI::Replicated does some stuff, blah blah.
=head1 METHODS
=head2 C<< replication >>
$class->replication(\%arg);
Analogous to C<< connection >>. Takes a single hashref.
=over 4
=item B<master>
a single arrayref (as passed to C<< connection >>)
=item B<slaves>
an arrayref of arrayrefs, one per slave. NOTE: currently,
using more than one slave does nothing.
=item B<user>
=item B<password>
If present, these specify the user and password to use for
replication-specific queries (such as MySQL's C<SHOW MASTER
STATUS>).
=back
=head2 C<< replication_db >>
Returns the name of the current database in use (minus the
leading C<< db_ >>).
=cut
# we also track the most recent slave db name
sub replication_db {
my $class = shift;
return $class->__replication_db unless @_;
my ($db_name) = @_;
if ($db_name =~ /^Slave_/) {
$class->__slave_db($db_name);
}
$class->__replication_db($db_name);
}
__PACKAGE__->mk_classdata('__replication_std_triggers');
__PACKAGE__->mk_classdata('__force_master');
sub replication {
my $class = shift;
my $arg = { validate_with(
params => [ %{ +shift } ],
spec => {
master => { type => ARRAYREF },
slaves => { type => ARRAYREF },
user => { type => SCALAR, optional => 1,
depends => ['password'],
},
password => { type => SCALAR, optional => 1,
depends => ['user'],
},
$class->replication_args,
}
) };
$class->mk_class_accessors(
'__slave_names',
'__slave_db',
'__replication_db',
'__repl_user',
'__repl_pass',
'__replication_setup',
'repl_pos',
);
if ($arg->{user}) {
$class->__repl_user($arg->{user});
$class->__repl_pass($arg->{password});
}
$class->__add_std_triggers;
$class->replication_setup($arg);
my @slaves = @{$arg->{slaves}};
if (!@slaves or @slaves % 2) {
croak "list of slaves must be name => dsn pairs\n";
}
$class->Ima::DBI::set_db('Master' => @{$arg->{master}});
if ($arg->{user}) {
$class->Ima::DBI::set_db(
'Master_Repl',
$arg->{master}->[0],
$arg->{user},
$arg->{password},
);
}
my @names;
while (my ($name, $dsn) = splice @slaves, 0, 2) {
push @names, $name;
my $slave_arg = ref $dsn eq 'HASH' ? $dsn : { dsn => $dsn };
$class->Ima::DBI::set_db("Slave_$name" => @{ $slave_arg->{dsn} });
if ($arg->{user}) {
$class->Ima::DBI::set_db(
"Slave_$name\_Repl",
$slave_arg->{dsn}->[0],
$arg->{user},
$arg->{password},
);
}
unless ($class->replication_db) {
$class->replication_db("Slave_$name");
}
}
$class->__slave_names([ @names ]);
}
=head2 C<< db_Main >>
Return a master or slave DBH, as dictated by the current
replication state.
=cut
sub db_Main {
my ($class) = @_;
my $db_name = $class->__force_master ?
'db_Master' : 'db_' . $class->replication_db;
$class->call_trigger(
'repl_db',
$db_name,
);
return $class->$db_name;
}
=head2 C<< db_Slave >>
Always returns a DBH for the most recently-used slave.
=cut
sub db_Slave {
my ($class) = @_;
my $db_name = 'db_' . $class->__slave_db;
return $class->$db_name;
}
=head2 C<< db_Master >>
Generated by Class::DBI.
=head2 C<< db_Slave_Repl >>
Most recently-used slave's connection for replication.
Falls back to db_Slave if no user/password given
=head2 C<< db_Master_Repl >>
Master's connection for replication.
Falls back to db_Master if no user/password given
=cut
sub db_Slave_Repl {
my ($class) = @_;
return $class->db_Slave unless $class->__repl_user;
my $db_name = 'db_' . $class->__slave_db . '_Repl';
return $class->$db_name;
}
# any auto-generated db_Master_Repl will override this
sub db_Master_Repl {
my ($class) = @_;
return $class->db_Master;
}
=head2 C<< switch_to_master >>
=cut
sub switch_to_master {
my ($class) = @_;
return if $class->replication_db eq 'Master';
$class->replication_db('Master');
$class->call_trigger('switch_to_master');
}
=head2 C<< switch_to_slave >>
=cut
sub __default_slave {
my ($class) = @_;
return $class->__slave_names->[0];
}
sub switch_to_slave {
my ($class, $name) = @_;
$name ||= $class->__default_slave;
$class->replication_db("Slave_$name");
$class->call_trigger('switch_to_slave', $name);
}
=head2 C<< wait_for_slave >>
=cut
sub wait_for_slave {
my ($class, $name) = @_;
my $ok = eval {
$class->repl_wait({
slave => $name
});
};
my $err = $@;
die $err if $err;
return $ok;
}
=head1 REPLICATION METHODS
That is, methods dealing specifically with replication
positions.
=head2 C<< repl_mark >>
Get current master position and save it
=head2 C<< repl_pos >>
Class data accessor/mutator for current marked master position
=head2 C<< repl_get_master >>
virtual (scalar)
=head2 C<< repl_get_slave >>
virtual (scalar)
=head2 C<< repl_check >>
if ($class->repl_check) { ...
=head2 C<< repl_wait >>
unless ($class->repl_wait(\%arg)) {
# not up to date
}
Possible arguments:
=over 4
=item B<timeout>
defaults to 30
=item B<slave>
slave name, defaults to the first one
=item B<fatal>
die instead of returning 0
=back
return 0 for failure
=head2 C<< repl_compare >>
my $later = $class->repl_compare($my_pos, $master_pos);
virtual (boolean)
return 1 if $my_pos is at least as new as $master_pos
return 0 otherwise
=cut
sub _mk_unimplemented {
my ($class, $meth) = @_;
no strict 'refs';
*{$class . "::" . $meth} = sub {
croak "$class does not implement $meth"
};
}
sub repl_mark {
my ($class) = @_;
$class->call_trigger(
'repl_mark',
);
my $pos = $class->repl_get_master;
$class->repl_pos($pos);
}
sub repl_wait {
my ($class, $arg) = @_;
$arg->{timeout} = 30 unless defined $arg->{timeout};
$arg->{slave} ||= $class->__default_slave;
my $tries = 0;
my $done = $class->repl_check($arg);
while ($tries < $arg->{timeout} and not $done) {
sleep 1;
$done = $class->repl_check($arg);
}
if ($tries >= $arg->{timeout} and not $done) {
die "$arg->{slave} is not up to date after $arg->{timeout} seconds"
if $arg->{fatal};
return 0;
}
unless ($class->replication_db eq "Slave_$arg->{slave}") {
$class->switch_to_slave($arg->{slave});
}
return 1;
}
sub repl_check {
my ($class, $arg) = @_;
$arg->{slave} ||= $class->__default_slave;
return 1 unless defined $class->repl_pos;
$class->call_trigger(
'repl_check',
);
my $get = sub { $class->repl_get_slave };
my $slave_pos = do {
if ($arg->{slave} eq $class->__slave_db) {
$get->();
} else {
my $old = $class->__slave_db;
$class->__slave_db("Slave_" . $arg->{slave});
my $return = $get->();
$class->__slave_db($old);
$return;
}
};
if ($slave_pos && $class->repl_compare($slave_pos, $class->repl_pos)) {
$class->repl_pos(undef);
return 1;
}
return 0;
}
for (qw(
repl_get_master
repl_get_slave
repl_compare
)) {
__PACKAGE__->_mk_unimplemented($_);
}
=head1 TRIGGERS
=head2 C<< before_create >>
=head2 C<< before_update >>
=head2 C<< before_delete >>
switch to using master
=head2 C<< after_create >>
=head2 C<< after_update >>
=head2 C<< after_delete >>
mark master position
=head2 C<< select >>
=cut
sub _mark {
shift->repl_mark;
}
sub _check {
shift->repl_wait({
timeout => 0,
});
}
# this exists only because you can't take a reference to a
# method and still let inheritance have a chance, unlike the
# two subs above which actually add functionality
sub __master {
shift->switch_to_master;
}
sub __add_std_triggers {
my ($class) = @_;
return if $class->__replication_std_triggers;
$class->__real_add_std_triggers;
$class->__replication_std_triggers(1);
}
sub __real_add_std_triggers {
my ($class) = @_;
$class->add_trigger(
before_create => \&__master,
before_update => \&__master,
before_delete => \&__master,
after_create => \&_mark,
after_update => \&_mark,
after_delete => \&_mark,
select => \&_check,
);
}
=head1 SUBCLASSING
=head2 C<< mk_force_masters >>
=cut
sub mk_force_masters {
my $class = shift;
for my $meth (@_) {
# XXX this is very disrespectful.
my $oldcode = $class->can($meth);
install_sub({
code => sub {
my $class = shift;
my $old = $class->__force_master;
$class->__force_master(1);
my $r = $class->$oldcode(@_);
$class->__force_master($old);
return $r;
},
into => $class,
as => $meth,
});
}
}
# XXX fix duplication here
=head2 C<< mk_markers >>
=cut
sub mk_markers {
my $class = shift;
for my $meth (@_) {
# XXX this is very disrespectful.
my $oldcode = $class->can($meth);
install_sub({
code => sub {
my $class = shift;
$class->switch_to_master;
my $r = $class->$oldcode(@_);
$class->repl_mark;
$class->repl_wait({ timeout => 0 });
return $r;
},
into => $class,
as => $meth,
});
}
}
=head2 C<< replication_args >>
Extra Params::Validate specifications for C<< replication >>.
=cut
sub replication_args { () }
=head2 C<< replication_setup >>
Called automatically inside C<< replication >>.
Gets a hashref of the arguments to C<< replication >>.
=cut
sub replication_setup { () }
=head1 AUTHOR
Hans Dieter Pearcey, C<< <hdp@cpan.org> >>
=head1 BUGS
Please report any bugs or feature requests to
C<bug-class-dbi-replicated@rt.cpan.org>, or through the web interface at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Class-DBI-Replicated>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.
=head1 ACKNOWLEDGEMENTS
=head1 COPYRIGHT & LICENSE
Copyright 2005 Hans Dieter Pearcey, all rights reserved.
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
=cut
1; # End of Class::DBI::Replicated