The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Dimedis::Sql::Import;

use strict;
use vars qw($VERSION);
use Carp;

use Dimedis::Sql;
use Dimedis::Sql::CSV;
use Data::Dumper;
use FileHandle;
use File::Path;

$VERSION = '0.22';

##------------------------------------------------------------------------------
# CLASS
#   Dimedis::Sql::Import
#
# PURPOSE
#   Diese Klasse ermöglicht einen Import von Daten aus einem bestimmten
#   Export-Verzeichnis im Filesystem in ein bestehendes Datenbank-Schema
#   (Oracle, MySQL oder mSQL).
#   <p>
#   Es werden alle Daten aus dem angegebenen Export-Verzeichnis importiert,
#   für die es einen Eintrag im übergebenen Type-Hash gibt.
#   <p>
#   Das Export-Verzeichnis enthält für jede exportierte Tabelle ein
#   Unterverzeichnis mit dem Namen der entsprechenden Tabelle.
#   Dort sind die zugehörigen Daten abgelegt:
#   <ul>
#     <li>Die Datei <b>format.conf</b> enthält Informationen (Spaltenname, Typ
#         und Länge) zu den zugehörigen Tabellen-Spalten.<br>
#     <li>Die eigentlichen Daten sind in der CSV-Datei <b>data.dump</b>
#         abgelegt, wobei die einzelnen Spalten durch Tabs voneinander getrennt
#         sind. Enthält eine Tabelle BLOB- oder CLOB-Spalten, sind die
#         Inhalte dieser Spalten in separaten Dateien (<b>blob_1.bin -
#         blob_n.bin</b>) gespeichert. In der CSV-Datei ist dann für diese
#         Spalten nur der Name der zugehörigen Datei abgelegt.
#   </ul>
#   <p>
#   Die Start- und Endzeit des Imports, sowie die übergebenen Parameter
#   und die Statusmeldungen, die während des Imports ausgegeben werden,
#   werden in eine Meta-Datei im Export-Verzeichnis geschrieben.<br>
#   Für jeden Import wird eine neue Meta-Datei mit dem Namen
#   "import_<code>time()</code>.meta" erzeugt.
#   <p>
#   Über den optionalen Config-Parameter <code>inserts_per_transaction</code>
#   kann angegeben werden, nach wievielen DB-Inserts jeweils ein Commit
#   erfolgen soll.
#   (Beispiel: <code>inserts_per_transaction => 1</code> gibt an, dass nach
#   jedem eingefügten Datensatz committed wird).<br>
#   Wird der <code>inserts_per_transaction</code>-Parameter nicht angegeben,
#   erfolgt jeweils ein Commit pro Tabelle. 
#   <p>
#   Beispiel-Aufruf:
#   <pre>
#   |   my $import = Dimedis::Sql::Import->new(
#   |       dbh        => $dbh,
#   |       config     => {
#   |                      data_source    => 'dbi:Oracle:',
#   |                      username       => 'test',
#   |                      directory      => '/tmp/export',
#   |                      type_hash_file => './prod/config/lib.install.sql.general.all_tables.config',
#   |                     #inserts_per_transaction => 1
#   |                     },
#   |   );
#   |
#   |   $import->do_import();
#   </pre>
#
# AUTHOR
#   Sabine Tonn <stonn@dimedis.de>
#
# COPYRIGHT
#   dimedis GmbH, Cologne, Germany
#-------------------------------------------------------------------------------


##------------------------------------------------------------------------------
# METHOD
#   public constructor: new
#
# DESCRIPTION
#   Erzeugt ein neues Import-Objekt
#
# INPUT
#   dbh       -- DB-Handle
#   config      -- Config-Hash
#                    <ul>
#                      <li><b>Key:</b>   <code>data_source</code>
#                      <li><b>Value:</b> Data-Source der Zieldatenbank, in die
#                                        die Daten importiert werden
#                      <br><br>
#                      <li><b>Key:</b>   <code>username</code>
#                      <li><b>Value:</b> Schema-Name der Zieldatenbank, in die
#                                        die Daten importiert werden
#                      <br><br>
#                      <li><b>Key:</b>   <code>directory</code>
#                      <li><b>Value:</b> kompletter Pfad des Verzeichnisses,
#                                        aus dem die zu importierenden Daten
#                                        gelesen werden
#                      <br><br>
#                      <li><b>Key</b>:   <code>type_hash_file</code>
#                      <li><b>Value</b>: kompletter Pfad der Datei, in der das
#                                        Type-Hash abgelegt ist
#                      <br><br>
#                      Optional:<br>
#                      <li><b>Key</b>:   <code>inserts_per_transaction</code>
#                      <li><b>Value</b>: Anzahl der DB-Inserts, nach denen
#                                        jeweils ein Commit gemacht werden soll
#                                        (Default: ein Commit pro Tabelle)
#                    </ul> 
#
# OUTPUT
#   quiet_mode    -- 1 = keine Status-Meldungen zur Laufzeit auf der
#                    Standardausgabe anzeigen
#                    (Default = 0)
#
# RETURN
#   neues Import-Objekt
#-------------------------------------------------------------------------------
sub new {

  my $class = shift;
  my %par   = @_;

  #--- Parameterprüfung
  my $dbh      = $par{dbh}    or croak("'dbh' missing");
  my $config   = $par{config} or croak("'config' missing");

  croak "'data_source' missing"     unless $config->{data_source};
  croak "'username' missing"        unless $config->{username};
  croak "'directory' missing"       unless $config->{directory};
  croak "'type_hash_file' missing"  unless $config->{type_hash_file};

  my $quiet_mode = $par{quiet_mode};

  #--- Type-Hash einlesen
  my $fh = FileHandle->new();
  open( $fh, $config->{type_hash_file} ) or die "Can't open file: $!\n";

  my $data          = join ("", <$fh>);
  my $type_hash_ref = eval ($data);

  close( $fh );
  
  #--- neuen Filehandle für die Meta-Datei erzeugen
  my $fh_meta = FileHandle->new();
  
  open( $fh_meta, "> $config->{directory}/import_".time().".meta" )
    or die "Can't open file: $!\n";

  #------

  my ($from_charset, $to_charset) = $config->{recode} =~ /(.*?)\.\.(.*)/;
  
  $from_charset ||= "latin1";
  $to_charset   ||= "latin1";

  die "Invalid from_charset '$from_charset'"
  	unless $from_charset eq 'utf8' or $from_charset eq 'latin1';
  die "Invalid to_charset '$to_charset'"
  	unless $to_charset eq 'utf8' or $to_charset eq 'latin1';
  die "Recoding utf8..latin1 currently not supported"
  	if $from_charset eq 'utf8' and $to_charset eq 'latin1';

  my $self = {
      dbh                       => $dbh,
      data_source               => $config->{data_source},
      username                  => $config->{username},
      dir                       => $config->{directory},
      type_hash_file            => $config->{type_hash_file},
      inserts_per_transaction   => $config->{inserts_per_transaction},
      from_charset	        => $from_charset,
      to_charset	        => $to_charset,
      type_hash_ref             => $type_hash_ref,
      quiet_mode                => $quiet_mode,
      fh_meta                   => $fh_meta,
  };

  return bless $self, $class;
}

##------------------------------------------------------------------------------
# METHOD
#   public: do_import
#
# DESCRIPTION
#   Importieren der Daten
#-------------------------------------------------------------------------------
sub do_import {

  my $self = shift;
  
  my $dbh     = $self->{dbh};
  my $fh_meta = $self->{fh_meta};
  
  #--- Startzeit und übergebene Parameter in die Meta-Datei schreiben
  print $fh_meta "Import.pm version $VERSION\n\n".
                 "Import started at " . localtime() ." by user $ENV{USER} ".
                 "on $ENV{HOSTNAME}\n\n" .
                 "directory     : $self->{dir}\n" .
                 "data source   : $self->{data_source}\n" .
                 "schema        : $self->{username}\n" .
                 "type hash file: $self->{type_hash_file}\n\n";

  #--- Spalteninformationen zu den bestehenden Tabellen holen
  $self->_get_table_info();

  #--- Daten lesen und in die Datenbank schreiben 
  $self->_insert_data();

  #--- Endezeit in die Meta-Datei schreiben
  print $fh_meta "\nImport finished at " . localtime() ."\n";
}

##------------------------------------------------------------------------------
# METHOD
#   private: _get_table_info
#
# DESCRIPTION
#   Spalteninformationen zu den bestehenden Tabellen holen
#-------------------------------------------------------------------------------
sub _get_table_info {

  my $self = shift;

  my $dbh    = $self->{dbh};
  
  # -------------------------
  #  Tabellennamen ermitteln
  # -------------------------
  $self->_get_table_names();
 
  # -------------------------------------------------------
  #  Hash mit allen verfügbaren Spaltentypen zusammenbauen
  # -------------------------------------------------------
  my $type_info_all = $dbh->type_info_all();

  my $DATA_TYPE_idx = $type_info_all->[0]->{DATA_TYPE};
  my $TYPE_NAME_idx = $type_info_all->[0]->{TYPE_NAME};

  my %data_types;

  my $len = @{$type_info_all};

  #--- Ids und Namen der verfügbaren Spaltentypen holen
  for ( my $i=1; $i < $len; ++$i ) {

    $data_types{$type_info_all->[$i]->[$DATA_TYPE_idx]}
          = lc( $type_info_all->[$i]->[$TYPE_NAME_idx] );
  }

  # --------------------------------------------
  #  Spalten-Infos der einzelnen Tabellen holen
  # --------------------------------------------
  $self->_write_status( "\n" );

  foreach my $table_name ( keys %{ $self->{tables} } ) {

    $self->_write_status(
        ">>> getting column infos for table " . uc( $table_name ) . "...\n"
    );

    #--- Dummy-Statement ausführen, um die Spalteninformationen
    #--- zur aktuellen Tabelle ermitteln zu können
    my $sth = $dbh->prepare ("SELECT * FROM $table_name WHERE 1=0");

    $sth->execute();

    my @column_names  = @{ $sth->{NAME_lc} };
    my @column_types  = @{ $sth->{TYPE} };
    my $column_number = 0;

    foreach my $col ( @column_names ) {

      #--- Bei BLOB-, CLOB- und Serial-Spalten, wird der Typ nicht aus der
      #--- Datenbank sondern aus dem übergebenen Type-Hash geholt
      my $hash_type = $self->{type_hash_ref}{$table_name}{$col};
      my $db_type   = $data_types{$column_types[$column_number]};

      if ( $hash_type =~ /(^blob|^clob|^serial)/i ) {
        $self->{tables}{$table_name}{$col}{type} = $hash_type;
      }
      else {
        $self->{tables}{$table_name}{$col}{type} = $db_type;
      }

      $column_number++;
    }
  }
}

##------------------------------------------------------------------------------
# METHOD
#   private: _get_table_names
#
# DESCRIPTION
#   Namen der bestehenden Tabellen ermitteln
#-------------------------------------------------------------------------------
sub _get_table_names {
  
  my $self = shift;

  my $dbh    = $self->{dbh};
  
  my $schema = uc ( $self->{username} );
  
  my ( $sth, $table_name_key);

  # --------------------------
  #  alle Tabellennamen holen
  # --------------------------
  $self->_write_status( ">>> getting table names for schema $schema...\n" );
  
  #--- Sonderbehandlung für Sybase
  #--- (table_info()-Aufruf funktioniert nicht mit Hashref als Parameter)
  if ( $self->{data_source} =~ m/Sybase/i ) {
    $sth            = $dbh->table_info( $self->{username} );
    $table_name_key = "table_name";
  }
  else {
    my %attr        = (  TABLE_SCHEM => $schema );
    $sth            = $dbh->table_info( \%attr );
    $table_name_key = "TABLE_NAME";
  }

  # ----------------------
  #  Tabellennamen prüfen
  # ----------------------
  my $table_info_hr;
  
  while ( $table_info_hr = $sth->fetchrow_hashref() ) {
    
    my $table_name = lc( $table_info_hr->{$table_name_key} );
   
    #--- überspringen, wenn die Tabelle nicht zum angegebenen Schema gehört
    #--- (bei mySQL wird kein Schema-Name zurückgegeben..)
    #next  if $table_info_hr->{TABLE_SCHEM} ne $schema;

    #--- überspringen, wenn es zur Tabelle keinen Eintrag im
    #--- übergebenen Type-Hash gibt
    if ( not $self->{type_hash_ref}{$table_name} ) {
      
      $self->_write_status(
          "\nWARNING! Table " . uc( $table_name ) .
          " will be skipped due to missing type hash entry!\n"
      );
      next;
    }

    $self->{tables}{ lc( $table_name ) } = {};
  }

}

##------------------------------------------------------------------------------
# METHOD
#   private: _insert_data
#
# DESCRIPTION
#   Daten aus dem Filesystem lesen und in die Datenbank schreiben
#-------------------------------------------------------------------------------
sub _insert_data {

  my $self = shift;
  
  my $insert_count = 0;

  #--- neuen SQL-Handle erzeugen  
  my $dbh          = $self->{dbh};
  my $from_charset = $self->{from_charset};
  my $to_charset   = $self->{to_charset};

  my $sqlh = Dimedis::Sql->new ( 
      dbh           => $dbh, 
      type          => $self->{type_hash_ref},
      debug         => 0,
      serial_write  => 1,
      utf8          => ($to_charset eq 'utf8'),
  );

  $self->_write_status( "\n" );

  # --------------------------------------------------------
  #  alle gefundenen Tabellen durchlaufen, zugehörige Daten
  #  aus dem Filesystem lesen und in die DB einfügen
  # --------------------------------------------------------
  foreach my $table_name ( keys %{ $self->{tables} } ) {

    my $table_dir = $self->{dir} . "/$table_name";
   
    #--- Tabelle überspringen, wenn es kein zugehöriges
    #--- Export-Verzeichnis gibt 
    next  unless -d $table_dir;

    # -----------------------------------------------
    #  bestehende Datensätze aus der Tabelle löschen
    # -----------------------------------------------
    $self->_write_status(
        ">>> deleting data from table " . uc( $table_name ) . "...\n"
    );
   
    my $sth = $dbh->prepare ("DELETE FROM $table_name");
    $sth->execute();

    # ----------------------------------------------------------------------
    #  Spalten-Infos aus der Datei 'format.conf' der aktuelle Tabelle lesen
    # ----------------------------------------------------------------------
    my $VAR1;
   
    my $fh = FileHandle->new();
    open( $fh, "$table_dir/format.conf" ) or die "Can't open file: $!\n";

    my $data = join ("", <$fh>);
    eval ($data);

    close($fh);

    #--- zugehörige Spalten zur aktuellen Tabelle ermitteln
    #--- (aus der Datei format.conf, die beim Export erstellt wurde, holen,
    #---  damit die Reihenfolge der Spalten beim Insert stimmt)
    my $column_lr    = $VAR1;
    my $column_count = @{$column_lr};
    
    my ( @skipped_invalid_columns, @skipped_date_columns,
         $column_name, $column_type
    );
    
    for ( my $i=0; $i < $column_count; $i++ ) {

      $column_name = $column_lr->[$i]->{name};
      $column_type = $self->{tables}{$table_name}{$column_name}{type};
          
      #--- Spalte beim Insert überspringen, wenn es dazu keinen
      #--- Type-Hash-Eintrag gibt
      if ( $column_type eq "" ) {
        push ( @skipped_invalid_columns, uc( $column_name ) );
      }
      #--- Spalte beim Insert überspringen, wenn es eine Date-Spalte ist
      #--- (kann Dimedis::Sql nicht verarbeiten)
      elsif ( $column_type =~ /^date$/i ) {
        push ( @skipped_date_columns, uc( $column_name ) );
      }
    }

    # ----------------------------------------------------------
    #  neues CSV-Objekt erzeugen, um die exportierten Daten aus
    #   der entsprechenden CSV-Datei zu lesen
    # ----------------------------------------------------------
    my $csv = Dimedis::Sql::CSV->new (
        filename  => $table_dir . "/data.dump",
        delimiter => "\t",
	layer	  => ( $self->{from_charset} eq 'utf8' ? ':utf8' : '' ),
    );

    #--- für jede Zeile aus der CSV-Datei wird jetzt das
    #--- Datenhash gefüllt und der Datensatz wirden in die
    #--- Datenbank geschrieben
    $self->_write_status(
        ">>> inserting data into table " . uc( $table_name ) . "...\n\n"
    );
    
    my $data_lr;

    while ( $data_lr = $csv->read_line() ) {

      my %data;

      for ( my $i=0; $i < $column_count; $i++) {

        $column_name = $column_lr->[$i]->{name};
        $column_type = $self->{tables}{$table_name}{$column_name}{type};

        #--- Datums-Spalten und Spalten ohne Type-Hash-Eintrag überspringen
        #--- (die Namen dieser Spalten wurden weiter oben schon in die Liste
        #---  @skipped_invalid_columns oder @skipped_date_columns geschrieben
        #---  und werden später in einer Warnung ausgegeben)
        next  if ( $column_type eq "" or $column_type =~ /^date$/i ); 
        
        #--- Wert auf undef setzen, falls der Inhalt der DB-Spalte NULL war
        undef $data_lr->[$i] if $data_lr->[$i] eq "";

        #--- Sonderbehandlung von BLOB's und CLOB's
        if ( $column_type eq 'clob' or $column_type eq 'blob' ) {
          #-- Behandlung leerer Blobs (für die gibt's keine Datei)
          my $filename = "$table_dir/$data_lr->[$i]";
          $filename = "/dev/null" if not -f $filename;
          if ( $column_type eq 'clob' ) {
	    if ( $from_charset eq $to_charset ) {
	      #-- Dateiname übergeben, keine explizite Konvertierung
	      #-- nötig, also kann Dimedis::Sql das File für uns einlesen.
              $data_lr->[$i] = $filename;
	    } else {
	      #-- Bei unterschiedlichen Zeichensätzen (derzeit gibt's
	      #-- nur latin1->utf8), muß die Datei :raw eingelesen
	      #-- werden, da latin1. Beim Insert wird wg. des fehlenden
	      #-- utf8 Tags automatisch nach utf8 konvertiert.
	      $data_lr->[$i] = $sqlh->blob2memory (
	        $filename,
	        $column_name, $column_type,
	        ":raw"
	      );
	    }
          } elsif ( $column_type eq 'blob' ) {
	    $data_lr->[$i] = $filename;
	  }
        }
        $data{$column_name} = $data_lr->[$i];
      }

      # --------------------
      #  Datensatz einfügen
      # --------------------
      $sqlh->insert (
          table => $table_name,
          data  => \%data,
      );
     
      $insert_count++;
     
      #--- committen?
      if ( $self->{inserts_per_transaction} ne ""
       and $insert_count == $self->{inserts_per_transaction} )
      {
        $dbh->commit();
        $insert_count = 0;
      }
    }

    #--- Warnungen mit den Namen der übersprungenen Spalten ausgeben
    if ( scalar( @skipped_invalid_columns ) > 0 ) {
      $self->_write_status(
          "WARNING! Skipped column(s) "
          . join ( ", ", @skipped_invalid_columns ) .
          " due to missing type hash entry.\n\n"
      );
    }
    if ( scalar( @skipped_date_columns ) > 0 ) {
      $self->_write_status(
          "WARNING! Skipped column(s) "
          . join ( ", ", @skipped_date_columns ) .
          " due to illegal column type 'date'.\n\n"
      );
    }

    #--- Daten committen
    $dbh->commit();
  }

}

##------------------------------------------------------------------------------
# METHOD
#   private: _write_status
#
# DESCRIPTION
#   Status-Meldungen ausgeben
#-------------------------------------------------------------------------------
sub _write_status {
  
  my $self    = shift;
  my $message = shift;
  
  my $fh_meta = $self->{fh_meta};
  
  #--- Meldung auf der Standardausgabe ausgeben, wenn der Quiet-Modus
  #--- ausgeschaltet ist
  print $message  unless $self->{quiet_mode};
  
  #--- Meldung in die Meta-Datei schreiben
  print $fh_meta $message;
}

sub DESTROY {

  my $self = shift;

   #--- Meta-Datei schließen 
  my $fh = $self->{fh_meta};

  close($fh) if $fh;
}