The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# Ginger::Reference::Request::IO::Mongrel2
# Version 0.01
# Copyright (C) 2013 David Helkowski

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.  You may also can
# redistribute it and/or modify it under the terms of the Perl
# Artistic License.
  
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

=head1 NAME

Ginger::Reference::Request::IO::Mongrel2 - Ginger::Reference Component

=head1 VERSION

0.02

=cut

package Ginger::Reference::Request::IO::Mongrel2;
use strict;
use ZMQ::LibZMQ3;
use Class::Core 0.03 qw/:all/;
use Data::Dumper;
#use ZMQ::Constants ':all';
use ZMQ::Constants qw/ZMQ_PULL ZMQ_PUB ZMQ_IDENTITY ZMQ_RCVMORE ZMQ_POLLIN ZMQ_MSG_MORE/;
#use URI::Simple;
use CGI;
use Text::TNetstrings qw/:all/;
use threads;
use threads::shared;
use XML::Bare qw/xval/;
use Carp;

use vars qw/$VERSION/;
$VERSION = "0.02";

my $stop :shared;

sub wait_threads {
    my ( $core, $self ) = @_;
    while( 1 ) {
        my @joinable = threads->list(threads::joinable);
        my @running = threads->list(threads::running);
        
        for my $thr ( @joinable ) { $thr->join(); }
        last if( !@running );
        sleep(1) if( !$stop );
    }
}

sub init {
    my ( $core, $self ) = @_;
    $self->{'running'} = 1;
    $stop = 0;
    my $app = $self->{'obj'}{'_app'};
    my $sman = $self->{'session_man'} = $app->get_mod( mod => 'session_man' );
    if( !$sman ) {
        confess( "Cannot find session manager" );
    }
}

sub end {
    my ( $core, $self ) = @_;
    #$self->{'closed'} = 1;
    #zmq_close( $self->{'incoming'} );
    #zmq_close( $self->{'outgoing'} );
    #zmq_term( $self->{'ctx'} );
    $stop = 1;
    $self->wait_threads();
}

sub run {
    my ( $core, $self ) = @_;
    
    my $app = $core->get_app();
    #$self->{'request_man'} = $app->get_mod( mod => 'request_man' );
    my $log = $self->{'log'} = $app->get_mod( mod => 'log' );
    my $conf = $self->{'_xml'};
    my $threads = xval $conf->{'threads'}, 2;
    
    #my $session_man = $app->get_mod( mod => 'session_man' );
    #my $session_hash = $session_man->{'sessions'};
    
    $log->note( text => "Spawning $threads thread(s) to handle incoming requests" );
    for( my $i=0;$i<$threads;$i++ ) {
        threads->create( \&server, $core, $self, $i );
    }
}

#my $server;
my %tids :shared;
my %servers;

sub server {
    my ( $core, $self, $sid ) = @_;
    
    my $app = $core->get_app();
    
    $self->{'request_man'} = $app->get_mod( mod => 'request_man' );
    
    #print "Start session hash:".$session_hash."\n";
    
    #$SIG{'INT'} = 'default';
    
    my $thr = threads->self();
    my $tid = $thr->tid();
    $app->init_threads( tid => $tid );    
    {
        lock %tids;
        $tids{ $tid } = $sid;
        $servers{ $tid } = $self;
    }
    
    $self->{'id'} = $sid;
    #my $app = $self->{'obj'}{'_app'};
    my $glob = $self->{'obj'}{'_glob'};
    
    my $log = $self->{'log'}; # this is a thread specific copy of the log module due to the way perl threading works
        
    my $sman = $self->{'session_man'};
    
    #$sman->{'sessions'} = $session_hash;
    
    my $rman = $self->{'request_man'};
    #my $router = $self->{'router'};
    my $ctx = $self->{'ctx'} = zmq_init();
    my $incoming = $self->{'incoming'} = zmq_socket( $ctx, ZMQ_PULL );
    
    my $xml = $self->{'_xml'};
    my $ip = xval $xml->{'ip'};
    my $inconf = $xml->{'incoming'};
    my $outconf = $xml->{'outgoing'};
    my $inport = 6768;
    my $inid = 'blah';
    if( $inconf ) {
        $inport = xval $inconf->{'port'}, 6768;
        $inid = xval $inconf->{'id'}, 'blah';
    }
    my $outport = 6769;
    my $outid = 'blah2';
    if( $outconf ) {
        $outport = xval $outconf->{'port'}, 6769;
        $outid = xval $outconf->{'id'}, 'blah2';
    }
    $log->note( text => "Server $sid started - Listening on ip $ip - in: $inport($inid) - out: $outport($outid)" );
    
    zmq_connect( $incoming, "tcp://$ip:$inport" );
    zmq_setsockopt( $incoming, ZMQ_IDENTITY, $inid ); # Indentity should not be hardcoded
    my $outgoing = $self->{'outgoing'} = zmq_socket( $ctx, ZMQ_PUB );
    zmq_connect( $outgoing, "tcp://$ip:$outport" );
    zmq_setsockopt( $outgoing, ZMQ_IDENTITY, $outid );
    
    #my $q = new CGI;
    while(1) {
        last if( $stop );
        #sleep(1);
        zmq_poll( [ { socket => $incoming, events => ZMQ_POLLIN, callback => \&handle_request } ], 1000 );
    }
    $log->note( text => "Server $sid ending" );
    
    zmq_close( $incoming );
    zmq_close( $outgoing );
    zmq_term( $ctx );
}

sub handle_request {
    my $sid;
    my $thr = threads->self();
    my $tid = $thr->tid();
    my $self;
    {
        lock %tids;
        $sid = $tids{ $tid };
        $self = $servers{ $tid };
    }
    
    my $app = $self->{'obj'}{'_app'};
    
    my $log = $self->{'log'};
    my $sman = $self->{'session_man'};
    
    my $rman = $self->{'request_man'};
    my $incoming = $self->{'incoming'};
    my $outgoing = $self->{'outgoing'};
    
    #my $sid = $server->{'id'};
    #print "Server: $sid - $tid\n";
    
    my $buffer = '';
    while( 1 ) {
        my $part;
        zmq_recv( $incoming, $part, 32768 ); # TODO: the size should be configurable
        $buffer .= $part;
        my $rc = zmq_getsockopt( $incoming, ZMQ_RCVMORE );
        last if( !$rc );
        print "Extra part; size of part1 = " . length( $part ) . "\n";
    }
    
    #print "Sid: $sid\n";
    $buffer =~ m/^([^ ]+) ([^ ]+) ([^ ]+) (.+)$/s;
    my $sender = $1;
    my $id = $2;
    my $path = $3;
    my $data = $4;
    my $type = 'get';
    
    #print "Sending: $sender Id $id\n";
    # $data =~ s/\0*$//; remove the null from the end for printing more easily
    
    $data =~ m/^([0-9]+):/; 
    
    my $tnetlen = $1;
    my $lenlen = length( $tnetlen );
    
    my $extra = length( $data ) - ( $tnetlen + $lenlen + 2 );
    my $post = '';
    my $postvars = {};
    
    if( $extra > 0 ) {
        my $xstr = substr( $data, $tnetlen + $lenlen + 2 );
        $xstr =~ s/,\0*$//;
        if( $xstr eq '0:' ) {
        }
        elsif( $xstr eq '21:{"type":"disconnect"}' ) {
            $type = "disconnect_notice";
            #print "Disconnect of $id\n";
            return; # TODO; pass notice on to routing to potentially terminate long running reports
        }
        else {
            $post = $xstr;
            $post =~ s/^([0-9]+)://; 
        }
    }
    
    my $hash = decode_tnetstrings( $data );
    
    my $queryhash = 0;
    if( $hash && defined $hash->{'QUERY'} ) {
        $queryhash = url2hash( $hash->{'QUERY'} );
    }
    
    my $content_type = $hash->{'content-type'};
    
    if( $content_type && $content_type =~ m|^multipart/form-data; boundary=(.+)$| ) {
        my $bound = "--$1";#\r\n
        if( $hash->{'x-mongrel2-upload-start'} ) {
            if( ! $hash->{'x-mongrel2-upload-done'} ) {
                $type = "largepost_notice";#$hash->{'x-mongrel2-upload-start'};
                my $postid;
                if( $queryhash && ( $postid = $queryhash->{'postid'} ) ) {
                    # store off the temp file location with the postid so that it can be retrieved by a later status check
                    # TODO
                }
                # potentially it is pointless to pass the upload start notice on to routing,
                # but current we are doing so, comment out the following line to avoid passing it on
                # next;
            }
            else {
                $type = "largepost";
                # open the file and parse it
                my $tmpfile = $hash->{'x-mongrel2-upload-start'};
                process_postfile( $postvars, $bound, $tmpfile );
            }
        }
        else {
            $type = 'post';
            my @postarr = split( $bound, $post );
            shift @postarr;
            while( @postarr ) {
                my $part = shift @postarr;
                last if( $part =~ m"^--" );
                my $parthash = process_multipart( $part );
                my $partname = $parthash->{'name'};
                if( !@{$parthash->{'headers'}} ) {
                    $postvars->{ $partname } = $parthash->{'body'};
                }
                else {
                    $postvars->{ $partname } = $parthash;
                }
            }
        }
    }
    elsif( $hash->{'METHOD'} eq 'POST' ) {
        $type = 'post';
        $postvars = url2hash( $post );
    }
    
    my $session = 0;
    
    my $r = $rman->new_request(
        path => $path,
        query => $queryhash,
        postvars => $postvars,
        id => $id,
        ip => $hash->{'x-forwarded-for'},
        type => $type # either 'post', 'get', or 'disconnect_notice'
        );
    $log->{'r'} = $r;
    
    $log->note( text => "Recieved request to $path");
    
    my $cookieman = $r->get_mod( mod => 'cookie_man' );
    
    if( $hash->{'cookie'} ) {
        $cookieman->parse( raw => $hash->{'cookie'} );
    }
    
    my $router = $r->get_mod( mod => 'web_router' );

    my $res = $router->route( session_man => $sman );
    if( $type =~ m/notice/ ) {
        next;
    }
    
    my $typeinfo = $r->get_type();
    my $restype = $typeinfo->get_res('type');
    
    my $code = $r->get_code();
    my $body = $r->get_body();
    
    my $headers = $r->get_headers();
    
    $headers .= $cookieman->set_header();
    my $raw;
    use bytes;
    if( $body ne '' ) {
        my $blen = length( $body );
        $headers .= "Content-Length: $blen\r\n";
        $raw = "$headers\r\n$body";
    }
    else {
        $raw = $headers . "\r\n\r\n";
    }
    
    my $idlen = length( $id );
    my $msg   = "blah2 $idlen:$id, HTTP/1.1 $code\r\n$raw";
    my $len   = length( $msg );
    
    if( $len > 100000 ) {
        # Theoretically this should work, but Mongrel2 apparently does not support this
        #for( my $st = 0; $st < $len; $st+= 100000 ) {
        #    my $end = $st + 100000 - 1;
        #    if( $end > $len ) { $end = $len; }
        #    if( $end < $len ) {
        #        $log->note( text => "Sending $st - $end" );
        #        zmq_send( $outgoing, substr( $msg, $st, $end ), -1, ZMQ_MSG_MORE );
        #    }
        #    else {
        #        $log->note( text => "Sending $st - $end" );
        #        zmq_send( $outgoing, substr( $msg, $st, $end ) );
        #    }
        #}
        
        # This -seems- to work more often than the zmq_send below for large messages
        my $ob = zmq_msg_init_data( $msg );
        zmq_sendmsg( $outgoing, $ob );
        zmq_msg_close( $ob );
    }
    else {
        zmq_send( $outgoing, $msg );
    }
    
    $r->end();
    
    my $rlen = $r->{'end'} - $r->{'start'};
    $rlen *= 100000;
    $rlen = int( $rlen );
    $rlen /= 100;
    
    $log->note( text => "Request finished; len=${rlen}ms" );
}

sub url2hash {
    my $url = shift;
    my $hash;
    
    my @parts = split('&', $url );
    for my $part ( @parts ) {
        next if( ! defined $part );
        if( $part =~ m/(.+)=(.+)/ ) {
            my $key = $1;
            my $val = $2;
            $key =~ s/%([a-zA-Z0-9]{2})/pack('H2',$1)/ge;$key =~ s/\+/ /g;
            $val =~ s/%([a-zA-Z0-9]{2})/pack('H2',$1)/ge;$val =~ s/\+/ /g;
            if( $key =~ m/^(.+)\[([0-9]+)\]$/ ) {
                my $arr = $hash->{ $1 } ||= [];
                $arr->[$2] = $val;
            }
            else {
                $hash->{ $key } = $val;
            }
        }
    }
    return $hash;
}

sub process_postfile {
    my ( $postvars, $bound, $tmpfile ) = @_;
    if( ! -e $tmpfile ) {
        # perhaps not running on the same server?
        return;
    }
    # TODO: process the contents of tmpfile
    # - store short variables in postvars
    # - store pointers to the region of the tmpfile that matters for larger files
}

sub process_multipart {
    # Example parts:
    # Content-Disposition: form-data; name="pw"
    # Content-Disposition: form-data; name="myfile"; filename="test.txt"
    # Content-Type: text/plain
    my $part = shift;
    my $headers = [];
    $part =~ s/\r\n//;
    my $name = '';
    while( $part =~ m/^(Content-[^:]+): ([^\r]+)\r\n(.+)/s ) {
        my $varname = $1;
        my $val = $2;
        $part = $3;
        if( $varname eq 'Content-Disposition' ) {
            #print "Val:$val\n";
            if( $val =~ m/form-data; name="([^"]+)"(; filename="([^"]+)")?/ ) {
                my $disp_name = $1;
                my $disp_file = $2;
                if( $disp_file ) { # we have a filename
                }
                else {
                    $name = $disp_name;
                }
            }
        }
        else {
            push( @$headers, { name => $name, varname => $varname, val => $val } );
        }
    }
    $part =~ s/^\r\n//;
    $part =~ s/\r\n$//;
    
    return {
        name => $name,
        headers => $headers,
        body => $part  
    }
}

1;

__END__

=head1 SYNOPSIS

Component of L<Ginger::Reference> that handles recieving web requests from Mongrel2 via ZeroMQ.

=head1 DESCRIPTION

The following things are handled by this module:

=over 4

=item * Receiving requests from Mongrel2

=item * Parsing the raw Mongrel2 incoming request into a structure format

=item * Decoding post data if it is set

=item * Routing request via web_router module

=item * Sending the results of a routed request back out to Mongrel2

=item * Sending desired cookies to be set to Mongrel2

=head2 Known Bugs

=over 4

=item * File uploads are not handled properly

=back

=head1 LICENSE

  Copyright (C) 2013 David Helkowski
  
  This program is free software; you can redistribute it and/or
  modify it under the terms of the GNU General Public License as
  published by the Free Software Foundation; either version 2 of the
  License, or (at your option) any later version.  You may also can
  redistribute it and/or modify it under the terms of the Perl
  Artistic License.
  
  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

=cut