The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package KiokuDB::Test::Fixture::Concurrency;
BEGIN {
  $KiokuDB::Test::Fixture::Concurrency::AUTHORITY = 'cpan:NUFFIN';
}
{
  $KiokuDB::Test::Fixture::Concurrency::VERSION = '0.56';
}
use Moose;

use Test::More;
use Test::Exception;

use Try::Tiny;
use List::Util qw(sum);
use Scope::Guard;
use POSIX qw(_exit :sys_wait_h);

use namespace::clean -except => 'meta';

with qw(KiokuDB::Test::Fixture) => { -excludes => [qw/run required_backend_roles/] };

use constant required_backend_roles => qw(Clear TXN Concurrency::POSIX);

use constant FORKS => 20;
use constant COUNTERS => 250;
use constant ACCOUNTS => 250;
use constant ITER => 10;

my @ids = qw(foo bar gorch baz);

{
    package # hide from PAUSE
        Foo;
    use Moose;

    has bar => ( is => 'rw' );
}

has exit => (
    isa => "Int",
    is  => "rw",
    default => 0,
);

before precheck => sub {
    my $self = shift;

};

sub create {
    return (
        counter => { value => 0 },
        (map { ( "counter_$_"   => { value => 0 } ) } 1 .. COUNTERS ),
        (map { ( "${_}_account" => { value => 0 } ) } 1 .. ACCOUNTS),
    );
}

sub run {
    my $self = shift;

    SKIP: {
        local $Test::Builder::Level = $Test::Builder::Level + 1;

        skip "Set KIOKUDB_STRESS_TEST to run this fixture", 1 unless $ENV{KIOKUDB_STRESS_TEST};

        $self->precheck;

        lives_ok {
            local $Test::Builder::Level = $Test::Builder::Level - 1;
            $self->txn_do(sub {
                my $s = $self->new_scope;
                $self->backend->clear;
                $self->populate;
            });
        } "populated OK";

        $self->clear_directory;

        $self->verify;

        is_deeply( [ $self->live_objects ], [ ], "no live objects at end of " . $self->name . " fixture" );

        $self->clear_live_objects;
    }
}

sub verify {
    my $self = shift;

    ok( !$self->has_directory, "no directory object" );

    # force re-instantiation of directory
    $self->clear_directory;

    foreach my $num ( 1 .. FORKS ) {
        defined(my $pid = fork) or die $!;
        next if $pid;

        my $guard = Scope::Guard->new(sub {
            # avoid cleanups on errors
            use POSIX qw(_exit);
            _exit($self->exit);
        });

        # make sure each child gets a different random seed
        srand($$ ^ time);

        $self->run_child($num);
    }

    my $skip = 0;

    while ( wait > 0 ) {
        do {
            $skip++ if $?
        } while waitpid(-1, WNOHANG) > 0;

        $self->check_consistency;
    }

    $self->check_counters($skip);
}

sub check_consistency {
    my $self = shift;

    my $ok;

    my ( $counter, @accounts );

    attempt: foreach my $attempt ( 1 .. FORKS ) {
        last attempt if try {
            $self->txn_do(sub {
                my $s = $self->new_scope;

                $counter = $self->lookup("counter")->{value};

                @accounts = map { $_->{value} } $self->lookup(map { "${_}_account" } 1 .. ACCOUNTS);
            });

            ++$ok;
        };
    }


    SKIP: {
        skip "lock contention", 3 unless $ok;

        cmp_ok( $counter, '>=', 0, "counter not 0" );
        cmp_ok( $counter, '<=', FORKS, "counter <= counters" );

        is( sum(@accounts), 0, "account sum is 0 (state is consistent)" );
    };
}

sub check_counters {
    my ( $self, $skip ) = @_;

    $self->txn_do(sub {
        my $s = $self->new_scope;

        my $counter = $self->lookup_ok("counter");
        is( $counter->{value}, FORKS-$skip, "total counter value" );

        my @counters = $self->lookup_ok(map { "counter_$_" } 1 .. COUNTERS);

        is( sum(map { $_->{value} } @counters), FORKS-$skip, "counters sum" );
    });
}

sub run_child {
    my ( $self, $child ) = @_;

    for ( 1 .. ITER ) {
        try {
            $self->txn_do(sub {
                my $s = $self->new_scope;

                my $id = @ids[int rand @ids];

                if ( my $foo = $self->lookup($id) ) {
                    if ( rand > 0.5 ) {
                        $foo->bar("foo");
                        $self->update($foo);
                    } else {
                        $self->delete($foo);
                    }
                } else {
                    $self->insert( foo => Foo->new( bar => "bar" ) );
                }

                my ( $one, $two ) = $self->lookup( map { int(rand ACCOUNTS) . "_account" } 1 .. 2 );

                my $amount = int(rand 10000);
                $one->{value} += $amount;
                $two->{value} -= $amount;

                select(undef,undef,undef,0.01) if rand > 0.5;

                $self->update($one, $two);
            });
        };

        select(undef,undef,undef, 0.01);
    }

    my $ok;

    attempt: foreach my $attempt ( 1 .. FORKS*2 ) {
        last attempt if try {
            $self->txn_do(sub {
                my $s = $self->new_scope;

                my $counter = $self->lookup("counter");

                my $counter_two = $self->lookup("counter_" . int rand COUNTERS);

                select(undef,undef,undef,0.02 * rand) if rand > 0.5;

                $counter_two->{value}++;
                $self->update($counter_two);

                $counter->{value}++;
                $self->update($counter);
            });

            ++$ok;
        };

        select(undef,undef,undef, 0.05);
    }

    $self->exit(1) unless $ok;
}

__PACKAGE__->meta->make_immutable;

__PACKAGE__

__END__

=pod

=head1 NAME

KiokuDB::Test::Fixture::Concurrency

=head1 VERSION

version 0.56

=head1 AUTHOR

Yuval Kogman <nothingmuch@woobling.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Yuval Kogman, Infinity Interactive.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut