The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

WRITING ETLp PLUGINS

All of ETLp's processing is performed by plugins. The framework includes a number that support common ETL tasks, but it is easy to add your own:

  • Create a new subclass of ETLp::Plugin

  • Define the name of the "type" of functionality provided in the "type" subroutine. All type names must be unique across the environment, and ETLp will raise an error if there are any type name collisions.

  • All functionality must be provided in the "run" method.

A plugin is invoked from by specifying its type in the job item configuration. For example:

    <item>
        name = load file
        # The plugin that registers itself for processing the 
        # csv_loader type will be invoked
        type = csv_loader
    </item>

The typical template for an iterative plugin is:

    use MooseX::Declare;
    class MyApp::Plugin::Iterative::<<Name>> extends ETLp::Plugin {

        sub type {
            return '<<type>>';
        }

        method run (Str $filename) {
            <<functionality here>>
            return $filename;
        }
    }
  • An iterative plugin must always return the name of the file. This is because the filename can change throughout the lifetime of the file, so the pipeline must alway pass the filename to the next plugin.

  • ETLp uses Try::Tiny to catch and deal with exceptions. This is not required by a plugin, but it is recommended in the absence of an alternative. You can also raise an exception via ETLpException->throw('<<my exception>>'). This is preferable to using die or croak, as they would record the entire stack trace in the audit records, and may include too much irrelevant information about a plugin's internals to an application's support staff

A template for a serial plugin is not much different:

    use MooseX::Declare;
    class MyApp::Plugin::Serial::<<Name>> extends ETLp::Plugin {

        sub type {
            return '<<type>>';
        }

        method run {
            <<functionality here>>
        }
    }

The following attributes are available to the plugin:

  • config. A hashref of the job configuration.

  • item. A hashref containing the parsed item. All placeholders will have been replaced with the appropriate values.

  • original_item.A hashref containing the raw item - i.e. the placeholders have not been replaced. This gives the plugin developer the opportunity to record or log item elements without worrying about sensitive information being substituted for the placeholders.

  • env_conf. A hashref containing the parsed environment conf. This is only accessible if the environment setting allow_env_vars is true.

  • dbh.The DBI database handle. Be careful to commit or rollback your transactions as appropriate.

  • logger. A Log4perl logger.

  • audit. ETLp's job auditor.

AUDITING

ETLp provides three levels of auditing:

  • Audit the job

  • Audit the currently executing item

  • Audit any processing of a file (only applicable to an iterative job)

Generally, the job audit object shouldn't be used, as auditing at this level is handled by the framework not the plugin itself. The same applies to the item auditor. However, iterative plugins may wish to retrieve audit information about the current file being processed (the file id for instance):

    my $aud_file_process = $self->audit->item->file_process;  
    # get the file_id associated with the current file. This 
    # might be used later      
    my $file_id = $aud_file_process->get_canonical_id;

For documentation on the audit classes:

    perldoc ETLp::Audit::Job
    perldoc ETLp::Audit::Item
    perldoc ETLp::Audit::FileProcess

EXAMPLE PLUGIN

The following plugin loads the contents of an XML file into a database table. It's somewhat contrived as plugins should provide generalised functionality, and this plugin simply loads a specific type of data. However, the intention is to illustrate how a plugin works.

Consider the file

    <?xml version="1.0" encoding="UTF-8"?>
    <scores>
        <score>
            <id>1</id>
            <name>Smith</name>
            <value>50.5</value>
        </score>
        <score>
            <id>2</id>
            <name>Jones</name>
            <value>30.75</value>
        </score>
        <score>
            <id>3</id>
            <name>White</name>
            <value>89.3</value>
        </score>
    </scores>

The data will be loaded into the scores table:

    +---------+-------------+
    | Field   | Type        |
    +---------+-------------+
    | id      | int         |
    | name    | varchar(20) |
    | score   | float       |
    | file_id | int         |
    +---------+-------------+

All tables that are populated by the framework should have a file_id column, so that we can join back to the audit records.

    use MooseX::Declare;
    class My::Plugin::Iterative::ScoreXML extends ETLp::Plugin {
        use XML::Simple;
        use File::Copy;
        use File::Basename;
        
        sub type {
            return 'score_xml';
        }
        
        method run (Str $filename) {
            my $aud_file_process = $self->audit->item->file_process;        
            my $file_id          = $aud_file_process->get_canonical_id;
            my $app_config       = $self->config->{config};
            
            my $ref = XMLin($filename, KeyAttr => 'score');
            
            my $sth = $self->dbh->prepare(
                q{
                    insert into scores (
                        id, name, score, file_id
                    ) values (?, ?, ?, ?)
                }
            );

            foreach my $record (@{$ref->{score}}) {
                $sth->execute($record->{id}, $record->{name},
                        $record->{value}, $file_id);
                $row_count++;
            }
            
            $aud_file_process->record_count($row_count);
            
            $self->dbh->commit;
            
            move($filename, $app_config->{archive_dir}) ||
               ETLpException->throw(error => "Unable to move $filename to "
                  . $app_config->{archive_dir} . ": $!");

            return $app_config->{archive_dir} . '/' . basename($filename);    
        }
    }

Notes

  • The plugin retrieves the file_id from the file process auditor so that it can be saved with the record.

  • In this example we don't try and trap any errors. Any exceptions will be caught by the framework and logged as appropriate, and the file will be moved to the fail directory.

  • Generally, a file is processed by a series of plugins. One of them should move the file to the archive directory. This is generally the plugin that loads the data into the database.

  • This plugin illustrates why the filename should be returned. Subsequent processing will need to know where the file is located.

  • Plugins that load data can record or rows loaded via the record_count method in ETLp::Audit::!FileProcess.

REGISTERING PLUGINS

Plugins are registered when ETLp is invoked. However, the framework needs to know which of your modules are plugins, and this is done by setting the serial_plugin_ns and iterative_plugin_ns parameters in the env.conf file:

    serial_plugin_ns    = MyApp::Serial::Plugin
    iterative_plugin_ns = MyApp::Iteratve::Plugin

The plugins should be placed in your Perl library path. Note that ETLp includes %app_root%/lib by default.