@@ -1,5 +1,15 @@
Revision history for Perl extension POE::Component::Server::FTP.
+0.04 Mon Feb 16 16:19:48 2004
+ - Fixed a nasty bug that involved the data session disconnecting early
+ - Bandwidth limiting added (upload and download), you can choose by ip or by connection
+ - Options added: LimitSceme (ip/per), DownloadLimit (bytes), UploadLimit (bytes)
+ - Forking isn't working yet (Wheel::Run)
+ - Logging has improved. Added an option (LogLevel) (1-4)
+ - Added SIZE command
+ - Added SITE CHMOD (some clients add SITE for some reason
+ - Added a timeout (option TimeOut (seconds))
+
0.03 Thu Feb 5 23:32:45 2004
- oops, fixed hardcoded ip for pasv
@@ -1,9 +1,9 @@
package POE::Component::Server::FTP::ControlFilter;
-######################################################################
+###########################################################################
### POE::Component::Server::FTP::ControlFilter
### L.M.Orchard (deus_x@pobox.com)
-### Modified by David Davis ( xantus@cpan.org )
+### David Davis (xantus@cpan.org)
###
### TODO:
### --
@@ -12,8 +12,8 @@ package POE::Component::Server::FTP::ControlFilter;
### This module is free software; you can redistribute it and/or
### modify it under the same terms as Perl itself.#
###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
use strict;
@@ -23,20 +23,20 @@ sub new {
my $class = shift;
my %args = @_;
- bless {}, $class;
+ bless({}, $class);
}
sub get {
my ($self, $raw) = @_;
my @events = ();
-
+
foreach my $input (@$raw) {
$input =~ s/\n//g;
$input =~ s/\r//g;
DEBUG && print STDERR "<<< $input\n";
my ($cmd, @args) = split(/ /, $input);
- push @events, { cmd => uc $cmd, args =>\@args };
+ push(@events, { cmd => uc $cmd, args =>\@args });
}
return \@events;
@@ -44,17 +44,18 @@ sub get {
sub put {
my ($self, $in) = @_;
- my $out;
+ my @out = ();
foreach (@$in) {
- push @$out, "$_\n";
+ DEBUG && print STDERR ">>> $_\n";
+ push(@out, "$_\n");
}
- return $out;
+ return \@out;
}
sub get_pending {
- my($self)=@_;
+ my ($self) = @_;
warn ref($self)." does not support the get_pending() method\n";
return;
}
@@ -1,31 +1,30 @@
package POE::Component::Server::FTP::ControlSession;
-######################################################################
+###########################################################################
### POE::Component::Server::FTP::ControlSession
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
###
### TODO:
### -- Better PASV port picking
### -- Support both ASCII and BINARY transfer types
### -- More logging!!
+### -- MOTD after login
+### -- MOTD before login (seperate)
###
### Copyright (c) 2001 Leslie Michael Orchard. All Rights Reserved.
### This module is free software; you can redistribute it and/or
### modify it under the same terms as Perl itself.
###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
use strict;
-use IO::Socket::INET;
use POE qw(Session Wheel::ReadWrite Driver::SysRW Wheel::SocketFactory);
use POE::Component::Server::FTP::DataSession;
use POE::Component::Server::FTP::ControlFilter;
-sub DEBUG { 0 }
-
sub new {
my $type = shift;
my $opt = shift;
@@ -41,6 +40,9 @@ sub new {
_stop => '_stop',
_default => '_default',
_child => '_child',
+ _reset_timeout => '_reset_timeout',
+ _write_log => '_write_log',
+ time_out => 'time_out',
receive => 'receive',
flushed => 'flushed',
error => 'error',
@@ -69,7 +71,10 @@ sub new {
REST => 'REST',
ABOR => 'ABOR',
APPE => 'APPE',
+ SIZE => 'SIZE',
+ SITE => 'SITE',
+
# unimplemented
# RNFR => 'RNFR',
@@ -79,10 +84,9 @@ sub new {
XPWD => 'PWD',
XCUP => 'CDUP',
XCWD => 'CWD',
-
+
# rfc 737
XSEN => 'XSEN',
-
}
],
);
@@ -91,7 +95,7 @@ sub new {
}
sub _start {
- my ($kernel, $heap, $opt) = @_[KERNEL, HEAP, ARG0];
+ my ($kernel, $heap, $session, $opt) = @_[KERNEL, HEAP, SESSION, ARG0];
eval("use $opt->{FilesystemClass}");
if ($@) {
@@ -104,109 +108,164 @@ sub _start {
$kernel->sig('INT', 'signals');
# start reading and writing
- $heap->{control} = new POE::Wheel::ReadWrite(
- Handle => $opt->{Handle}, # on this handle
- Driver => new POE::Driver::SysRW, # using sysread and syswrite
- Filter => new POE::Component::Server::FTP::ControlFilter,
- InputEvent => 'receive', # generating this event for requests
- ErrorEvent => 'error', # generating this event for errors
- FlushedEvent => 'flushed', # generating this event for all-sent
+ $heap->{control} = POE::Wheel::ReadWrite->new(
+ # on this handle
+ Handle => $opt->{Handle},
+ # using sysread and syswrite
+ Driver => POE::Driver::SysRW->new(),
+ Filter => POE::Component::Server::FTP::ControlFilter->new(),
+ # generating this event for requests
+ InputEvent => 'receive',
+ # generating this event for errors
+ ErrorEvent => 'error',
+ # generating this event for all-sent
+ FlushedEvent => 'flushed',
);
- # maybe do this?
-# $heap->{fs_session} = POE::Wheel::Run->new(
-# Program => sub {
-# my $fs = ("$fs_class")->new($fs_args);
-# },
-# StdioFilter => POE::Filter::Line->new(),
-# StderrFilter => POE::Filter::Line->new(),
-# StdoutEvent => "_job_stdout",
-# StderrEvent => "_job_stderr",
-# CloseEvent => "_job_close",
-# );
-# $heap->{control}->put( "Job " . $heap->{job}->PID . " started." );
-
$heap->{pasv} = 0;
$heap->{auth} = 0;
+ $heap->{rest} = 0;
$heap->{host} = $opt->{PeerAddr};
$heap->{port} = $opt->{PeerPort};
$heap->{filesystem} = $fs;
%{$heap->{params}} = %{ $opt };
-
- DEBUG && print "Control session started for $heap->{host} : $heap->{port}\n";
+
+ if ($heap->{params}{'TimeOut'} > 0) {
+ $heap->{time_out} = $kernel->delay_set(time_out => $heap->{params}{'TimeOut'});
+ $kernel->call($session->ID => _write_log => 4 => "Timeout set: id ".$heap->{time_out});
+ }
+
+ $kernel->call($session->ID => _write_log => 4 => "Control session started for $heap->{host} : $heap->{port}");
$heap->{control}->put("220 $opt->{Domain} FTP server ($opt->{Version} ".localtime()." ready.)");
}
sub _stop {
- my $heap = $_[HEAP];
- DEBUG && print "Client session ended with $heap->{host} : $heap->{port}\n";
+ my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
+ $kernel->call($session->ID => _write_log => 4 => "Client session ended with $heap->{host} : $heap->{port}");
}
sub _child {
- my ($kernel, $heap, $action, $child) = @_[KERNEL, HEAP, ARG0, ARG1];
+ my ($kernel, $heap, $session, $action, $child) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1];
if ($action eq 'create') {
+ $kernel->call($session->ID => _write_log => 4 => "child session created ".$child->ID);
$heap->{pending_session} = $child;
} elsif ($action eq 'lose') {
- $heap->{control}->put("226 Transfer complete.");
+ $kernel->call($session->ID => _write_log => 3 => sprintf("Transfer complete %d kB/s of %d bytes",($child->get_heap->{bps}/1023),$child->get_heap->{total_bytes}));
+ $kernel->call($session->ID => _write_log => 4 => "child session lost ".$child->ID);
+ $kernel->call($session->ID => "_reset_timeout");
+ if ($heap->{params}{'LimitSceme'} eq 'ip') {
+ my $cheap = $child->get_heap;
+ $kernel->call($heap->{params}{'Alias'} => _dcon_cleanup => $cheap->{type}, $cheap->{remote_ip} => $child->ID);
+ }
+ if (defined $heap->{abor}) {
+ delete $heap->{abor};
+ } else {
+ $heap->{control}->put("226 Transfer complete.");
+ }
delete $heap->{pending_session};
}
+
+ return 0;
+}
+
+sub time_out {
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+
+ # if we have a child session, then there must be a transfer
+ # going on, reset the timer
+ if (defined $heap->{pending_session} &&
+ $heap->{params}{'TimeOut'} > 0) {
+ $heap->{time_out} = $kernel->delay_set(time_out => $heap->{params}{'TimeOut'});
+ $kernel->call($session->ID => _write_log => 4 => "Timeout re-set: id ".$heap->{time_out});
+ return;
+ }
+
+ unless ($heap->{control}) {
+ $kernel->alarm_remove_all( );
+ delete $heap->{control};
+ }
+
+ if ($heap->{auth} == 0) {
+ $kernel->call($session->ID => _write_log => 2 => "Session ".$session->ID." timed out before login (".$heap->{params}{'TimeOut'}.")");
+ $heap->{control}->put("421 Disconnecting you because you did't login before ".$heap->{params}{'TimeOut'}." seconds, Goodbye.");
+ } else {
+ $kernel->call($session->ID => _write_log => 2 => "Session ".$session->ID." timed out (".$heap->{params}{'TimeOut'}.")");
+ $heap->{control}->put("421 Disconnecting you because you were inactive for ".$heap->{params}{'TimeOut'}." seconds, Goodbye.");
+ }
+
+ $kernel->alarm_remove_all( );
+ delete $heap->{control};
}
sub receive {
my ($kernel, $session, $heap, $cmd) = @_[KERNEL, SESSION, HEAP, ARG0];
- DEBUG && print "Received input from $heap->{host} : $heap->{port}\n";
- DEBUG && print "Args: ".join(',',@{$cmd->{args}})."\n";
+ $kernel->call($session->ID => _write_log => 4 => "Received input from $heap->{host} : $heap->{port} -> $cmd->{cmd} (".join(',',@{$cmd->{args}}).")");
+ if ($heap->{auth} == 1) {
+ $kernel->call($session->ID => '_reset_timeout');
+ }
$kernel->post($session, $cmd->{cmd}, \@{$cmd->{args}});
}
sub error {
- my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
+ my ($kernel, $heap, $session, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
if ($errnum) {
- DEBUG && print( "Session with $heap->{host} : $heap->{port} ",
- "encountered $operation error $errnum: $errstr\n"
- );
+ $kernel->call($session->ID => _write_log => 4 => "Session with $heap->{host} : $heap->{port} encountered $operation error $errnum: $errstr");
} else {
- DEBUG && print( "Client at $heap->{host} : $heap->{port} disconnected\n" );
+ $kernel->call($session->ID => _write_log => 4 => "Client at $heap->{host} : $heap->{port} disconnected");
}
# either way, stop this session
+ $kernel->alarm_remove_all( );
delete $heap->{control};
}
sub flushed {
my ($kernel, $heap) = @_[KERNEL, HEAP];
-
- DEBUG && print "Response has been flushed to $heap->{host} : $heap->{port}\n";
-
- $kernel->post($heap->{pending_session}, 'execute')
- if (defined $heap->{pending_session});
+
+ if (defined $heap->{pending_session} && $heap->{listening} == 0) {
+# this broke stuff, now execute is yielded another way
+# $kernel->post($heap->{pending_session}->ID, 'execute');
+ }
}
sub signals {
- my ($heap, $signal_name) = @_[HEAP, ARG0];
-
- DEBUG && print( "Session with $heap->{host} : $heap->{port} caught SIG",
- $signal_name, "\n"
- );
+ my ($kernel, $heap, $session, $signal_name) = @_[KERNEL, HEAP, SESSION, ARG0];
+
+ $kernel->call($session->ID => _write_log => 4 => "Session with $heap->{host} : $heap->{port} caught SIG $signal_name");
# do not handle the signal
return 0;
}
+sub SITE {
+ my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
+
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ } else {
+ my $cmd = shift(@$args);
+ $kernel->call($session->ID,$cmd,$args);
+ }
+}
+
sub NOOP {
- my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+ my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
- # reset a timeout timer?
- $heap->{control}->put("200 No-op okay.");
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ } else {
+ # resetting the timeout is done in receive()
+ $heap->{control}->put("200 No-op okay.");
+ }
}
sub XSEN {
- my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+ my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
# TODO send a message to the terminal
#$args = join(' ',@$args);
@@ -215,8 +274,9 @@ sub XSEN {
}
sub QUIT {
- my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
+ my ($kernel, $heap, $session, $args) = @_[KERNEL, HEAP, SESSION, ARG0];
+ $kernel->alarm_remove_all( );
$heap->{control}->put("221 Goodbye.");
delete $heap->{control};
}
@@ -244,17 +304,19 @@ sub PASS {
if (exists($heap->{username})) {
if ($heap->{params}{AnonymousLogin} eq 'deny' && $heap->{username} eq 'anonymous') {
- _write_log("Anonymous login denied.");
+ $kernel->call($session->ID => _write_log => 1 => "Anonymous login denied.");
$heap->{control}->put("530 Login incorrect.");
$heap->{auth} = 0;
return;
}
if ($fs->login($heap->{username}, $password)) {
- _write_log("User $heap->{username} logged in.");
- $heap->{control}->put("230 User $heap->{username} logged in.");
+ $kernel->call($session->ID => _write_log => 1 => "User $heap->{username} logged in.");
+ # MOTD?
+ $heap->{control}->put("230 Logged in.");
$heap->{auth} = 1;
+ $kernel->call($session->ID => "_reset_timeout");
} else {
- _write_log("Incorrect login");
+ $kernel->call($session->ID => _write_log => 1 => "Incorrect login");
$heap->{control}->put("530 Login incorrect.");
$heap->{auth} = 0;
}
@@ -267,31 +329,57 @@ sub PASS {
sub REST {
my ($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
- $heap->{rest} = $args->[0];
- $heap->{control}->put("350 Will attempt to restart at postion $args->[0].");
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ return;
+ }
+
+ if ($args->[0] =~ m/^\d+$/) {
+ $heap->{rest} = $args->[0];
+ $heap->{control}->put("350 Will attempt to restart at postion $args->[0].");
+ } else {
+
+ }
}
# Not implemented.
sub TYPE {
my ($kernel, $session, $heap, $type) = @_[KERNEL, SESSION, HEAP, ARG0];
- $type = $type->[0];
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ return;
+ }
+ $type = $type->[0];
+
$heap->{control}->put("200 Type set to I.");
}
# Not implemented.
sub SYST {
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ return;
+ }
+
$heap->{control}->put("215 UNIX Type: L8");
}
sub ABOR {
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ return;
+ }
+
if (defined $heap->{pending_session}) {
- $kernel->post($heap->{pending_session} => 'data_throttle');
- $kernel->post($heap->{pending_session} => 'shutdown');
+ $kernel->post($heap->{pending_session}->ID => 'data_throttle');
+ $kernel->post($heap->{pending_session}->ID => '_drop');
+ $heap->{abor} = 1;
}
$heap->{control}->put("200 ABOR successfull");
@@ -300,150 +388,187 @@ sub ABOR {
sub MDTM {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
+ my @modtime = $fs->modtime($fs);
+ if ($modtime[0] == 0) {
+ $heap->{control}->put("550 MDTM $fn: Permission denied.");
} else {
- my $fs = $heap->{filesystem};
- my @modtime = $fs->modtime($fs);
- if ($modtime[0] == 0) {
- $heap->{control}->put("550 MDTM $fn: Permission denied.\n");
- } else {
- $heap->{control}->put("213 ".$modtime[1]);
- }
+ $heap->{control}->put("213 ".$modtime[1]);
}
}
-sub CHMOD {
+sub SIZE {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- my $mode = shift(@$fn);
+
+ if ($heap->{auth} == 0) {
+ $heap->{control}->put("530 Not logged in");
+ return;
+ }
+
$fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
+ my $size = $fs->size($fs);
+ $heap->{control}->put("213 ".$size);
+
+# my @modtime = $fs->modtime($fs);
+# if ($modtime[0] == 0) {
+# $heap->{control}->put("550 SIZE $fn: Permission denied.");
+# } else {
+# $heap->{control}->put("213 ".$modtime[1]);
+# }
+}
+sub CHMOD {
+ my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
- } else {
- my $fs = $heap->{filesystem};
+ return;
+ }
+
+ my $mode = shift(@$fn);
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
- if ($fs->chmod($mode, $fn)) {
- $heap->{control}->put("200 CHMOD command successful.");
- } else {
- $heap->{control}->put("550 CHMOD command unsuccessful");
- }
+ if ($fs->chmod($mode, $fn)) {
+ $heap->{control}->put("200 CHMOD command successful.");
+ } else {
+ $heap->{control}->put("550 CHMOD command unsuccessful");
}
}
sub DELE {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
- } else {
- my $fs = $heap->{filesystem};
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
- if ($fs->delete($fn)) {
- $heap->{control}->put("250 DELE command successful");
- } else {
- $heap->{control}->put("550 DELE command unsuccessful");
- }
+ if ($fs->delete($fn)) {
+ $heap->{control}->put("250 DELE command successful");
+ } else {
+ $heap->{control}->put("550 DELE command unsuccessful");
}
}
sub MKD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
+
+ my $ret = $fs->mkdir($fn);
+ if ($ret == 1) {
+ $fn =~ s/"/""/g; # doublequoting
+ $heap->{control}->put("257 \"$fn\" directory created");
+ } elsif ($ret == 2) {
+ $fn =~ s/"/""/g; # doublequoting
+ $heap->{control}->put("521 \"$fn\" directory already exists");
} else {
- my $fs = $heap->{filesystem};
-
- my $ret = $fs->mkdir($fn);
- if ($ret == 1) {
- $fn =~ s/"/""/g; # doublequoting
- $heap->{control}->put("257 \"$fn\" directory created");
- } elsif ($ret == 2) {
- $fn =~ s/"/""/g; # doublequoting
- $heap->{control}->put("521 \"$fn\" directory already exists");
- } else {
- $heap->{control}->put("550 MKDIR $fn: Permission denied.\n");
- }
+ $heap->{control}->put("550 MKDIR $fn: Permission denied.");
}
}
sub XMKD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
+
+ my $ret = $fs->mkdir($fn);
+ if ($ret == 1) {
+ $fn =~ s/"/""/g; # doublequoting
+ $heap->{control}->put("257 \"$fn\" directory created");
+ } elsif ($ret == 2) {
+ $fn =~ s/"/""/g; # doublequoting
+ $heap->{control}->put("521 \"$fn\" directory already exists");
} else {
- my $fs = $heap->{filesystem};
-
- my $ret = $fs->mkdir($fn);
- if ($ret == 1) {
- $fn =~ s/"/""/g; # doublequoting
- $heap->{control}->put("257 \"$fn\" directory created");
- } elsif ($ret == 2) {
- $fn =~ s/"/""/g; # doublequoting
- $heap->{control}->put("521 \"$fn\" directory already exists");
- } else {
- $heap->{control}->put("550 MKDIR $fn: Permission denied.\n");
- }
+ $heap->{control}->put("550 MKDIR $fn: Permission denied.");
}
}
sub RMD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
- } else {
- my $fs = $heap->{filesystem};
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
- if ($fs->rmdir($fn)) {
- $heap->{control}->put("250 RMD command successful");
- } else {
- $heap->{control}->put("550 RMD $fn: Permission denied");
- }
+ if ($fs->rmdir($fn)) {
+ $heap->{control}->put("250 RMD command successful");
+ } else {
+ $heap->{control}->put("550 RMD $fn: Permission denied");
}
}
sub XRMD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
- } else {
- my $fs = $heap->{filesystem};
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
- if ($fs->rmdir($fs->cwd().$fn)) {
- $heap->{control}->put("250 RMD command successful");
- } else {
- $heap->{control}->put("550 RMD $fn: Permission denied");
- }
+ if ($fs->rmdir($fs->cwd().$fn)) {
+ $heap->{control}->put("250 RMD command successful");
+ } else {
+ $heap->{control}->put("550 RMD $fn: Permission denied");
}
}
sub CDUP {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
- } else {
- my $fs = $heap->{filesystem};
+ return;
+ }
+
+ $fn = join(' ',@$fn);
+ my $fs = $heap->{filesystem};
- if ($fs->chdir('..')) {
- $heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
- } else {
- $heap->{control}->put("550 ..: No such file or directory.");
- }
+ if ($fs->chdir('..')) {
+ $heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
+ } else {
+ $heap->{control}->put("550 ..: No such file or directory.");
}
}
sub CWD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
} else {
+ $fn = join(' ',@$fn);
my $fs = $heap->{filesystem};
if ($fs->chdir($fn)) {
@@ -456,10 +581,11 @@ sub CWD {
sub PWD {
my ($kernel, $session, $heap, $fn) = @_[KERNEL, SESSION, HEAP, ARG0];
- $fn = join(' ',@$fn);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
} else {
+ $fn = join(' ',@$fn);
my $fs = $heap->{filesystem};
$heap->{control}->put('257 "'.$fs->cwd().'" is current directory.');
@@ -469,15 +595,16 @@ sub PWD {
sub PORT {
my ($kernel, $session, $heap, $data_port) = @_[KERNEL, SESSION, HEAP, ARG0];
- $data_port = join(' ',@$data_port);
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
return;
}
+
+ $data_port = join(' ',@$data_port);
+
### Planning to use a Wheel here...
- #$heap->{control} = new POE::Wheel::ReadWrite
-# (
+# $heap->{control} = POE::Wheel::ReadWrite->new(
# Handle => $handle, # on this handle
# Driver => new POE::Driver::SysRW, # using sysread and syswrite
# Filter => new POE::Filter::FTPd::Control,
@@ -494,6 +621,7 @@ sub PORT {
sub PASV {
my ($kernel, $session, $heap, $data_port) = @_[KERNEL, SESSION, HEAP, ARG0];
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
return;
@@ -503,7 +631,7 @@ sub PASV {
my $p2 = (int rand(100))+1;
$p1 -= $p2;
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $heap->{filesystem},
port1 => $p1,
port2 => $p2,
@@ -513,6 +641,7 @@ sub PASV {
$heap->{pasv} = 1;
my $ip = $heap->{params}{ListenIP};
$ip =~ s/\./,/g;
+ print STDERR "ip is $ip\n";
$heap->{control}->put("227 Entering Passive Mode. ($ip,$p1,$p2)");
}
@@ -529,9 +658,9 @@ sub LIST {
$heap->{control}->put("150 Opening ASCII mode data connection for /bin/ls.");
if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
- $kernel->post($heap->{pending_session} => start_LIST => $dirfile);
+ $kernel->post($heap->{pending_session}->ID => start_LIST => $dirfile);
} else {
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $heap->{filesystem},
data_port => $heap->{last_port_cmd},
cmd => 'LIST',
@@ -542,7 +671,7 @@ sub LIST {
}
sub NLST {
- my ($kernel, $session, $heap, $dirfile) = @_[KERNEL, SESSION, HEAP, ARG0];
+my ($kernel, $session, $heap, $dirfile) = @_[KERNEL, SESSION, HEAP, ARG0];
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
@@ -554,23 +683,25 @@ sub NLST {
$heap->{control}->put("150 Opening ASCII mode data connection for /bin/ls.");
if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
- $kernel->post($heap->{pending_session} => start_NLST => $dirfile);
+ $kernel->post($heap->{pending_session}->ID => start_NLST => $dirfile);
} else {
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $heap->{filesystem},
data_port => $heap->{last_port_cmd},
cmd => 'NLST',
- opt => $dirfile
+ opt => $dirfile,
});
}
}
sub STOR {
my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
return;
}
+
my $fs = $heap->{filesystem};
$filename = join(' ',@$filename);
my $fh;
@@ -579,9 +710,12 @@ sub STOR {
$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
- $kernel->post($heap->{pending_session} => start_STOR => $fh, $heap->{rest});
+ $kernel->post($heap->{pending_session}->ID => start_STOR => $fh,
+ {
+ rest => $heap->{rest},
+ });
} else {
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $fs,
data_port => $heap->{last_port_cmd},
cmd => 'STOR',
@@ -597,10 +731,12 @@ sub STOR {
sub APPE {
my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
return;
}
+
my $fs = $heap->{filesystem};
$filename = join(' ',@$filename);
my $fh;
@@ -610,12 +746,12 @@ sub APPE {
$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
- $kernel->post($heap->{pending_session} => start_STOR => $fh);
+ $kernel->post($heap->{pending_session}->ID => start_STOR => $fh);
} else {
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $fs,
data_port => $heap->{last_port_cmd},
- cmd => 'APPE',
+ cmd => 'STOR',
opt => $fh,
});
}
@@ -627,84 +763,72 @@ sub APPE {
sub RETR {
my ($kernel, $session, $heap, $filename) = @_[KERNEL, SESSION, HEAP, ARG0];
- $filename = join(' ',@$filename);
+
if ($heap->{auth} == 0) {
$heap->{control}->put("530 Not logged in");
return;
}
+
+ $filename = join(' ',@$filename);
my $fs = $heap->{filesystem};
my $fh;
- _write_log("RETR $filename");
-
if ($fh = $fs->open_read($filename)) {
$heap->{control}->put("150 Opening BINARY mode data connection for $filename.");
if (defined $heap->{pending_session} && $heap->{pasv} == 1) {
- $kernel->post($heap->{pending_session} => start_RETR => $fh, $heap->{rest});
+ $kernel->post($heap->{pending_session}->ID => start_RETR => $fh,
+ {
+ rest => $heap->{rest},
+ });
} else {
- POE::Component::Server::FTP::DataSession->new({
+ POE::Component::Server::FTP::DataSession->new($heap->{params},{
fs => $fs,
data_port => $heap->{last_port_cmd},
cmd => 'RETR',
opt => $fh,
- rest => $heap->{rest}
+ rest => $heap->{rest},
});
}
} else {
- $heap->{control}->put
- ("550 No such file or directory: $filename.");
+ $heap->{control}->put("550 No such file or directory: $filename.");
}
}
sub _default {
- my ($kernel, $heap, $cmd, $args) = @_[KERNEL, HEAP, ARG0, ARG1];
+ my ($kernel, $heap, $session, $cmd, $args) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1];
if ($cmd =~ m/^_/) {
- DEBUG && print "NonHandled Event: $cmd(".join(", ", @$args).")\n";
+ $kernel->call($session->ID => _write_log => 4 => "NonHandled Event: $cmd(".join(", ", @$args).")");
} else {
- DEBUG && print "UNSUPPORTED COMMAND: $cmd(".join(", ", @$args).")\n";
+ $kernel->call($session->ID => _write_log => 4 => "UNSUPPORTED COMMAND: $cmd(".join(", ", @$args).")");
$heap->{control}->put("500 '$cmd': command not understood");
}
+
+ return 0;
}
-sub _write_log {
- my $datetime = localtime();
-
- print STDERR "[$datetime] ";
- print STDERR shift;
- print STDERR "\n";
+sub _reset_timeout {
+ my ($kernel,$heap) = @_[KERNEL, HEAP];
+
+ if (defined $heap->{time_out}) {
+ $kernel->delay_adjust( $heap->{time_out}, $heap->{params}{'TimeOut'} );
+ }
}
-sub _copy_fh {
- my ($fh_in, $fh_out) = @_;
-
- eval {
- my ($len, $buf, $offset, $written);
- my $blksize = (stat $fh_in)[11] || 16384;
-
- while($len = $fh_in->sysread($buf, $blksize)) {
- if (!defined $len) {
- next if $! =~ /^Interrupted/;
-# carp "System read error: $!\n";
- }
- $offset = 0;
- $written = 0;
- while ($len) {
- $written = $fh_out->syswrite($buf, $len, $offset) || 0;
- if ($!) {
-# print "WRITE ERROR: $!\n";
- sleep(1);
- }
-
-# or die "System write error: $!\n";
- $len -= $written;
- $offset += $written;
- }
+sub _write_log {
+ my ($kernel, $session, $heap, $sender, $verbose, $msg) = @_[KERNEL, SESSION, HEAP, SENDER, ARG0, ARG1];
+ if ($verbose <= $heap->{params}{'LogLevel'}) {
+ # if we're not forking, then pass the logging off to the
+ # main session
+ if ($heap->{params}{_main_pid} == $$) {
+ $kernel->call($heap->{params}{'Alias'} => _write_log => { type => (($sender->ID == $session->ID) ? 'C' : 'D'), msg => $msg, v => $verbose });
+ } else {
+ my $datetime = localtime();
+ my $type = ($sender->ID == $session->ID) ? 'C' : 'D';
+ print STDERR "[$datetime][$type".$sender->ID."] $msg\n";
}
- };
-
- _write_log("ERROR: $@\n") if ($@);
+ }
}
1;
@@ -1,45 +1,45 @@
package POE::Component::Server::FTP::DataSession;
-######################################################################
+###########################################################################
### POE::Component::Server::FTP::DataSession
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
###
### TODO:
### -- POEify the data channel
### -- Move file seeking to Filesys::Virtual
+### -- get rid of *_limit and use params instead
###
### Copyright (c) 2001 Leslie Michael Orchard. All Rights Reserved.
### This module is free software; you can redistribute it and/or
### modify it under the same terms as Perl itself.
###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
use strict;
-
-use IO::Scalar;
use IO::Socket::INET;
+use IO::Scalar;
use POE qw(Session Wheel::ReadWrite Filter::Stream Driver::SysRW Wheel::SocketFactory);
+use Time::HiRes qw(time);
use Data::Dumper;
-sub DEBUG { 0 }
-
# Create a new DataSession
sub new {
- my ($type, $opt) = @_;
+ my ($type, $para, $opt) = @_;
my $self = bless { }, $type;
- POE::Session->create(
+ my $ses = POE::Session->create(
#options =>{ trace=>1 },
- args => [ $opt ],
+ args => [ $para, $opt ],
object_states => [
$self => {
_start => '_start',
_stop => '_stop',
+ _drop => '_drop',
start_LIST => 'start_LIST',
start_NLST => 'start_NLST',
start_STOR => 'start_STOR',
@@ -53,6 +53,8 @@ sub new {
data_error => 'data_error',
data_throttle => 'data_throttle',
data_resume => 'data_resume',
+
+ stop_socket => 'stop_socket',
_sock_up => '_sock_up',
_sock_down => '_sock_down',
@@ -60,20 +62,28 @@ sub new {
],
);
- undef;
+ return $ses->ID;
}
sub _start {
- my ($kernel, $heap, $opt) = @_[KERNEL, HEAP, ARG0];
+ my ($kernel, $heap, $para, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
# generating a port num
# my $x = pack('n',$port);
# my $p1 = ord(substr($x,0,1));
# my $p2 = ord(substr($x,1,1));
+ $heap->{send_rec_okay} = 0;
$heap->{listening} = 0;
+ $heap->{rest} = 0;
+ $heap->{total_bytes} = 0;
+ $heap->{bps} = 0;
+ $heap->{type} = 'dl'; # default to download
+ $heap->{c_session} = $_[SENDER]->ID;
+ %{$heap->{params}} = %{$para};
if ($opt->{data_port}) {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "starting a PORT data session");
# PORT command
my ($h1, $h2, $h3, $h4, $p1, $p2) = split(',', $opt->{data_port});
@@ -92,8 +102,9 @@ sub _start {
);
$heap->{cmd} = $opt->{cmd};
- $heap->{rest} = $opt->{rest};
+ $heap->{rest} = $opt->{rest} if ($opt->{rest});
} else {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "starting a PASV data session");
# PASV command
$heap->{port} = ($opt->{port1}<<8)+$opt->{port2};
@@ -105,7 +116,7 @@ sub _start {
SocketDomain => AF_INET, # Sets the socket() domain
SocketType => SOCK_STREAM, # Sets the socket() type
SocketProtocol => 'tcp', # Sets the socket() protocol
- Reuse => 'on', # Lets the port be reused
+ Reuse => 'off', # Lets the port be reused
);
$heap->{listening} = 1;
@@ -121,10 +132,10 @@ sub _start {
sub _sock_up {
my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
- my $buffer_max = 8 * 1024;
+ my $buffer_max = 4 * 1024;
my $buffer_min = 128;
- $heap->{data} = new POE::Wheel::ReadWrite(
+ $heap->{data} = POE::Wheel::ReadWrite->new(
Handle => $socket,
Driver => POE::Driver::SysRW->new(),
Filter => POE::Filter::Stream->new(),
@@ -136,25 +147,20 @@ sub _sock_up {
HighEvent => 'data_throttle',
LowEvent => 'data_resume',
);
- # remote_ip??
if ($heap->{listening} == 0) {
- DEBUG && print "Data session started for $heap->{cmd}($heap->{opt})\n";
+ $kernel->call($heap->{c_session} => _write_log => 4 => "data session started for $heap->{cmd} ($heap->{opt})");
$kernel->yield('start_'.(uc $heap->{cmd}), $heap->{opt});
} else {
- DEBUG && print "Received connection from $heap->{remote_ip}\n";
+ # TODO check if correct IP connected if that option is on
+ $kernel->call($heap->{c_session} => _write_log => 4 => "received connection from $heap->{remote_ip}");
}
}
sub _sock_down {
- DEBUG && print "socket down\n";
- delete $_[HEAP]->{data};
-}
-
-sub start_PASV {
- my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
- my $fs = $heap->{filesystem};
-
+ my ($kernel, $heap) = @_[KERNEL, HEAP];
+ $kernel->call($heap->{c_session} => _write_log => 4 => "socket down");
+ delete $heap->{data};
}
sub start_LIST {
@@ -168,7 +174,8 @@ sub start_LIST {
$heap->{input_fh} = IO::Scalar->new(\$out);
$heap->{send_done} = 0;
- $heap->{send_okay} = 1;
+ $heap->{send_rec_okay} = 1;
+ $kernel->yield('execute');
}
sub start_NLST {
@@ -182,35 +189,39 @@ sub start_NLST {
$heap->{input_fh} = IO::Scalar->new(\$out);
$heap->{send_done} = 0;
- $heap->{send_okay} = 1;
+ $heap->{send_rec_okay} = 1;
+ $kernel->yield('execute');
}
sub start_RETR {
- my ($kernel, $heap, $fh, $rest) = @_[KERNEL, HEAP, ARG0, ARG1];
+ my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
- if (defined $rest) {
- $heap->{rest} = $rest;
+ if (defined $opt->{rest}) {
+ $heap->{rest} = $opt->{rest};
}
$heap->{input_fh} = $fh;
- DEBUG && print "Seeking to $heap->{rest}\n";
seek($fh,$heap->{rest},0);
$heap->{send_done} = 0;
- $heap->{send_okay} = 1;
+ $heap->{send_rec_okay} = 1;
+ $kernel->yield('execute');
}
sub start_STOR {
- my ($heap, $fh, $rest) = @_[HEAP, ARG0, ARG1];
-
- if (defined $rest) {
- $heap->{rest} = $rest;
+ my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
+
+ if (defined $opt->{rest}) {
+ $heap->{rest} = $opt->{rest};
}
$heap->{output_fh} = $fh;
- DEBUG && print "Seeking to $heap->{rest}\n";
seek($fh,$heap->{rest},0);
+ $heap->{type} = 'ul';
+ $heap->{send_rec_okay} = 1;
+ $heap->{xfer_time} = time();
+ $kernel->yield('execute');
}
sub _stop {
@@ -221,30 +232,77 @@ sub _stop {
# Execute the session's pending upload
sub execute {
- if (defined $_[HEAP]->{input_fh}) {
- $_[KERNEL]->yield('data_send');
- } elsif (!defined $_[HEAP]->{output_fh}) {
- if ($_[HEAP]->{listening} == 0) {
- delete $_[HEAP]->{data};
+ my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
+
+ if (defined $heap->{input_fh}) {
+ $heap->{xfer_time} = time();
+ $kernel->yield('data_send');
+ } elsif (!defined $heap->{output_fh}) {
+ if ($heap->{listening} == 0) {
+ $kernel->call($session->ID => '_drop');
}
}
}
+sub stop_socket {
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+
+ delete $heap->{time_out};
+
+ if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+ # still a factory?! Time to drop connection
+ delete $heap->{data};
+ }
+}
+
# Send a block to the remote client
sub data_send {
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
if ( (!defined $heap->{input_fh}) || (! ref $heap->{input_fh} ) ) {
- delete $heap->{data};
- } elsif ($heap->{send_okay} && (defined $heap->{data})) {
+ $kernel->call($session->ID => '_drop');
+ } elsif ($heap->{send_rec_okay} && (defined $heap->{data})) {
+
+ # if we haven't connected yet, then data will still be a factory
+ if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
+ if (defined $heap->{time_out}) {
+ $heap->{time_out} = $kernel->delay_set(stop_socket => 30);
+ }
+ $kernel->delay('data_send' => 1);
+ return;
+ }
+
+ if (defined $heap->{time_out}) {
+ $kernel->alarm_remove($heap->{time_out});
+ delete $heap->{time_out};
+ }
+
+ $heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
+
+ if ($heap->{params}{'DownloadLimit'} > 0) {
+ if ($heap->{params}{'LimitSceme'} eq 'ip') {
+ if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'dl' => $heap->{remote_ip} => $heap->{bps})) {
+ $kernel->yield('data_send');
+ return;
+ }
+ } else {
+ if ($heap->{bps} > $heap->{params}{'DownloadLimit'}) {
+ $kernel->yield('data_send');
+ return;
+ }
+ }
+ }
+
### Read in a block from the file.
my $buf;
my $len = $heap->{input_fh}->read($buf, $heap->{block_size});
### If something was read, queue it to be sent, and yield
- ### back for another send_block.
+ ### back for another data_send.
if ($len > 0) {
+ $heap->{total_bytes} += $len;
$heap->{data}->put($buf);
$kernel->yield('data_send');
} else {
@@ -253,12 +311,7 @@ sub data_send {
$fs->close_read($heap->{input_fh});
delete $heap->{input_fh};
- ### Thanks, poe.perl.org!
- if ($heap->{data}->get_driver_out_octets() == 0) {
- delete $heap->{data};
- } else {
- $heap->{send_done} = 1;
- }
+ $kernel->call($session->ID => '_drop');
}
}
}
@@ -266,22 +319,63 @@ sub data_send {
# Recieve a block from the remote client
sub data_receive {
- if ($_[HEAP]->{output_fh}) {
- $_[HEAP]->{output_fh}->print($_[ARG0]);
- } else {
- delete $_[HEAP]->{data};
+ my ($kernel, $heap, $session, $data) = @_[KERNEL, HEAP, SESSION, ARG0];
+
+ if ( (!defined $heap->{output_fh}) || (! ref $heap->{output_fh} ) ) {
+ $kernel->call($session->ID => '_drop');
+ } elsif ($heap->{send_rec_okay} && (defined $heap->{data})) {
+
+ # if we haven't connected yet, then data will still be a factory
+ if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
+ if (defined $heap->{time_out}) {
+ $heap->{time_out} = $kernel->delay_set(stop_socket => 30);
+ }
+ $kernel->delay('data_receive' => 1, $data);
+ return;
+ }
+
+ if (defined $heap->{time_out}) {
+ $kernel->alarm_remove($heap->{time_out});
+ delete $heap->{time_out};
+ }
+
+ $heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
+
+ if ($heap->{params}{'UploadLimit'} > 0) {
+ if ($heap->{params}{'LimitSceme'} eq 'ip') {
+ if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'ul' => $heap->{remote_ip} => $heap->{bps})) {
+ $kernel->yield('data_receive');
+ $heap->{data}->pause_input();
+ } else {
+ $heap->{data}->resume_input();
+ }
+ } else {
+ if ($heap->{bps} > $heap->{params}{'UploadLimit'}) {
+ $kernel->yield('data_receive');
+ $heap->{data}->pause_input();
+ } else {
+ $heap->{data}->resume_input();
+ }
+ }
+ }
+
+ if (defined $data) {
+ $heap->{total_bytes} += length($data);
+
+ $heap->{output_fh}->print($data);
+ }
}
}
sub data_error {
- my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
+ my ($kernel, $heap, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
my $fs = $heap->{filesystem};
if ($errnum) {
- DEBUG && print "Session with $heap->{remote_ip} : $heap->{port} ".
- "encountered $operation error $errnum: $errstr\n";
+ $kernel->call($heap->{c_session} => _write_log => 4 => "session with $heap->{remote_ip} : $heap->{port} encountered $operation error $errnum: $errstr");
} else {
- DEBUG && print "Client at $heap->{remote_ip} : $heap->{port} disconnected\n";
+ $kernel->call($heap->{c_session} => _write_log => 4 => "client at $heap->{remote_ip} : $heap->{port} disconnected");
}
# either way, stop this session
@@ -299,16 +393,35 @@ sub data_error {
}
sub data_flushed {
- delete $_[HEAP]->{data} if ($_[HEAP]->{send_done});
+ my ($kernel, $heap) = @_[KERNEL, HEAP];
+ if ($heap->{send_done}) {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "data flushed, dropping connection");
+ delete $heap->{data};
+ }
}
sub data_throttle {
- $_[HEAP]->{okay_to_send} = 0;
+ $_[HEAP]->{send_rec_okay} = 0;
}
sub data_resume {
- $_[HEAP]->{okay_to_send} = 1;
+ $_[HEAP]->{send_rec_okay} = 1;
$_[KERNEL]->yield('data_send');
}
+sub _drop {
+ my ($kernel, $heap) = @_[KERNEL, HEAP];
+
+ return unless ($heap->{data});
+
+ # if we are fully flushed, go ahead and disconnect
+ if ($heap->{data}->get_driver_out_octets() == 0) {
+ $kernel->call($heap->{c_session} => _write_log => 4 => "data finished, dropping connection");
+ delete $heap->{data};
+ } else {
+ # if not, then we set a flag and the flushed event
+ # drops the connection
+ $heap->{send_done} = 1;
+ }
+}
1;
@@ -1,9 +1,9 @@
package POE::Component::Server::FTP;
-######################################################################
+###########################################################################
### POE::Component::Server::FTP
-### L.M.Orchard ( deus_x@pobox.com )
-### Modified by David Davis ( xantus@cpan.org )
+### L.M.Orchard (deus_x@pobox.com)
+### David Davis (xantus@cpan.org)
###
### TODO:
###
@@ -11,43 +11,52 @@ package POE::Component::Server::FTP;
### This module is free software; you can redistribute it and/or
### modify it under the same terms as Perl itself.
###
-### Changes Copyright (c) 2003 David Davis and Teknikill Software
-######################################################################
+### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
+###########################################################################
use strict;
use warnings;
our @ISA = qw(Exporter);
-our $VERSION = '0.03';
+our $VERSION = '0.04';
use Socket;
use Carp;
use POE qw(Session Wheel::ReadWrite Filter::Line
- Driver::SysRW Wheel::SocketFactory);
+ Driver::SysRW Wheel::SocketFactory
+ Wheel::Run Filter::Reference);
use POE::Component::Server::FTP::ControlSession;
use POE::Component::Server::FTP::ControlFilter;
-sub DEBUG { 0 }
-
sub spawn {
my $package = shift;
croak "$package requires an even number of parameters" if @_ % 2;
my %params = @_;
my $alias = $params{'Alias'};
$alias = 'ftpd' unless defined($alias) and length($alias);
-
- my $listen_port = $params{listen_port} || 21;
+ $params{'Alias'} = $alias;
+ $params{'ListenPort'} = $params{'ListenPort'} || 21;
+ $params{'TimeOut'} = $params{'TimeOut'} || 0;
+ $params{'DownloadLimit'} = $params{'DownloadLimit'} || 0;
+ $params{'UploadLimit'} = $params{'UploadLimit'} || 0;
+ $params{'LimitSceme'} = $params{'LimitSceme'} || 'none';
POE::Session->create(
#options => {trace=>1},
- args => [ %params ],
+ args => [ \%params ],
package_states => [
'POE::Component::Server::FTP' => {
- _start => '_start',
- _stop => '_stop',
- accept => 'accept',
- accept_error => 'accept_error',
- signals => 'signals'
+ _start => '_start',
+ _stop => '_stop',
+ _write_log => '_write_log',
+ accept => 'accept',
+ accept_error => 'accept_error',
+ signals => 'signals',
+ _bw_limit => '_bw_limit',
+ _dcon_cleanup => '_dcon_cleanup',
+ cmd_stdout => 'cmd_stdout',
+ cmd_stderr => 'cmd_stderr',
+ cmd_error => 'cmd_error',
}
],
);
@@ -57,62 +66,176 @@ sub spawn {
sub _start {
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
- %{$heap->{params}} = splice @_,ARG0;
-
- $session->option( @{$heap->{params}{SessionOptions}} ) if $heap->{params}{SessionOptions};
- $kernel->alias_set($heap->{params}{alias});
+ %{$heap->{params}} = %{ $_[ARG0] };
+
+ $heap->{_main_pid} = $$;
+
+ $session->option( @{$heap->{params}{'SessionOptions'}} ) if $heap->{params}{'SessionOptions'};
+ $kernel->alias_set($heap->{params}{'Alias'});
# watch for SIGINT
$kernel->sig('INT', 'signals');
# create a socket factory
- $heap->{wheel} = new POE::Wheel::SocketFactory(
+ $heap->{wheel} = POE::Wheel::SocketFactory->new(
BindPort => $heap->{params}{ListenPort}, # on this port
Reuse => 'yes', # and allow immediate port reuse
SuccessEvent => 'accept', # generating this event on connection
FailureEvent => 'accept_error' # generating this event on error
);
- DEBUG && print "Listening to port $heap->{params}{ListenPort} on all interfaces.\n";
+
+ $kernel->call($session->ID => _write_log => { v => 2, msg => "Listening to port $heap->{params}{ListenPort} on all interfaces." });
}
sub _stop {
- DEBUG && print "Server stopped.\n";
+ my ($kernel, $session) = @_[KERNEL, SESSION];
+ $kernel->call($session->ID => _write_log => { v => 2, msg => "Server stopped." });
}
# Accept a new connection
sub accept {
- my ($heap, $accepted_handle, $peer_addr, $peer_port) = @_[HEAP, ARG0, ARG1, ARG2];
+ my ($kernel, $heap, $session, $accepted_handle, $peer_addr, $peer_port) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
$peer_addr = inet_ntoa($peer_addr);
- my $ip = getsockname($accepted_handle);
- DEBUG && print "Server received connection on $ip from $peer_addr : $peer_port\n";
+ my $ip = inet_ntoa((sockaddr_in(getsockname($accepted_handle)))[1]);
+ my $listen_ip = (defined $heap->{params}{FirewallIP}) ? $heap->{params}{FirewallIP} : $ip;
+
+ $kernel->call($session->ID => _write_log => { v => 2, msg => "Server received connection on $listen_ip from $peer_addr : $peer_port" });
my $opt = { %{$heap->{params}} };
$opt->{Handle} = $accepted_handle;
$opt->{ListenIP} = $ip;
$opt->{PeerAddr} = $peer_addr;
$opt->{PeerPort} = $peer_port;
+
POE::Component::Server::FTP::ControlSession->new($opt);
+
+# $heap->{control_session} = POE::Wheel::Run->new(
+# Program => sub {
+# my $raw;
+# my $size = 4096;
+# #my $filter = POE::Filter::Reference->new();
+# my $filter = POE::Filter::Line->new();
+#
+# POE::Component::Server::FTP::ControlSession->new($opt);
+#
+# #
+# # POE::Filter::Reference does buffering so that you don't have to.
+# #
+# READ: while (sysread( STDIN, $raw, $size )) {
+# my $s = $filter->get( [$raw] );
+#
+# #
+# # It is possible that $filter->get() has returned more than one
+# # structure from the parent process. Each $t represents whatever
+# # was pushed from the parent.
+# #
+# foreach my $t (@$s) {
+# print "-$t\n";
+# #
+# # Here is a stand-in for something that might be doing
+# # real work.
+# #
+# #$t->{fubar} = 'mycmd';
+#
+# #
+# # this part re-freezes the data structure and writes
+# # it back to the parent process.
+# #
+# #my $u = $filter->put( [$t] );
+# #print STDOUT @$u;
+#
+# #
+# # this is the exit condition.
+# #
+# last READ if ( $t->{'cmd'} eq 'shutdown' );
+# }
+# }
+# },
+# ErrorEvent => 'cmd_error',
+# StdoutEvent => 'cmd_stdout',
+# StderrEvent => 'cmd_stderr',
+# StdinFilter => POE::Filter::Line->new(),
+# #StdioFilter => POE::Filter::Reference->new(),
+# #StdinFilter => POE::Filter::Reference->new(),
+# ) or die "$0: can't POE::Wheel::Run->new";
+}
+
+sub _bw_limit {
+ my ($kernel, $heap, $session, $sender, $type, $ip, $bps) = @_[KERNEL, HEAP, SESSION, SENDER, ARG0, ARG1, ARG2];
+ $heap->{$type}{$ip}{$sender->ID} = $bps;
+ my $num = scalar(keys %{$heap->{$type}{$ip}});
+ my $newlimit = ((($type eq 'dl') ? $heap->{params}{'DownloadLimit'} : $heap->{params}{'UploadLimit'}) / $num);
+ return ($bps > $newlimit) ? 1 : 0;
+}
+
+sub _dcon_cleanup {
+ my ($kernel, $heap, $session, $type, $ip, $sid) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
+ $kernel->call($session->ID => _write_log => { v => 4, msg => "cleaing up $type limiter for $ip ($sid)" });
+ delete $heap->{$type}{$ip}{$sid};
+}
+
+sub cmd_error {
+ my ( $heap, $op, $code, $handle ) = @_[ HEAP, ARG0, ARG1, ARG4 ];
+
+ if ( $op eq 'read' and $code == 0 and $handle eq 'STDOUT' ) {
+ warn "child has closed output";
+ delete $heap->{control_session};
+ }
+}
+
+#
+# demonstrate that something is happening.
+#
+sub cmd_stdout {
+ my ( $heap, $txt ) = @_[ HEAP, ARG0 ];
+
+ print STDERR join ":", 'cmd_stdout ', $txt, "\n";
+
+}
+
+#
+# Just so that we can see what the child writes on errors.
+#
+sub cmd_stderr {
+ my ( $heap, $txt ) = @_[ HEAP, ARG0 ];
+ print STDERR "cmd_stderr: $txt\n";
}
# Handle an error in connection acceptance
sub accept_error {
- my ($operation, $errnum, $errstr) = @_[ARG0, ARG1, ARG2];
- DEBUG && print "Server encountered $operation error $errnum: $errstr\n";
+ my ($kernel, $session, $operation, $errnum, $errstr) = @_[KERNEL, SESSION, ARG0, ARG1, ARG2];
+ $kernel->call($session->ID => write_log => { v => 1, msg => "Server encountered $operation error $errnum: $errstr" });
}
# Handle incoming signals (INT)
sub signals {
- my $signal_name = $_[ARG0];
+ my ($kernel, $session, $signal_name) = @_[KERNEL, SESSION, ARG0];
+
+ $kernel->call($session->ID => _write_log => { v => 1, msg => "Server caught SIG$signal_name" });
- DEBUG && print "Server caught SIG$signal_name\n";
- # do not handle the signal
+ # to stop ctrl-c / INT
+ if ($signal_name eq 'INT') {
+ #$_[KERNEL]->sig_handled();
+ }
+
return 0;
}
+sub _write_log {
+ my ($kernel, $session, $heap, $sender, $o) = @_[KERNEL, SESSION, HEAP, SENDER, ARG0];
+ if ($o->{v} <= $heap->{params}{'LogLevel'}) {
+ my $datetime = localtime();
+ my $sender = (defined $o->{sid}) ? $o->{sid} : $sender->ID;
+ my $type = (defined $o->{type}) ? $o->{type} : 'M';
+ print STDERR "[$datetime][$type$sender] $o->{msg}\n";
+ }
+}
+
+
1;
__END__
@@ -122,23 +245,31 @@ POE::Component::Server::FTP - Event-based FTP server on a virtual filesystem
=head1 SYNOPSIS
- use POE qw(Wheel::ReadWrite Driver::SysRW
- Wheel::SocketFactory Component::Server::FTP);
-
- POE::Component::Server::FTP->spawn
- (
- Alias => 'ftpd',
- ListenPort => 2112,
- FilesystemClass => 'Filesys::Virtual::Plain',
- FilesystemArgs =>
- {
- 'root_path' => '/', # This is actual root for all paths
- 'cwd' => '/', # Initial current working dir
- 'home_path' => '/Users', # Home directory for '~'
- }
- );
+ use POE qw(Component::Server::FTP);
+ use Filesys::Virtual;
+
+ POE::Component::Server::FTP->spawn(
+ Alias => 'ftpd', # ftpd is default
+ ListenPort => 2112, # port to listen on
+ Domain => 'blah.net', # domain shown on connection
+ Version => 'ftpd v1.0', # shown on connection, you can mimic...
+ AnonymousLogin => 'allow', # deny, allow
+ FilesystemClass => 'Filesys::Virtual::Plain', # Currently the only one available
+ FilesystemArgs => {
+ 'root_path' => '/', # This is actual root for all paths
+ 'cwd' => '/', # Initial current working dir
+ 'home_path' => '/home', # Home directory for '~'
+ },
+ # use 0 to disable these Limits
+ DownloadLimit => (50 * 1024), # 50 kb/s per ip/connection (use LimitSceme to configure)
+ UploadLimit => (100 * 1024), # 100 kb/s per ip/connection (use LimitSceme to configure)
+ LimitSceme => 'ip', # ip or per (connection)
+
+ LogLevel => 4, # 4=debug, 3=less info, 2=quiet, 1=really quiet
+ TimeOut => 120, # Connection Timeout
+ );
- $poe_kernel->run();
+ $poe_kernel->run();
=head1 DESCRIPTION
@@ -148,6 +279,7 @@ virtual filesystem interface as implemented by Filesys::Virtual.
=head1 AUTHORS
L.M.Orchard, deus_x@pobox.com
+
David Davis, xantus@cpan.org
=head1 SEE ALSO
@@ -5,6 +5,9 @@ WriteMakefile(
'NAME' => 'POE::Component::Server::FTP',
'VERSION_FROM' => 'FTP.pm', # finds $VERSION
'PREREQ_PM' => {
- Filesys::Virtual => 0.02
+ Filesys::Virtual => 0.03,
+ IO::Scalar => undef,
+ IO::Socket::INET => undef,
+ Socket => undef,
},
);
@@ -6,5 +6,5 @@ with other mainstream ftp servers out there.
To get started quickly, install this module and Filesys::Virtual
and use server.pl from the examples dir.
-
+*Use this ftp server at your own risk*
@@ -4,17 +4,24 @@ use Filesys::Virtual;
use POE qw(Component::Server::FTP);
POE::Component::Server::FTP->spawn(
- Alias => 'ftpd',
- ListenPort => 2112,
- Domain => 'teknikill.net',
- Version => 'ftpd v1.0',
- AnonymousLogin => 'deny', # deny, allow
- FilesystemClass => 'Filesys::Virtual::Plain',
+ Alias => 'ftpd', # ftpd is default
+ ListenPort => 2112, # port to listen on
+ Domain => 'blah.net', # domain shown on connection
+ Version => 'ftpd v1.0', # shown on connection, you can mimic...
+ AnonymousLogin => 'allow', # deny, allow
+ FilesystemClass => 'Filesys::Virtual::Plain', # Currently the only one available
FilesystemArgs => {
- 'root_path' => '/', # This is actual root for all paths
- 'cwd' => '/', # Initial current working dir
- 'home_path' => '/home', # Home directory for '~'
- }
+ 'root_path' => '/', # This is actual root for all paths
+ 'cwd' => '/', # Initial current working dir
+ 'home_path' => '/home', # Home directory for '~'
+ },
+ # use 0 to disable these Limits
+ DownloadLimit => (50 * 1024), # 50 kb/s per ip/connection (use LimitSceme to configure)
+ UploadLimit => (100 * 1024), # 100 kb/s per ip/connection (use LimitSceme to configure)
+ LimitSceme => 'ip', # ip or per (connection)
+
+ LogLevel => 4, # 4=debug, 3=less info, 2=quiet, 1=really quiet
+ TimeOut => 120, # Connection Timeout
);
$poe_kernel->run();