The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl
#$Id: dbi.pl,v 1.13 2012/01/14 10:33:30 dk Exp $
use strict;
use warnings;

use IO::Socket::INET;
use IO::Lambda qw(:all);
use IO::Lambda::DBI;
use IO::Lambda::Thread qw(new_thread);
use IO::Lambda::Fork qw(new_process);
use IO::Lambda::Socket;

my $port = 3333;

sub usage
{
	print <<USAGE;

Test implementation of non-blocking DBI. This script can work in several modes,
run with one of the parameters to switch:

   $0 thread      - use DBI calls in a separate thread
   $0 fork        - use DBI calls in a separate process
   $0 remote HOST - connect to host to port $port and request DBI there
   $0 listen      - listen on port $port, execute incoming connections
	
USAGE
	exit;
}

my $mode = shift(@ARGV) || '';
usage unless $mode =~ /^(fork|thread|remote|listen)$/;

sub check_dbi
{
	my $dbi = shift;
	my $tries = 3;
	lambda {
		my $expect = int rand 100;
		context $dbi-> selectrow_array('SELECT 1 + ?', {}, $expect);
	tail {
		return warn("remote db error:@_\n") unless shift;
		my $ret = -1 + shift;
		print "$expect -> $ret\n";

		if ( $tries--) {
			this-> start;
		}
	}}
}

sub execute
{
	my $dbi = shift;
	lambda {
		context $dbi-> connect('DBI:mysql:database=mysql', '', '');
		tail {
			return warn("remote db connect error:@_\n") unless shift;
			context 
				check_dbi($dbi),
				check_dbi($dbi),
				check_dbi($dbi);
		tails {
			context $dbi-> disconnect;
		&tail();
	}}}-> wait;
}

my %dbopt = ( timeout => 5 );

# run

if ( $mode eq 'thread') {
	die $IO::Lambda::Thread::DISABLED if $IO::Lambda::Thread::DISABLED;

	my ($thread, $socket) = new_thread( sub {
		IO::Lambda::Message::DBI-> new( shift )-> run;
	}, 1);
	
	my $dbi = IO::Lambda::DBI-> new( $socket, $socket, %dbopt);
	execute($dbi);
	undef $dbi;
	
	$thread-> join;

} elsif ( $mode eq 'fork') {
	my ( $pid, $socket) = new_process {
		IO::Lambda::Message::DBI-> new( shift )-> run;
	};
	
	my $dbi = IO::Lambda::DBI-> new( $socket, $socket, %dbopt);
	execute($dbi);
	undef $dbi;

	close($socket);
	waitpid($pid, 0);
} elsif ( $mode eq 'remote') {
	my $host = shift @ARGV;
	usage unless defined $host;

	my $s = IO::Socket::INET-> new("$host:$port");
	die $! unless $s;

	my $dbi = IO::Lambda::DBI-> new( $s, $s, %dbopt);
	execute($dbi);

	undef $s;
} elsif ( $mode eq 'listen') {
	my $s = IO::Socket::INET-> new(
		LocalPort => $port,
		Listen    => 5,
		ReuseAddr => 1,
	);
	die $! unless $s;
	while ( 1) {
		my $c = IO::Handle-> new;
		die $! unless accept( $c, $s);
		eval {
			my $loop = IO::Lambda::Message::DBI-> new($c);
			$loop-> run;
			close($c);
		};
		warn $@ if $@;
	}
}