package MySQL::Slurp;

=head1 NAME

MySQL::Slurp - Use PIPEs to write directly to a MySQL table
=head1 CAVEAT

  MySQL::Slurp only works on systems that support FIFOs and
  does not support Windows ... yet.


    use 5.008;
    use Carp;                                         
    use List::MoreUtils qw(any);
    use File::Temp;
    use Moose;
        with 'MooseX::Getopt';          # For NoGetopt tags

    use Mknod;  # function mknod creates FIFO / named pipe      
    use MySQL::Slurp::Writer;

    use DBI;

=head1 VERSION



    our $VERSION = 0.28;


    use MySQL::Slurp;

    my $slurper= MySQL::Slurp->new( 
        database => 'test' , 
        table    => 'table_1' , 
        buffer   => 10000 ,
        args     => []    ,


  # OR,
    my $slurper = MySQL::Slurp->new( 
        database => 'test', 
        table => 'table_1' 

    $slurper->slurp();         # slurp from <STDIN>

  #     implements buffer and locks
    $slurper->write( @records );    

    $slurper->print( "Fred\tFlinstone\n" );
    print { $slurper->{writer} } "Fred\tFlinstone\n";  


  # In coordinated environents
    my $slurper1 = MySQL::Slurp::Writer->new( ... );
    my $slurper2 = MySQL::Slurp::Writer->new( ... );

    $slurper1->write( @a );  # In thread 1.
    $slurper2->write( @b );  # In thread 2.


MySQL::Slurp provides methods for writing directly to a MySQL table
using a convenient interface. This module creates a writable FIFO and
uses C<mysqlimport> or C<LOAD DATA INFILE> to load whatever is written
to that FIFO.  This is the fastest method for importing data into a 
MySQL table.  This module makes it easy.  The user needs only C<open>,
C<write>, and C<close> a MySQL::Slurp object.

This module also provides a C<slurp> method for reaing directly from 
<STDIN> and writing to the table.  This allows you to do tasks such 
as the following: 

    cat data.tsv | perl   

This is very handy for large ETL jobs.

Unike using L<DBI> for trapping errors, catching errors with 
mysqlimport can be troublesome with inconsitent data.  It is 
recommended that you check you data before writing to the MySQL::Slurp
handle or use a suitable L<DBI> method.  

The module also implements buffering and locking using 
L<MySQL::Slurp::Writer>.  This allows for multi-process and multi-

=head1 METHODS

=head2 new 

Creates a new MySQL::Slurp object


=item database (required)  

name of the MySQL database containing the target table. 


    has 'database' => ( 
            is             => 'rw', 
            isa            => 'Str', 
            required       => 1,  
            documentation  => 'Target database' ,

=item table (required)

Name of MySQL table to write to.   


    has 'table'    => ( 
            is            => 'rw', 
            isa           => 'Str', 
            required      => 1 ,
            documentation => 'Target table' ,

=item tmp  

The (name of the) temporary directory in which the FIFO/pipe is created.  This is created by L<File::Temp>


  # Replace with File::Temp::newdir as of Version 0.27_02
  # tmp no longer is a command-line option. This is handled automatically.
    has 'tmp'  => ( 
            is            => 'rw' , 
            isa           => 'File::Temp::Dir' , 
            required      => 1 , 
            default       => 
                sub { 
                    File::Temp->newdir( 'mysqlslurp.XXXXXX', TMPDIR => 1 ); 
                    } ,
            documentation => "Temporary directory for " . __PACKAGE__  ,
            metaclass      => 'NoGetopt' , 

    has dir => (
        is             => 'ro' ,
        isa            => 'Str' ,
        required       => 1 ,
        lazy           => 1 ,
        default        => sub { $_[0]->tmp->dirname } ,
        documentation  => 'Location of temporary mysqlslurp directory' ,
        metaclass      => 'NoGetopt' ,  

=item buffer  

default: 1  ( buffer one line, i.e. no buffering )

Maximum number of records that are stored in the buffer before locking
the fifo and flushing to the MySQL table.  By default, there is no 
buffering,  Buffer = 1.  This attribute is used by L<MySQL::Slurp::Writer>

There is no checking for memory limits.  The user is responsible for 
using a sensible value.


    has 'buffer' => ( 
            is            => 'rw' ,
            isa           => 'Int' ,
            required      => 1 ,
            default       => 1 ,
            documentation => 'Records processed before flushing to the file handle ( default: 1)' 

=item method 

default: dbi 

Method to use for importing.  Supports c<mysqlimport>, c<mysql> and 
c<dbi> for mysqlimport, mysql and dbi loading methods, respectively.

c<dbi> is the default method.  This method uses the DBI module and is the
most portable.

C<mysql> uses the C<mysql> command line application.

C<mysqlimport> uses the mysqlimport appication.  Since the mysqlimport 
uses multiple threads.  This is faster than the DBI method.  It reads 
settings from C<~/.my.cnf>.    


    has 'method' => (
            is            => 'rw' ,
            isa           => 'Str' ,
            required      => 1 ,
            default       => 'dbi' ,
            documentation => 'Method to use mysqlimport|mysql|dbi|dbi-delayed' ,

=item args      

Options to pass to mysqlimport.  C<args> is an array ref and should appear
exactly as it does in the command line invocation of mysqlimport.  Applies
to C<mysqlimport> method only


    has 'args' => ( 
            is            => 'rw' , 
            isa           => 'ArrayRef' , 
            required      => 0 , 
            default       => sub { [] } ,
            metaclass     => 'NoGetopt' ,
            documentation => 'Flags to pass to mysqlimport.' 

=item verbose 

Whether to display verbose output 


    has verbose => (
        is            => 'ro' ,
        isa           => 'Bool' ,
        required      => 1 ,
        lazy          => 1 ,
        default       => 
            sub { 
                if ( 
                    any { $_ =~ /[^\w] (-v|--verbose) [ \w$ ]/x } @{ $_[0]->args }                  ) {
                    return 1 ;
                } else {
                    return 0 ;
            } ,
        documentation => 'Force writing on error' ,
        metaclass     => 'NoGetopt' ,  

=item force 

Continue even if errors are encountered 


    has force => (
        is            => 'ro' ,
        isa           => 'Bool' ,
        required      => 1 ,
        lazy          => 1 ,
        default       => 
            sub { 
                if ( 
                    any { $_ =~ /[^\w] (-f|--force)   [ \w$ ]/x } @{ $_[0]->args }                  ) {
                    return 1 ;
                } else {
                    return 0 ;
            } ,
        documentation => 'Force writing on error' ,
        metaclass     => 'NoGetopt' ,  


=head2 open

Opens a connection to the MySQL table through a temporary FIFO.  
Returns a GlobRef that can be directly written to.  This calls
internal methods L<_mkfifo>, <_import>, <_install_writer>,  After the
C<MySQL::Slurp> object is open, one can print directly to the table.


  #   Turns object into a MooseX::GlobRef::Object
    sub open {

      # mkfifo  
        $_[0]->_mkfifo;  # Create FIFO

      # import 
        $_[0]->_import;  # Install reading end of FIFO  

        # $_[0]->_install_globref;
        $_[0]->_install_writer ;

        return $_[0];


=head2 print

Writes arguments directly to the MySQL database.  Buffering is off by default,
see the L<buffer> attribute.


    sub print {

        $_[0]->writer->print( @_[1..$#_] );


=head2 slurp

Read from <STDIN> and write to the database  table.


    sub slurp {

        while( <STDIN> ) {
            $_[0]->print( $_ );
    } # END METHOD: slurp 

=head2 close

Closes and removes the pipe and temporary table.  
Calls C<MySQL::Slurp::Writer::close> and L<_rmfifo>.


    sub close {

        print "Closing filehandle\n" if ( $_[0]->verbose );

        # $_[0]->writer->flush;
        # $_[0]->_rmfifo;



=head2 writer

The slow holding the L<MySQL::Slurp::Writer> used for buffering the 
writing and thread-safe.


    has 'writer' => (
            is            => 'rw' ,
            isa           => 'MySQL::Slurp::Writer' ,
            required      => 0 ,
            metaclass     => 'NoGetopt' ,
            documentation => 'IO::File::flock filehandle to the pipe' ,

=head2 dbh

The database handle to the target table.  The handle is created using
the defaults in your C<~/.my.cnf> file.  Compression is used and query 
is streamed, C<mysql_use_result=1>.


    has 'dbh' => (
            is           => 'ro' ,
            isa          => 'DBI::db' ,
            required     => 0 ,
            lazy         => 1 ,
            default      => sub {

                my $dsn = join( ';', 
                        "DBI:mysql:database=" . $_[0]->database ,
                        # "host=" . $_[0]->host ,
                        "mysql_read_default_file=~/.my.cnf" ,
                        "mysql_compression=1" ,

                return DBI->connect( $dsn );

            } ,
            documentation => 'Database handle' ,
            metaclass     => 'NoGetopt' ,  

  # Since FIFO is required to match the table name, and it is
  # dependent on the other features, we do not make it an 
  # attribute but install it as a method ... lazy

=head2 fifo

The fifo used for import.  This is simply the path.  It is set to the 
name of the C<tmp> directory and the name of the C<table>.  


    has fifo => ( 
        is            => 'ro'  , 
        isa           => 'Str' ,
        required      => 1 ,
        lazy          => 1 ,
        default       => sub { $_[0]->dir  . "/" . $_[0]->table . ".txt" } ,
        documentation => 'Location of the fifo' ,
        metaclass     => 'NoGetopt' ,  

# ---------------------------------------------------------------------
# ---------------------------------------------------------------------


Do not use these methods directly.

=head2 _mkfifo

Creates the FIFO at C<[tmp]/mysqlslurp/[table].txt>.  This will die if 
a pipe, file, directory exists with the same descriptor.

The fifo is created as with default mode 0777. 


    sub _mkfifo {

        print "Making FIFO ... " . $_[0]->fifo . "\n" if ( $_[0]->verbose );

        unlink( $_[0]->fifo ) if ( -p $_[0]->fifo and $_[0]->force );

        croak( "A FIFO already exists for that table.  Delete with 'rm -f "
            . $_[0]->fifo . "' before proceeding\n" ) if ( -e $_[0]->fifo );

       # MAKE FIFO
        croak( "Cannot cannot directory ... " . $_[0]->dir )
         if ( ! -e $_[0]->dir ); 

         mknod( $_[0]->fifo , S_IFIFO|0644 ) 
            or croak( "Cannot make FIFO" );


=head2 _rmfifo

Removes the FIFO.  Used in cleaning up after the upload.


    sub _rmfifoX {
        print  "Removing FIFO ... " . $_[0]->fifo . "\n" if ( $_[0]->verbose ); 

        if ( -p $_[0]->fifo ) {
            unlink $_[0]->fifo or warn( "Cannot remove fifo " . $_[0]->fifo );

        # if ( -d $_[0]->dir ) {   # and  ! $_[0]->dir_exists ) {
        #    rmtree( $_[0]->dir );
        # }


=head2 _import

This is the heart of C<MySQL::Slurp>, reading from the fifo and writing
from the table.


    sub _import {                                       

        my $sql = 
           "LOAD DATA LOCAL INFILE \'" . $_[0]->fifo . "\' " . 
           "INTO TABLE " . $_[0]->database . "." . $_[0]->table ; 

        if ( $_[0]->method eq 'mysqlimport' ) {
            my $command = 'mysqlimport --local ' 
                . join( 
                    " ", 
                    @{ $_[0]->args }, $_[0]->database, $_[0]->fifo, "&" 

            print "Executing ... \"$command\" \n" if ($_[0]->verbose);
            system( "$command" );

        } elsif ( $_[0]->method eq 'mysql' ) {

            my $command = "mysql --local-infile -e\"$sql\"" ;

            print "Executing ... \"$command\" \n" if ($_[0]->verbose);

            system( "$command &" ); # Command must be placed in background

      # METHOD: dbi 
        } elsif ( $_[0]->method eq 'dbi' ) {

            print "Forking $sql \n" if ($_[0]->verbose);
            my $pid = fork;
            if ( $pid ) {
              # Parent: Do nothing but continue 
            } elsif ( defined $pid ) { 
              # Execute statement in child
                $_[0]->dbh->do( $sql ); 
                exit 0;

        } elsif ( $_[0]->method eq 'dbi-delayed' ) {

            croak( "dbi-delayed method not yet available" );

        } else {

            croak( $_[0]->method . " method not supported " );



=head2 _install_writer


    sub _install_writer {

       # $_[0]->writer( IO::File->new( $_[0]->fifo, ">" ) );
            filename => $_[0]->fifo ,
            buffer   => $_[0]->buffer ,

       return $_[0]->writer; # important for some reason.


=head2 _write 

Print to L<MySQL::Slurp::Writer> object located in C<$_[0]->writer>.


    sub _write {
        print { $_[0]->writer } @_[1..$#_] ;





MySQL::Slurp is believed to be thread safe if using the 'write' method.
Directly accessing the IO::File pipe is not considered Thread safe.

=head1 TODO

- use MooseX::Attribute::Defaults::GNU for object attributes

- remove reliance on installation of mysqlimport, by XS wrapping the C 

- create a version to run on windows with named pipes(?)

- create method for INSERT DELAYED 

=head1 SEE ALSO

MySQL::Slurp relies on the L<Moose> metaobject package. 

mysqlimport at L<>, currently 

=head1 AUTHOR

Christopher Brown, E<lt><gt>



Copyright (C) 2008 by Open Data

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.
