The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: PK.pm 22875 2008-11-06 16:18:06Z kazuho $

package DBIx::Replicate::Strategy::PK;
use strict;
use warnings;
use Carp::Clan;
use List::Util qw/max/;
use Time::HiRes qw/time sleep/;

sub new { bless {}, shift }

sub replicate
{
    my $self = shift;
    my $c    = shift;
    my $args = shift || {};

    foreach my $p qw(primary_key) {
        croak(ref($self) . ": required parameter $p is missing\n")
            unless $args->{$p};
    }

    # XXX Refactor later;
    my @columns = @{ $c->columns };
    my $columns_str = join ',', @columns;
    my $extra_cond = $c->extra_cond ?  sprintf("and (%s)", $c->extra_cond) : '';
    my $limit_cond = $c->limit_cond ? sprintf("and (%s)", $c->limit_cond) : '';
    my $sql;
    
    my $block      = $c->block;
    my $src_table  = $c->src->table;
    my $dest_table = $c->dest->table;
    my $src_conn   = $c->src->conn;
    my $dest_conn  = $c->dest->conn;

    my $pkey = ref $args->{primary_key}
        ? $args->{primary_key} : [ $args->{primary_key} ];

    my $order_by = $args->{order_by} || join(',', @$pkey);
    
    # copy by 'limit $block'
    my ($start_srcconn, $start_destconn) = qw/1 1/;
    while (1) {
        my $start = time;
        $sql = "select $columns_str from $src_table where $start_srcconn $extra_cond $limit_cond order by $order_by limit $block";
        my $rows = $src_conn->selectall_arrayref(
            $sql,
            { Slice => {} },
        ) or die $src_conn->errstr;
        last unless @$rows;
        $dest_conn->begin_work
            or die $dest_conn->errstr;
        my $next_srcconn =
            '(' . join(',', @$pkey) . ')>(' . join(
                ',',
                map {
                    $src_conn->quote($rows->[-1]->{$_})
                } @$pkey,
            ) . ')';
        my $next_destconn =
            '(' . join(',', @$pkey) . ')>(' . join(
                ',',
                map {
                    $dest_conn->quote($rows->[-1]->{$_})
                } @$pkey,
            ) . ')';
        $sql = "delete from $dest_table where $start_destconn $limit_cond and not $next_destconn";
        $dest_conn->do($sql)
            or die $dest_conn->errstr;
        $sql = "insert into $dest_table ($columns_str) values "
            . join(
                ',',
                map {
                    my $row = $_;
                    '(' . join(
                        ',',
                        map {
                            $dest_conn->quote($row->{$_})
                        } @columns
                    ) . ')'
                } @$rows,
            );

        $dest_conn->do($sql)
            or die $dest_conn->errstr;
        $dest_conn->commit
            or die $dest_conn->errstr;
        sleep(max(time - $start, 0) * (1 - $args->{load}) / $args->{load})
            if $args->{load};
        ($start_srcconn, $start_destconn)
            = ($next_srcconn, $next_destconn);
    }
    $sql = "delete from $dest_table where $start_destconn $limit_cond";
    $dest_conn->do($sql)
        or die $dest_conn->errstr;
}

1;