The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
#######################################################################
#   $Header$
#######################################################################
#  $HeadURL$
#   $Source$
#     $Date$
#   $Author$
# $Revision$
#######################################################################
package SNMP::Query::Asynch;

# Pragmas
use strict;
use warnings;

# Standard
use Carp;

# Cpan
use SNMP;

# See chap 17, pg. 404, PBP (Conway 2005)
use version; our $VERSION = qv('0.1_21');


# This comes in handy so we don't pass bogus
# parameters to SNMP::Session->new()
use vars qw(@valid_sess_params);
@valid_sess_params = qw(
      DestHost
      Community
      Version
      RemotePort
      Timeout
      Retries
      RetryNoSuch
      SecName
      SecLevel
      SecEngineId
      ContextEngineId
      Context
      AuthProto
      AuthPass
      PrivProto
      PrivPass
      AuthMasterKey
      PrivMasterKey
      AuthLocalizedKey
      PrivLocalizedKey
      VarFormats
      TypeFormats
      UseLongNames
      UseSprintValue
      UseEnums
      UseNumeric
      BestGuess
      NonIncreasing
    );

#----------------------------------------------------------
# Constructor
sub new {
    my $class = shift;
    my $self = bless {}, $class;

    $self->{query_stack}          = [];
    $self->{results}              = [];
    $self->{max_in_flight}        = 10;
    $self->{current_in_flight}    = 0;

    $self->{this_run_issued}      = 0;
    $self->{this_run_finished}    = 0;

    $self->{grand_total_issued}   = 0;
    $self->{grand_total_finished} = 0;

    return $self;
}

#---------------------------------------------------------

# Verifies that the named parameter is a subref
sub _check_param_callback {
    my $self       = shift;
    my $param_name = shift;    # string, hash key
    my $params     = shift;    # hashref
    return 1 unless exists $params->{$param_name};
    croak "Bad parameter for [$param_name] - not a CODE ref"
      if ref $params->{$param_name} ne 'CODE';
    return 1;
}

#---------------------------------------------------------

# TODO Fill in the code and use in the add_XXX() or _make_XXX_query() methods
# Verifies that the named parameter is something the SNMP
# module can use as a VarBind or VarBindList.
sub _check_param_varbinds {
    my $self       = shift;
    my $param_name = shift;    # string, hash key
    my $params     = shift;    # hashref
    return;
}

#---------------------------------------------------------

sub add_getbulk {
    my $self   = shift;
    my $params = shift;

    my $query_stack = $self->{query_stack};

    # These are required for all query ops so make sure they're present.
    my $varbinds = $params->{VarBinds}
      || croak "Bad or missing parameter [VarBinds]";
    my $query_type = $params->{QueryType}
      || croak "Bad or missing parameter [QueryType]";

    # Make sure our callback params are valid.
    $self->_check_param_callback( 'PreCallback',   $params );
    $self->_check_param_callback( 'PostCallback',  $params );

    if ( $query_type eq 'getbulk' ) {
        my $query       = $self->_make_getbulk_query($params);
        my $query_stack = $self->{query_stack};
        push @$query_stack, $query;
    }
    else {
        croak "Attempt to add using unknown query type: $query_type\n";
    }

    return;
}


#---------------------------------------------------------

# NOTE This method has gotten long and complex. 
# TODO Refactor, but think carefully. 
#
# I see a couple of places where I can create closure-based memory leaks, and
# knowing myself, I could end up doing it inadvertenly at any time. It's code
# like this where some more experienced hackers could help me *big time*
sub _make_getbulk_query {
    my $self = shift;
    my $query_info = shift;

    # These params are required for a getbulk query op.
    my $non_repeaters =
      exists $query_info->{NonRepeaters}
      ? $query_info->{NonRepeaters}
      : croak "Bad or missing parameter [NonRepeaters]";
    my $max_repeaters =
      exists $query_info->{MaxRepeaters}
      ? $query_info->{MaxRepeaters}
      : croak "Bad or missing parameter [MaxRepeaters]";

    # Currently, these are validated in the add() method, so no need here.
    my $preop_callback  = $query_info->{PreCallback};
    my $postop_callback = $query_info->{PostCallback};
    my $batch_callback  = $query_info->{BatchCallback};

    # TODO I may want to add a method to validate
    # and/or transform the VarBinds parameter
    my $varbinds = $query_info->{VarBinds};

    # Parse out the parameters for creating the session.
    # I really think I should be validating them better here...
    # Maybe I need a separate subroutine...
    # TODO write the routine described above.
    my %sess_params;
    $sess_params{$_} = $query_info->{$_}
      for grep { exists $query_info->{$_} } @valid_sess_params;

    my $batch_size = $query_info->{BatchSize};

    return sub {

        # I wonder if this should be before or after the counter increments?
        $preop_callback->() if $preop_callback;

        $self->{current_in_flight}++;
        $self->{this_run_issued}++;
        $self->{grand_total_issued}++;

        my $callback = sub {

            my $bulkwalk_results = shift;

            # Store the results and info about the query for later...
            push @{ $self->{results} }, $query_info, $bulkwalk_results;

            # NOTE should I do this callback here, or somewhere else? hmmmmm....
            $postop_callback->() if $postop_callback;
            
            # Am I introducing a bug with the increments 
            # here and the SNMP::finish() down below?
            $self->{current_in_flight}--;
            $self->{this_run_finished}++;
            $self->{grand_total_finished}++;

            $self->{BatchCallback}()
              if $self->{BatchSize}
                  && ( $self->{this_run_finished} % $self->{BatchSize} == 0 );

            if ( scalar @{ $self->{query_stack} } ) {
                my $next_query = pop @{ $self->{query_stack} };
                return $next_query->();
            }
            $self->{current_in_flight} <= 0 ? return SNMP::finish() : return 1;
        };

        my $sess = SNMP::Session->new(%sess_params);

        my $operation_id = $sess->bulkwalk( $non_repeaters, $max_repeaters,
                                            $varbinds, [$callback] );
                                            
        # Why, yes. I am being sneaky here. Since $query_info is referenced
        # within the $callback closure, any data stored in the hash it points
        # to should be available, regardless of whether is was stored before 
        # or after the closure definition.
        $query_info->{OperationId} = $operation_id;
        
        return; # No need to return anything, AFAICT.
    };
}

#---------------------------------------------------------



sub current_in_flight    { return shift->{current_in_flight} }
sub this_run_issued      { return shift->{this_run_issued} }
sub this_run_finished    { return shift->{this_run_finished} }
sub grand_total_issued   { return shift->{grand_total_issued} }
sub grand_total_finished { return shift->{grand_total_finished} }

sub shuffle { } # TODO Implement with List::Util::shuffle

#---------------------------------------------------------

sub execute {
    my $self   = shift;
    my $params = shift;

    # The KeepLast option can come in handy if, for example, another
    # thread or process is working on the contents of the results
    # array from a previous execution and may not finish before the
    # next execution.
    @{ $self->{results} } = ()
      unless (

        # I'll make my OWN idioms from now on, HAHA! (You can explicitly
        # set keeplast or it will use the object's default)
        defined $params->{KeepLast} ? $params->{KeepLast} : $self->{KeepLast}
      );

    # Install 'batch' callback if applicable...
    $self->{BatchCallback} = $params->{BatchCallback}
        if $self->_check_param_callback( 'BatchCallback', $params );
    $self->{BatchSize} = $params->{BatchSize};
    

    # Determine our maximum concurrency level for this run
    my $max_in_flight = $params->{InFlight} || $self->{max_in_flight};

    # Make a copy of the stack in case we want to run the same query
    my $query_stack_ref  = $self->{query_stack};
    my @query_stack_copy = @{ $self->{query_stack} };

    # Set some counters
    $self->{current_in_flight} = 0;
    $self->{this_run_issued}   = 0;
    $self->{this_run_finished} = 0;

    # Begin issuing operations
    while ( scalar @$query_stack_ref ) {
        my $query = pop @$query_stack_ref;
        $query->();
        last if $self->{current_in_flight} >= $max_in_flight;
    }

    # Wait for the ops to complete, or time-out (if specified)
    $params->{MasterTimeout}
      ? SNMP::MainLoop( $params->{MasterTimeout}, &SNMP::finish() )
      : SNMP::MainLoop();

    # Reset the stack for the next run.
    $self->{query_stack} = \@query_stack_copy;

    return $self->get_results_ref();
}

#---------------------------------------------------------

#  Returns a reference to the results array from executing the query.
sub get_results_ref {
    return shift->{results};
}

1;
__END__

=pod

=head1 NAME

SNMP::Query::Asynch - Fast asynchronous execution of batches of SNMP queries

=head1 VERSION

Version 0.01

=head1 SYNOPSIS

 use SNMP::Query::Asynch;
  
 my @varbinds = qw(
        ifDescr ifInOctets ifOutOctets ifAlias ifType
        ifName  ifInErrors ifOutErrors ifSpeed
        ifAdminStatus      ifOperStatus
    );
 
 my $query = SNMP::Query::Asynch->new();
 
 # You should create and populate @hosts to make this synposis code work. 
 # It's an AoH, fairly simple. For example...
 my @hosts = create_hosts_array('snmp_hosts.csv');
 
 foreach my $host (@hosts) {
     
     # Add a getbulk operation to the queue.
     $query->add_getbulk({            
             # Params passed to directly to SNMP::Session->new()
             DestHost     => $host->{HOSTIP},
             Community    => $host->{COMMUNITY},
             Version      => $host->{SNMPVER},  # getbulk only supports 2 or 3.

             # Params concerning the type of query operation
             # See POD for SNMP::Session->getbulk() in this case.
             MaxRepeaters => 20,
             NonRepeaters => 0, 
 
             # The varbinds to be operated on - can be a reference to anything 
             # supported by the corresponding query operation in SNMP::Session.
             VarBinds     => \@varbinds, 
         });
     
 }
 
 # Execute the queries that were added, get a reference to the results array.
 my $results = $query->execute({ 
         InFlight      => 50, # Simultaneous operations
         MasterTimeout => 60, # Seconds until unfinished operations are aborted.
     });
 
 # See what the results look like.
 use Data::Dumper; 
 print Dumper $results;


=head1 DESCRIPTION

This module allows for a fairly simple, streamlined means of executing large
numbers of SNMP operations as fast as your systems can handle. It extensively
uses net-snmp's asynchronous operation interfaces and callbacks to keep as much
data flowing as you need. 

Perl's support of closures and anonymous subroutines provide the means for 
sophisticated, elegant control of query operations before and after execution. 
There are also facilities to install callbacks that are run after pre-set 
numbers (batches) of operations are completed . 

These callbacks can be used to log progress, update the user, transfer results
from memory to disk (or even another thread or process!) or anything you can
think of! If there's some feature you desire, do not hesitate to ask me!!!

Please be aware - my primary design concern is speed and flexibility. I have 
certain non-scientific, subjective benchmarks I use to decide if some 
modification is worth-while, but so far the design of the internals of this 
module has lent itself to feature additions and enhancements very well. 

=head1 SUBROUTINES/METHODS

=head2 new

Constructs a new query object with an empty queue.

=head2 add_getbulk

Adds a getbulk query operation to the queue.

=head3 Required Parameters SNMP::Session->new()

=over

=item DestHost

=item Community

=item Version

=item RemotePort

=item Timeout

=item Retries

=item RetryNoSuch

=item SecName

=item SecLevel

=item SecEngineId

=item ContextEngineId

=item Context

=item AuthProto

=item AuthPass

=item PrivProto

=item PrivPass

=item AuthMasterKey

=item PrivMasterKey

=item AuthLocalizedKey

=item PrivLocalizedKey

=item VarFormats

=item TypeFormats

=item UseLongNames

=item UseSprintValue

=item UseEnums

=item UseNumeric

=item BestGuess

=item NonIncreasing

=back

=head2 execute

Executes all operations in the queue.

=head2 shuffle

Shuffles the operations in the queue so they are executed in random order.

=head2 current_in_flight

Returns the number of operations currently issued which have not yet 
been completed.

Please note: completed means that the operation was issued and either results 
data or an error condition were recieved. If the operation was interrupted 
before that happens, then is not counted as completed. This typically only 
happens when the call to execute() was interrupted by a fatal error or the
query-object's master timeout was exceeded.

=head2 this_run_issued

Returns the number of operations that have been issued during the current 
execute() call. If called after execute() has completed, returns the number 
issued during the most recent execute() call.

Each call to execute() resets this value to zero.

=head2 this_run_finished

Returns the number of operations that have been completed during the current 
execute() call. If called after execute() has completed, returns the number 
completed during the most recent execute() call.

Each call to execute() resets this value to zero.

=head2 grand_total_issued

Returns the number of operations that have been issued during all calls to
execute() since the query object was created. This value is *never* reset.

=head2 grand_total_finished

Returns the number of operations that have been completed during all calls to
execute() since the query object was created. This value is *never* reset.

=head2 get_results_ref

Returns a reference to the query object's internal results array.

NOTE: Yes, I *know* that providing access to an object's internal data is poor 
OO design, but it gets the job done right now. I do plan on converting the 
results array into it's own type of object with OO-kosher semantics, but only 
if that does not substantially impact overall speed.

That said, use this method with caution because in the future it will likely 
be changed to return something completely different than an array-ref or maybe
even be removed and replaced with a different method. I may also have a moment
of insanity and make the results-object tied so as to look like an array. But
I doubt it unless people ask for it enough.

=head1 EXPORTS

Nothing.

=head1 SEE ALSO

SNMP
SNMP::Effective
SNMP::Multi
MRTG
RTG
YATG

=head1 AUTHOR

Steve Scaffidi, C<< <sscaffidi at cpan.org> >>

=head1 BUGS

Please report any bugs or feature requests to
C<bug-snmp-query-asynch at rt.cpan.org>, or through the web interface at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=SNMP-Query-Asynch>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc SNMP::Query::Asynch

You can also look for information at:

=over 4

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/SNMP-Query-Asynch>

=item * CPAN Ratings

L<http://cpanratings.perl.org/d/SNMP-Query-Asynch>

=item * RT: CPAN's request tracker

L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=SNMP-Query-Asynch>

=item * Search CPAN

L<http://search.cpan.org/dist/SNMP-Query-Asynch>

=back

=head1 ACKNOWLEDGEMENTS

=head1 DEPENDENCIES

=head1 BUGS AND LIMITATIONS

=head1 INCOMPATIBILITIES

=head1 LICENSE AND COPYRIGHT

Copyright 2008 Steve Scaffidi, all rights reserved.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut