The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package App::ProxyHunter;

use strict;
use Coro::Select;
use Coro::LWP;
use Coro::Timer;
use Coro::Util;
use Coro::PatchSet;
use Coro;
use LWP::UserAgent;
use LWP::Protocol::https;
use LWP::Protocol::connect;
use LWP::Protocol::socks;
use Net::Proxy::Type ':types';
use Carp;
use Time::HiRes;
use DateTime;
use List::Util 'shuffle';
use Getopt::Long;
use App::ProxyHunter::Constants;
use App::ProxyHunter::Config;
use App::ProxyHunter::Model;

use constant CORO_DELAY       => 5;
use constant SELECT_LIMIT     => 100;
use constant MAX_SEC_IN_QUEUE => 60;

our $VERSION = '0.02';

sub start {
	my $class = shift;
	my $opts_ok = Getopt::Long::Parser->new->getoptionsfromarray(
		\@_,
		'create-config=s' => \my $create_config,
		'create-schema'   => \my $create_schema,
		'config=s'        => \my $config,
		'daemon:s'        => \my $daemon
	);
	
	my $usage = "usage:
	$0 --create-config /path/to/config
	$0 --config /path/to/config --create-schema
	$0 --config /path/to/config [--daemon [/path/to/pidfile]]\n";
	
	unless ($opts_ok) {
		die $usage;
	}
	
	if (defined $create_config) {
		open my $fh, '>', $create_config
			or croak "can't open `$create_config' for write: $!";
		
		while (my $str = <DATA>) {
			print $fh $str;
		}
		
		close $fh;
		return;
	}
	
	unless (defined $config) {
		die $usage;
	}
	
	$config = App::ProxyHunter::Config->new(path => $config);
	
	my $model = App::ProxyHunter::Model->new(
		connect_info => [
			sprintf('dbi:%s:database=%s', $config->db->driver, $config->db->name) .
				($config->db->host ? ';host='.$config->db->host : ''),
			$config->db->login,
			$config->db->password,
			{
				%{$config->db->driver_cfg},
				AutoInactiveDestroy => 1
			}
		],
		schema_class => 'App::ProxyHunter::Model::Schema::' . $config->db->driver
	);
	
	if (defined $create_schema) {
		while (my $query = $model->schema_class->get_create_query()) {
			$model->do($query);
		}
		return;
	}
	
	for my $engine (@{$config->searcher->engines}) {
		$engine = 'App::ProxyHunter::SearchEngine::'.$engine;
		eval "require $engine"
			or croak "Can't load search engine module $engine: $@";
	}
	
	if (defined $daemon) {
		$SIG{INT} = $SIG{TERM} = sub {
			unlink $daemon if length($daemon) > 0;
			exit;
		};
		
		eval {
			require Proc::Daemon;
		};
		if ($@) {
			croak 'You need to install Proc::Daemon to perform daemonization: ', $@;
		}
		
		Proc::Daemon::Init({
			work_dir => '.',
			length($daemon)>0 ? (pid_file => $daemon) : ()
		});
	}
	
	$model->update('proxy', {in_progress => 0}); # clean
	
	my @coros;
	if ($config->checker->enabled) {
		push @coros, $class->_start_checkers($config, $model);
	}
	if ($config->rechecker->enabled) {
		push @coros, $class->_start_recheckers($config, $model);
	}
	if ($config->speed_checker->enabled) {
		push @coros, $class->_start_speed_checkers($config, $model);
	}
	if ($config->searcher->enabled) {
		push @coros, $class->_start_searcher($config, $model);
	}
	
	$_->join() for @coros;
}

sub _start_checkers {
	my ($class, $config, $model) = @_;
	
	my @coros;
	my @queue;
	my $delay;
	
	for (1..$config->checker->workers) {
		push @coros, async {
			my $type_checker = Net::Proxy::Type->new(http_strict => 1, noauth => 1);
			my $speed_checker = $config->checker->speed_check ?
				LWP::UserAgent->new(agent => 'Mozilla 5.0', timeout => 10, parse_head => 0) :
				undef;
			
			while (1) {
				unless (@queue) {
					if ($delay) {
						Coro::Timer::sleep $delay;
						next;
					}
					
					unless (@queue = $class->_get_queue($model, {checked => 0}, {order_by => 'checkdate'})) {
						$delay = CORO_DELAY;
						Coro::Timer::sleep $delay;
						$delay = 0;
						next;
					}
				}
				
				my $proxy = shift @queue;
				my ($type, $conn_time) = $class->_check($type_checker, $proxy)
					or do {
						$proxy->delete();
						next;
					};
				
				$proxy->set('type', $type);
				$proxy->set('conn_time', $conn_time);
				
				if ($speed_checker) {
					my $speed = $class->_check_speed($speed_checker, $config->speed_checker, $proxy)
						or do {
							$proxy->delete();
							next;
						};
					
					$proxy->set('speed', $speed);
					$proxy->set('speed_checkdate', DateTime->now(time_zone => TZ));
				}
				
				$proxy->set('checked', 1);
				$proxy->set('checkdate', DateTime->now(time_zone => TZ));
				$proxy->set('in_progress', \0); # force update
				$proxy->set('success_total', 1);
				$proxy->update();
			}
		}
	}
	
	return @coros;
}

sub _start_recheckers {
	my ($class, $config, $model) = @_;
	
	my @coros;
	my @queue;
	my $delay;
	
	for (1..$config->rechecker->workers) {
		push @coros, async {
			my $type_checker = Net::Proxy::Type->new(http_strict => 1, noauth => 1);
			my $speed_checker = $config->rechecker->speed_check ?
				LWP::UserAgent->new(agent => 'Mozilla 5.0', timeout => 10, parse_head => 0) :
				undef;
			
			while (1) {
				if ($delay) {
					Coro::Timer::sleep $delay;
					next;
				}
				
				unless (@queue) {
					unless (@queue = $class->_get_queue($model, {checked => 1}, {order_by => 'checkdate'})) {
						$delay = CORO_DELAY;
						Coro::Timer::sleep $delay;
						$delay = 0;
						next;
					}
					
					my $sec_after_last_check = DateTime->now(time_zone => TZ)->
							subtract_datetime_absolute($queue[0]->checkdate)->seconds;
					
					if ($sec_after_last_check < $config->rechecker->interval) {
						$delay = $config->rechecker->interval - $sec_after_last_check;
						Coro::Timer::sleep $delay;
						$delay = 0;
						next;
					}
				}
				
				my $proxy = shift @queue;
				my $fail;
				if (my ($type, $conn_time) = $class->_check($type_checker, $proxy)) {
					$proxy->set('type', $type);
					$proxy->set('conn_time', $conn_time);
					
					if ($speed_checker) {
						if (my $speed = $class->_check_speed($speed_checker, $config->speed_checker, $proxy)) {
							$proxy->set('speed', $speed);
						}
						else {
							$fail = 1;
							$proxy->set('speed', 0);
						}
						
						$proxy->set('speed_checkdate', DateTime->now(time_zone => TZ));
					}
				}
				else {
					$fail = 1;
				}
				
				if ($fail) {
					$proxy->set('fails_total', $proxy->fails_total+1);
					$proxy->set('fails', $proxy->fails+1);
					if ($proxy->fails > $config->rechecker->fails_before_delete) {
						$proxy->delete();
						next;
					}
				}
				else {
					$proxy->set('fails', 0);
					$proxy->set('success_total', $proxy->success_total+1);
				}
				
				$proxy->set('checkdate', DateTime->now(time_zone => TZ));
				$proxy->set('in_progress', \0); # force update
				$proxy->update();
			}
		}
	}
	
	return @coros;
}

sub _start_speed_checkers {
	my ($class, $config, $model) = @_;
	
	my @coros;
	my @queue;
	my $delay;
	
	for (1..$config->speed_checker->workers) {
		push @coros, async {
			my $speed_checker = LWP::UserAgent->new(agent => 'Mozilla 5.0', timeout => 10, parse_head => 0);
			
			while (1) {
				if ($delay) {
					Coro::Timer::sleep $delay;
					next;
				}
				
				unless (@queue) {
					unless (@queue = $class->_get_queue(
								$model,
								{checked => 1},
								{order_by => 'speed_checkdate'},
								$config->speed_checker->interval)) {
						
						$delay = CORO_DELAY;
						Coro::Timer::sleep $delay;
						$delay = 0;
						next;
					}
					
					my $sec_after_last_check = DateTime->now(time_zone => TZ)->
							subtract_datetime_absolute($queue[0]->speed_checkdate)->seconds;
					
					if ($sec_after_last_check < $config->speed_checker->interval) {
						$delay = $config->speed_checker->interval - $sec_after_last_check;
						Coro::Timer::sleep $delay;
						$delay = 0;
						next;
					}
				}
				
				my $proxy = shift @queue;
				if (my $speed = $class->_check_speed($speed_checker, $config->speed_checker, $proxy)) {
					$proxy->set('speed', $speed);
					$proxy->set('success_total', $proxy->success_total+1);
				}
				else {
					$proxy->set('fails', $proxy->fails+1);
					if ($proxy->fails > $config->rechecker->fails_before_delete) {
						$proxy->delete();
						next;
					}
					$proxy->set('speed', 0);
					$proxy->set('fails_total', $proxy->fails_total+1);
				}
				
				my $now = DateTime->now(time_zone => TZ);
				$proxy->set('checkdate', $now);
				$proxy->set('speed_checkdate', $now);
				$proxy->set('in_progress', \0); # force update
				$proxy->update();
			}
		}
	}
}

sub _start_searcher {
	my ($class, $config, $model) = @_;
	
	async {
		while (1) {
			my @querylist = shuffle @{$config->searcher->querylist};
			
			my @engines;
			my $i = 0;
			for my $engine (@{$config->searcher->engines}) {
				push @engines, $engine->new(query => $querylist[$i++ % @querylist]);
			}
			
			my $proxylist;
			while (@engines) {
				for ($i=$#engines; $i>=0; $i--) {
					unless ($proxylist = $engines[$i]->next) {
						splice @engines, $i, 1;
						next;
					}
					
					if (@$proxylist) {
						my $now = DateTime->now(time_zone => TZ);
						
						for my $proxy (@$proxylist) {
							my ($host, $port) = split /:/, $proxy;
							$host = Coro::Util::inet_aton($host) or next;
							$host = join('.', unpack('C4', $host));
							eval {
								# ignore duplicates
								$model->fast_insert('proxy', {
									host       => $host,
									port       => $port,
									insertdate => $now
								});
							}
						}
					}
				}
			}
		}
	}
}

sub _get_queue {
	my ($class, $model, $conditions, $rules, $interval) = @_;
	
	$conditions->{in_progress} = 0;
	$rules->{limit} = SELECT_LIMIT;
	
	my $iter = $model->search('proxy', $conditions, $rules);
	my $now = DateTime->now(time_zone => TZ);
	my $date_column = $rules->{order_by};
	
	my @rows;
	my @ids;
	
	while (my $proxy = $iter->next) {
		if (defined $interval && 
			$interval - $now->subtract_datetime_absolute($proxy->$date_column)->seconds > MAX_SEC_IN_QUEUE) {
			
			next;
		}
		
		push @rows, $proxy;
		push @ids, $proxy->id;
	}
	
	if (@ids) {
		$model->update('proxy', {in_progress => 1}, {id => \@ids});
	}
	
	return @rows;
}

sub _check {
	my ($class, $checker, $proxy) = @_;
	
	my $full_mask = 0;
	$full_mask |= $_ for grep { $_ != UNKNOWN_PROXY && $_ != DEAD_PROXY } keys %Net::Proxy::Type::NAME;
	my @check_mask;
	
	if ($proxy->type) {
		# first check for previous type of the proxy to speed up
		push @check_mask, $proxy->type;
		push @check_mask, $full_mask&(~$proxy->type);
	}
	else {
		push @check_mask, $full_mask;
	}
	
	for my $mask (@check_mask) {
		my ($type, $conn_time) = $checker->get($proxy->host, $proxy->port, $mask);
		
		unless ($type == DEAD_PROXY || $type == UNKNOWN_PROXY) {
			return ($type, $conn_time);
		}
	}
	
	return;
}

my %uri_scheme = (
	&HTTP_PROXY    => 'http',
	&CONNECT_PROXY => 'connect',
	&HTTPS_PROXY   => 'connect',
	&SOCKS4_PROXY  => 'socks4',
	&SOCKS5_PROXY  => 'socks',
);

sub _check_speed {
	my ($class, $checker, $config, $proxy) = @_;
	
	$checker->proxy(['http', 'https'] => sprintf('%s://%s:%s', $uri_scheme{$proxy->type}, $proxy->host, $proxy->port));
	
	my @speed_variations;
	my $received_bytes = 0;
	my $curspeed;
	my $maxbytes = 1024*1024;
	my $start = Time::HiRes::time();
	
	my $resp = $checker->get(
		$proxy->type == HTTPS_PROXY ? $config->https_url : $config->http_url,
		':content_cb' => sub {
			$received_bytes += length($_[0]);
			$curspeed = $received_bytes / (Time::HiRes::time() - $start);
			die if $received_bytes > $maxbytes;
			
			if (@speed_variations == 10) {
				my $ok = 1;
				for my $sv (@speed_variations) {
					if (abs($sv - $curspeed) > 5 * 1024) {
						$ok = 0;
						last;
					}
				}
				
				die if $ok;
				shift @speed_variations;
			}
			
			push @speed_variations, $curspeed;
		}
	);
	
	return if $resp->code > 299;
	return int($curspeed);
}

1;

=pod

=head1 NAME

App::ProxyHunter - main proxyhunter's class

=head1 METHODS

=head2 App::ProxyHunter->start(@ARGV)

Static method to start C<proxyhunter> execution. @ARGV is a list with options which C<proxyhunter> understands.

=head1 SEE ALSO

L<proxyhunter>

=head1 AUTHOR

Oleg G, E<lt>oleg@cpan.orgE<gt>

=head1 COPYRIGHT AND LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself

=cut

__DATA__
db = {
	driver: "SQLite",
	driver_cfg: {},
	host: "localhost",
	name: "proxyhunter.db"
	login: "",
	password: ""
}

# first check
check = {
	enabled: true,
	workers: 30,
	speed_check: true # do immediate speed check
	                  # if true will performe speed check
	                  # even if speed_check.enabled is false
}

# recheck for alive proxies
recheck = {
	enabled: true,
	workers: 10,
	interval: 300,
	speed_check: false,
	fails_before_delete: 3 # if proxy was alive once
                           # how many times (in a row) its check may fail
                           # before it will be deleted from db
}

speed_check = {
	enabled: true,
	workers: 10,
	interval: 1800,
	http_url: "http://mirror.yandex.ru/debian/ls-lR.gz" # should be > 1 mb
	https_url: "https://mail.ru"
}


search = {
	enabled: true,
	# which queries to use when searching for proxylist
	# via google and other search engines
	querylist: [
		"proxy list",
		"socks proxy list",
		"socks5 proxy",
		"socks4 proxy",
		"free proxy list"
	],
	# which search engines to use
	# should be in App::ProxyHunter::SearchEngine:: namespace
	engines: [
		"Google", "Bing", "Yandex"
	]
}