The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
Changes 010
FTP/ControlFilter.pm 1112
FTP/ControlSession.pm 205329
FTP/DataSession.pm 66179
FTP.pm 48180
Makefile.PL 14
README 11
examples/server.pl 1017
8 files changed (This is a version diff) 342732
@@ -1,5 +1,15 @@
 Revision history for Perl extension POE::Component::Server::FTP.
 
+0.04 Mon Feb 16 16:19:48 2004
+	- Fixed a nasty bug that involved the data session disconnecting early
+	- Bandwidth limiting added (upload and download), you can choose by ip or by connection
+	- Options added: LimitSceme (ip/per), DownloadLimit (bytes), UploadLimit (bytes)
+	- Forking isn't working yet (Wheel::Run)
+	- Logging has improved. Added an option (LogLevel) (1-4)
+	- Added SIZE command
+	- Added SITE CHMOD (some clients add SITE for some reason
+	- Added a timeout (option TimeOut (seconds))
+
 0.03 Thu Feb 5 23:32:45 2004
 	- oops, fixed hardcoded ip for pasv
 
@@ -1,9 +1,9 @@
 package POE::Component::Server::FTP::ControlFilter;
 
-######################################################################
+###########################################################################
 ### POE::Component::Server::FTP::ControlFilter
 ### L.M.Orchard (deus_x@pobox.com)
-### Modified by David Davis ( xantus@cpan.org )
+### David Davis (xantus@cpan.org)
 ###
 ### TODO:
 ### --
@@ -12,8 +12,8 @@ package POE::Component::Server::FTP::ControlFilter;
 ### This module is free software; you can redistribute it and/or
 ### modify it under the same terms as Perl itself.#
 ###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
 
 use strict;
 
@@ -23,20 +23,20 @@ sub new {
 	my $class = shift;
 	my %args = @_;
 
-	bless {}, $class;
+	bless({}, $class);
 }
 
 sub get {
 	my ($self, $raw) = @_;
 	my @events = ();
-		
+	
 	foreach my $input (@$raw) {
 		$input =~ s/\n//g;
 		$input =~ s/\r//g;
 		DEBUG && print STDERR "<<< $input\n";
 		my ($cmd, @args) = split(/ /, $input);
 
-		push @events,	{ cmd => uc $cmd, args =>\@args };
+		push(@events, { cmd => uc $cmd, args =>\@args });
 	}
 	
 	return \@events;
@@ -44,17 +44,18 @@ sub get {
 
 sub put {
 	my ($self, $in) = @_;
-	my $out;
+	my @out = ();
 
 	foreach (@$in) {
-		push @$out, "$_\n";
+		DEBUG && print STDERR ">>> $_\n";
+		push(@out, "$_\n");
 	}
 		
-	return $out;
+	return \@out;
 }
 
 sub get_pending {
-	my($self)=@_;
+	my ($self) = @_;
 	warn ref($self)." does not support the get_pending() method\n";
 	return;
 }
@@ -1,31 +1,30 @@
 package POE::Component::Server::FTP::ControlSession;
 
-######################################################################
+###########################################################################
 ### POE::Component::Server::FTP::ControlSession
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
 ###
 ### TODO:
 ### -- Better PASV port picking
 ### -- Support both ASCII and BINARY transfer types
 ### -- More logging!!
+### -- MOTD after login
+### -- MOTD before login (seperate)
 ###
 ### Copyright (c) 2001 Leslie Michael Orchard.  All Rights Reserved.
 ### This module is free software; you can redistribute it and/or
 ### modify it under the same terms as Perl itself.
 ###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
 
 use strict;
 
-use IO::Socket::INET;
 use POE qw(Session Wheel::ReadWrite Driver::SysRW Wheel::SocketFactory);
 use POE::Component::Server::FTP::DataSession;
 use POE::Component::Server::FTP::ControlFilter;
 
-sub DEBUG { 0 }
-
 sub new {
 	my $type = shift;
 	my $opt = shift;
@@ -41,6 +40,9 @@ sub new {
 				_stop		=> '_stop',
 				_default	=> '_default',
 				_child		=> '_child',
+				_reset_timeout => '_reset_timeout',
+				_write_log	=> '_write_log',
+				time_out	=> 'time_out',
 				receive		=> 'receive',
 				flushed		=> 'flushed',
 				error		=> 'error',
@@ -69,7 +71,10 @@ sub new {
 				REST		=> 'REST',
 				ABOR		=> 'ABOR',
 				APPE		=> 'APPE',
+				SIZE		=> 'SIZE',
 				
+				SITE		=> 'SITE',
+
 				# unimplemented
 #				RNFR		=> 'RNFR',
 
@@ -79,10 +84,9 @@ sub new {
 				XPWD		=> 'PWD',
 				XCUP		=> 'CDUP',
 				XCWD		=> 'CWD',
-				
+
 				# rfc 737
 				XSEN		=> 'XSEN',
-				
 			}
 		],
 	);
@@ -91,7 +95,7 @@ sub new {
 }
 
 sub _start {
-	my ($kernel, $heap, $opt) = @_[KERNEL, HEAP, ARG0];
+	my ($kernel, $heap, $session, $opt) = @_[KERNEL, HEAP, SESSION, ARG0];
 
 	eval("use $opt->{FilesystemClass}");
 	if ($@) {
@@ -104,109 +108,164 @@ sub _start {
 	$kernel->sig('INT', 'signals');
 
 	# start reading and writing
-	$heap->{control} = new POE::Wheel::ReadWrite(
-		Handle			=> $opt->{Handle},					# on this handle
-		Driver			=> new POE::Driver::SysRW,	# using sysread and syswrite
-		Filter			=> new POE::Component::Server::FTP::ControlFilter,
-		InputEvent		=> 'receive',				# generating this event for requests
-		ErrorEvent		=> 'error',					# generating this event for errors
-		FlushedEvent	=> 'flushed',				# generating this event for all-sent
+	$heap->{control} = POE::Wheel::ReadWrite->new(
+		# on this handle
+		Handle			=> $opt->{Handle}, 
+		# using sysread and syswrite
+		Driver			=> POE::Driver::SysRW->new(), 
+		Filter			=> POE::Component::Server::FTP::ControlFilter->new(),
+		# generating this event for requests
+		InputEvent		=> 'receive',
+		# generating this event for errors
+		ErrorEvent		=> 'error',
+		# generating this event for all-sent
+		FlushedEvent	=> 'flushed',
 	);
 
-	# maybe do this?
-#	$heap->{fs_session} = POE::Wheel::Run->new(
-#		Program => sub {
-#			my $fs = ("$fs_class")->new($fs_args);
-#		},
-#		StdioFilter  => POE::Filter::Line->new(),
-#		StderrFilter => POE::Filter::Line->new(),
-#		StdoutEvent  => "_job_stdout",
-#		StderrEvent  => "_job_stderr",
-#		CloseEvent   => "_job_close",
-#	);
-#	$heap->{control}->put( "Job " . $heap->{job}->PID . " started." );
-
 	$heap->{pasv} = 0;
 	$heap->{auth} = 0;
+	$heap->{rest} = 0;
 	$heap->{host} = $opt->{PeerAddr};
 	$heap->{port} = $opt->{PeerPort};
 	$heap->{filesystem} = $fs;
 	%{$heap->{params}} = %{ $opt };
-
-	DEBUG && print "Control session started for $heap->{host} : $heap->{port}\n";
+	
+	if ($heap->{params}{'TimeOut'} > 0) {
+		$heap->{time_out} = $kernel->delay_set(time_out => $heap->{params}{'TimeOut'});
+		$kernel->call($session->ID => _write_log => 4 => "Timeout set: id ".$heap->{time_out});
+	}
+	
+	$kernel->call($session->ID => _write_log => 4 => "Control session started for $heap->{host} : $heap->{port}");
 
 	$heap->{control}->put("220 $opt->{Domain} FTP server ($opt->{Version} ".localtime()." ready.)");
 }
 
 sub _stop {
-	my $heap = $_[HEAP];
-	DEBUG && print "Client session ended with $heap->{host} : $heap->{port}\n";
+	my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
+	$kernel->call($session->ID => _write_log => 4 => "Client session ended with $heap->{host} : $heap->{port}");
 }
 
 sub _child {
-	my ($kernel, $heap, $action, $child) = @_[KERNEL, HEAP, ARG0, ARG1];
+	my ($kernel, $heap, $session, $action, $child) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1];
 
 	if ($action eq 'create') {
+		$kernel->call($session->ID => _write_log => 4 => "child session created ".$child->ID);
 		$heap->{pending_session} = $child;
 	} elsif ($action eq 'lose') {
-		$heap->{control}->put("226 Transfer complete.");
+		$kernel->call($session->ID => _write_log => 3 => sprintf("Transfer complete %d kB/s of %d bytes",($child->get_heap->{bps}/1023),$child->get_heap->{total_bytes}));
+		$kernel->call($session->ID => _write_log => 4 => "child session lost ".$child->ID);
+		$kernel->call($session->ID => "_reset_timeout");
+		if ($heap->{params}{'LimitSceme'} eq 'ip') {
+			my $cheap = $child->get_heap;
+			$kernel->call($heap->{params}{'Alias'} => _dcon_cleanup => $cheap->{type}, $cheap->{remote_ip} => $child->ID);
+		}
+		if (defined $heap->{abor}) {
+			delete $heap->{abor};
+		} else {
+			$heap->{control}->put("226 Transfer complete.");
+		}
 		delete $heap->{pending_session};
 	}
+	
+	return 0;
+}
+
+sub time_out {
+	my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+
+	# if we have a child session, then there must be a transfer
+	# going on, reset the timer
+	if (defined $heap->{pending_session} &&
+			$heap->{params}{'TimeOut'} > 0) {
+		$heap->{time_out} = $kernel->delay_set(time_out => $heap->{params}{'TimeOut'});
+		$kernel->call($session->ID => _write_log => 4 => "Timeout re-set: id ".$heap->{time_out});
+		return;
+	}
+	
+	unless ($heap->{control}) {
+		$kernel->alarm_remove_all( );
+		delete $heap->{control};
+	}
+	
+	if ($heap->{auth} == 0) {
+		$kernel->call($session->ID => _write_log => 2 => "Session ".$session->ID." timed out before login (".$heap->{params}{'TimeOut'}.")");
+		$heap->{control}->put("421 Disconnecting you because you did't login before ".$heap->{params}{'TimeOut'}." seconds, Goodbye.");
+	} else {
+		$kernel->call($session->ID => _write_log => 2 => "Session ".$session->ID." timed out (".$heap->{params}{'TimeOut'}.")");
+		$heap->{control}->put("421 Disconnecting you because you were inactive for ".$heap->{params}{'TimeOut'}." seconds, Goodbye.");
+	}
+	
+	$kernel->alarm_remove_all( );
+	delete $heap->{control};
 }
 
 sub receive {
 	my ($kernel, $session, $heap, $cmd) = @_[KERNEL, SESSION, HEAP, ARG0];
 
-	DEBUG && print "Received input from $heap->{host} : $heap->{port}\n";
-	DEBUG && print "Args: ".join(',',@{$cmd->{args}})."\n";
+	$kernel->call($session->ID => _write_log => 4 => "Received input from $heap->{host} : $heap->{port} -> $cmd->{cmd} (".join(',',@{$cmd->{args}}).")");
 
+	if ($heap->{auth} == 1) {
+		$kernel->call($session->ID => '_reset_timeout');
+	}
 	$kernel->post($session, $cmd->{cmd}, \@{$cmd->{args}});
 }
 
 sub error {
-	my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
+	my ($kernel, $heap, $session, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
 
 	if ($errnum) {
-		DEBUG && print( "Session with $heap->{host} : $heap->{port} ",
-						"encountered $operation error $errnum: $errstr\n"
-					  );
+		$kernel->call($session->ID => _write_log => 4 => "Session with $heap->{host} : $heap->{port} encountered $operation error $errnum: $errstr");
 	} else {
-		DEBUG && print( "Client at $heap->{host} : $heap->{port} disconnected\n" );
+		$kernel->call($session->ID => _write_log => 4 => "Client at $heap->{host} : $heap->{port} disconnected");
 	}
 
 	# either way, stop this session
+	$kernel->alarm_remove_all( );
 	delete $heap->{control};
 }
 
 sub flushed {
 	my ($kernel, $heap) = @_[KERNEL, HEAP];
-
-	DEBUG && print "Response has been flushed to $heap->{host} : $heap->{port}\n";
-
-	$kernel->post($heap->{pending_session}, 'execute')
-	  if (defined $heap->{pending_session});
+	
+	if (defined $heap->{pending_session} && $heap->{listening} == 0) {
+# this broke stuff, now execute is yielded another way
+#		$kernel->post($heap->{pending_session}->ID, 'execute');
+	}
 }
 
 
 sub signals {
-	my ($heap, $signal_name) = @_[HEAP, ARG0];
-
-	DEBUG && print( "Session with $heap->{host} : $heap->{port} caught SIG",
-					$signal_name, "\n"
-				  );
+	my ($kernel, $heap, $session, $signal_name) = @_[KERNEL, HEAP, SESSION, ARG0];
+	
+	$kernel->call($session->ID => _write_log => 4 => "Session with $heap->{host} : $heap->{port} caught SIG $signal_name");
 	# do not handle the signal
 	return 0;
 }
 
+sub SITE {
+	my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
+	
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+	} else {
+		my $cmd = shift(@$args);
+		$kernel->call($session->ID,$cmd,$args);
+	}
+}
+
 sub NOOP {
-	my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+	my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
 	
-	# reset a timeout timer?
-	$heap->{control}->put("200 No-op okay.");
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+	} else {
+		# resetting the timeout is done in receive()
+		$heap->{control}->put("200 No-op okay.");
+	}
 }
 
 sub XSEN {
-	my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+	my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
 	
 	# TODO send a message to the terminal
 	#$args = join(' ',@$args);
@@ -215,8 +274,9 @@ sub XSEN {
 }
 
 sub QUIT {
-	my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+	my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
 
+	$kernel->alarm_remove_all( );
 	$heap->{control}->put("221 Goodbye.");
 	delete $heap->{control};
 }
@@ -244,17 +304,19 @@ sub PASS {
 
 	if (exists($heap->{username})) {
 		if ($heap->{params}{AnonymousLogin} eq 'deny' && $heap->{username} eq 'anonymous') {
-			_write_log("Anonymous login denied.");
+			$kernel->call($session->ID => _write_log => 1 => "Anonymous login denied.");
 			$heap->{control}->put("530 Login incorrect.");
 			$heap->{auth} = 0;
 			return;
 		}
 		if ($fs->login($heap->{username}, $password)) {
-			_write_log("User $heap->{username} logged in.");
-			$heap->{control}->put("230 User $heap->{username} logged in.");
+			$kernel->call($session->ID => _write_log => 1 => "User $heap->{username} logged in.");
+			# MOTD?
+			$heap->{control}->put("230 Logged in.");
 			$heap->{auth} = 1;
+			$kernel->call($session->ID => "_reset_timeout");
 		} else {
-			_write_log("Incorrect login");
+			$kernel->call($session->ID => _write_log => 1 => "Incorrect login");
 			$heap->{control}->put("530 Login incorrect.");
 			$heap->{auth} = 0;
 		}
@@ -267,31 +329,57 @@ sub PASS {
 sub REST {
 	my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
 	
-	$heap->{rest} = $args->[0];
-	$heap->{control}->put("350 Will attempt to restart at postion $args->[0].");
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
+	if ($args->[0] =~ m/^\d+$/) {
+		$heap->{rest} = $args->[0];
+		$heap->{control}->put("350 Will attempt to restart at postion $args->[0].");
+	} else {
+		
+	}
 }
 
 # Not implemented.
 sub TYPE {
 	my ($kernel, $session, $heap, $type) = @_[KERNEL, SESSION, HEAP, ARG0];
 	
-	$type = $type->[0];
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+		return;
+	}
 	
+	$type = $type->[0];
+		
 	$heap->{control}->put("200 Type set to I.");
 }
 
 # Not implemented.
 sub SYST {
 	my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+	
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
 	$heap->{control}->put("215 UNIX Type: L8");
 }
 
 sub ABOR {
 	my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
 	
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
 	if (defined $heap->{pending_session}) {
-		$kernel->post($heap->{pending_session} => 'data_throttle');
-		$kernel->post($heap->{pending_session} => 'shutdown');
+		$kernel->post($heap->{pending_session}->ID => 'data_throttle');
+		$kernel->post($heap->{pending_session}->ID => '_drop');
+		$heap->{abor} = 1;
 	}
 	
 	$heap->{control}->put("200 ABOR successfull");
@@ -300,150 +388,187 @@ sub ABOR {
 
 sub MDTM {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
+	my @modtime = $fs->modtime($fs);
+	if ($modtime[0] == 0) {
+		$heap->{control}->put("550 MDTM $fn: Permission denied.");
 	} else {
-		my $fs = $heap->{filesystem};
-		my @modtime = $fs->modtime($fs);
-		if ($modtime[0] == 0) {
-			$heap->{control}->put("550 MDTM $fn: Permission denied.\n");
-		} else {
-			$heap->{control}->put("213 ".$modtime[1]);
-		}
+		$heap->{control}->put("213 ".$modtime[1]);
 	}
 }
 
-sub CHMOD {
+sub SIZE {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	my $mode = shift(@$fn);
+	
+	if ($heap->{auth} == 0) {
+		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
 	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
+	my $size = $fs->size($fs);
+	$heap->{control}->put("213 ".$size);
+	
+#	my @modtime = $fs->modtime($fs);
+#	if ($modtime[0] == 0) {
+#		$heap->{control}->put("550 SIZE $fn: Permission denied.");
+#	} else {
+#		$heap->{control}->put("213 ".$modtime[1]);
+#	}
+}
 
+sub CHMOD {
+	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
-	} else {
-		my $fs = $heap->{filesystem};
+		return;
+	}
+	
+	my $mode = shift(@$fn);
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
 
-		if ($fs->chmod($mode, $fn)) {
-			$heap->{control}->put("200 CHMOD command successful.");
-		} else {
-			$heap->{control}->put("550 CHMOD command unsuccessful");
-		}
+	if ($fs->chmod($mode, $fn)) {
+		$heap->{control}->put("200 CHMOD command successful.");
+	} else {
+		$heap->{control}->put("550 CHMOD command unsuccessful");
 	}
 }
 
 sub DELE {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
-	} else {
-		my $fs = $heap->{filesystem};
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
 
-		if ($fs->delete($fn)) {
-			$heap->{control}->put("250 DELE command successful");
-		} else {
-			$heap->{control}->put("550 DELE command unsuccessful");
-		}
+	if ($fs->delete($fn)) {
+		$heap->{control}->put("250 DELE command successful");
+	} else {
+		$heap->{control}->put("550 DELE command unsuccessful");
 	}
 }
 
 sub MKD	{
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
+	
+	my $ret = $fs->mkdir($fn);
+	if ($ret == 1) {
+		$fn =~ s/"/""/g; # doublequoting
+		$heap->{control}->put("257 \"$fn\" directory created");
+	} elsif ($ret == 2) {
+		$fn =~ s/"/""/g; # doublequoting
+		$heap->{control}->put("521 \"$fn\" directory already exists");
 	} else {
-		my $fs = $heap->{filesystem};
-		
-		my $ret = $fs->mkdir($fn);
-		if ($ret == 1) {
-			$fn =~ s/"/""/g; # doublequoting
-			$heap->{control}->put("257 \"$fn\" directory created");
-		} elsif ($ret == 2) {
-			$fn =~ s/"/""/g; # doublequoting
-			$heap->{control}->put("521 \"$fn\" directory already exists");
-		} else {
-			$heap->{control}->put("550 MKDIR $fn: Permission denied.\n");
-		}
+		$heap->{control}->put("550 MKDIR $fn: Permission denied.");
 	}
 }
 
 sub XMKD {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
+	
+	my $ret = $fs->mkdir($fn);
+	if ($ret == 1) {
+		$fn =~ s/"/""/g; # doublequoting
+		$heap->{control}->put("257 \"$fn\" directory created");
+	} elsif ($ret == 2) {
+		$fn =~ s/"/""/g; # doublequoting
+		$heap->{control}->put("521 \"$fn\" directory already exists");
 	} else {
-		my $fs = $heap->{filesystem};
-		
-		my $ret = $fs->mkdir($fn);
-		if ($ret == 1) {
-			$fn =~ s/"/""/g; # doublequoting
-			$heap->{control}->put("257 \"$fn\" directory created");
-		} elsif ($ret == 2) {
-			$fn =~ s/"/""/g; # doublequoting
-			$heap->{control}->put("521 \"$fn\" directory already exists");
-		} else {
-			$heap->{control}->put("550 MKDIR $fn: Permission denied.\n");
-		}
+		$heap->{control}->put("550 MKDIR $fn: Permission denied.");
 	}
 }
 
 sub RMD {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
-	} else {
-		my $fs = $heap->{filesystem};
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
 
-		if ($fs->rmdir($fn)) {
-			$heap->{control}->put("250 RMD command successful");
-		} else {
-			$heap->{control}->put("550 RMD $fn: Permission denied");
-		}
+	if ($fs->rmdir($fn)) {
+		$heap->{control}->put("250 RMD command successful");
+	} else {
+		$heap->{control}->put("550 RMD $fn: Permission denied");
 	}
 }
 
 sub XRMD {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
-	} else {
-		my $fs = $heap->{filesystem};
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
 
-		if ($fs->rmdir($fs->cwd().$fn)) {
-			$heap->{control}->put("250 RMD command successful");
-		} else {
-			$heap->{control}->put("550 RMD $fn: Permission denied");
-		}
+	if ($fs->rmdir($fs->cwd().$fn)) {
+		$heap->{control}->put("250 RMD command successful");
+	} else {
+		$heap->{control}->put("550 RMD $fn: Permission denied");
 	}
 }
 
 sub CDUP {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
-	} else {
-		my $fs = $heap->{filesystem};
+		return;
+	}
+	
+	$fn = join(' ',@$fn);
+	my $fs = $heap->{filesystem};
 
-		if ($fs->chdir('..')) {
-			$heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
-		} else {
-			$heap->{control}->put("550 ..: No such file or directory.");
-		}
+	if ($fs->chdir('..')) {
+		$heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
+	} else {
+		$heap->{control}->put("550 ..: No such file or directory.");
 	}
 }
 
 sub CWD {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 	} else {
+		$fn = join(' ',@$fn);
 		my $fs = $heap->{filesystem};
 
 		if ($fs->chdir($fn)) {
@@ -456,10 +581,11 @@ sub CWD {
 
 sub PWD {
 	my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$fn = join(' ',@$fn);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 	} else {
+		$fn = join(' ',@$fn);
 		my $fs = $heap->{filesystem};
 
 		$heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
@@ -469,15 +595,16 @@ sub PWD {
 sub PORT {
 	my ($kernel, $session, $heap, $data_port) = @_[KERNEL, SESSION, HEAP, ARG0];
 
-	$data_port = join(' ',@$data_port);
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 		return;
 	}
+	
+	$data_port = join(' ',@$data_port);
+	
 	### Planning to use a Wheel here...
 
-	#$heap->{control} = new POE::Wheel::ReadWrite
-#			(
+#	$heap->{control} = POE::Wheel::ReadWrite->new(
 #			 Handle       => $handle,                # on this handle
 #			 Driver       => new POE::Driver::SysRW, # using sysread and syswrite
 #			 Filter       => new POE::Filter::FTPd::Control,
@@ -494,6 +621,7 @@ sub PORT {
 
 sub PASV {
 	my ($kernel, $session, $heap, $data_port) = @_[KERNEL, SESSION, HEAP, ARG0];
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 		return;
@@ -503,7 +631,7 @@ sub PASV {
 	my $p2 = (int rand(100))+1;
 	$p1 -= $p2;
 
-	POE::Component::Server::FTP::DataSession->new({
+	POE::Component::Server::FTP::DataSession->new($heap->{params},{
 		fs => $heap->{filesystem},
 		port1 => $p1,
 		port2 => $p2,
@@ -513,6 +641,7 @@ sub PASV {
 	$heap->{pasv} = 1;
 	my $ip = $heap->{params}{ListenIP};
 	$ip =~ s/\./,/g;
+	print STDERR "ip is $ip\n";
 	$heap->{control}->put("227 Entering Passive Mode. ($ip,$p1,$p2)");
 }
 
@@ -529,9 +658,9 @@ sub LIST {
 	$heap->{control}->put("150 Opening ASCII mode data connection for /bin/ls.");
 
 	if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
-		$kernel->post($heap->{pending_session} => start_LIST => $dirfile);
+		$kernel->post($heap->{pending_session}->ID => start_LIST => $dirfile);
 	} else {
-		POE::Component::Server::FTP::DataSession->new({
+		POE::Component::Server::FTP::DataSession->new($heap->{params},{
 			fs => $heap->{filesystem},
 			data_port => $heap->{last_port_cmd},
 			cmd => 'LIST',
@@ -542,7 +671,7 @@ sub LIST {
 }
 
 sub NLST {
-	my ($kernel, $session, $heap, $dirfile) = @_[KERNEL, SESSION, HEAP, ARG0];
+my ($kernel, $session, $heap, $dirfile) = @_[KERNEL, SESSION, HEAP, ARG0];
 
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
@@ -554,23 +683,25 @@ sub NLST {
 	$heap->{control}->put("150 Opening ASCII mode data connection for /bin/ls.");
 
 	if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
-		$kernel->post($heap->{pending_session} => start_NLST => $dirfile);
+		$kernel->post($heap->{pending_session}->ID => start_NLST => $dirfile);
 	} else {
-		POE::Component::Server::FTP::DataSession->new({
+		POE::Component::Server::FTP::DataSession->new($heap->{params},{
 			fs => $heap->{filesystem},
 			data_port => $heap->{last_port_cmd},
 			cmd => 'NLST',
-			opt => $dirfile
+			opt => $dirfile,
 		});
 	}
 }
 
 sub STOR {
 	my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 		return;
 	}
+	
 	my $fs = $heap->{filesystem};
 	$filename = join(' ',@$filename);
 	my $fh;
@@ -579,9 +710,12 @@ sub STOR {
 		$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
 
 		if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
-			$kernel->post($heap->{pending_session} => start_STOR => $fh, $heap->{rest});
+			$kernel->post($heap->{pending_session}->ID => start_STOR => $fh,
+			{
+				rest => $heap->{rest},
+			});
 		} else {
-			POE::Component::Server::FTP::DataSession->new({
+			POE::Component::Server::FTP::DataSession->new($heap->{params},{
 				fs => $fs,
 				data_port => $heap->{last_port_cmd},
 				cmd => 'STOR',
@@ -597,10 +731,12 @@ sub STOR {
 
 sub APPE {
 	my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 		return;
 	}
+	
 	my $fs = $heap->{filesystem};
 	$filename = join(' ',@$filename);
 	my $fh;
@@ -610,12 +746,12 @@ sub APPE {
 		$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
 
 		if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
-			$kernel->post($heap->{pending_session} => start_STOR => $fh);
+			$kernel->post($heap->{pending_session}->ID => start_STOR => $fh);
 		} else {
-			POE::Component::Server::FTP::DataSession->new({
+			POE::Component::Server::FTP::DataSession->new($heap->{params},{
 				fs => $fs,
 				data_port => $heap->{last_port_cmd},
-				cmd => 'APPE',
+				cmd => 'STOR',
 				opt => $fh,
 			});
 		}
@@ -627,84 +763,72 @@ sub APPE {
 
 sub RETR {
 	my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
-	$filename = join(' ',@$filename);
+	
 	if ($heap->{auth} == 0) {
 		$heap->{control}->put("530 Not logged in");
 		return;
 	}
+	
+	$filename = join(' ',@$filename);
 	my $fs = $heap->{filesystem};
 	my $fh;
 
-	_write_log("RETR $filename");
-
 	if ($fh = $fs->open_read($filename)) {
 		$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
 		if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
-			$kernel->post($heap->{pending_session} => start_RETR => $fh, $heap->{rest});
+			$kernel->post($heap->{pending_session}->ID => start_RETR => $fh,
+			{
+				rest => $heap->{rest},
+			});
 		} else {
-			POE::Component::Server::FTP::DataSession->new({
+			POE::Component::Server::FTP::DataSession->new($heap->{params},{
 				fs => $fs,
 				data_port => $heap->{last_port_cmd},
 				cmd => 'RETR',
 				opt => $fh,
-				rest => $heap->{rest}
+				rest => $heap->{rest},
 			});
 		}
 	} else {
-		$heap->{control}->put
-		  ("550 No such file or directory: $filename.");
+		$heap->{control}->put("550 No such file or directory: $filename.");
 	}
 }
 
 sub _default {
-	my ($kernel, $heap, $cmd, $args) = @_[KERNEL, HEAP, ARG0, ARG1];
+	my ($kernel, $heap, $session, $cmd, $args) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1];
 
 	if ($cmd =~ m/^_/) {
-		DEBUG && print "NonHandled Event: $cmd(".join(", ", @$args).")\n";
+		$kernel->call($session->ID => _write_log => 4 => "NonHandled Event: $cmd(".join(", ", @$args).")");
 	} else {
-		DEBUG && print "UNSUPPORTED COMMAND: $cmd(".join(", ", @$args).")\n";
+		$kernel->call($session->ID => _write_log => 4 => "UNSUPPORTED COMMAND: $cmd(".join(", ", @$args).")");
 
 		$heap->{control}->put("500 '$cmd': command not understood");
 	}
+	
+	return 0;
 }
 
-sub _write_log {
-	my $datetime = localtime();
-
-	print STDERR "[$datetime] ";
-	print STDERR shift;
-	print STDERR "\n";
+sub _reset_timeout {
+	my ($kernel,$heap) = @_[KERNEL, HEAP];
+	
+	if (defined $heap->{time_out}) {
+		$kernel->delay_adjust( $heap->{time_out}, $heap->{params}{'TimeOut'} );
+	}
 }
 
-sub _copy_fh {
-	my ($fh_in, $fh_out) = @_;
-
-	eval {
-		my ($len, $buf, $offset, $written);
-		my $blksize = (stat $fh_in)[11] || 16384;
-
-		while($len = $fh_in->sysread($buf, $blksize)) {
-			if (!defined $len) {
-				next if $! =~ /^Interrupted/;
-#				carp "System read error: $!\n";
-			}
-			$offset = 0;
-			$written = 0;
-			while ($len) {
-				$written = $fh_out->syswrite($buf, $len, $offset) || 0;
-				if ($!) {
-#					print "WRITE ERROR: $!\n";
-					sleep(1);
-				}
-
-#				or die "System write error: $!\n";
-				$len    -= $written;
-				$offset += $written;
-			}
+sub _write_log {
+	my ($kernel, $session, $heap, $sender, $verbose, $msg) = @_[KERNEL, SESSION, HEAP, SENDER, ARG0, ARG1];
+	if ($verbose <= $heap->{params}{'LogLevel'}) {
+		# if we're not forking, then pass the logging off to the
+		# main session
+		if ($heap->{params}{_main_pid} == $$) {
+			$kernel->call($heap->{params}{'Alias'} => _write_log => { type => (($sender->ID == $session->ID) ? 'C' : 'D'), msg => $msg, v => $verbose });
+		} else {
+			my $datetime = localtime();
+			my $type = ($sender->ID == $session->ID) ? 'C' : 'D';
+			print STDERR "[$datetime][$type".$sender->ID."] $msg\n";
 		}
-	};
-
-	_write_log("ERROR: $@\n") if ($@);
+	}
 }
 
 1;
@@ -1,45 +1,45 @@
 package POE::Component::Server::FTP::DataSession;
 
-######################################################################
+###########################################################################
 ### POE::Component::Server::FTP::DataSession
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
 ###
 ### TODO:
 ### -- POEify the data channel
 ### -- Move file seeking to Filesys::Virtual
+### -- get rid of *_limit and use params instead
 ###
 ### Copyright (c) 2001 Leslie Michael Orchard.  All Rights Reserved.
 ### This module is free software; you can redistribute it and/or
 ### modify it under the same terms as Perl itself.
 ###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
 
 use strict;
-
-use IO::Scalar;
 use IO::Socket::INET;
+use IO::Scalar;
 use POE qw(Session Wheel::ReadWrite Filter::Stream Driver::SysRW Wheel::SocketFactory);
+use Time::HiRes qw(time);
 
 use Data::Dumper;
 
-sub DEBUG { 0 }
-
 # Create a new DataSession
 
 sub new {
-	my ($type, $opt) = @_;
+	my ($type, $para, $opt) = @_;
 	my $self = bless { }, $type;
 
-	POE::Session->create(
+	my $ses = POE::Session->create(
 		 #options =>{ trace=>1 },
-		 args => [ $opt ],
+		 args => [ $para, $opt ],
 		 object_states => [
 			$self => {
 				_start			=> '_start',
 				_stop			=> '_stop',
 
+				_drop			=> '_drop',
 				start_LIST		=> 'start_LIST',
 				start_NLST		=> 'start_NLST',
 				start_STOR		=> 'start_STOR',
@@ -53,6 +53,8 @@ sub new {
 				data_error		=> 'data_error',
 				data_throttle	=> 'data_throttle',
 				data_resume		=> 'data_resume',
+				
+				stop_socket		=> 'stop_socket',
 
 				_sock_up		=> '_sock_up',
 				_sock_down		=> '_sock_down',
@@ -60,20 +62,28 @@ sub new {
 		],
 	);
 
-	undef;
+	return $ses->ID;
 }
 
 sub _start {
-	my ($kernel, $heap, $opt) = @_[KERNEL, HEAP, ARG0];
+	my ($kernel, $heap, $para, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
 
 # generating a port num
 #	my $x = pack('n',$port);
 #	my $p1 = ord(substr($x,0,1));
 #	my $p2 = ord(substr($x,1,1));
 
+	$heap->{send_rec_okay} = 0;
 	$heap->{listening} = 0;
+	$heap->{rest} = 0;
+	$heap->{total_bytes} = 0;
+	$heap->{bps} = 0;
+	$heap->{type} = 'dl'; # default to download
+	$heap->{c_session} = $_[SENDER]->ID;
+	%{$heap->{params}} = %{$para};
 
 	if ($opt->{data_port}) {
+		$kernel->call($heap->{c_session} => _write_log => 4 => "starting a PORT data session");
 		# PORT command
 		my ($h1, $h2, $h3, $h4, $p1, $p2) = split(',', $opt->{data_port});
 
@@ -92,8 +102,9 @@ sub _start {
 		);
 
 		$heap->{cmd} = $opt->{cmd};
-		$heap->{rest} = $opt->{rest};
+		$heap->{rest} = $opt->{rest} if ($opt->{rest});
 	} else {
+		$kernel->call($heap->{c_session} => _write_log => 4 => "starting a PASV data session");
 		# PASV command
 		$heap->{port} = ($opt->{port1}<<8)+$opt->{port2};
 		
@@ -105,7 +116,7 @@ sub _start {
 			SocketDomain   => AF_INET, # Sets the socket() domain
 			SocketType     => SOCK_STREAM, # Sets the socket() type
 			SocketProtocol => 'tcp', # Sets the socket() protocol
-			Reuse          => 'on', # Lets the port be reused
+			Reuse          => 'off', # Lets the port be reused
 		);
 
 		$heap->{listening} = 1;
@@ -121,10 +132,10 @@ sub _start {
 sub _sock_up {
 	my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
 
-	my $buffer_max = 8 * 1024;
+	my $buffer_max = 4 * 1024;
 	my $buffer_min = 128;
 
-	$heap->{data} = new POE::Wheel::ReadWrite(
+	$heap->{data} = POE::Wheel::ReadWrite->new(
 		Handle			=> $socket,
 		Driver			=> POE::Driver::SysRW->new(),
 		Filter			=> POE::Filter::Stream->new(),
@@ -136,25 +147,20 @@ sub _sock_up {
 		HighEvent		=> 'data_throttle',
 		LowEvent		=> 'data_resume',
 	);
-	# remote_ip??
 
 	if ($heap->{listening} == 0) {
-		DEBUG && print "Data session started for $heap->{cmd}($heap->{opt})\n";
+		$kernel->call($heap->{c_session} => _write_log => 4 => "data session started for $heap->{cmd} ($heap->{opt})");
 		$kernel->yield('start_'.(uc $heap->{cmd}), $heap->{opt});
 	} else {
-		DEBUG && print "Received connection from $heap->{remote_ip}\n";
+		# TODO check if correct IP connected if that option is on
+		$kernel->call($heap->{c_session} => _write_log => 4 => "received connection from $heap->{remote_ip}");
 	}
 }
 
 sub _sock_down {
-	DEBUG && print "socket down\n";
-	delete $_[HEAP]->{data};
-}
-
-sub start_PASV {
-	my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
-	my $fs = $heap->{filesystem};
-
+	my ($kernel, $heap) = @_[KERNEL, HEAP];
+	$kernel->call($heap->{c_session} => _write_log => 4 => "socket down");
+	delete $heap->{data};
 }
 
 sub start_LIST {
@@ -168,7 +174,8 @@ sub start_LIST {
 
 	$heap->{input_fh} = IO::Scalar->new(\$out);
 	$heap->{send_done} = 0;
-	$heap->{send_okay} = 1;
+	$heap->{send_rec_okay} = 1;
+	$kernel->yield('execute');
 }
 
 sub start_NLST {
@@ -182,35 +189,39 @@ sub start_NLST {
 
 	$heap->{input_fh} = IO::Scalar->new(\$out);
 	$heap->{send_done} = 0;
-	$heap->{send_okay} = 1;
+	$heap->{send_rec_okay} = 1;
+	$kernel->yield('execute');
 }
 
 sub start_RETR {
-	my ($kernel, $heap, $fh, $rest) = @_[KERNEL, HEAP, ARG0, ARG1];
+	my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
 
-	if (defined $rest) {
-		$heap->{rest} = $rest;
+	if (defined $opt->{rest}) {
+		$heap->{rest} = $opt->{rest};
 	}
 	
 	$heap->{input_fh} = $fh;
-	DEBUG && print "Seeking to $heap->{rest}\n";
 	seek($fh,$heap->{rest},0);
 	
 	$heap->{send_done} = 0;
-	$heap->{send_okay} = 1;
+	$heap->{send_rec_okay} = 1;
+	$kernel->yield('execute');
 }
 
 sub start_STOR {
-	my ($heap, $fh, $rest) = @_[HEAP, ARG0, ARG1];
-	
-	if (defined $rest) {
-		$heap->{rest} = $rest;
+	my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
+
+	if (defined $opt->{rest}) {
+		$heap->{rest} = $opt->{rest};
 	}
 	
 	$heap->{output_fh} = $fh;
 	
-	DEBUG && print "Seeking to $heap->{rest}\n";
 	seek($fh,$heap->{rest},0);
+	$heap->{type} = 'ul';
+	$heap->{send_rec_okay} = 1;
+	$heap->{xfer_time} = time();
+	$kernel->yield('execute');
 }
 
 sub _stop {
@@ -221,30 +232,77 @@ sub _stop {
 # Execute the session's pending upload
 
 sub execute {
-	if (defined $_[HEAP]->{input_fh}) {
-		$_[KERNEL]->yield('data_send');
-	} elsif (!defined $_[HEAP]->{output_fh}) {
-		if ($_[HEAP]->{listening} == 0) {
-			delete $_[HEAP]->{data};
+	my ($kernel, $heap, $session) =	@_[KERNEL, HEAP, SESSION];
+	
+	if (defined $heap->{input_fh}) {
+		$heap->{xfer_time} = time();
+		$kernel->yield('data_send');
+	} elsif (!defined $heap->{output_fh}) {
+		if ($heap->{listening} == 0) {
+			$kernel->call($session->ID => '_drop');
 		}
 	}
 }
 
+sub stop_socket {
+	my ($kernel, $session, $heap) =	@_[KERNEL, SESSION, HEAP];
+	
+	delete $heap->{time_out};
+	
+	if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+		# still a factory?! Time to drop connection
+		delete $heap->{data};
+	}
+}
+
 # Send a block to the remote client
 
 sub data_send {
 	my ($kernel, $session, $heap) =	@_[KERNEL, SESSION, HEAP];
 
 	if ( (!defined $heap->{input_fh}) || (! ref $heap->{input_fh} ) ) {
-		delete $heap->{data};
-	} elsif ($heap->{send_okay} && (defined $heap->{data})) {
+		$kernel->call($session->ID => '_drop');
+	} elsif ($heap->{send_rec_okay} && (defined $heap->{data})) {
+		
+		# if we haven't connected yet, then data will still be a factory
+		if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+			$kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
+			if (defined $heap->{time_out}) {
+				$heap->{time_out} = $kernel->delay_set(stop_socket => 30);
+			}
+			$kernel->delay('data_send' => 1);
+			return;
+		}
+		
+		if (defined $heap->{time_out}) {
+			$kernel->alarm_remove($heap->{time_out});
+			delete $heap->{time_out};
+		}
+		
+		$heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
+		
+		if ($heap->{params}{'DownloadLimit'} > 0) {
+			if ($heap->{params}{'LimitSceme'} eq 'ip') {	
+				if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'dl' => $heap->{remote_ip} => $heap->{bps})) {
+					$kernel->yield('data_send');
+					return;
+				}
+			} else {
+				if ($heap->{bps} > $heap->{params}{'DownloadLimit'}) {
+					$kernel->yield('data_send');
+					return;
+				}
+			}		
+		}
+		
 		### Read in a block from the file.
 		my $buf;
 		my $len = $heap->{input_fh}->read($buf, $heap->{block_size});
 
 		### If something was read, queue it to be sent, and yield
-		### back for another send_block.
+		### back for another data_send.
 		if ($len > 0) {
+			$heap->{total_bytes} += $len;
 			$heap->{data}->put($buf);
 			$kernel->yield('data_send');
 		} else {
@@ -253,12 +311,7 @@ sub data_send {
 			$fs->close_read($heap->{input_fh});
 			delete $heap->{input_fh};
 
-			### Thanks, poe.perl.org!
-			if ($heap->{data}->get_driver_out_octets() == 0) {
-				delete $heap->{data};
-			} else {
-				$heap->{send_done} = 1;
-			}
+			$kernel->call($session->ID => '_drop');
 		}
 	}
 }
@@ -266,22 +319,63 @@ sub data_send {
 # Recieve a block from the remote client
 
 sub data_receive {
-	if ($_[HEAP]->{output_fh}) {
-		$_[HEAP]->{output_fh}->print($_[ARG0]);
-	} else {
-		delete $_[HEAP]->{data};
+	my ($kernel, $heap, $session, $data) = @_[KERNEL, HEAP, SESSION, ARG0];
+
+	if ( (!defined $heap->{output_fh}) || (! ref $heap->{output_fh} ) ) {
+		$kernel->call($session->ID => '_drop');
+	} elsif ($heap->{send_rec_okay} && (defined $heap->{data})) {
+		
+		# if we haven't connected yet, then data will still be a factory
+		if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+			$kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
+			if (defined $heap->{time_out}) {
+				$heap->{time_out} = $kernel->delay_set(stop_socket => 30);
+			}
+			$kernel->delay('data_receive' => 1, $data);
+			return;
+		}
+		
+		if (defined $heap->{time_out}) {
+			$kernel->alarm_remove($heap->{time_out});
+			delete $heap->{time_out};
+		}
+		
+		$heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
+		
+		if ($heap->{params}{'UploadLimit'} > 0) {
+			if ($heap->{params}{'LimitSceme'} eq 'ip') {
+				if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'ul' => $heap->{remote_ip} => $heap->{bps})) {
+					$kernel->yield('data_receive');
+					$heap->{data}->pause_input();
+				} else {
+					$heap->{data}->resume_input();
+				}
+			} else {
+				if ($heap->{bps} > $heap->{params}{'UploadLimit'}) {
+					$kernel->yield('data_receive');
+					$heap->{data}->pause_input();
+				} else {
+					$heap->{data}->resume_input();
+				}
+			}
+		}
+		
+		if (defined $data) {			
+			$heap->{total_bytes} += length($data);
+			
+			$heap->{output_fh}->print($data);
+		}
 	}
 }
 
 sub data_error {
-	my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
+	my ($kernel, $heap, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
 	my $fs = $heap->{filesystem};
 
 	if ($errnum) {
-		DEBUG && print "Session with $heap->{remote_ip} : $heap->{port} ".
-		"encountered $operation error $errnum: $errstr\n";
+		$kernel->call($heap->{c_session} => _write_log => 4 => "session with $heap->{remote_ip} : $heap->{port} encountered $operation error $errnum: $errstr");
 	} else {
-		DEBUG && print "Client at $heap->{remote_ip} : $heap->{port} disconnected\n";
+		$kernel->call($heap->{c_session} => _write_log => 4 => "client at $heap->{remote_ip} : $heap->{port} disconnected");
 	}
 
 	# either way, stop this session
@@ -299,16 +393,35 @@ sub data_error {
 }
 
 sub data_flushed {
-	delete $_[HEAP]->{data} if ($_[HEAP]->{send_done});
+	my ($kernel, $heap) = @_[KERNEL, HEAP];
+	if ($heap->{send_done}) {
+		$kernel->call($heap->{c_session} => _write_log => 4 => "data flushed, dropping connection");
+		delete $heap->{data};
+	}
 }
 
 sub data_throttle {
-	$_[HEAP]->{okay_to_send} = 0;
+	$_[HEAP]->{send_rec_okay} = 0;
 }
 
 sub data_resume {
-	$_[HEAP]->{okay_to_send} = 1;
+	$_[HEAP]->{send_rec_okay} = 1;
 	$_[KERNEL]->yield('data_send');
 }
 
+sub _drop {
+	my ($kernel, $heap) = @_[KERNEL, HEAP];
+	
+	return unless ($heap->{data});
+	
+	# if we are fully flushed, go ahead and disconnect
+	if ($heap->{data}->get_driver_out_octets() == 0) {
+		$kernel->call($heap->{c_session} => _write_log => 4 => "data finished, dropping connection");
+		delete $heap->{data};
+	} else {
+		# if not, then we set a flag and the flushed event
+		# drops the connection
+		$heap->{send_done} = 1;
+	}
+}
 1;
@@ -1,9 +1,9 @@
 package POE::Component::Server::FTP;
 
-######################################################################
+###########################################################################
 ### POE::Component::Server::FTP
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
 ###
 ### TODO:
 ###
@@ -11,43 +11,52 @@ package POE::Component::Server::FTP;
 ### This module is free software; you can redistribute it and/or
 ### modify it under the same terms as Perl itself.
 ###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
 
 use strict;
 use warnings;
 
 our @ISA = qw(Exporter);
-our $VERSION = '0.03';
+our $VERSION = '0.04';
 
 use Socket;
 use Carp;
 use POE qw(Session Wheel::ReadWrite Filter::Line
-		   Driver::SysRW Wheel::SocketFactory);
+		   Driver::SysRW Wheel::SocketFactory
+		   Wheel::Run Filter::Reference);
 use POE::Component::Server::FTP::ControlSession;
 use POE::Component::Server::FTP::ControlFilter;
 
-sub DEBUG { 0 }
-
 sub spawn {
 	my $package = shift;
 	croak "$package requires an even number of parameters" if @_ % 2;
 	my %params = @_;
 	my $alias = $params{'Alias'};
 	$alias = 'ftpd' unless defined($alias) and length($alias);
-
-	my $listen_port = $params{listen_port} || 21;
+	$params{'Alias'} = $alias;
+	$params{'ListenPort'} = $params{'ListenPort'} || 21;
+	$params{'TimeOut'} = $params{'TimeOut'} || 0;
+	$params{'DownloadLimit'} = $params{'DownloadLimit'} || 0;
+	$params{'UploadLimit'} = $params{'UploadLimit'} || 0;
+	$params{'LimitSceme'} = $params{'LimitSceme'} || 'none';
 
 	POE::Session->create(
 		#options => {trace=>1},
-		args => [ %params ],
+		args => [ \%params ],
 		package_states => [
 			'POE::Component::Server::FTP' => {
-				_start       => '_start',
-				_stop        => '_stop',
-				accept       => 'accept',
-				accept_error => 'accept_error',
-				signals      => 'signals'
+				_start			=> '_start',
+				_stop			=> '_stop',
+				_write_log		=> '_write_log',
+				accept			=> 'accept',
+				accept_error	=> 'accept_error',
+				signals			=> 'signals',
+				_bw_limit		=> '_bw_limit',
+				_dcon_cleanup	=> '_dcon_cleanup',
+				cmd_stdout		=> 'cmd_stdout',
+				cmd_stderr		=> 'cmd_stderr',
+				cmd_error		=> 'cmd_error',
 			}
 		],
 	);
@@ -57,62 +66,176 @@ sub spawn {
 
 sub _start {
 	my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
-	%{$heap->{params}} = splice @_,ARG0;
-
-	$session->option( @{$heap->{params}{SessionOptions}} ) if $heap->{params}{SessionOptions};
-	$kernel->alias_set($heap->{params}{alias});
+	%{$heap->{params}} = %{ $_[ARG0] };
+	
+	$heap->{_main_pid} = $$;
+	
+	$session->option( @{$heap->{params}{'SessionOptions'}} ) if $heap->{params}{'SessionOptions'};
+	$kernel->alias_set($heap->{params}{'Alias'});
 
 	# watch for SIGINT
 	$kernel->sig('INT', 'signals');
 
 	# create a socket factory
-	$heap->{wheel} = new POE::Wheel::SocketFactory(
+	$heap->{wheel} = POE::Wheel::SocketFactory->new(
 		BindPort       => $heap->{params}{ListenPort},          # on this port
 		Reuse          => 'yes',          # and allow immediate port reuse
 		SuccessEvent   => 'accept',       # generating this event on connection
 		FailureEvent   => 'accept_error'  # generating this event on error
 	);
-	DEBUG && print "Listening to port $heap->{params}{ListenPort} on all interfaces.\n";
+	
+	$kernel->call($session->ID => _write_log => { v => 2, msg => "Listening to port $heap->{params}{ListenPort} on all interfaces." });
 }
 
 sub _stop {
-	DEBUG && print "Server stopped.\n";
+	my ($kernel, $session) = @_[KERNEL, SESSION];
+	$kernel->call($session->ID => _write_log => { v => 2, msg => "Server stopped." });
 }
 
 # Accept a new connection
 
 sub accept {
-	my ($heap, $accepted_handle, $peer_addr, $peer_port) = @_[HEAP, ARG0, ARG1, ARG2];
+	my ($kernel, $heap, $session, $accepted_handle, $peer_addr, $peer_port) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
 
 	$peer_addr = inet_ntoa($peer_addr);
-	my $ip = getsockname($accepted_handle);
-	DEBUG && print "Server received connection on $ip from $peer_addr : $peer_port\n";
+	my $ip = inet_ntoa((sockaddr_in(getsockname($accepted_handle)))[1]);
+	my $listen_ip = (defined $heap->{params}{FirewallIP}) ? $heap->{params}{FirewallIP} : $ip;
+	 
+	$kernel->call($session->ID => _write_log => { v => 2, msg => "Server received connection on $listen_ip from $peer_addr : $peer_port" });
 
 	my $opt = { %{$heap->{params}} };
 	$opt->{Handle} = $accepted_handle;
 	$opt->{ListenIP} = $ip;
 	$opt->{PeerAddr} = $peer_addr;
 	$opt->{PeerPort} = $peer_port;
+
 	POE::Component::Server::FTP::ControlSession->new($opt);
+	
+#	$heap->{control_session} = POE::Wheel::Run->new(
+#		Program     => sub {
+#			my $raw;
+#			my $size   = 4096;
+#			#my $filter = POE::Filter::Reference->new();
+#			my $filter = POE::Filter::Line->new();
+#			
+#			POE::Component::Server::FTP::ControlSession->new($opt);
+#			
+#			#
+#			# POE::Filter::Reference does buffering so that you don't have to.
+#			#
+#			READ: while (sysread( STDIN, $raw, $size )) {
+#				my $s = $filter->get( [$raw] );
+#				
+#				#
+#				# It is possible that $filter->get() has returned more than one
+#				# structure from the parent process.  Each $t represents whatever
+#				# was pushed from the parent.
+#				#
+#				foreach my $t (@$s) {
+#					print "-$t\n";
+#					#
+#					# Here is a stand-in for something that might be doing
+#					# real work.
+#					#
+#					#$t->{fubar} = 'mycmd';
+#					
+#					#
+#					# this part re-freezes the data structure and writes
+#					# it back to the parent process.
+#					#
+#					#my $u = $filter->put( [$t] );
+#					#print STDOUT @$u;
+#					
+#					#
+#					# this is the exit condition.
+#					#
+#					last READ if ( $t->{'cmd'} eq 'shutdown' );
+#				}	
+#			}
+#		},
+#		ErrorEvent  => 'cmd_error',
+#		StdoutEvent => 'cmd_stdout',
+#		StderrEvent => 'cmd_stderr',
+#		StdinFilter => POE::Filter::Line->new(),
+#		#StdioFilter => POE::Filter::Reference->new(),
+#		#StdinFilter => POE::Filter::Reference->new(),
+#	) or die "$0: can't POE::Wheel::Run->new";
+}
+
+sub _bw_limit {
+    my ($kernel, $heap, $session, $sender, $type, $ip, $bps) = @_[KERNEL, HEAP, SESSION, SENDER, ARG0, ARG1, ARG2];
+	$heap->{$type}{$ip}{$sender->ID} = $bps;
+	my $num = scalar(keys %{$heap->{$type}{$ip}});
+	my $newlimit = ((($type eq 'dl') ? $heap->{params}{'DownloadLimit'} : $heap->{params}{'UploadLimit'}) / $num);
+	return ($bps > $newlimit) ? 1 : 0;
+}
+
+sub _dcon_cleanup {
+    my ($kernel, $heap, $session, $type, $ip, $sid) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
+	$kernel->call($session->ID => _write_log => { v => 4, msg => "cleaing up $type limiter for $ip ($sid)" });
+	delete $heap->{$type}{$ip}{$sid};
+}
+
+sub cmd_error {
+    my ( $heap, $op, $code, $handle ) = @_[ HEAP, ARG0, ARG1, ARG4 ];
+
+    if ( $op eq 'read' and $code == 0 and $handle eq 'STDOUT' ) {
+        warn "child has closed output";
+        delete $heap->{control_session};
+    }
+}
+
+#
+# demonstrate that something is happening.
+#
+sub cmd_stdout {
+    my ( $heap, $txt ) = @_[ HEAP, ARG0 ];
+
+    print STDERR join ":", 'cmd_stdout ', $txt, "\n";
+
+}
+
+#
+# Just so that we can see what the child writes on errors.
+#
+sub cmd_stderr {
+    my ( $heap, $txt ) = @_[ HEAP, ARG0 ];
+    print STDERR "cmd_stderr: $txt\n";
 }
 
 # Handle an error in connection acceptance
 
 sub accept_error {
-	my ($operation, $errnum, $errstr) = @_[ARG0, ARG1, ARG2];
-	DEBUG && print "Server encountered $operation error $errnum: $errstr\n";
+	my ($kernel, $session, $operation, $errnum, $errstr) = @_[KERNEL, SESSION, ARG0, ARG1, ARG2];
+	$kernel->call($session->ID => write_log => { v => 1, msg => "Server encountered $operation error $errnum: $errstr" });
 }
 
 # Handle incoming signals (INT)
 
 sub signals {
-	my $signal_name = $_[ARG0];
+	my ($kernel, $session, $signal_name) = @_[KERNEL, SESSION, ARG0];
+
+	$kernel->call($session->ID => _write_log => { v => 1, msg => "Server caught SIG$signal_name" });
 
-	DEBUG && print "Server caught SIG$signal_name\n";
-	# do not handle the signal
+	# to stop ctrl-c / INT
+	if ($signal_name eq 'INT') {
+		#$_[KERNEL]->sig_handled();
+	}
+	
 	return 0;
 }
 
+sub _write_log {
+	my ($kernel, $session, $heap, $sender, $o) = @_[KERNEL, SESSION, HEAP, SENDER, ARG0];
+	if ($o->{v} <= $heap->{params}{'LogLevel'}) {
+		my $datetime = localtime();
+		my $sender = (defined $o->{sid}) ? $o->{sid} : $sender->ID;
+		my $type = (defined $o->{type}) ? $o->{type} : 'M';
+		print STDERR "[$datetime][$type$sender] $o->{msg}\n";
+	}
+}
+
+
 1;
 __END__
 
@@ -122,23 +245,31 @@ POE::Component::Server::FTP - Event-based FTP server on a virtual filesystem
 
 =head1 SYNOPSIS
 
-  use POE qw(Wheel::ReadWrite Driver::SysRW
-	    	   Wheel::SocketFactory Component::Server::FTP);
-
-  POE::Component::Server::FTP->spawn
-    (
-     Alias           => 'ftpd',
-     ListenPort      => 2112,
-     FilesystemClass => 'Filesys::Virtual::Plain',
-     FilesystemArgs  =>
-     {
-	  'root_path' => '/',      # This is actual root for all paths
-	  'cwd'       => '/',      # Initial current working dir
-	  'home_path' => '/Users', # Home directory for '~'
-     }
-    );
+	use POE qw(Component::Server::FTP);
+	use Filesys::Virtual;
+
+	POE::Component::Server::FTP->spawn(
+		Alias           => 'ftpd',				# ftpd is default
+		ListenPort      => 2112,				# port to listen on
+		Domain			=> 'blah.net',			# domain shown on connection
+		Version			=> 'ftpd v1.0',			# shown on connection, you can mimic...
+		AnonymousLogin	=> 'allow',				# deny, allow
+		FilesystemClass => 'Filesys::Virtual::Plain', # Currently the only one available
+		FilesystemArgs  => {
+			'root_path' => '/',					# This is actual root for all paths
+			'cwd'       => '/',					# Initial current working dir
+			'home_path' => '/home',				# Home directory for '~'
+		},
+		# use 0 to disable these Limits
+		DownloadLimit	=> (50 * 1024),			# 50 kb/s per ip/connection (use LimitSceme to configure)
+		UploadLimit		=> (100 * 1024),		# 100 kb/s per ip/connection (use LimitSceme to configure)
+		LimitSceme		=> 'ip',				# ip or per (connection)
+		
+		LogLevel		=> 4,					# 4=debug, 3=less info, 2=quiet, 1=really quiet
+		TimeOut			=> 120,					# Connection Timeout
+	);
 
-  $poe_kernel->run();
+	$poe_kernel->run();
 
 =head1 DESCRIPTION
 
@@ -148,6 +279,7 @@ virtual filesystem interface as implemented by Filesys::Virtual.
 =head1 AUTHORS
 
 L.M.Orchard, deus_x@pobox.com
+
 David Davis, xantus@cpan.org
 
 =head1 SEE ALSO
@@ -5,6 +5,9 @@ WriteMakefile(
     'NAME'		=> 'POE::Component::Server::FTP',
     'VERSION_FROM'	=> 'FTP.pm', # finds $VERSION
     'PREREQ_PM'		=> {
-			   Filesys::Virtual => 0.02
+			   Filesys::Virtual => 0.03,
+			   IO::Scalar => undef,
+			   IO::Socket::INET => undef,
+			   Socket => undef,
 	},
 );
@@ -6,5 +6,5 @@ with other mainstream ftp servers out there.
 To get started quickly, install this module and Filesys::Virtual
 and use server.pl from the examples dir.
 
-
+*Use this ftp server at your own risk*
 
@@ -4,17 +4,24 @@ use Filesys::Virtual;
 use POE qw(Component::Server::FTP);
 
 POE::Component::Server::FTP->spawn(
-	Alias           => 'ftpd',
-	ListenPort      => 2112,
-	Domain			=> 'teknikill.net',
-	Version			=> 'ftpd v1.0',
-	AnonymousLogin	=> 'deny', # deny, allow
-	FilesystemClass => 'Filesys::Virtual::Plain',
+	Alias           => 'ftpd',				# ftpd is default
+	ListenPort      => 2112,				# port to listen on
+	Domain			=> 'blah.net',			# domain shown on connection
+	Version			=> 'ftpd v1.0',			# shown on connection, you can mimic...
+	AnonymousLogin	=> 'allow',				# deny, allow
+	FilesystemClass => 'Filesys::Virtual::Plain',	# Currently the only one available
 	FilesystemArgs  => {
-		'root_path' => '/',      # This is actual root for all paths
-		'cwd'       => '/',      # Initial current working dir
-		'home_path' => '/home', # Home directory for '~'
-	}
+		'root_path' => '/',					# This is actual root for all paths
+		'cwd'       => '/',					# Initial current working dir
+		'home_path' => '/home',				# Home directory for '~'
+	},
+	# use 0 to disable these Limits
+	DownloadLimit	=> (50 * 1024),			# 50 kb/s per ip/connection (use LimitSceme to configure)
+	UploadLimit		=> (100 * 1024),		# 100 kb/s per ip/connection (use LimitSceme to configure)
+	LimitSceme		=> 'ip',				# ip or per (connection)
+	
+	LogLevel		=> 4,					# 4=debug, 3=less info, 2=quiet, 1=really quiet
+	TimeOut			=> 120,					# Connection Timeout
 );
 
 $poe_kernel->run();