The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package File::Rsync::Mirror::Recentfile;

# use warnings;
use strict;

=encoding utf-8

=head1 NAME

File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient

=cut

my $HAVE = {};
for my $package (
                 "Data::Serializer",
                 "File::Rsync"
                ) {
    $HAVE->{$package} = eval qq{ require $package; };
}
use Config;
use File::Basename qw(basename dirname fileparse);
use File::Copy qw(cp);
use File::Path qw(mkpath);
use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
use File::Temp;
use List::Util qw(first max min);
use Scalar::Util qw(reftype);
use Storable;
use Time::HiRes qw();
use YAML::Syck;

use version; our $VERSION = qv('0.0.8');

use constant MAX_INT => ~0>>1; # anything better?
use constant DEFAULT_PROTOCOL => 1;

# cf. interval_secs
my %seconds;

# maybe subclass if this mapping is bad?
my %serializers;

=head1 SYNOPSIS

Writer (of a single file):

    use File::Rsync::Mirror::Recentfile;
    my $fr = File::Rsync::Mirror::Recentfile->new
      (
       interval => q(6h),
       filenameroot => "RECENT",
       comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
       localroot => "/home/ftp/pub/PAUSE/authors/",
       aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
      );
    $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");

Reader/mirrorer:

    my $rf = File::Rsync::Mirror::Recentfile->new
      (
       filenameroot => "RECENT",
       interval => q(6h),
       localroot => "/home/ftp/pub/PAUSE/authors",
       remote_dir => "",
       remote_host => "pause.perl.org",
       remote_module => "authors",
       rsync_options => {
                         compress => 1,
                         'rsync-path' => '/usr/bin/rsync',
                         links => 1,
                         times => 1,
                         'omit-dir-times' => 1,
                         checksum => 1,
                        },
       verbose => 1,
      );
    $rf->mirror;

Aggregator (usually the writer):

    my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
    $rf->aggregate;

=head1 DESCRIPTION

Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
is always composed of several recentfiles, controlled by the
F:R:M:Recent object. The Recentfile object has to do the bookkeeping
for a single timeslice.

=head1 EXPORT

No exports.

=head1 CONSTRUCTORS / DESTRUCTOR

=head2 my $obj = CLASS->new(%hash)

Constructor. On every argument pair the key is a method name and the
value is an argument to that method name.

If a recentfile for this resource already exists, metadata that are
not defined by the constructor will be fetched from there as soon as
it is being read by recent_events().

=cut

sub new {
    my($class, @args) = @_;
    my $self = bless {}, $class;
    while (@args) {
        my($method,$arg) = splice @args, 0, 2;
        $self->$method($arg);
    }
    unless (defined $self->protocol) {
        $self->protocol(DEFAULT_PROTOCOL);
    }
    unless (defined $self->filenameroot) {
        $self->filenameroot("RECENT");
    }
    unless (defined $self->serializer_suffix) {
        $self->serializer_suffix(".yaml");
    }
    return $self;
}

=head2 my $obj = CLASS->new_from_file($file)

Constructor. $file is a I<recentfile>.

=cut

sub new_from_file {
    my($class, $file) = @_;
    my $self = bless {}, $class;
    $self->_rfile($file);
    #?# $self->lock;
    my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
                           local $/;
                           <$fh>;
                       };
    # XXX: we can skip this step when the metadata are sufficient, but
    # we cannot parse the file without some magic stuff about
    # serialized formats
    while (-l $file) {
        my($name,$path) = fileparse $file;
        my $symlink = readlink $file;
        if ($symlink =~ m|/|) {
            die "FIXME: filenames containing '/' not supported, got $symlink";
        }
        $file = File::Spec->catfile ( $path, $symlink );
    }
    my($name,$path,$suffix) = fileparse $file, keys %serializers;
    $self->serializer_suffix($suffix);
    $self->localroot($path);
    die "Could not determine file format from suffix" unless $suffix;
    my $deserialized;
    if ($suffix eq ".yaml") {
        require YAML::Syck;
        $deserialized = YAML::Syck::LoadFile($file);
    } elsif ($HAVE->{"Data::Serializer"}) {
        my $serializer = Data::Serializer->new
            ( serializer => $serializers{$suffix} );
        $deserialized = $serializer->raw_deserialize($serialized);
    } else {
        die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
    }
    while (my($k,$v) = each %{$deserialized->{meta}}) {
        next if $k ne lc $k; # "Producers"
        $self->$k($v);
    }
    unless (defined $self->protocol) {
        $self->protocol(DEFAULT_PROTOCOL);
    }
    return $self;
}

=head2 DESTROY

A simple unlock.

=cut
sub DESTROY {
    my $self = shift;
    $self->unlock;
    unless ($self->_current_tempfile_fh) {
        if (my $tempfile = $self->_current_tempfile) {
            if (-e $tempfile) {
                # unlink $tempfile; # may fail in global destruction
            }
        }
    }
}

=head1 ACCESSORS

=cut

my @accessors;

BEGIN {
    @accessors = (
                  "_current_tempfile",
                  "_current_tempfile_fh",
                  "_delayed_operations",
                  "_done",
                  "_interval",
                  "_is_locked",
                  "_localroot",
                  "_merged",
                  "_pathdb",
                  "_remember_last_uptodate_call",
                  "_remote_dir",
                  "_remoteroot",
                  "_requires_fsck",
                  "_rfile",
                  "_rsync",
                  "__verified_tempdir",
                  "_seeded",
                  "_uptodateness_ever_reached",
                  "_use_tempfile",
                 );

    my @pod_lines =
        split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }

=over 4

=item aggregator

A list of interval specs that tell the aggregator which I<recentfile>s
are to be produced.

=item canonize

The name of a method to canonize the path before rsyncing. Only
supported value is C<naive_path_normalize>. Defaults to that.

=item comment

A comment about this tree and setup.

=item dirtymark

A timestamp. The dirtymark is updated whenever an out of band change
on the origin server is performed that violates the protocol. Say,
they add or remove files in the middle somewhere. Slaves must react
with a devaluation of their C<done> structure which then leads to a
full re-sync of all files. Implementation note: dirtymark may increase
or decrease.

=item filenameroot

The (prefix of the) filename we use for this I<recentfile>. Defaults to
C<RECENT>. The string must not contain a directory separator.

=item have_mirrored

Timestamp remembering when we mirrored this recentfile the last time.
Only relevant for slaves.

=item ignore_link_stat_errors

If set to true, rsync errors are ignored that complain about link stat
errors. These seem to happen only when there are files missing at the
origin. In race conditions this can always happen, so it defaults to
true.

=item is_slave

If set to true, this object will fetch a new recentfile from remote
when the timespan between the last mirror (see have_mirrored) and now
is too large (see C<ttl>).

=item keep_delete_objects_forever

The default for delete events is that they are passed through the
collection of recentfile objects until they reach the Z file. There
they get dropped so that the associated file object ceases to exist at
all. By setting C<keep_delete_objects_forever> the delete objects are
kept forever. This makes the Z file larger but has the advantage that
slaves that have interrupted mirroring for a long time still can clean
up their copy.

=item locktimeout

After how many seconds shall we die if we cannot lock a I<recentfile>?
Defaults to 600 seconds.

=item loopinterval

When mirror_loop is called, this accessor can specify how much time
every loop shall at least take. If the work of a loop is done before
that time has gone, sleeps for the rest of the time. Defaults to
arbitrary 42 seconds.

=item max_files_per_connection

Maximum number of files that are transferred on a single rsync call.
Setting it higher means higher performance at the price of holding
connections longer and potentially disturbing other users in the pool.
Defaults to the arbitrary value 42.

=item max_rsync_errors

When rsync operations encounter that many errors without any resetting
success in between, then we die. Defaults to unlimited. A value of
-1 means we run forever ignoring all rsync errors.

=item minmax

Hashref remembering when we read the recent_events from this file the
last time and what the timespan was.

=item protocol

When the RECENT file format changes, we increment the protocol. We try
to support older protocols in later releases.

=item remote_host

The host we are mirroring from. Leave empty for the local filesystem.

=item remote_module

Rsync servers have so called modules to separate directory trees from
each other. Put here the name of the module under which we are
mirroring. Leave empty for local filesystem.

=item rsync_options

Things like compress, links, times or checksums. Passed in to the
File::Rsync object used to run the mirror.

=item serializer_suffix

Mostly untested accessor. The only well tested format for
I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
Data::Serializer. But in principle other formats are supported as
well. See section SERIALIZERS below.

=item sleep_per_connection

Sleep that many seconds (floating point OK) after every chunk of rsyncing
has finished. Defaults to arbitrary 0.42.

=item tempdir

Directory to write temporary files to. Must allow rename operations
into the tree which usually means it must live on the same partition
as the target directory. Defaults to C<< $self->localroot >>.

=item ttl

Time to live. Number of seconds after which this recentfile must be
fetched again from the origin server. Only relevant for slaves.
Defaults to arbitrary 24.2 seconds.

=item verbose

Boolean to turn on a bit verbosity.

=item verboselog

Path to the logfile to write verbose progress information to. This is
a primitive stop gap solution to get simple verbose logging working.
Switching to Log4perl or similar is probably the way to go.

=back

=cut

use accessors @accessors;

=head1 METHODS

=head2 (void) $obj->aggregate( %options )

Takes all intervals that are collected in the accessor called
aggregator. Sorts them by actual length of the interval.
Removes those that are shorter than our own interval. Then merges this
object into the next larger object. The merging continues upwards
as long as the next I<recentfile> is old enough to warrant a merge.

If a merge is warranted is decided according to the interval of the
previous interval so that larger files are not so often updated as
smaller ones. If $options{force} is true, all files get updated.

Here is an example to illustrate the behaviour. Given aggregators

  1h 1d 1W 1M 1Q 1Y Z

then

  1h updates 1d on every call to aggregate()
  1d updates 1W earliest after 1h
  1W updates 1M earliest after 1d
  1M updates 1Q earliest after 1W
  1Q updates 1Y earliest after 1M
  1Y updates  Z earliest after 1Q

Note that all but the smallest recentfile get updated at an arbitrary
rate and as such are quite useless on their own.

=cut

sub aggregate {
    my($self, %option) = @_;
    my %seen_interval;
    my @aggs = sort { $a->{secs} <=> $b->{secs} }
        grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
            map { { interval => $_, secs => $self->interval_secs($_)} }
                $self->interval, @{$self->aggregator || []};
    $self->update;
    $aggs[0]{object} = $self;
  AGGREGATOR: for my $i (0..$#aggs-1) {
        my $this = $aggs[$i]{object};
        my $next = $this->_sparse_clone;
        $next->interval($aggs[$i+1]{interval});
        my $want_merge = 0;
        if ($option{force} || $i == 0) {
            $want_merge = 1;
        } else {
            my $next_rfile = $next->rfile;
            if (-e $next_rfile) {
                my $prev = $aggs[$i-1]{object};
                local $^T = time;
                my $next_age = 86400 * -M $next_rfile;
                if ($next_age > $prev->interval_secs) {
                    $want_merge = 1;
                }
            } else {
                $want_merge = 1;
            }
        }
        if ($want_merge) {
            $next->merge($this);
            $aggs[$i+1]{object} = $next;
        } else {
            last AGGREGATOR;
        }
    }
}

# collect file size and mtime for all files of this aggregate
sub _debug_aggregate {
    my($self) = @_;
    my @aggs = sort { $a->{secs} <=> $b->{secs} }
        map { { interval => $_, secs => $self->interval_secs($_)} }
            $self->interval, @{$self->aggregator || []};
    my $report = [];
    for my $i (0..$#aggs) {
        my $this = Storable::dclone $self;
        $this->interval($aggs[$i]{interval});
        my $rfile = $this->rfile;
        my @stat = stat $rfile;
        push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
    }
    $report;
}

# (void) $self->_assert_symlink()
sub _assert_symlink {
    my($self) = @_;
    my $recentrecentfile = File::Spec->catfile
        (
         $self->localroot,
         sprintf
         (
          "%s.recent",
          $self->filenameroot
         )
        );
    if ($Config{d_symlink} eq "define") {
        my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
        if (-l $recentrecentfile) {
            my $found_symlink = readlink $recentrecentfile;
            if ($found_symlink eq $self->rfilename) {
                return;
            } else {
                $howto_create_symlink = 2;
            }
        } else {
            $howto_create_symlink = 1;
        }
        if (1 == $howto_create_symlink) {
            symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
        } else {
            unlink "$recentrecentfile.$$"; # may fail
            symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
            rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
        }
    } else {
        warn "Warning: symlinks not supported on this system, doing a copy instead\n";
        unlink "$recentrecentfile.$$"; # may fail
        cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
        rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
    }
}

=head2 $hashref = $obj->delayed_operations

A hash of hashes containing unlink and rmdir operations which had to
wait until the recentfile got unhidden in order to not confuse
downstream mirrors (in case we have some).

=cut

sub delayed_operations {
    my($self) = @_;
    my $x = $self->_delayed_operations;
    unless (defined $x) {
        $x = {
              unlink => {},
              rmdir => {},
             };
        $self->_delayed_operations ($x);
    }
    return $x;
}

=head2 $done = $obj->done

C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
object that keeps track of rsync activities. Only needed and used when
we are a mirroring slave.

=cut

sub done {
    my($self) = @_;
    my $done = $self->_done;
    if (!$done) {
        require File::Rsync::Mirror::Recentfile::Done;
        $done = File::Rsync::Mirror::Recentfile::Done->new();
        $done->_rfinterval ($self->interval);
        $self->_done ( $done );
    }
    return $done;
}

=head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()

Stores the remote I<recentfile> locally as a tempfile. The caller is
responsible to remove the file after use.

Note: if you're intending to act as an rsync server for other slaves,
then you must prefer this method to fetch that file with
get_remotefile(). Otherwise downstream mirrors would expect you to
already have mirrored all the files that are in the I<recentfile>
before you have them mirrored.

=cut

sub get_remote_recentfile_as_tempfile {
    my($self) = @_;
    mkpath $self->localroot;
    my $fh;
    my $trfilename;
    if ( $self->_use_tempfile() ) {
        if ($self->ttl_reached) {
            $fh = $self->_current_tempfile_fh;
            $trfilename = $self->rfilename;
        } else {
            return $self->_current_tempfile;
        }
    } else {
        $trfilename = $self->rfilename;
    }

    my $dst;
    if ($fh) {
        $dst = $self->_current_tempfile;
    } else {
        $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
        $dst = $fh->filename;
        $self->_current_tempfile ($dst);
        my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
        if (defined $rfile && -e $rfile) {
            # saving on bandwidth. Might need to be configurable
            # $self->bandwidth_is_cheap?
            cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
        }
    }
    my $src = join ("/",
                    $self->remoteroot,
                    $trfilename,
                   );
    if ($self->verbose) {
        my $doing = -e $dst ? "Sync" : "Get";
        my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
        my $LFH = $self->_logfilehandle;
        printf $LFH
            (
             "%-4s %d (1/1/%s) temp %s ... ",
             $doing,
             time,
             $self->interval,
             $display_dst,
            );
    }
    my $gaveup = 0;
    my $retried = 0;
    local($ENV{LANG}) = "C";
    while (!$self->rsync->exec(
                               src => $src,
                               dst => $dst,
                              )) {
        $self->register_rsync_error ($self->rsync->err);
        if (++$retried >= 3) {
            warn "XXX giving up";
            $gaveup = 1;
            last;
        }
    }
    if ($gaveup) {
        my $LFH = $self->_logfilehandle;
        printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
    } else {
        $self->_refresh_internals ($dst);
        $self->have_mirrored (Time::HiRes::time);
        $self->un_register_rsync_error ();
    }
    $self->unseed;
    if ($self->verbose) {
        my $LFH = $self->_logfilehandle;
        print $LFH "DONE\n";
    }
    my $mode = 0644;
    chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
    return $dst;
}

sub _verified_tempdir {
    my($self) = @_;
    my $tempdir = $self->__verified_tempdir();
    return $tempdir if defined $tempdir;
    unless ($tempdir = $self->tempdir) {
        $tempdir = $self->localroot;
    }
    unless (-d $tempdir) {
        mkpath $tempdir;
    }
    $self->__verified_tempdir($tempdir);
    return $tempdir;
}

sub _get_remote_rat_provide_tempfile_object {
    my($self, $trfilename) = @_;
    my $_verified_tempdir = $self->_verified_tempdir;
    my $fh = File::Temp->new
        (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
                             $trfilename,
                            ),
         DIR => $_verified_tempdir,
         SUFFIX => $self->serializer_suffix,
         UNLINK => $self->_use_tempfile,
        );
    my $mode = 0644;
    my $dst = $fh->filename;
    chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
    if ($self->_use_tempfile) {
        $self->_current_tempfile_fh ($fh); # delay self destruction
    }
    return $fh;
}

sub _logfilehandle {
    my($self) = @_;
    my $fh;
    if (my $vl = $self->verboselog) {
        open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
    } else {
        $fh = \*STDERR;
    }
    return $fh;
}

=head2 $localpath = $obj->get_remotefile ( $relative_path )

Rsyncs one single remote file to local filesystem.

Note: no locking is done on this file. Any number of processes may
mirror this object.

Note II: do not use for recentfiles. If you are a cascading
slave/server combination, it would confuse other slaves. They would
expect the contents of these recentfiles to be available. Use
get_remote_recentfile_as_tempfile() instead.

=cut

sub get_remotefile {
    my($self, $path) = @_;
    my $dst = File::Spec->catfile($self->localroot, $path);
    mkpath dirname $dst;
    if ($self->verbose) {
        my $doing = -e $dst ? "Sync" : "Get";
        my $LFH = $self->_logfilehandle;
        printf $LFH
            (
             "%-4s %d (1/1/%s) %s ... ",
             $doing,
             time,
             $self->interval,
             $path,
            );
    }
    local($ENV{LANG}) = "C";
    my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
    while (!$self->rsync->exec(
                               src => join("/",
                                           $remoteroot,
                                           $path),
                               dst => $dst,
                              )) {
        $self->register_rsync_error ($self->rsync->err);
    }
    $self->un_register_rsync_error ();
    if ($self->verbose) {
        my $LFH = $self->_logfilehandle;
        print $LFH "DONE\n";
    }
    return $dst;
}

=head2 $obj->interval ( $interval_spec )

Get/set accessor. $interval_spec is a string and described below in
the section INTERVAL SPEC.

=cut

sub interval {
    my ($self, $interval) = @_;
    if (@_ >= 2) {
        $self->_interval($interval);
        $self->_rfile(undef);
    }
    $interval = $self->_interval;
    unless (defined $interval) {
        # do not ask the $self too much, it recurses!
        require Carp;
        Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
    }
    return $interval;
}

=head2 $secs = $obj->interval_secs ( $interval_spec )

$interval_spec is described below in the section INTERVAL SPEC. If
empty defaults to the inherent interval for this object.

=cut

sub interval_secs {
    my ($self, $interval) = @_;
    $interval ||= $self->interval;
    unless (defined $interval) {
        die "interval_secs() called without argument on an object without a declared one";
    }
    my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
        die "Could not determine seconds from interval[$interval]";
    if ($interval eq "Z") {
        return MAX_INT;
    } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
        return $seconds{$t}*$n;
    } else {
        die "Invalid interval specification: n[$n]t[$t]";
    }
}

=head2 $obj->localroot ( $localroot )

Get/set accessor. The local root of the tree. Guaranteed without
trailing slash.

=cut

sub localroot {
    my ($self, $localroot) = @_;
    if (@_ >= 2) {
        $localroot =~ s|/$||;
        $self->_localroot($localroot);
        $self->_rfile(undef);
    }
    $localroot = $self->_localroot;
}

=head2 $ret = $obj->local_path($path_found_in_recentfile)

Combines the path to our local mirror and the path of an object found
in this I<recentfile>. In other words: the target of a mirror operation.

Implementation note: We split on slashes and then use
File::Spec::catfile to adjust to the local operating system.

=cut

sub local_path {
    my($self,$path) = @_;
    unless (defined $path) {
        # seems like a degenerated case
        return $self->localroot;
    }
    my @p = split m|/|, $path;
    File::Spec->catfile($self->localroot,@p);
}

=head2 (void) $obj->lock

Locking is implemented with an C<mkdir> on a locking directory
(C<.lock> appended to $rfile).

=cut

sub lock {
    my ($self) = @_;
    # not using flock because it locks on filehandles instead of
    # old school ressources.
    my $locked = $self->_is_locked and return;
    my $rfile = $self->rfile;
    # XXX need a way to allow breaking the lock
    my $start = time;
    my $locktimeout = $self->locktimeout || 600;
    my %have_warned;
    my $lockdir = "$rfile.lock";
    my $procfile = "$lockdir/process";
 GETLOCK: while (not mkdir $lockdir) {
        if (open my $fh, "<", $procfile) {
            chomp(my $process = <$fh>);
            if (0) {
            } elsif ($process !~ /^\d+$/) {
                warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
            } elsif ($$ == $process) {
                last GETLOCK;
            } elsif (kill 0, $process) {
                warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
            } else {
                warn "Warning: breaking lock held by process $process";
                sleep 1;
                last GETLOCK;
            }
        } else {
            warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
        }
        Time::HiRes::sleep 0.01;
        if (time - $start > $locktimeout) {
            die "Could not acquire lockdirectory '$rfile.lock': $!";
        }
    } # GETLOCK
    open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
    print $fh $$, "\n";
    close $fh or die "Could not close: $!";
    $self->_is_locked (1);
}

=head2 (void) $obj->merge ($other)

Bulk update of this object with another one. It's used to merge a
smaller and younger $other object into the current one. If this file
is a C<Z> file, then we normally do not merge in objects of type
C<delete>; this can be overridden by setting
keep_delete_objects_forever. But if we encounter an object of type
delete we delete the corresponding C<new> object if we have it.

If there is nothing to be merged, nothing is done.

=cut

sub merge {
    my($self, $other) = @_;
    $self->_merge_sanitycheck ( $other );
    $other->lock;
    my $other_recent = $other->recent_events || [];
    $self->lock;
    $self->_merge_locked ( $other, $other_recent );
    $self->unlock;
    $other->unlock;
}

sub _merge_locked {
    my($self, $other, $other_recent) = @_;
    my $my_recent = $self->recent_events || [];

    # calculate the target time span
    my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
    my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
    my $oldest_allowed = 0;
    my $something_done;
    unless ($my_recent->[0]) {
        # obstetrics
        $something_done = 1;
    }
    if ($epoch) {
        if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
            $oldest_allowed = 0;
            $something_done = 1;
        } elsif (my $merged = $self->merged) {
            my $secs = $self->interval_secs();
            $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
            if (@$other_recent and
                _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
               ) {
                $oldest_allowed = $other_recent->[-1]{epoch};
            }
        }
        while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
            pop @$my_recent;
            $something_done = 1;
        }
    }

    my %have_path;
    my $other_recent_filtered = [];
    for my $oev (@$other_recent) {
        my $oevepoch = $oev->{epoch} || 0;
        next if _bigfloatlt($oevepoch, $oldest_allowed);
        my $path = $oev->{path};
        next if $have_path{$path}++;
        if (    $self->interval eq "Z"
            and $oev->{type}    eq "delete"
            and ! $self->keep_delete_objects_forever
           ) {
            # do nothing
        } else {
            if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
                $something_done = 1;
            }
            push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
        }
    }
    if ($something_done) {
        $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
    }
}

sub _merge_something_done {
    my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
    my $recent = [];
    my $epoch_conflict = 0;
    my $last_epoch;
 ZIP: while (@$other_recent_filtered || @$my_recent) {
        my $event;
        if (!@$my_recent ||
            @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
            $event = shift @$other_recent_filtered;
        } else {
            $event = shift @$my_recent;
            next ZIP if $have_path->{$event->{path}}++;
        }
        $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
        $last_epoch = $event->{epoch};
        push @$recent, $event;
    }
    if ($epoch_conflict) {
        my %have_epoch;
        for (my $i = $#$recent;$i>=0;$i--) {
            my $epoch = $recent->[$i]{epoch};
            if ($have_epoch{$epoch}++) {
                while ($have_epoch{$epoch}) {
                    $epoch = _increase_a_bit($epoch);
                }
                $recent->[$i]{epoch} = $epoch;
                $have_epoch{$epoch}++;
            }
        }
    }
    if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
        $self->dirtymark ( $other->dirtymark );
    }
    $self->write_recent($recent);
    $other->merged({
                    time => Time::HiRes::time, # not used anywhere
                    epoch => $recent->[0]{epoch},
                    into_interval => $self->interval, # not used anywhere
                   });
    $other->write_recent($other_recent);
}

sub _merge_sanitycheck {
    my($self, $other) = @_;
    if ($self->interval_secs <= $other->interval_secs) {
        require Carp;
        Carp::confess
                (sprintf
                 (
                  "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
                  $self->interval_secs,
                  $other->interval_secs,
                 ));
    }
}

=head2 merged

Hashref denoting when this recentfile has been merged into some other
at which epoch.

=cut

sub merged {
    my($self, $set) = @_;
    if (defined $set) {
        $self->_merged ($set);
    }
    my $merged = $self->_merged;
    my $into;
    if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
        # sanity checks
        if ($into eq $self->interval) {
            require Carp;
            Carp::cluck(sprintf
                (
                 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
                 $into,
                 $self->interval,
                ));
        } elsif ($self->interval_secs($into) < $self->interval_secs) {
            require Carp;
            Carp::cluck(sprintf
                (
                 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
                 $self->interval_secs($into),
                 $self->interval_secs,
                 $self->interval,
                ));
        }
    }
    $merged;
}

=head2 $hashref = $obj->meta_data

Returns the hashref of metadata that the server has to add to the
I<recentfile>.

=cut

sub meta_data {
    my($self) = @_;
    my $ret = $self->{meta};
    for my $m (
               "aggregator",
               "canonize",
               "comment",
               "dirtymark",
               "filenameroot",
               "interval",
               "merged",
               "minmax",
               "protocol",
               "serializer_suffix",
              ) {
        my $v = $self->$m;
        if (defined $v) {
            $ret->{$m} = $v;
        }
    }
    # XXX need to reset the Producer if I am a writer, keep it when I
    # am a reader
    $ret->{Producers} ||= {
                           __PACKAGE__, "$VERSION", # stringified it looks better
                           '$0', $0,
                           'time', Time::HiRes::time,
                          };
    $ret->{dirtymark} ||= Time::HiRes::time;
    return $ret;
}

=head2 $success = $obj->mirror ( %options )

Mirrors the files in this I<recentfile> as reported by
C<recent_events>. Options named C<after>, C<before>, C<max> are passed
through to the C<recent_events> call. The boolean option C<piecemeal>,
if true, causes C<mirror> to only rsync C<max_files_per_connection>
and keep track of the rsynced files so that future calls will rsync
different files until all files are brought to sync.

=cut

sub mirror {
    my($self, %options) = @_;
    my $trecentfile = $self->get_remote_recentfile_as_tempfile();
    $self->_use_tempfile (1);
    # skip-deletes is inadequat for passthrough within mirror. We
    # would never reach uptodateness when a delete were on a
    # borderline
    my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
    my ($recent_events) = $self->recent_events(%passthrough);
    my(@error, @dlcollector); # download-collector: array containing paths we need
    my $first_item = 0;
    my $last_item = $#$recent_events;
    my $done = $self->done;
    my $pathdb = $self->_pathdb;
  ITEM: for my $i ($first_item..$last_item) {
        my $status = +{};
        $self->_mirror_item
            (
             $i,
             $recent_events,
             $last_item,
             $done,
             $pathdb,
             \@dlcollector,
             \%options,
             $status,
             \@error,
            );
        last if $i == $last_item;
        if ($status->{mustreturn}){
            if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
                # looks like a bug somewhere else
                my $t = $self->_current_tempfile;
                unlink $t or die "Could not unlink '$t': $!";
                $self->_current_tempfile(undef);
                $self->_use_tempfile(0);
            }
            return;
        }
    }
    if (@dlcollector) {
        my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
        if (!$success || $@) {
            warn "Warning: Unknown error while mirroring: $@";
            push @error, $@;
            sleep 1;
        }
    }
    if ($self->verbose) {
        my $LFH = $self->_logfilehandle;
        print $LFH "DONE\n";
    }
    # once we've gone to the end we consider ourselves free of obligations
    $self->unseed;
    $self->_mirror_unhide_tempfile ($trecentfile);
    $self->_mirror_perform_delayed_ops(\%options);
    return !@error;
}

sub _mirror_item {
    my($self,
       $i,
       $recent_events,
       $last_item,
       $done,
       $pathdb,
       $dlcollector,
       $options,
       $status,
       $error,
      ) = @_;
    my $recent_event = $recent_events->[$i];
    return if $done->covered ( $recent_event->{epoch} );
    if ($pathdb) {
        my $rec = $pathdb->{$recent_event->{path}};
        if ($rec && $rec->{recentepoch}) {
            if (_bigfloatgt
                ( $rec->{recentepoch}, $recent_event->{epoch} )){
                $done->register ($recent_events, [$i]);
                return;
            }
        }
    }
    my $dst = $self->local_path($recent_event->{path});
    if ($recent_event->{type} eq "new"){
        $self->_mirror_item_new
            (
             $dst,
             $i,
             $last_item,
             $recent_events,
             $recent_event,
             $dlcollector,
             $pathdb,
             $status,
             $error,
             $options,
            );
    } elsif ($recent_event->{type} eq "delete") {
        my $activity;
        if ($options->{'skip-deletes'}) {
            $activity = "skipped";
        } else {
            my @lstat = lstat $dst;
            if (! -e _) {
                $activity = "not_found";
            } elsif (-l _ or not -d _) {
                $self->delayed_operations->{unlink}{$dst}++;
                $activity = "deleted";
            } else {
                $self->delayed_operations->{rmdir}{$dst}++;
                $activity = "deleted";
            }
        }
        $done->register ($recent_events, [$i]);
        if ($pathdb) {
            $self->_mirror_register_path($pathdb,[$recent_event],$activity);
        }
    } else {
        warn "Warning: invalid upload type '$recent_event->{type}'";
    }
}

sub _mirror_item_new {
    my($self,
       $dst,
       $i,
       $last_item,
       $recent_events,
       $recent_event,
       $dlcollector,
       $pathdb,
       $status,
       $error,
       $options,
      ) = @_;
    if ($self->verbose) {
        my $doing = -e $dst ? "Sync" : "Get";
        my $LFH = $self->_logfilehandle;
        printf $LFH
            (
             "%-4s %d (%d/%d/%s) %s ... ",
             $doing,
             time,
             1+$i,
             1+$last_item,
             $self->interval,
             $recent_event->{path},
            );
    }
    my $max_files_per_connection = $self->max_files_per_connection || 42;
    my $success;
    if ($self->verbose) {
        my $LFH = $self->_logfilehandle;
        print $LFH "\n";
    }
    push @$dlcollector, { rev => $recent_event, i => $i };
    if (@$dlcollector >= $max_files_per_connection) {
        $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
        my $sleep = $self->sleep_per_connection;
        $sleep = 0.42 unless defined $sleep;
        Time::HiRes::sleep $sleep;
        if ($options->{piecemeal}) {
            $status->{mustreturn} = 1;
            return;
        }
    } else {
        return;
    }
    if (!$success || $@) {
        warn "Warning: Error while mirroring: $@";
        push @$error, $@;
        sleep 1;
    }
    if ($self->verbose) {
        my $LFH = $self->_logfilehandle;
        print $LFH "DONE\n";
    }
}

sub _mirror_dlcollector {
    my($self,$xcoll,$pathdb,$recent_events) = @_;
    my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
    if ($pathdb) {
        $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
    }
    $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
    @$xcoll = ();
    return $success;
}

sub _mirror_register_path {
    my($self,$pathdb,$coll,$activity) = @_;
    my $time = time;
    for my $item (@$coll) {
        $pathdb->{$item->{path}} =
            {
             recentepoch => $item->{epoch},
             ($activity."_on") => $time,
            };
    }
}

sub _mirror_unhide_tempfile {
    my($self, $trecentfile) = @_;
    my $rfile = $self->rfile;
    if (rename $trecentfile, $rfile) {
        # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
    } else {
        require Carp;
        Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
    }
    $self->_use_tempfile (0);
    if (my $ctfh = $self->_current_tempfile_fh) {
        $ctfh->unlink_on_destroy (0);
        $self->_current_tempfile_fh (undef);
    }
}

sub _mirror_perform_delayed_ops {
    my($self,$options) = @_;
    my $delayed = $self->delayed_operations;
    for my $dst (keys %{$delayed->{unlink}}) {
        unless (unlink $dst) {
            require Carp;
            Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
        }
        if ($self->verbose) {
            my $doing = "Del";
            my $LFH = $self->_logfilehandle;
            printf $LFH
                (
                 "%-4s %d (%s) %s DONE\n",
                 $doing,
                 time,
                 $self->interval,
                 $dst,
                );
            delete $delayed->{unlink}{$dst};
        }
    }
    for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
        unless (rmdir $dst) {
            require Carp;
            Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
        }
        if ($self->verbose) {
            my $doing = "Del";
            my $LFH = $self->_logfilehandle;
            printf $LFH
                (
                 "%-4s %d (%s) %s DONE\n",
                 $doing,
                 time,
                 $self->interval,
                 $dst,
                );
            delete $delayed->{rmdir}{$dst};
        }
    }
}

=head2 $success = $obj->mirror_path ( $arrref | $path )

If the argument is a scalar it is treated as a path. The remote path
is mirrored into the local copy. $path is the path found in the
I<recentfile>, i.e. it is relative to the root directory of the
mirror.

If the argument is an array reference then all elements are treated as
a path below the current tree and all are rsynced with a single
command (and a single connection).

=cut

sub mirror_path {
    my($self,$path) = @_;
    # XXX simplify the two branches such that $path is treated as
    # [$path] maybe even demand the argument as an arrayref to
    # simplify docs and code. (rsync-over-recentfile-2.pl uses the
    # interface)
    if (ref $path and ref $path eq "ARRAY") {
        my $dst = $self->localroot;
        mkpath dirname $dst;
        my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
                                                      lc $self->filenameroot,
                                                     ),
                                  TMPDIR => 1,
                                  UNLINK => 0,
                                 );
        for my $p (@$path) {
            print $fh $p, "\n";
        }
        $fh->flush;
        $fh->unlink_on_destroy(1);
        my $gaveup = 0;
        my $retried = 0;
        local($ENV{LANG}) = "C";
        while (!$self->rsync->exec
               (
                src => join("/",
                            $self->remoteroot,
                           ),
                dst => $dst,
                'files-from' => $fh->filename,
               )) {
            my(@err) = $self->rsync->err;
            if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
                if ($self->verbose) {
                    my $LFH = $self->_logfilehandle;
                    print $LFH "Info: ignoring link_stat error '@err'";
                }
                return 1;
            }
            $self->register_rsync_error (@err);
            if (++$retried >= 3) {
                my $batchsize = @$path;
                warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
                $gaveup = 1;
                last;
            }
            sleep 1;
        }
        unless ($gaveup) {
            $self->un_register_rsync_error ();
        }
    } else {
        my $dst = $self->local_path($path);
        mkpath dirname $dst;
        local($ENV{LANG}) = "C";
        while (!$self->rsync->exec
               (
                src => join("/",
                            $self->remoteroot,
                            $path
                           ),
                dst => $dst,
                )) {
            my(@err) = $self->rsync->err;
            if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
                if ($self->verbose) {
                    my $LFH = $self->_logfilehandle;
                    print $LFH "Info: ignoring link_stat error '@err'";
                }
                return 1;
            }
            $self->register_rsync_error (@err);
        }
        $self->un_register_rsync_error ();
    }
    return 1;
}

sub _my_ignore_link_stat_errors {
    my($self) = @_;
    my $x = $self->ignore_link_stat_errors;
    $x = 1 unless defined $x;
    return $x;
}

sub _my_current_rfile {
    my($self) = @_;
    my $rfile;
    if ($self->_use_tempfile) {
        $rfile = $self->_current_tempfile;
    }
    unless ($rfile && -s $rfile) {
        $rfile = $self->rfile;
    }
    return $rfile;
}

=head2 $path = $obj->naive_path_normalize ($path)

Takes an absolute unix style path as argument and canonicalizes it to
a shorter path if possible, removing things like double slashes or
C</./> and removes references to C<../> directories to get a shorter
unambiguos path. This is used to make the code easier that determines
if a file passed to C<upgrade()> is indeed below our C<localroot>.

=cut

sub naive_path_normalize {
    my($self,$path) = @_;
    $path =~ s|/+|/|g;
    1 while $path =~ s|/[^/]+/\.\./|/|;
    $path =~ s|/$||;
    $path;
}

=head2 $ret = $obj->read_recent_1 ( $data )

Delegate of C<recent_events()> on protocol 1

=cut

sub read_recent_1 {
    my($self, $data) = @_;
    return $data->{recent};
}

=head2 $array_ref = $obj->recent_events ( %options )

Note: the code relies on the resource being written atomically. We
cannot lock because we may have no write access. If the caller has
write access (eg. aggregate() or update()), it has to care for any
necessary locking and it MUST write atomically.

If C<$options{after}> is specified, only file events after this
timestamp are returned.

If C<$options{before}> is specified, only file events before this
timestamp are returned.

If C<$options{max}> is specified only a maximum of this many most
recent events is returned.

If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
will be returned.

If C<$options{contains}> is specified the value must be a hash
reference containing a query. The query may contain the keys C<epoch>,
C<path>, and C<type>. Each represents a condition that must be met. If
there is more than one such key, the conditions are ANDed.

If C<$options{info}> is specified, it must be a hashref. This hashref
will be filled with metadata about the unfiltered recent_events of
this object, in key C<first> there is the first item, in key C<last>
is the last.

=cut

sub recent_events {
    my ($self, %options) = @_;
    my $info = $options{info};
    if ($self->is_slave) {
        # XXX seems dubious, might produce tempfiles without removing them?
        $self->get_remote_recentfile_as_tempfile;
    }
    my $rfile_or_tempfile = $self->_my_current_rfile or return [];
    -e $rfile_or_tempfile or return [];
    my $suffix = $self->serializer_suffix;
    my ($data) = eval {
        $self->_try_deserialize
            (
             $suffix,
             $rfile_or_tempfile,
            );
    };
    my $err = $@;
    if ($err or !$data) {
        return [];
    }
    my $re;
    if (reftype $data eq 'ARRAY') { # protocol 0
        $re = $data;
    } else {
        $re = $self->_recent_events_protocol_x
            (
             $data,
             $rfile_or_tempfile,
            );
    }
    return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
    $self->_recent_events_handle_options ($re, \%options);
}

# File::Rsync::Mirror::Recentfile::_recent_events_handle_options
sub _recent_events_handle_options {
    my($self, $re, $options) = @_;
    my $last_item = $#$re;
    my $info = $options->{info};
    if ($info) {
        $info->{first} = $re->[0];
        $info->{last} = $re->[-1];
    }
    if (defined $options->{after}) {
        if ($re->[0]{epoch} > $options->{after}) {
            if (
                my $f = first
                        {$re->[$_]{epoch} <= $options->{after}}
                        0..$#$re
               ) {
                $last_item = $f-1;
            }
        } else {
            $last_item = -1;
        }
    }
    my $first_item = 0;
    if (defined $options->{before}) {
        if ($re->[0]{epoch} > $options->{before}) {
            if (
                my $f = first
                        {$re->[$_]{epoch} < $options->{before}}
                        0..$last_item
               ) {
                $first_item = $f;
            }
        } else {
            $first_item = 0;
        }
    }
    if (0 != $first_item || -1 != $last_item) {
        @$re = splice @$re, $first_item, 1+$last_item-$first_item;
    }
    if ($options->{'skip-deletes'}) {
        @$re = grep { $_->{type} ne "delete" } @$re;
    }
    if (my $contopt = $options->{contains}) {
        my $seen_allowed = 0;
        for my $allow (qw(epoch path type)) {
            if (exists $contopt->{$allow}) {
                $seen_allowed++;
                my $v = $contopt->{$allow};
                @$re = grep { $_->{$allow} eq $v } @$re;
            }
        }
        if (keys %$contopt > $seen_allowed) {
            require Carp;
            Carp::confess
                    (sprintf "unknown query: %s", join ", ", %$contopt);
        }
    }
    if ($options->{max} && @$re > $options->{max}) {
        @$re = splice @$re, 0, $options->{max};
    }
    $re;
}

sub _recent_events_protocol_x {
    my($self,
       $data,
       $rfile_or_tempfile,
      ) = @_;
    my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
    # we may be reading meta for the first time
    while (my($k,$v) = each %{$data->{meta}}) {
        if ($k ne lc $k){ # "Producers"
            $self->{ORIG}{$k} = $v;
            next;
        }
        next if defined $self->$k;
        $self->$k($v);
    }
    my $re = $self->$meth ($data);
    my $minmax;
    if (my @stat = stat $rfile_or_tempfile) {
        $minmax = { mtime => $stat[9] };
    } else {
        # defensive because ABH encountered:

#### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
#### Ydr_.yaml ... DONE
#### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
#### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
#### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
#### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
#### gone already at cpan-pause.pl line 0
        
        my $LFH = $self->_logfilehandle;
        print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
    }
    if (@$re) {
        $minmax->{min} = $re->[-1]{epoch};
        $minmax->{max} = $re->[0]{epoch};
    }
    $self->minmax ( $minmax );
    return $re;
}

sub _try_deserialize {
    my($self,
       $suffix,
       $rfile_or_tempfile,
      ) = @_;
    if ($suffix eq ".yaml") {
        require YAML::Syck;
        YAML::Syck::LoadFile($rfile_or_tempfile);
    } elsif ($HAVE->{"Data::Serializer"}) {
        my $serializer = Data::Serializer->new
            ( serializer => $serializers{$suffix} );
        my $serialized = do
            {
                open my $fh, $rfile_or_tempfile or die "Could not open: $!";
                local $/;
                <$fh>;
            };
        $serializer->raw_deserialize($serialized);
    } else {
        die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
    }
}

sub _refresh_internals {
    my($self, $dst) = @_;
    my $class = ref $self;
    my $rfpeek = $class->new_from_file ($dst);
    for my $acc (qw(
                    _merged
                    minmax
                   )) {
        $self->$acc ( $rfpeek->$acc );
    }
    my $old_dirtymark = $self->dirtymark;
    my $new_dirtymark = $rfpeek->dirtymark;
    if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
        $self->done->reset;
        $self->dirtymark ( $new_dirtymark );
        $self->_uptodateness_ever_reached(0);
        $self->seed;
    }
}

=head2 $ret = $obj->rfilename

Just the basename of our I<recentfile>, composed from C<filenameroot>,
a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>

=cut

sub rfilename {
    my($self) = @_;
    my $file = sprintf("%s-%s%s",
                       $self->filenameroot,
                       $self->interval,
                       $self->serializer_suffix,
                      );
    return $file;
}

=head2 $str = $self->remote_dir

The directory we are mirroring from.

=cut

sub remote_dir {
    my($self, $set) = @_;
    if (defined $set) {
        $self->_remote_dir ($set);
    }
    my $x = $self->_remote_dir;
    $self->is_slave (1);
    return $x;
}

=head2 $str = $obj->remoteroot

=head2 (void) $obj->remoteroot ( $set )

Get/Set the composed prefix needed when rsyncing from a remote module.
If remote_host, remote_module, and remote_dir are set, it is composed
from these.

=cut

sub remoteroot {
    my($self, $set) = @_;
    if (defined $set) {
        $self->_remoteroot($set);
    }
    my $remoteroot = $self->_remoteroot;
    unless (defined $remoteroot) {
        $remoteroot = sprintf
            (
             "%s%s%s",
             defined $self->remote_host   ? ($self->remote_host."::")  : "",
             defined $self->remote_module ? ($self->remote_module."/") : "",
             defined $self->remote_dir    ? $self->remote_dir          : "",
            );
        $self->_remoteroot($remoteroot);
    }
    return $remoteroot;
}

=head2 (void) $obj->split_rfilename ( $recentfilename )

Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
of the pattern

    $filenameroot-$interval$serializer_suffix

e.g.

    RECENT-1M.yaml

This filename is split into its parts and the parts are fed to the
object itself.

=cut

sub split_rfilename {
    my($self, $rfname) = @_;
    my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
    if (my($f,$i,$s) = $rfname =~ $splitter) {
        $self->filenameroot      ($f);
        $self->interval          ($i);
        $self->serializer_suffix ($s);
    } else {
        die "Alert: cannot split '$rfname', doesn't match '$splitter'";
    }
    return;
}

=head2 my $rfile = $obj->rfile

Returns the full path of the I<recentfile>

=cut

sub rfile {
    my($self) = @_;
    my $rfile = $self->_rfile;
    return $rfile if defined $rfile;
    $rfile = File::Spec->catfile
        ($self->localroot,
         $self->rfilename,
        );
    $self->_rfile ($rfile);
    return $rfile;
}

=head2 $rsync_obj = $obj->rsync

The File::Rsync object that this object uses for communicating with an
upstream server.

=cut

sub rsync {
    my($self) = @_;
    my $rsync = $self->_rsync;
    unless (defined $rsync) {
        my $rsync_options = $self->rsync_options || {};
        if ($HAVE->{"File::Rsync"}) {
            $rsync = File::Rsync->new($rsync_options);
            $self->_rsync($rsync);
        } else {
            die "File::Rsync required for rsync operations. Cannot continue";
        }
    }
    return $rsync;
}

=head2 (void) $obj->register_rsync_error(@err)

=head2 (void) $obj->un_register_rsync_error()

Register_rsync_error is called whenever the File::Rsync object fails
on an exec (say, connection doesn't succeed). It issues a warning and
sleeps for an increasing amount of time. Un_register_rsync_error
resets the error count. See also accessor C<max_rsync_errors>.

=cut

{
    my $no_success_count = 0;
    my $no_success_time = 0;
    sub register_rsync_error {
        my($self, @err) = @_;
        chomp @err;
        $no_success_time = time;
        $no_success_count++;
        my $max_rsync_errors = $self->max_rsync_errors;
        $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
        if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
            require Carp;
            Carp::confess
                  (
                   sprintf
                   (
                    "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
                    $self->interval,
                    join(" ",@err),
                    $no_success_count,
                   ));
        }
        my $sleep = 12 * $no_success_count;
        $sleep = 300 if $sleep > 300;
        require Carp;
        Carp::cluck
              (sprintf
               (
                "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
                scalar(localtime($no_success_time)),
                $self->interval,
                join(" ",@err),
                $sleep,
               ));
        sleep $sleep
    }
    sub un_register_rsync_error {
        my($self) = @_;
        $no_success_time = 0;
        $no_success_count = 0;
    }
}

=head2 $clone = $obj->_sparse_clone

Clones just as much from itself that it does not hurt. Experimental
method.

Note: what fits better: sparse or shallow? Other suggestions?

=cut

sub _sparse_clone {
    my($self) = @_;
    my $new = bless {}, ref $self;
    for my $m (qw(
                  _interval
                  _localroot
                  _remoteroot
                  _rfile
                  _use_tempfile
                  aggregator
                  filenameroot
                  ignore_link_stat_errors
                  is_slave
                  max_files_per_connection
                  protocol
                  rsync_options
                  serializer_suffix
                  sleep_per_connection
                  tempdir
                  verbose
                 )) {
        my $o = $self->$m;
        $o = Storable::dclone $o if ref $o;
        $new->$m($o);
    }
    $new;
}

=head2 $boolean = OBJ->ttl_reached ()

=cut

sub ttl_reached {
    my($self) = @_;
    my $have_mirrored = $self->have_mirrored || 0;
    my $now = Time::HiRes::time;
    my $ttl = $self->ttl;
    $ttl = 24.2 unless defined $ttl;
    if ($now > $have_mirrored + $ttl) {
        return 1;
    }
    return 0;
}

=head2 (void) $obj->unlock()

Unlocking is implemented with an C<rmdir> on a locking directory
(C<.lock> appended to $rfile).

=cut

sub unlock {
    my($self) = @_;
    return unless $self->_is_locked;
    my $rfile = $self->rfile;
    unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
    rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
    $self->_is_locked (0);
}

=head2 unseed

Sets this recentfile in the state of not 'seeded'.

=cut
sub unseed {
    my($self) = @_;
    $self->seeded(0);
}

=head2 $ret = $obj->update ($path, $type)

=head2 $ret = $obj->update ($path, "new", $dirty_epoch)

=head2 $ret = $obj->update ()

Enter one file into the local I<recentfile>. $path is the (usually
absolute) path. If the path is outside I<our> tree, then it is
ignored.

C<$type> is one of C<new> or C<delete>.

Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
not used and the epoch is calculated by the update() routine itself
based on current time. But if there is the demand to insert a
not-so-current file into the dataset, then the caller sets
$dirty_epoch. This causes the epoch of the registered event to become
$dirty_epoch or -- if the exact value given is already taken -- a tiny
bit more. As compensation the dirtymark of the whole dataset is set to
now or the current epoch, whichever is higher. Note: setting the
dirty_epoch to the future is prohibited as it's very unlikely to be
intended: it definitely might wreak havoc with the index files.

The new file event is unshifted (or, if dirty_epoch is set, inserted
at the place it belongs to, according to the rule to have a sequence
of strictly decreasing timestamps) to the array of recent_events and
the array is shortened to the length of the timespan allowed. This is
usually the timespan specified by the interval of this recentfile but
as long as this recentfile has not been merged to another one, the
timespan may grow without bounds.

The third form runs an update without inserting a new file. This may
be desired to truncate a recentfile.

=cut
sub _epoch_monotonically_increasing {
    my($self,$epoch,$recent) = @_;
    return $epoch unless @$recent; # the first one goes unoffended
    if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
        return $epoch;
    } else {
        return _increase_a_bit($recent->[0]{epoch});
    }
}
sub update {
    my($self,$path,$type,$dirty_epoch) = @_;
    if (defined $path or defined $type or defined $dirty_epoch) {
        die "update called without path argument" unless defined $path;
        die "update called without type argument" unless defined $type;
        die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
    }
    $self->lock;
    my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
    $self->write_recent($ctx->{recent}) if $ctx->{something_done};
    $self->_assert_symlink;
    $self->unlock;
}

=head2 $obj->batch_update($batch)

Like update but for many files. $batch is an arrayref containing
hashrefs with the structure

  {
    path => $path,
    type => $type,
    epoch => $epoch,
  }



=cut
sub batch_update {
    my($self,$batch) = @_;
    $self->lock;
    my $ctx = $self->_locked_batch_update($batch);
    $self->write_recent($ctx->{recent}) if $ctx->{something_done};
    $self->_assert_symlink;
    $self->unlock;
}
sub _locked_batch_update {
    my($self,$batch) = @_;
    my $something_done = 0;
    my $recent = $self->recent_events;
    unless ($recent->[0]) {
        # obstetrics
        $something_done = 1;
    }
    my %paths_in_recent = map { $_->{path} => undef } @$recent;
    my $interval = $self->interval;
    my $canonmeth = $self->canonize;
    unless ($canonmeth) {
        $canonmeth = "naive_path_normalize";
    }
    my $oldest_allowed = 0;
    my $setting_new_dirty_mark = 0;
    my $console;
    if ($self->verbose && @$batch > 1) {
        eval {require Time::Progress};
        warn "dollarat[$@]" if $@;
        $| = 1;
        $console = new Time::Progress;
        $console->attr( min => 1, max => scalar @$batch );
        print "\n";
    }
    my $i = 0;
    my $memo_splicepos;
 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
        $i++;
        print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
        my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
        $something_done = $ctx->{something_done};
        $oldest_allowed = $ctx->{oldest_allowed};
        $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
        $recent = $ctx->{recent};
        $memo_splicepos = $ctx->{memo_splicepos};
    }
    print "\n" if $console;
    if ($setting_new_dirty_mark) {
        $oldest_allowed = 0;
    }
TRUNCATE: while (@$recent) {
        if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
            pop @$recent;
            $something_done = 1;
        } else {
            last TRUNCATE;
        }
    }
    return {something_done=>$something_done,recent=>$recent};
}
sub _update_batch_item {
    my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
    my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
    if (defined $path or defined $type or defined $dirty_epoch) {
        $path = $self->$canonmeth($path);
    }
    # you must calculate the time after having locked, of course
    my $now = Time::HiRes::time;

    my $epoch;
    if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
        $epoch = $dirty_epoch;
    } else {
        $epoch = $self->_epoch_monotonically_increasing($now,$recent);
    }
    $recent ||= [];
    my $merged = $self->merged;
    if ($merged->{epoch} && !$setting_new_dirty_mark) {
        my $virtualnow = _bigfloatmax($now,$epoch);
        # for the lower bound I think we need no big math, we calc already
        my $secs = $self->interval_secs();
        $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
        } else {
            # as long as we are not merged at all, no limits!
        }
    my $lrd = $self->localroot;
    if (defined $path && $path =~ s|^\Q$lrd\E||) {
        $path =~ s|^/||;
        my $splicepos;
        # remove the older duplicates of this $path, irrespective of $type:
        if (defined $dirty_epoch) {
            my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
            $recent    = $ctx->{recent};
            $splicepos = $ctx->{splicepos};
            $epoch     = $ctx->{epoch};
            my $dirtymark = $self->dirtymark;
            my $new_dm = $now;
            if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
                $new_dm = $epoch;
            }
            $self->dirtymark($new_dm);
            $setting_new_dirty_mark = 1;
            if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
                $self->merged(+{});
            }
        } else {
            $recent = [ grep { $_->{path} ne $path } @$recent ];
            $splicepos = 0;
        }
        if (defined $splicepos) {
            splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
            $paths_in_recent->{$path} = undef;
        }
        $memo_splicepos = $splicepos;
        $something_done = 1;
    }
    return
        {
         something_done => $something_done,
         oldest_allowed => $oldest_allowed,
         setting_new_dirty_mark => $setting_new_dirty_mark,
         recent => $recent,
         memo_splicepos => $memo_splicepos,
        }
}
sub _update_with_dirty_epoch {
    my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
    my $splicepos;
    my $new_recent = [];
    if (exists $paths_in_recent->{$path}) {
        my $cancel = 0;
    KNOWN_EVENT: for my $i (0..$#$recent) {
            if ($recent->[$i]{path} eq $path) {
                if ($recent->[$i]{epoch} eq $epoch) {
                    # nothing to do
                    $cancel = 1;
                    last KNOWN_EVENT;
                }
            } else {
                push @$new_recent, $recent->[$i];
            }
        }
        @$recent = @$new_recent unless $cancel;
    }
    if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
        $splicepos = 0;
    } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
        $splicepos = @$recent;
    } else {
        my $startingpoint;
        if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
            $startingpoint = 0;
        } else {
            $startingpoint = $memo_splicepos;
        }
    RECENT: for my $i ($startingpoint..$#$recent) {
            my $ev = $recent->[$i];
            if ($epoch eq $recent->[$i]{epoch}) {
                $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
            }
            if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
                $splicepos = $i;
                last RECENT;
            }
        }
    }
    return {
            recent => $recent,
            splicepos => $splicepos,
            epoch => $epoch,
    }
}

=head2 seed

Sets this recentfile in the state of 'seeded' which means it has to
re-evaluate its uptodateness.

=cut
sub seed {
    my($self) = @_;
    $self->seeded(1);
}

=head2 seeded

Tells if the recentfile is in the state 'seeded'.

=cut
sub seeded {
    my($self, $set) = @_;
    if (defined $set) {
        $self->_seeded ($set);
    }
    my $x = $self->_seeded;
    unless (defined $x) {
        $x = 0;
        $self->_seeded ($x);
    }
    return $x;
}

=head2 uptodate

True if this object has mirrored the complete interval covered by the
current recentfile.

=cut
sub uptodate {
    my($self) = @_;
    my $uptodate;
    my $why;
    if ($self->_uptodateness_ever_reached and not $self->seeded) {
        $why = "saturated";
        $uptodate = 1;
    }
    # it's too easy to misconfigure ttl and related timings and then
    # never reach uptodateness, so disabled 2009-03-22
    if (0 and not defined $uptodate) {
        if ($self->ttl_reached){
            $why = "ttl_reached returned true, so we are not uptodate";
            $uptodate = 0 ;
        }
    }
    unless (defined $uptodate) {
        # look if recentfile has unchanged timestamp
        my $minmax = $self->minmax;
        if (exists $minmax->{mtime}) {
            my $rfile = $self->_my_current_rfile;
            my @stat = stat $rfile;
            if (@stat) {
                my $mtime = $stat[9];
                if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
                    $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
                    $uptodate = 0;
                } else {
                    my $covered = $self->done->covered(@$minmax{qw(max min)});
                    $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
                    $uptodate = $covered;
                }
            } else {
                require Carp;
                $why = "Could not stat '$rfile': $!";
                Carp::cluck($why);
                $uptodate = 0;
            }
        }
    }
    unless (defined $uptodate) {
        $why = "fallthrough, so not uptodate";
        $uptodate = 0;
    }
    if ($uptodate) {
        $self->_uptodateness_ever_reached(1);
    }
    my $remember =
        {
         uptodate => $uptodate,
         why => $why,
        };
    $self->_remember_last_uptodate_call($remember);
    return $uptodate;
}

=head2 $obj->write_recent ($recent_files_arrayref)

Writes a I<recentfile> based on the current reflection of the current
state of the tree limited by the current interval.

=cut
sub _resort {
    my($self) = @_;
    @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
    return;
}
sub write_recent {
    my ($self,$recent) = @_;
    die "write_recent called without argument" unless defined $recent;
    my $Last_epoch;
 SANITYCHECK: for my $i (0..$#$recent) {
        if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
            require Carp;
            Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
                          $recent->[$i]{epoch}, $Last_epoch, $self->interval);
            # you may want to:
            # $self->_resort($recent);
            # last SANITYCHECK;
        }
        $Last_epoch = $recent->[$i]{epoch};
    }
    my $minmax = $self->minmax;
    if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
        $minmax->{max} = @$recent && exists $recent->[0]{epoch} ? $recent->[0]{epoch} : undef;
    }
    if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
        $minmax->{min} = @$recent && exists $recent->[-1]{epoch} ? $recent->[-1]{epoch} : undef;
    }
    $self->minmax($minmax);
    my $meth = sprintf "write_%d", $self->protocol;
    $self->$meth($recent);
}

=head2 $obj->write_0 ($recent_files_arrayref)

Delegate of C<write_recent()> on protocol 0

=cut

sub write_0 {
    my ($self,$recent) = @_;
    my $rfile = $self->rfile;
    YAML::Syck::DumpFile("$rfile.new",$recent);
    rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
}

=head2 $obj->write_1 ($recent_files_arrayref)

Delegate of C<write_recent()> on protocol 1

=cut

sub write_1 {
    my ($self,$recent) = @_;
    my $rfile = $self->rfile;
    my $suffix = $self->serializer_suffix;
    my $data = {
                meta => $self->meta_data,
                recent => $recent,
               };
    my $serialized;
    if ($suffix eq ".yaml") {
        $serialized = YAML::Syck::Dump($data);
    } elsif ($HAVE->{"Data::Serializer"}) {
        my $serializer = Data::Serializer->new
            ( serializer => $serializers{$suffix} );
        $serialized = $serializer->raw_serialize($data);
    } else {
        die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
    }
    open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
    print $fh $serialized;
    close $fh or die "Could not close '$rfile.new': $!";
    rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
}

BEGIN {
    my $nq = qr/[^"]+/; # non-quotes
    my @pod_lines = 
        split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }

=head1 SERIALIZERS

The following suffixes are supported and trigger the use of these
serializers:

=over 4

=item C<< ".yaml" => "YAML::Syck" >>

=item C<< ".json" => "JSON" >>

=item C<< ".sto"  => "Storable" >>

=item C<< ".dd"   => "Data::Dumper" >>

=back

=cut

BEGIN {
    my @pod_lines = 
        split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }

=head1 INTERVAL SPEC

An interval spec is a primitive way to express time spans. Normally it
is composed from an integer and a letter.

As a special case, a string that consists only of the single letter
C<Z>, stands for MAX_INT seconds.

The following letters express the specified number of seconds:

=over 4

=item C<< s => 1 >>

=item C<< m => 60 >>

=item C<< h => 60*60 >>

=item C<< d => 60*60*24 >>

=item C<< W => 60*60*24*7 >>

=item C<< M => 60*60*24*30 >>

=item C<< Q => 60*60*24*90 >>

=item C<< Y => 60*60*24*365.25 >>

=back

=cut

=head1 SEE ALSO

L<File::Rsync::Mirror::Recent>,
L<File::Rsync::Mirror::Recentfile::Done>,
L<File::Rsync::Mirror::Recentfile::FakeBigFloat>

=head1 BUGS

Please report any bugs or feature requests through the web interface
at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
I will be notified, and then you'll automatically be notified of
progress on your bug as I make changes.

=head1 KNOWN BUGS

Memory hungry: it seems all memory is allocated during the initial
rsync where a list of all files is maintained in memory.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc File::Rsync::Mirror::Recentfile

You can also look for information at:

=over 4

=item * RT: CPAN's request tracker

L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>

=item * CPAN Ratings

L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>

=item * Search CPAN

L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>

=back


=head1 ACKNOWLEDGEMENTS

Thanks to RJBS for module-starter.

=head1 AUTHOR

Andreas König

=head1 COPYRIGHT & LICENSE

Copyright 2008,2009 Andreas König.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.


=cut

1; # End of File::Rsync::Mirror::Recentfile

# Local Variables:
# mode: cperl
# cperl-indent-level: 4
# End: