The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Parallel::Depend : Parallel-dependent dispatch of perl or shell code.

SYNOPSIS

    package Mine;

    use base qw( Parallel::Depend Whatever::Else );

    my $manager = Mine->constructify( @whatever );

    my @argz =
    (
        # assign values to attributes

        restart     => '',  # restart in-process queue
        force       => '',  # ignore/overwrite previous execution

        verbose => 1,       # quiet (0), progress (1), detail (2).
        debug   => 0,       # DB::single set before first parse, dispatch

        nofork      => '',  # single-stream, useful for testing.
        maxjobs     => 8,   # 0 => unlimited, x < 0 => nofork.
        fork_ttys   => '',  # used for $DB::fork_TTY with perl debugger

        autoload    => '',  # choose between autoload and shell

        logdir      => "$Bin/../var/log",   # stderr, stdout files
        rundir      => "$Bin/../var/run",   # job status files

        sched   => <<'END'

        this : that     # this runs after that
        that : other    # that runs after other

        # multiple dependencies per line or 1 : 1.

        foo bar : bletch blort
        foo : bim
        foo : bam
        bim bam : this that

        # redundent but harmless if included.

        foo bar : blort bletch

        # without aliases jobs are dispatched as manager
        # object methods, perl function calls, code
        # blocks, AUTOEXEC, or shell depending on where
        # the job can be found.
        # aliases are expanded in the same fashion but
        # are passed the job name as an argument.

        foo = frobnicate                # $manager->frobnicate( 'foo' )

        bar = Some:Package::function    # $coderef->( 'bar' )

        bim = { your code here }        # $anon_sub->( 'bim' )

        bam = /path/to/shell            # system( '/path/to/shell', 'bam' )

        this    = ./blah -a -b          # system( './blah -a -b', 'this' )

        # example of reusing an alias: zip a
        # maxjobs files in parallel.

        /tmp/bakcup/bigdump.aa  = squish
        /tmp/bakcup/bigdump.ab  = squish
        /tmp/bakcup/bigdump.ac  = squish
        /tmp/bakcup/bigdump.ad  = squish
        /tmp/bakcup/bigdump.ae  = squish
        /tmp/bakcup/bigdump.af  = squish

        /tmp/bakcup/bigdump.aa :
        /tmp/bakcup/bigdump.ab :
        /tmp/bakcup/bigdump.ac :
        /tmp/bakcup/bigdump.ad :
        /tmp/bakcup/bigdump.ae :
        /tmp/bakcup/bigdump.af :

        # groups are sub-schedules that have their
        # own namespace for jobs, are skpped entirely
        # on restart if the group completes successfully,
        # and can set their own attributes.

        pass2 < maxjob % 4          >   # throttle heavy-duty jobs.
        pass2 < fee fie foe fum :   >   # all these can run in
        pass2 < this that other :   >   # parallel, no harm splitting them up
        pass2 < this  = squish      >   # locally defined aliases: these are
        pass2 < that  = squash      >   # *not* the same as the jobs above.
        pass2 < other = frobnicate  >

        # attributes can be set, apply to all nested
        # levels. helpful within groups.
        #
        # for example, extract the contents of some
        # files, process them, then cleanup the 
        # results. each group uses its own alias
        # to handle the files.

        maxjob % 0              # unlimited forks

        after  < maxjob % 2 >   # two-way only for after group

        prior  < alias % extract >  # default alias within the group
        after  < alias % cleanup >
        during < alias % process >

        prior  < file1 : >  # $mgr->extract( 'file1' )
        prior  < file2 : >  # $mgr->extract( 'file2' )

        during < file1 : >  # $mgr->process( 'file1' )
        during < file2 : >  # $mgr->process( 'file2' )

        after  < file1 : >  # $mgr->cleanup( 'file1' )
        after  < file2 : >  # $mgr->cleanup( 'file2' )

        # assign job-specific attributes -- mainly to control
        # verbosity or flag jobs as installing ad-hoc schedules.

        this ~ ad_hoc       # default for all attributes is 1
        this ~ verbose 0    # or add your own value
        that ~ verbose 2

        # as you might have guessed by now, text after
        # an un-escaped hash sign is treated as a comment.
END
    );

    my $outcome
    = eval
    {
        $manager->prepare ( @argz );    # parase, validate the queue.
        $manager->validate;             # check for deadlocks
        $manager->execute;              # do the deed

        "The End"
        
    }
    or die $@;

    or just:

    $manager->prepare( @argz )->validate->execute;

    # if you want to derive a new object from a
    # new one and use it to execute the que or add
    # ad_hoc jobs (e.g., factory class) then you
    # must share the queue for the new object.

    sub derive_new_object
    {
        my $mgr     = shift;

        my $derived = $mgr->new( @_ );

        $mgr->share_queue( $derived );

        $derived
    }

    # at this point the derived object uses 
    # the same queue as the original $manager
    # (not a clone, the same one). executing
    # with $derived will have the same effect
    # on the queue as $manager.

    $derived->execute;

DESCRIPTION

Parallel::Depend does parallel, dependent dispatch of perl methods, perl functions, inline code blocks, or external shell commands. The schedule syntax is derived from Make but does not require that all jobs be wrapped in shell code to execute and also supports sub-schedules ("groups" ) that are dispatched as a unit.

Execution history, including stdout and stderr of each job, is kept in per-job files for simpler access after the fact.

Schedule Syntax

The schedule can contain dependencies, aliases, attribute assignments, group definitions, and perl-style comments.

Dependecies ":"

Dependencies between jobs use a ':' syntax much like make:

        # commenting the schedule is often helpful.

    foo : bar
    foo : bletch
    bim : foo
    bam : foo

or

    # produces the same result as above:

    foo : bar bletch

    bim bam : foo

Job names are non-whitespace ( /\S/ if you like regexen) and are separated by whitespace. If you need whitespace arguments in order to dispatch the job then see "aliases" below.

Job Aliases "="

Jobs are normally dispatched as-is as either method names, perl functions, perl code blocks, or to the shell via system( $job ).

Processing a number of inputs through the same cycle, passing arguments to the methods, or including shell commands with multiple arguments requries aliasing the job name:

        job1 = make -wk -c /foo bar bletch;
        job2 = Some::Module::mysub
        job3 = methodname
        job4 = { print "this is a perl code block"; 0 }

        job3 : job2 job1

Will eventually call Some::Module::mysub( 'job2' ) and $mgr->methodname( 'job3' ); job1 will be handled as system( 'make ...' ). job4 will not be executed since there is no dependency rule for it.

Passing a number of arguments to the same routine is done by aliasing them the same way.

Say you want to gzip a large number files, running the zips n-way parallel:

    my $sched
    = q
    {
        /path/to/file1  = gzip
        /path/to/file2  = gzip
        /path/to/file3  = gzip

        /path/to/file1  :
        /path/to/file2  :
        /path/to/file3  :
    };

    My::Class->prepare
    (
        sched   => $sched,
        maxjob  => 4
    )
    ->execute;

Types of aliases

Method Alias

if $mgr->can( $alias ) then the alias will be dispatched as

    $mgr->$alias( $job );

For example

    /path/to/file = squish
    /path/to/file :

will dispatch:

    $mgr->squish( '/path/to/file' );
Shell Alias
    /path/to/file   = /bin/gzip -9v

Will call

    system '/bin/gzip -9v', '/path/to/file'

(splitting the command line on un-escaped whitespace is hell, the shell can do it well enough for itself).

Perl Function Alias

If you don't want to pass the queue manager object (i.e., functonal interface) just include the package with '::':

    /path/to/file1  = My::Class::Util::gzip

will end up calling

    $coderef->( '/path/to/file1' );
Code Block Alias

For complete -- if somewhat dangerous -- control use a codeblock. This will be compiled on the fly into a subroutine and run with job as its argument.

Using

    foo     = { a block of perly code }
   

Will eval "sub $alias" to get a subref and dispatch $subref->( 'foo' ). This is one way to dodge passing around the manager object if necessary.

AUTOLOAD

If the 'autoload' attribute is set, the manager can 'AUTOLOAD', and the alias looks like a method call then unalias will trust to luck and compile

    sub { $mgr->$alias( $job ) }

For you. This can be helpful for allowing nested class structures to derive their own objects to handle specific parts of processing: just pass class name as the job, alias it to the method being dispatched, and have your AUTOLOAD take the argument as the class to dispatch into. This case is where sharing the queue also comes in handy.

It is also useful with modules that depend on autoload side effects to function.

Groups (sub-schedules) "< ... >"

Groups are schedules within the schedule:

        group : job1

        group < job2 job3 job4 : job5 job6 >
        group < job3 : job5 >

Groups provide a separate namespace for their jobs, attributes, and aliases and are skipped wholesale on restart.

The most common use of groups is to process a set of data through multiple stages. This can be done easily by putting each stage in a group, assigning a hardwired alias within the group, and adding the data. For example:

    my @fixed =
    (
        'maxjob % 4',
        'prior  < alias % /path/to/gzip -dv >',
        'during < alias % process >',
        'after  < alias % /path/to/gzip -9v >',

        'during : prior',
        'after  : during',
    );

    my @variable
    = map
    {
        (
            "prior  < $_ : >",
            "during < $_ : >",
            "after  < $_ : >",
        )
    }
    glob '/tmp/incoming/*';

    $mgr->prepare( @fixed, indir @variable );
    $mgr->execute;

Will run all of the jobs 4-way parallel through gzip -d, the process method, and gzip -9.

On a restart if the "prior" group completed then it will be stubbed on restart and none of its contents run. This helps avoid expensive side- effects in preparing the jobs.

Rolling Your Own

The unalias method returns an id string ($name) for tracking the job and a closure to execute for running it:

        my ( $name, $sub )  = $mgr->unalias( 'job1' );

        my $return = $mgr->runjob( $sub );

The default runjob simply dispatchs $sub->() but it might be overloaded to wrap, eval, or otherwise manage the execution. Overloading unalias gives you complete control over how the jobs are processed. This can be helpful for cases where the method + argument is not sufficient.

Setting Schedule Attributes

Settings are used to override defaults in the schedule preparation. Defaults are taken from hard-coded defaults in S::D, parameters passed into prepare as arguments, or the parent que's attributes for sub-queues or groups. Settings use '%' to spearate the attribute and its value (mainly because '=' was already used for aliases):

        verbose % 1
        maxjob  % 1

The main use of these in top-level schedules is as an alternative to argument passing. In sub-queues they are the only way to override the que attributes. One good example of these is setting maxjob to 2-3 in order to allow multiple groups to start in the main schedule and then to 1 in the groups to avoid flooding the system.

Groups inherit the attributes of their enclosing group; everyone inherits from the the global settings.

For example:

    maxjob % 4

    expensive < maxjob % 2 >
    cheap     < maxjob % 8 >

Leaves most things runnin 4-way parallel, with jobs in the "expensive" group throttled, ones in "cheap" run 8-ways.

Sometimes attributes need to be set for one job. This is usually to up verbosity to check a failure or to flag the job as installing an ad-hoc schedule:

    alias % frobnicate

    /input/path = check_filesystem

    /input/path ~ ad_hoc

    /input/path : extract
    cleanup : /input/path

Will run "find_files" without forking so that it can call $mgr->ad_hoc( ... ) to install a scheudle on the fly. Both "extract" and "cleanup" will be passed to "frobnicate", where the input path will be passed to "check_filesystem".

The "check_filesystem" call will be made without forking. This allows the method to install jobs for files it finds. This also creates a group "check_filesystem._" to contain the results of the ad-hoc schedule (see ad_hoc method).

Ad-Hoc schedules

There are times when you want a job to run in order to determine what happens next. Examples are searching the filesystem for inputs or processing the chunked output a previous stage. In both caess it is easier to have a job add to the running schedule.

The "ad_hoc" call adds jobs into the current schedule. For this to work the current job cannot be forked. This means that either the entire queue runs with "nofork" set or that the job is flagged as "ad_hoc". The difference is that ad_hoc is a flag for only one job:

    /input/dir ~ ad_hoc

    /input/dir = find_files

    /input/dir : download
    load_data  : /input/dir

This will fork download, wait until it completes, then dispatch find_files without forking. When the sub-schedule installed by find_files completes then load_data will be called.

The ad_hoc call installs a new group 'job._', which can contain its own attributes, aliases, or groups:

    sub find_files
    {
        my ( $mgr, $path ) = @_;    # $path is '/input/dir'

        my @jobz
        = map
        {
            my $base    = basename $_;
            my ( $grp ) = $base =~ m{ [.] (\w+) [.]gz };
            o
        }
        or die "Invalid input: '$path' has no files";
    }

Arguments

sched

The schedule can be passed as a single argument (string or reference) or with the "depend" key as a hash value:

        sched => [ schedule as seprate lines in an array ]

        sched => "newline delimited schedule, one item per line";

Or can be passed a hash of configuration information with the required key "sched" having a value of the schedule scalar described above.

The dependencies are described much like a Makefile, with targets waiting for other jobs to complete on the left and the dependencies on the right. Schedule lines can have single dependencies like:

        waits_for : depends_on

or multiple dependencies:

        wait1 wait2 : dep1 dep2 dep3

or no dependencies:

        runs_immediately :

Jobs on the righthand side of the dependency ("depends_on" or "dep1 dep2 dep3", above) will automatically be added to the list of runnable jobs. This avoids having to add speical rules for them.

Dependencies without a wait_for argument are an error (e.g., ": foo" will croak during prepare).

It is also possible to alias job strings:

        foo = /usr/bin/find -type f -name 'core' | xargs rm -f

        ...

        foo : bar

        ...

will wait until bar has finished, unalias foo to the command string and pass the expanded version wholesale to the system command. Aliases can include fully qualified perl subroutines (e.g., " Foo::Bar::subname") or methods accessable via the $que object (e.g., "subname"), code blocks (e.g., "{returns_nonzero; 0}". If no subroutine, method or perl block can be extracted from the alias then it is passed to the shell for execution via the shellexec method.

If the schedule entry requires newlines (e.g., for better display of long dependency lists) newlines can be embedded in it if the schedule is passed into prepare as an array reference:

        my $sched =
        [
                "foo =  bar
                                bletch
                                blort
                ",
        ];

        ...

        Parallel::Depend->prepare( sched => $sched ... );
        Parallel::Depend->prepare( sched => $sched ... );

will handle the extra whitespace properly. Multi-line dependencies, aliases or groups are not allowed if the schedule is passed in as a string.

One special alias is "group". This is a standard method used to handle grouped jobs as a sub-que. Groups are assigned using the '~' character and by having the group name aliased to group. This guarantees that the jobs do not start until the group is ready and that anything the group depends on will not be run until all of the group jobs have completd.

For example:

        # main schedule has unlimited number of concurrent jobs.

        maxjob % 0

        name ~ job1 job2 job3

        gname : startup

        # optional, default for handling groups is to alias
        # them to the $mgr->group method.

        gname = group

        gname : startup

        # the group runs single-file, with maxjob set to 1

        gname < maxjob % 1 >
        gname < job1 job2 job3 >

        shutdown : gname

Will run job[123] together after "startup" completes and will cause "shutdwon" to wait until all of them have finished.

See the "Schedules" section for more details.

verbose

Turns on verbose execution for preparation and execution.

All output controlled by verbosity is output to STDOUT; errors, roadkill, etc, are written to STDERR.

verbose == 0 only displays a few fixed preparation and execution messages. This is mainly intended for production system with large numbers of jobs where searching a large output would be troublesome.

verbose == 1 displays the input schedule contents during preparation and fork/reap messages as jobs are started.

verbose == 2 is intended for monitoring automatically generated queues and debugging new schedules. It displays the input lines as they are processed, forks/reaps, exit status and results of unalias calls before the jobs are exec-ed.

verbose can also be specified in the schedule, with schedule settings overriding the args. If no verbose setting is made then debug runs w/ verobse == 1, non-debug execution with verbose == 0.

Also "verbose % X" in the schedule, with X as the new verbosity.

validate

Runs the full prepare but does not fork any jobs, pidfiles get a "Debugging $job" entry in them and an exit of 1. This can be used to test the schedule or debug side-effects of overloaded methods. See also: verbose, above.

rundir & logdir

These are where the pidfiles and stdout/stderr of forked jobs are placed, along with stdout (i.e., verbose) messages from the que object itself.

These can be supplied via the schedule using aliases "rundir" and "logdir". Lacking any input from the schedule or arguments all output goes into the #! file's directory (see FindBin(1)).

Note: The last option is handy for running code via soft link w/o having to provide the arguments each time. The RBTMU.pm module in examples can be used in a single #! file, soft linked in to any number of directories with various .tmu files and then run to load the varoius groups of files.

maxjob

This is the maximum number of concurrnet processs that will be run at any one time during the que. If more jobs are runnable than process slots then jobs will be started in lexical order by their name until no slots are left.

restart, noabort

These control the execution by skipping jobs that have completed or depend on those that have failed.

The restart option scans pidfiles for jobs which have a zero exit in them, these are marked for skipping on the next pass. It also ignores zero-sized pidfiles to allow for restarts without having to remove the initail pidfiles created automatically in prepare.

The noabort option causes execution to behave much like "make -k": instead of aborting completely on a non-zero exit the execution will complete any jobs that do not depend on the failed job.

Combining noabort with restart can help debug new schedules or handle balky ones that require multiple restarts.

These can be given any true value; the default for both is false.

Also: "maxjob % X" in the schedule with X as the maximum number of concurrent jobs.

Note on schedule arguments and aliases

verbose, debug, rundir, logdir, and maxjob can all be supplied via arguments or within the scheule as aliases (e.g., "maxjob = 2" as a scheule entry). Entries hard- coded into the schedule override those supplied via the arguments. This was done mainly so that maxjob could be used in test schedules without risk of accidentally bringing a system to its knees during testing. Setting debug in this way can help during testing; setting verbose to 0 on automatically generated queues with thousands of entries can also be a big help.

Hard-coding "restart" would require either a new directory for each new execution of the schedule or explicit cleanup of the pidfiles (either by hand or a final job in the schedule).

Hard-codding "noabort" is probably harmless.

Hard-coding "debug" will effectively disable any real execution of the que.

Note for debugging

$que->{attrib} contains the current que settings. Its contents should probably not be modified but displaying it (e.g., via Dumper $que->{attrib})" can be helpful in debgging que behavior.

Description

Parallel scheduler with simplified make syntax for job dependencies and substitutions. Like make, targets have dependencies that must be completed before the can be run. Unlike make there are no statements for the targets, the targets are themselves executables.

The use of pidfiles with status information allows running the queue in "restart" mode. This skips any jobs with zero exit status in their pidfiles, stops and re-runs or waits for any running jobs and launches anything that wasn't started. This should allow a schedule to be re-run with a minimum of overhead.

The pidfile serves three purposes:

Restarts
        On restart any leftover pidfiles with
        a zero exit status in them can be skipped.
Waiting
        Any process used to monitor the result of
        a job can simply perform a blocking I/O to
        for the exit status to know when the job
        has completed. This avoids the monitoring
        system having to poll the status.
Tracking
        Tracking the empty pidfiles gives a list of
        the pending jobs. This is mainly useful with
        large queues where running in verbose mode
        would generate execesive output.

Each job is executed via fork/exec (or sub call, see notes for unalias and runjob). The parent writes out a pidfile with initially two lines: pid and command line. It then closes the pidfile. The child keeps the file open and writes its exit status to the file if the job completes; the parent writes the returned status to the file also. This makes it rather hard to "loose" the completion and force an abort on restart.

Schedules

The configuration syntax is make-like. The two sections give aliases and the schedule itself. Aliases and targets look like make rules:

        target = expands_to

        target : dependency

example:

        a = /somedir/abjob.ksh
        b = /somedir/another.ksh
        c = /somedir/loader

        a : /somedir/startup.ksh
        b : /somedir/startup.ksh

        c : a b

        /somedir/validate : a b c

Will use the various path expansions for "a", "b" and "c" in the targets and rules, running /somedir/abjob.ksh only after /somedir/startup.ksh has exited zero, the same for /somedir/another.ksh. The file /somedir/loader gets run only after both abjob.ksh and another.ksh are done with and the validate program gets run only after all of the other three are done with.

A job can be assigned a single alias, which must be on a single line of the input schedule (or a single row in schedleds passed in as arrays). The alias is expanded at runtime to determine what gets dispatched for the job.

The main uses of aliases would be to simplify re-use of scripts. One example is the case where the same code gets run multiple times with different arguments:

        # comments are introduced by '#', as usual.
        # blank lines are also ignored.

        a = /somedir/process 1  # process is called with various arg's
        b = /somedir/process 2
        c = /somedir/process 3
        d = /somedir/process 4
        e = /somedir/process 5
        f = /somedir/process 6

        a : /otherdir/startup   # startup.ksh isn't aliased
        b : /otherdir/startup
        c : /otherdir/startup

        d : a b
        e : b c
        f : d e

        cleanup : a b c d e f

Would allow any variety of arguments to be run for the a-f code simply by changing the aliases, the dependencies remain the same.

If the alias for a job is a perl subroutine call then the job tag is passed to it as the single argument. This simplifies the re-use above to:

        file1.gz = loadfile
        file1.gz = loadfile
        file1.gz = loadfile

        file1.gz file2.gz file3.gz : /some/dir/download_files

Will call $mgr->loadfile passing it "file1.gz" and so on for each of the files listed -- afte the download_files script exits cleanly.

Another example is a case of loading fact tables after the dimensions complete:

        fact1   = loadfile
        fact2   = loadfile
        fact3   = loadfile
        dim1    = loadfile
        dim2    = loadfile
        dim3    = loadfile

        fact1 fact2 fact3 : dim1 dim2 dim3

Would load all of the dimensions at once and the facts afterward. Note that stub entries are not required for the dimensions, they are added as runnable jobs when the rule is read. The rules above could also have been stated as:

        fact1 fact2 fact3 dim1 dim2 dim3 : loadfile

        fact1 fact2 fact3 : dim1 dim2 dim3

The difference is entirely one if artistic taste for a scalar schedule. If the schedule is passed in as an array reference then it will usually be easier to push dependnecies on one-by-one rather than building them as longer lines.

Single-line code blocks can also be used as aliases. One use of these is to wrap legacy code that returns non-zero on success:

        a = { ! returns1; }

or

        a = { eval{returns1}; $@ ? 1 : 0 }

to reverse the return value or pass non-zero if the job died. The blocks can also be used for simple dispatch logic:

        a = { $::switchvar ? subone("a") : subtwo("a") }

allows the global $::switchvar to decide if subone or subtwo is passed the argument. Note that the global is required since the dispatch will be made within the Parallel::Depend package. the Parallel::Depend package.

Altering the package for subroutines that depend on package lexicals can also be handled using a block:

        a = { package MyPackage; somesub }

Another alias is "PHONY", which is used for placeholder jobs. These are unaliased to sub{0} and are indended to simplify grouping of jobs in the schedule:

        waitfor = PHONY

        waitfor : job1
        waitfor : job2
        waitfor : job3
        waitfor : job4

        job5 job6 job7 : waitfor

will generate a stub that immediately returns zero for the "waitfor" job. This allows the remaining jobs to be hard coded -- or the job1-4 strings to be long file paths -- without having to generate huge lines or dynamicaly build the job5-7 line.

One example of phony jobs simplifying schedule generation is loading of arbitrary files. A final step bringing the database online for users could be coded as:

        online : loads

with lines for the loads added one by one as the files are found:

        push @schedule, "loads : $path", "path = loadfile";

could call a subroutine "loadfile" for each of the paths without the "online" operation needing to be udpated for each path found.

The other standard alias is "STUB". This simply prints out the job name and is intended for development where tracking schedule execution is useful. Jobs aliased to "STUB" return a closure "sub{print $job; 0}" and an id string of the job tag.

In many cases PHONY jobs work but become overly verbose. The usual cause is that a large number of jobs are tied together at both the beginning and ending stages, causing double-entries for each one, for example:

        job1 : startup
        job2 : startup
        job3 : startup
        ...
        jobN : startup

        shutdown : job1
        shutdwon : job2
        shutdwon : job3
        shutdown : jobN

Even if the jobs are listed on a single line each, double listing is a frequent source of errors. Groups are designed to avoid most of this diffuculty. Jobs in a group have an implicit starting and ending since they are only run within the group. For example if the jobs above were in a group;

        middle = group                  # alias is optional

        middle < job1 : job2 >
        middle < job3 : job2 >

        middle : startup
        shutdown : middle

This will wait until the "middle" job becomes runnble (i.e., when startup has finished) and will prepare the schedule contained in the angle-brackets. The entire schedule is prepared and executed after the middle job has forked and uses a local copy of the queued jobs and dependencies. This allows the "middle" group to contain a complete schedule -- complete with sub-sub- schedules if necessary.

The normal method for handling group names is the "group" method. If the group name has not already been aliased when the group is parsed then it will be aliased to "group". This allows another method to handle dispatching the jobs if necessary (e.g., one that uses a separate run or log directory).

It is important to note that the schedule defined by a group is run seprately from the main schedule in a forked process. This localizes any changes to the que object and effects on jobs skipped, etc. It also means that the group's schedule should not have any dependencies outside of the group or it will deadlock (and so may the main schedule).

Note: Group names should be simple tags, and must avoid '=' and ':' characers in the job name in order to be parsed properly.

Overloading unalias for special job expansion.

Up to this point all of the schedule processing has been handled automatically. There may be cases where specialized processing of the jobs may be simpler. One example is where the "jobs" are known to be data files being loaded into a database, another is there the subroutine calls must come from an object other than the que itself.

In this case the unalias or runjob methods can be overloaded. Because runjob will automatically handle calling subroutines within perl vs. passing strings to the shell, most of the overloading can be done in unalias.

If unalias returns a code reference then it will be used to execute the code. One way to handle file processing for, say, rb_tmu loading dimension files before facts would be a schedule like:

        dim1 = tmu_loader
        dim2 = tmu_loader
        dim3 = tmu_loader
        fact1 = tmu_loader
        fact2 = tmu_loader

        fact2 fact1 : dim1 dim2 dim3

This would call $mgr->tmu_loader( 'dim1' ), etc, allowing the jobs to be paths to files that need to be loaded.

The problem with this approach is that the file names can change for each run, requiring more complicated code.

In this case it may be easier to overload the unalias method to process file names for itself. This might lead to the schedule:

        fact2 fact1 : dim1 dim2 dim3

and nothing more with

                -e $tmufile or croak "$$: Missing: $tmufile";

                # unzip zipped files, otherwise just redrect them

                my $cmd = $datapath =~ /.gz$/ ?
                        "gzip -dc $datapath | rb_ptmu $tmufile \$RB_USER" :
                        "rb_tmu $tmufile \$RB_USER < $datapath"
                ;

                # caller gets back an id string of the file
                # (could be the command but that can get a bit
                # long) and the closure that deals with the
                # string itself.

                ( $datapath, sub { shellexec $cmd } };
        }

In this case all the schedule needs to contain are paths to the data files being loaded. The unalias method deals with all of the rest at runtime.

Aside: This can be easily implemented by way of a simple convention and one soft link. The tmu (or sqlldr) config. files for each group of files can be placed in a single directory, along with a soft link to the #! code that performs the load. The shell code can then use '.' for locating new data files and "dirname $0" to locate the loader configuations. Given any reasonable naming convention for the data and loader files this allows a single executable to handle mutiple data groups -- even multiple loaders -- realtively simply.

Since code references are processed within perl this will not be passed to the shell. It will be run in the forked process, with the return value of tmuload_method being passed back to the parent process.

Using an if-ladder various subroutines can be chosen from when the job is unaliased (in the parent) or in the subroutine called (in the child).

Aliases can pass shell variables.

Since the executed code is fork-execed it can contain any useful environment variables also:

        a = process --seq 1 --foo=$BAR

will interpolate $BAR at fork-time in the child process (i.e.. by the shell handling the exec portion).

The scheduling module exports modules for managing the preparation, validation and execution of schedule objects. Since these are separated they can be manipulated by the caller as necessary.

One example would be to read in a set of schedules, run the first one to completion, modify the second one based on the output of the first. This might happen when jobs are used to load data that is not always present. The first schedule would run the data extract/import/tally graphs. Code could then check if the tally shows any work for the intermittant data and stub out the processing of it by aliasing the job to "/bin/true":

        /somedir/somejob.ksh = /bin/true

        prepare = /somedir/extract.ksh

        load = /somedir/batchload.ksh


        /somedir/somejob.ksh : prepare
        /somedir/ajob.ksh : prepare
        /somedir/bjob.ksh : prepare

        load : /somedir/somejob.ksh /somedir/ajob.ksh /somedir/bjob.ksh

In this case /somedir/somejob.ksh will be stubbed to exit zero immediately. This will not interfere with any of the scheduling patterns, just reduce any dealays in the schedule.

Note on calling convention for closures from unalias.

        $sub = unalias $job;

The former is printed for error and log messages, the latter is executed via &$sub in the child process.

The default closures vary somewhat in the arguments they are passed for handling the job and how they are called:

        $run = sub { $sub->( $job ) };                          $package->can( $subname )

        $run = sub { $que->$sub( $job ) };                      $mgr->can( $run )

        $run = sub { __PACKAGE__->$sub( $job ) };       __PACKAGE__->can( $run )

        $run = eval "sub $block";                                       allows perl block code.

The first case comes up because Foo::bar in a schedule is unlikey to successfully process any package arguments. The __PACKAGE__ situation is only going to show up in cases where execute has been overloaded, and the subroutines may need to know which package context they were unaliased.

The first case can be configured to pass the package in by changing it to:

        $run = sub { $packge->$sub( $job ) };

This will pass the package as $_[0].

The first test is necessary because:

        $object->can( 'Foo::bar' )

alwyas returns \&Foo::bar, which called as $que->$sub puts a stringified version of the object into $_[0], and getting something like "2/8" is unlikely to be useful as an argument.

The last is mainly designed to handle subroutines that have multiple arguments which need to be computed at runtime:

        foo = { do_this( $dir, $blah); do_that }

or when scheduling legacy code that might not exit zero on its own:

        foo = { some_old_sub(@argz); 0 }

The exit from the block will be used for the non-zero exit status test in the parent when the job is run.

Notes on methods

Summary by subroutine call, with notes on overloading and general use.

ready

Return a list of what is runnable in the queue. these will be any queued jobs which have no keys in their queued subhash. e.g., the schedule entry

        "foo : bar"

leaves

        $queued->{foo}{bar} = 1.

foo will not be ready to excute until keys %{$queued->{foo}} is false (i.e., $queued->{foo}{bar} is deleted in the completed module).

This is used in two places: as a sanity check of the schedule after the input is complete and in the main scheduling loop.

If this is not true when we are done reading the configuration then the schedule is bogus.

Overloading this might allow some extra control over priority where maxjob is set by modifying the sort to include a priority (e.g., number of waiting jobs).

queued, depend

queued hands back the keys of the que's "queued" hash. This is the list of jobs which are waiting to run. The keys are sorted lexically togive a consistent return value.

depend hands back the keys of que's "depend" hash for a particular job. This is a list of the jobs that depend on the job.

Only reason to overload these would be in a multi-stage system where one queue depends on another. It may be useful to prune the second queue if something abnormal happens in the first (sort of like make -k continuing to compile).

Trick would be for the caller to use something like:

        $q1->dequeue( $_ ) for $q0->depend( $job_that_failed );

        croak "Nothing left to run" unless $q1;

note that the sort allows for priority among tags when the number of jobs is limited via maxjob. Jobs can be given tags like "00_", "01_" or "aa_", with hotter jobs getting lexically lower tag values.

dequeue

Once a job has been started it needs to be removed from the queue immediately. This is necessary because the queue may be checked any number of times while the job is still running.

For the golf-inclined this reduces to

        delete $_[0]->{queued}{$_[1]}

for now this looks prettier.

Compare this to the complete method which is run after the job completes and deals with pidfile and cleanup issues.

complete

Deal with job completion. Internal tasks are to update the dependencies, external cleanups (e.g., zipping files) can be handled by adding a "cleanup" method to the queue.

Thing here is to find all the jobs that depend on whatever just got done and remove their dependency on this job.

$depend->{$job} was built in the constructor via:

                push @{ $depend->{$_} }, $job for @dependz;

Which assembles an array of what depeneds on this job. Here we just delete from the queued entries anything that depends on this job. After this is done the runnable jobs will have no dependencies (i.e., keys %{$q{queued}{$job} will be an empty list).

A "cleanup" can be added for post-processing (e.g., gzip-ing processed data files or unlinking scratch files). It will be called with the que and job string being cleaned up after.

unalias, runjob

unalias is passed a single argument of a job tag and returns two items: a string used to identify the job and a closure that executes it. The string is used for all log and error messages; the closure executed via "&$sub" in the child process.

The default runjob accepts a scalar to be executed and dispatches it via "&$run". This is broken out as a separate method purely for overloading (e.g., for even later binding due to mod's in unalias).

For the most part, closures should be capable of encapsulating any logic necessary so that changes to this subroutine will not be necessary.

precheck

Isolate the steps of managing the pidfiles and checking for a running job.

This varies enough between operating systems that it'll make for less hacking if this is in one place or can be overridden.

This returns true if the pidfile contains the pid for a running job. depending on the operating system this can also check if the pid is a copy of this job running.

If the pid's have simply wrapped then someone will have to clean this up by hand. Problem is that on Solaris (at least through 2.7) there isn't any good way to check the command line in /proc.

On HP it's worse, since there isn't any /proc/pid. there we need to use a process module or parse ps.

On solaris the /proc directory helps:

        croak "$$: job $job is already running: /proc/$dir"
                if( -e "/proc/$pid" );}

but all we can really check is that the pid is running, not that it is our job.

On linux we can also check the command line to be sure the pid hasn't wrapped and been re-used (not all that far fetched on a system with 30K blast searches a day for example).

Catch: If we zero the pidfile here then $q->validate->execute fails because the file is open for append during the execution and we get two sets of pid entries. The empty pidfiles are useful however, and are a good check for writability.

Fix: deal with it via if block in execute.

prepare

Read the schedule and generate a queue from it.

Lines arrive as:

        job = alias expansion of job

or

        job : depend on other jobs

any '#' and all text after it on a line are stripped, regardless of quotes or backslashes and blank lines are ignored.

Basic sanity checks are that none of the jobs is currently running, no job depends on istelf to start and there is at least one job which is inidially runnable (i.e., has no dependencies).

Caller gets back a blessed object w/ sufficient info to actually run the scheduled jobs.

The only reason for overloading this would be to add some boilerplate to the parser. The one here is sufficient for the default grammar, with only aliases and dependencies of single-word tags.

Note: the "ref $proto || $proto" trick allows this to be used as a method in some derived class. in that case the caller will get back an object blessed into the same class as the calling object. This simplifies daisy-chaining the construction and saves the deriving class from having to duplicate all of this code in most cases.

Alternate uses for S::D::unalias

This can be used as the basis for a general-purpose dispatcher. For example, Schedule::Cron passes the command line directly to the scheduler. Something like:

        package Foo;

        use Schedule::Cron;
        use Parallel::Depend;
        use Parallel::Depend;

        sub dispatcher
        {
                my $cmd = shift;

                if( my ( $name, $sub ) = Parallel::Depend->unalias($cmd) )
                if( my ( $name, $sub ) = Parallel::Depend->unalias($cmd) )
                {
                        print "$$: Dispatching $name";

                        &$sub;
                }
        }

permits cron lines to include shell paths, perl subs or blocks:

        * * * * *       Some::Module::subname
        * * * * *       { this block gets run  also }
        * * * * *       methodname

This works in part because unalias does a check for its first argument being a refernce or not before attempting to unalias it. If a blessed item has an "unalias" hash within it then that will be used to unalias the job strings:

        use base qw( Parallel::Depend );
        use base qw( Parallel::Depend );

        my $blessificant = bless { alias => { foo => 'bar' } }, __PACKAGE__;

        my ( $string, $sub ) = $blessificant->unalias( $job );

will return a subroutine that uses the aliased strings to find method names, etc.

debug

Stub out the execution, used to check if the queue will complete. Basic trick is to make a copy of the object and then run the que with "norun" set.

This uses Dumper to get a deep copy of the object so that the original queue isn't consumed by the debug process, which saves having to prepare the schedule twice to debug then execute it.

two simplest uses are:

        if( my $que = S::D->prepare( @blah )->validate ) {...}

or

        eval { S::D->prepare( @blah )->debug->execute }

depending on your taste in error handling.

execute

Actually do the deed. There is no reason to overload this that I can think of.

group

This is passed a group name via aliasing the group in a schedle, for example:

    dims    = group # alias added automatically
    facts   = group # alias added automatically

    dims    < dim1 dim2 dim3 : >
    facts   < fact1 fact2 : >

    facts : dims

will call $mgr->group( 'dims' ) first then $mgr->group( 'facts' ).

Known Bugs/Features

The block-eval of code can yield all sorts of oddities if the block has side effects (e.g., exit()). The one- line format also imposes some strict limits on blocks for now unless the schedule is passed in as an arrayref.

Dependencies between jobs in separate groups is not yet supported due to validation issues. The call to prepare will mangle dependencies between jobs to keep the groups in order but you cannot have a job in one group depened explicitly in any job in another group -- this includes nested groups.

Author

Steven Lembark, Workhorse Computing lembark@wrkhors.com

Copyright

(C) 2001-2009 Steven Lembark, Workhorse Computing

This code is released under the same terms as Perl istelf. Please see the Perl-5.10 distribution (or later) for a full description.

In any case, this code is release as-is, with no implied warranty of fitness for a particular purpose or warranty of merchantability.

See Also

perl(1)

perlobj(1) perlfork(1) perlreftut(1)

Other scheduling modules:

Parallel::Queue(1) Schedule::Cron(1)

3 POD Errors

The following errors were encountered while parsing the POD:

Around line 2822:

You forgot a '=back' before '=head2'

Around line 2829:

=back without =over

Around line 3308:

Unknown directive: =head