The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Kamaitachi;
use 5.008001;
use Moose;

our $VERSION = '0.05';

use IO::Handle;
use IO::Socket::INET;
use Socket qw/IPPROTO_TCP TCP_NODELAY SOCK_STREAM/;
use Danga::Socket;
use Danga::Socket::Callback;
use Data::AMF;
use Text::Glob qw/glob_to_regex/;

use Kamaitachi::Socket;
use Kamaitachi::Session;

with 'MooseX::LogDispatch';

has port => (
    is      => 'rw',
    isa     => 'Int',
    default => sub { 1935 },
);

has sessions => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

has parser => (
    is      => 'rw',
    isa     => 'Object',
    lazy    => 1,
    default => sub {
        Data::AMF->new( version => 0 ),
    },
);

has buffer_size => (
    is      => 'rw',
    isa     => 'Int',
    default => sub { 8192 },
);

has services => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

no Moose;

=head1 NAME

Kamaitachi - perl flash media server

=head1 SYNOPSIS

    use Kamaitachi;
    
    my $kamaitachi = Kamaitachi->new( port => 1935 );
    
    $kamaitachi->register_services(
        'servive1' => 'Your::Service::Class1',
        'servive2' => 'Your::Service::Class2',
    );
    $kamaitachi->run;

=head1 DESCRIPTION

Kamaitachi is perl implementation of Adobe's RTMP(Real Time Messaging Protocol).

Now kamaitachi supports Remoting and MediaStreaming via RTMP. SharedObject is not implemented yet.

This 0.x is development *alpha* version. API Interface and design are stil fluid.

If you want to use kamaitachi, look at example directory. it contains both server script and client swf.

=head1 DEVELOPMENT

GitHub: http://github.com/typester/kamaitachi

Issues: http://karas.unknownplace.org/ditz/kamaitachi/

IRC: #kamaitachi @ chat.freenode.net

=head1 METHODS

=head2 new

    Kamaitachi->new( %options );

Create kamaitachi server object.

Available option parameters are:

=over 4

=item port

port number to listen (default 1935)

=item buffer_size

socket buffer size to read (default 8192)

=back

=cut

sub BUILD {
    my $self = shift;

    my $ssock = IO::Socket::INET->new(
        LocalPort => $self->port,
        Type      => SOCK_STREAM,
        Blocking  => 0,
        ReuseAddr => 1,
        Listen    => 10,
    ) or die $!;
    IO::Handle::blocking($ssock, 0);

    Danga::Socket->AddOtherFds(
        fileno($ssock) => sub {
            my $csock = $ssock->accept or return;

            $self->logger->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));

            IO::Handle::blocking($csock, 0);
            setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack('l', 1)) or die;

            my $session = Kamaitachi::Session->new(
                id      => fileno($csock),
                context => $self,
            );
            $self->sessions->[ $session->id ] = $session;

            Kamaitachi::Socket->new(
                handle        => $csock,
                context       => $self,
                session       => $session,
                on_read_ready => sub {
                    my $socket = shift;

                    my $bref = $socket->read( $self->buffer_size );
                    unless (defined $bref) {
                        $socket->close;
                        return;
                    }

                    $session->io->push($$bref);
                    $session->handler->( $session );
                },
            );
        }
    );
}

=head2 register_services

    $kamaitachi->register_services(
        'rpc/echo'    => 'Service::Echo',
        'rpc/chat'    => 'Service::Chat',
        'stream/live' => 'Service::LiveStreaming',
        'stream/rec'  => Service::LiveStreamingRecorder->new( record_output_dir => $dir ),
    );

Register own service classes.

=cut

sub register_services {
    my ($self, @args) = @_;

    local $Text::Glob::strict_wildcard_slash = 0;

    while (@args) {
        my $key   = shift @args;
        my $class = shift @args;

        unless (ref($class)) {
            eval qq{ use $class };
            die $@ if $@;

            $class = $class->new;
        }

        push @{ $self->services }, [ glob_to_regex($key), $class ];
    }
}

=head2 run

    $kamaitachi->run

Start kamaitachi

=cut

sub run {
    my $self = shift;

    Danga::Socket->AddTimer(
        0,
        sub {
            my $poll
                = $Danga::Socket::HaveKQueue ? 'kqueue'
                : $Danga::Socket::HaveEpoll  ? 'epoll'
                :                              'poll';
            $self->logger->debug(
                "started kamaitachi port $self->{port} with $poll"
            );
        }
    );

    Danga::Socket->EventLoop;
}

=head1 AUTHOR

Daisuke Murase <typester@cpan.org>

Hideo Kimura <hide@cpan.org>

=head1 COPYRIGHT

This program is free software; you can redistribute
it and/or modify it under the same terms as Perl itself.

The full text of the license can be found in the
LICENSE file included with this module.

=cut

__PACKAGE__->meta->make_immutable;