The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use Net::NfDump;
use Data::Dumper;
use Net::IP::LPM;
use Geo::IP;
use Socket;
use Socket6;
use Getopt::Std;

# path to ASN database
my $DBDIR       = "/var/db/flowtools";
my $ASN_DB      = $DBDIR.'/asns.db';
my $GEO_DB4     = $DBDIR.'/geo/GeoIP.dat';
my $GEO_DB6     = $DBDIR.'/geo/GeoIPv6.dat';
my $FNAME_TMP   = "/tmp/nf_asn_update_$$.tmp";
my $DEBUG 		= 1;
my $FNAME;
my $VERSION		= '0.04';

# gloval variables  handles to databases
my ($GEO4, $GEO6, $BGP);

# lookup in geodb 
sub geo_lookup_raw {
	my ($addr) = @_;
	my ($code) = undef;

	if ( length($addr) == 4 ) {
		$code =  $GEO4->country_code_by_addr(inet_ntop(AF_INET, $addr));
	} else {
		$code = $GEO6->country_code_by_addr_v6(inet_ntop(AF_INET6, $addr));
	}

	return undef if (!defined($code));

	# convert code to 16 bit number
	my ($a, $b) =  unpack('CC', $code);
	
	return 256 * $a + $b;
}

# parse options 
sub usage() {

	printf "Command updates nfdump file and adds AS ang geoIP information\n";
	printf "Options: \n\n";
	printf " -d <level> : debug level 1 - prints logs output to stdout, 10 - more detailed\n";
	printf " -b update AS numbers (srcas, dstas) \n";
	printf " -g update country code (*xsrcport, *xdstport) \n";
	printf " -a <file>  : overwrite default database ASN file %s \n", $ASN_DB;
	printf " -4 <file>  : overwrite default IPv4 GeoIP database %s \n", $GEO_DB4;
	printf " -6 <file>  : overwrite default IPv4 GeoIP database %s \n", $GEO_DB6;
	printf " Version: %s \n", $VERSION;
}


if ( !getopts("d:a:4:6:bg", \%OPTS) || !defined($ARGV[0]) ) {
	usage();
	exit 1;
}

$DEBUG = $OPTS{"d"} if (defined($OPTS{"d"}));
$UPDATE_AS = 1 if (defined($OPTS{"b"}));
$UPDATE_GEO = 1 if (defined($OPTS{"g"}));
$ASN_DB = $OPTS{"a"} if (defined($OPTS{"a"}));
$GEO_DB4 = $OPTS{"4"} if (defined($OPTS{"4"}));
$GEO_DB6 = $OPTS{"6"} if (defined($OPTS{"6"}));
$FNAME = $ARGV[0];

if ( !defined($UPDATE_AS) && !defined($UPDATE_GEO) ) { 
	printf "Either -b or -b must be used\n";
	exit 1;
}


# instance of source and destination files
my $flow_src = new Net::NfDump(InputFiles => [ $FNAME ], Fields => 'srcip,dstip' );
my $flow_dst = new Net::NfDump(OutputFile => $FNAME_TMP, Fields => 'srcas,dstas,xsrcport,xdstport' );

# Open ASN database
if ($UPDATE_AS) {
	printf "Opening AS database...\n" if ($DEBUG > 0);
	$BGP = Net::IP::LPM->new($ASN_DB);
}

# Open MAXMIND GEO database
if ($UPDATE_GEO) {
	printf "Opening country code (IPv4 and IPv6) database...\n" if ($DEBUG > 0);
	$GEO4 = Geo::IP->open($GEO_DB4, GEOIP_MEMORY_CACHE);
	$GEO6 = Geo::IP->open($GEO_DB6, GEOIP_MEMORY_CACHE);
}

# statistics counters
my $flows = 0;
my $time = time();

# exec query 
$flow_src->query();

printf "Updating records...\n" if ($DEBUG > 0);
while (my ($srcip, $dstip) = $flow_src->fetchrow_array()) {

	my ($srcas, $dstas, $srccountry, $dstcountry) = (undef, undef, undef, undef);

	# find ASN for SRC and DST address
	if ($UPDATE_AS) { 
		$srcas = $BGP->lookup_cache_raw($srcip);
		$dstas = $BGP->lookup_cache_raw($dstip);
		$srcas = undef if (!defined($srcas) || $srcas eq "");
		$dstas = undef if (!defined($dstas) || $dstas eq "");
	}

	# geolocation lookup
	if ($UPDATE_GEO) {
		$srccountry = geo_lookup_raw($srcip);
		$dstcountry = geo_lookup_raw($dstip);
	}

	# wite data to output file
	$flow_dst->clonerow($flow_src);
	$flow_dst->storerow_array( $srcas, $dstas, $srccountry, $dstcountry );

	# count statistics
	$flows += 1;
}


rename($FNAME_TMP, $FNAME);
# print statistics
printf "Done. Processed lows=$flows in secs=%d.\n", time() - $time if ($DEBUG > 0);