In this example, we will unzip, validate and load free five minute price files from the [http://www.electricityinfo.co.nz/comitFta/ftapage.main WITS free to air site]. As noted in the introduction, we try and move as much of the application functionality to the framework as possible. The ETLp framework will execute each phase one after the other.
The Pipeline has two modes - iterative and serial specified with the type parameter.
In the iterative mode, the framework will iterate over a number of files, calling each phase in turn, once for each file.
With the serial mode, each phase is executed once.
In this example, because we are processing a series of files, we wll be running an iterative pipeline. We start by creating a config file:
<five_min_prices> type = iterative <config> filename_format = (5_minute_prices.*\.csv)(?:\.gz)?$ incoming_dir = data/incoming archive_dir = data/archive fail_dir = data/fail table_name = five_minute_prices controlfile_dir = conf/control controlfile = five_minute_prices.ctl on_error = die next = wits price_datamart </config> <pre_process> <item> name = decompress type = gunzip </item> <item> name = validate price file type = validate file_type = csv skip = 1 </item> </pre_process> <process> <item> name = load price file type = csv_loader skip = 1 </item> <item> name = price_proc type = plsql <parameters> name = filename value = %basename(filename)% </parameters> <parameters> name = message value = %message% </parameters> </item> </process> <post_process> <item> name=compress price file type=gzip </item> </postprocess> </five_min_prices>
The file uses the Apache-style syntax. It is similar to XML in that it allows nested sections, but parameters are specified using name value pairs. Our outmost tags, "five_min_prices" denote a section within the configuration file. If we grabbed more than one file type from the WITS site, we would add each type to its own section within the configuration file
The file is divided into two categories:
The configuration section (config) sets up our environment.
The processing sections do the actual processing.
The Standard ETLp gzip, gunzip, validation and load functions require specific configuration values:
filename_format is a reqular expression that will be used to find the files we wish to load. The file may have a .gz extension.
Note the parentheses around (5_minute_prices.*\.csv). The canonical name of the file (the name when all other extraneous information such as the compressed extension) must be specified within parenthesis. This applies even to files whether there is no transformation of the name.
incoming_dir specifies where the files will be found.
If the path is relative (i.e. no leading /), then the path will be relative to the application root directory - i.e. one directory above the etlp script.
If the path has a leading slash, then it is an absolute specification, and the data does not have top reside under the application tree.
archive_dir specifies where successfully loaded files will be moved to.
fail_dir is where files that fail processing are moved to.
table_name defines the table that the data will be loded into.
controlfile is the name of the definition file that specifies the definition of the source files to be processed. It can also have rules for each field that will define allowable values.
controlfile_dir is the directrory where the controlfile is found.
on_error specifes a default action to be taken whenever an error is encountered. This can be over-ridden at the individual processing item level. The allowable values are
die: stop all processing.
skip: stop futher processing of the current file, and move onto the next one.
ignore: ignore the error and continue with further processing.
This is the controlfile we use to specify the file definition:
grid_point N varchar(8) trading_date N date(%d/%m/%Y) trading_period N integer;range(1,50) market_time N qr/^(20|21|22|23|[01]\d|\d)(([:][0-5]\d){1,2})$/ price N float;range(0,) island N varchar(2) area N varchar(2) market_flag N varchar(1) runtime N date(%d/%m/%Y %H:%M:%S)
The first field defines the field name. The second field specifies whether the field is nullable or not. The third field specifies the validation rules. There can be multiple rules per field, with each rule separated by a semi-colon.
grid_point. Can contain up to 8 characters.
trading_date. A date, using the specified POSIX date format.
trading_period. (1) An integer. (2) The minimum value is 1 and maximum value is 50.
market_time. A valid time using a 24 hour clock. Note if none of the predefined validations suit our needs, we can defined custom regular expressions to validate the data.
price. (1) A floating point number. (2) The minimum value is 0 and there is no maximum value.
island. A two character field.
area. A two character field.
market_flag. A single character field.
run_time. A date, using the specified POSIX date format.
The field names should match the corresponding column names in the table. The table will require one extra integer field called file_id. This allows an individual file to be tracked throughout it's rocessing history.
If any records fail validation, the errors are logged in the audit tables and are also emailed:
ERROR - Error processing /home/etl/data/incoming/5_minute_prices_WWD1103_20100609.csv: 5_minute_prices_WWD1103_20100609.csv failed validation: Line number: 13 field name:island field value:NNI error:Length must be less than or equal to 2 characters Line number: 30 field name:trading_date field value:09/13/2010 error:Invalid date for pattern: %d/%m/%Y
All processing belongs to one of three phases, where each phase can consist of one or more items, which are run sequentially. Not all phases are required.
name: decompress
type: gunzip
description: Decompress the current file if it is gzipped.
name: validate price file
type: validate
description: Validate the file against the file rules (see above).
name: load price file
type: csv_loader
description: Loads the data into the table.
name: price_proc
type: plsql
description: An Oracle stored procedure
name: compress
type: gzip
description: The processed file is compressed using the gzip algorithm.
The validation and csv_loader items both have the setting "skip = 1". This is because the data files have a header record and we skip over one line.
If csv_loader completes successfully, the file is moved to the archive_dir.
If any of the processing steps raise an error, the file is moved to the fail_dir,
The PL/SQL call is the only invocation of custom code. Everything else is handled automatically by the framework.
The framework removes the burden of capturing the results and audit information from the developer. It records when processes are run, how long they took and what the results were. Any errors are captured and recorded and can be viewed in the audit browser. Any errors or warnings are automatically mailed to the email address defined in the environment configuration file.
Invoking the ETL is simple. Call the following from either a scheduler or the command line, making sure the OS environment (ORACLE HOME, etc) is configured:
etlp <config_file> <section>
e.g.
etlp wits five_min_prices
The next parameter in the top section of the configuration specifies the next pipeline to be called on the completion of the current one. i.e.
next = wits price_datamart
will invoke the call
etlp wits price_datamart
To install ETLp, copy and paste the appropriate command in to your terminal.
cpanm
cpanm ETLp
CPAN shell
perl -MCPAN -e shell install ETLp
For more information on module installation, please visit the detailed CPAN module installation guide.