The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

use strict;
use Net::NfDump;
use Data::Dumper;
use Net::IP::LPM;
use Socket;
use Socket6;
use Getopt::Std;
use POSIX;
use Sys::Syslog qw(:DEFAULT setlogsock);
use File::Basename;


# path to ASN database
my $DBDIR       = "/var/db/flowtools";
my $CASNSDB		= "/usr/local/etc/asns.txt";	# cuscom ASN DB
my $BGPDB		= "/var/tmp/asns-$>.gz";
my $BGPDB_MD5	= "/var/tmp/asns-$>.gz.md5";
my $GEO_DB4     = $DBDIR.'/geo/GeoIP.dat';
my $GEO_DB6     = $DBDIR.'/geo/GeoIPv6.dat';
my $FNAME_TMP   = ".nffile_$$.tmp";
my $DEBUG 		= 1;
my $FNAME;
my $VERSION     = "1.10";

# URL of bgpdb database and its checksum privided by libnf.net project 
my $BGPDB_URL       = "http://bgpdb.libnf.net/bgpdb/latest.gz";
my $BGPDB_URL_MD5   = "http://bgpdb.libnf.net/bgpdb/latest.gz.md5";

# The minimum interval for checking for new databas and scatter of 
# checks (in seconds). The check will be performed randomly somwhere  
# between: $BGPDB_CHECK_INTERVAL .. $BGPDB_CHECK_INTERVAL + $BGPDB_CHECK_SCATTER
my $BGPDB_CHECK_INTERVAL	= 7200;
my $BGPDB_CHECK_SCATTER		= 7200;

# path to fetch (wget) and  gunzip utility
my $FETCH		= "wget -q -O %s %s?v=$VERSION";
my $GUNZIP		= "gunzip";

my $LOGNAME		= substr($0, rindex($0, "/") + 1);;
my $LOGFACILITY	= "daemon.info";

# default options 
my $UPDATE_AS 	= 1;
my $UPDATE_GEO 	= 0;


# gloval variables  handles to databases
my ($GEO4, $GEO6, $BGP, %OPTS);


# logging rutine
sub mylog {
	my ($msg, @par) = @_;
	my $lmsg = sprintf($msg, @par);
	if ($DEBUG > 0) {
		printf "%s[%d]: %s\n", strftime("%Y-%m-%d.%H:%M:%S", localtime), $$, $lmsg;
	}
	setlogsock('unix');
	openlog("$LOGNAME\[$$\]", 'ndelay', 'user');
	syslog($LOGFACILITY, $lmsg);
}

# download file from URL 
sub fetch_url($$) {
	my ($file, $url) = @_;

	mylog("Downloading %s", $url) if ($DEBUG > 3);
	my $cmd = sprintf($FETCH, $file, $url);
	mylog("CMD: %s.", $cmd) if ($DEBUG > 5); 
    system($cmd);
    if ($? != 0) {
        return 0;
    }
	return 1;
}
	

# check, download and unzip bgp database 
sub check_bgpdb() {

	# test if both bgpdb and checksum exists 
	if ( -f $BGPDB && -f $BGPDB_MD5 ) {

			my $mtime = (stat($BGPDB_MD5))[10]; # get mtime
			my $time = time();

			# BGPDB os older than $BGPDB_CHECK_INTERVA
			if ($time - $mtime > $BGPDB_CHECK_INTERVAL) {
				srand($time+$$);
				my $rand = int(rand($BGPDB_CHECK_SCATTER));

#				printf("%s %s\n", $time - $mtime - $BGPDB_CHECK_INTERVAL, $rand);
				if ($time - $mtime - $BGPDB_CHECK_INTERVAL < $rand) {
					mylog("BGPDB check skipped in SCATTER interval.") if ($DEBUG > 3);
					return 1;
				}
			} else {
				mylog("BGPDB check skipped in CHECK interval.") if ($DEBUG > 3);
				return 1;
			}

			# read content of md5 and compare it with server version  
			open MD5, "< $BGPDB_MD5";
			my $md5 = join('', <MD5>);
			close MD5;

			if (fetch_url($BGPDB_MD5, $BGPDB_URL_MD5)) {
				# compare checksums 
				open MD5, "< $BGPDB_MD5";
				my $md5_new = join('', <MD5>);
				close MD5;
				if ($md5 eq $md5_new) {
					mylog("BGPDB download skipped - same md5 sum.") if ($DEBUG > 3);
					return 1;
				}
			} else {
        		mylog("Can not dowload data from %s", $BGPDB_URL_MD5);
				return 2;
			}
	}

	if (fetch_url($BGPDB."._tmp", $BGPDB_URL)) {
		mylog("Downloaded BGPDB from %s.", $BGPDB_URL) if ($DEBUG > 0);
		if (!fetch_url($BGPDB_MD5."._tmp", $BGPDB_URL_MD5)) {
			mylog("Can not download data from %s.", $BGPDB_URL_MD5);
			return 0;
		}
		rename($BGPDB_MD5."._tmp", $BGPDB_MD5);
		unlink($BGPDB_MD5."._tmp");
		rename($BGPDB."._tmp", $BGPDB);
		unlink($BGPDB."._tmp");
		return 3;
	} else {
		mylog("Can not download data from %s.", $BGPDB_URL);
		return 0;
	}
}


# lookup in geodb 
sub geo_lookup_raw {
	my ($addr) = @_;
	my ($code) = undef;

	if ( length($addr) == 4 ) {
		$code =  $GEO4->country_code_by_addr(inet_ntop(AF_INET, $addr));
	} else {
		$code = $GEO6->country_code_by_addr_v6(inet_ntop(AF_INET6, $addr));
	}

	return undef if (!defined($code));

	# convert code to 16 bit number
	my ($a, $b) =  unpack('CC', $code);
	
	return 256 * $a + $b;
}

# parse options 
sub usage() {

	printf "\nCommand updates nfdump file and adds AS ang geoIP information\n\n";
	printf "Usage: \n\n";
	printf "   %s  [ -d <level> ] -b -g [ -a -5 -4 -6 ] [ -c <ASN_db_file> ] <nfdump_file>\n\n", $LOGNAME;
	printf "Options: \n\n";
	printf "   -d <level> : debug level (dafault: 1)\n\n";
	printf "   -B do NOT update AS numbers (srcas, dstas) \n";
	printf "   -g update country code (*xsrcport, *xdstport) \n\n";
	printf "   -a <file>  : path to BGPDB file (default: %s) \n", $BGPDB;
	printf "   -5 <file>  : path to BGPDB MD5 (default: %s) \n", $BGPDB_MD5;
	printf "   -c <file>  : path to additional textfile with ASN mapping (default: %s) \n", $CASNSDB;
	printf "   -4 <file>  : path to IPv4 GeoIP database (default: %s) \n", $GEO_DB4;
	printf "   -6 <file>  : path to IPv6 GeoIP database (default: %s) \n\n", $GEO_DB6;
	printf " Part of libnf.net project, version: %s \n", $VERSION;
}


if ( !getopts("d:a:4:6:bg5:", \%OPTS) || !defined($ARGV[0]) ) {
	usage();
	exit 1;
}

$DEBUG = $OPTS{"d"} if (defined($OPTS{"d"}));
$CASNSDB = $OPTS{"c"} if (defined($OPTS{"c"}));
$UPDATE_AS = 1 if (defined($OPTS{"b"}));
$UPDATE_AS = 0 if (defined($OPTS{"B"}));
$UPDATE_GEO = 1 if (defined($OPTS{"g"}));
$BGPDB = $OPTS{"a"} if (defined($OPTS{"a"}));
$BGPDB_MD5 = $OPTS{"5"} if (defined($OPTS{"5"}));
$GEO_DB4 = $OPTS{"4"} if (defined($OPTS{"4"}));
$GEO_DB6 = $OPTS{"6"} if (defined($OPTS{"6"}));
$FNAME = $ARGV[0];
$FNAME_TMP = dirname($FNAME).'/'.$FNAME_TMP;


# instance of source and destination files
my $flow_src = new Net::NfDump(InputFiles => [ $FNAME ], Fields => 'srcip,dstip' );
my $flow_dst = new Net::NfDump(OutputFile => $FNAME_TMP, Fields => 'srcas,dstas,xsrcport,xdstport' );

# Open ASN database
if ($UPDATE_AS) {
	# check and update BGP database
	mylog("Checking BGP database for new version.") if ($DEBUG > 0);
	check_bgpdb();

	if ( ! -f $BGPDB ) { 
		mylog("BGP database not found. Exiting.");
		exit 1;
	}
	mylog("Loading AS database.") if ($DEBUG > 0);
	$BGP = Net::IP::LPM->new();

	open F1, "$GUNZIP < $BGPDB |"; 
	while (<F1>) {
		chomp ; 
		my ($prefix, $as) = split(/ /);
		$BGP->add($prefix, $as);
	}
	close F1;
}

# read additional custom ASNS DB 
if ( -f $CASNSDB) {
	open F1, "< $CASNSDB"; 
	while (<F1>) {
		chomp ; 
		my ($prefix, $as) = split(/ /);
		$BGP->add($prefix, $as);
	}
	close F1;
}

# Open MAXMIND GEO database
if ($UPDATE_GEO) {
	printf("XXXx\n");
	if (!eval("use Geo::IP;")) {
		mylog("%s", $@);
		mylog("Geo::IP module not found. Geo update remains disabled.", $@);
		$UPDATE_GEO = 0;
	} else {
		mylog("Opening country code (IPv4 and IPv6) database.") if ($DEBUG > 0);
		$GEO4 = Geo::IP->open($GEO_DB4);
		$GEO6 = Geo::IP->open($GEO_DB6);
	}
}

# statistics counters
my $flows = 0;
my $time = time();

# exec query 
$flow_src->query();

mylog("Updating records.") if ($DEBUG > 0);
while (my ($srcip, $dstip) = $flow_src->fetchrow_array()) {

	my ($srcas, $dstas, $srccountry, $dstcountry) = (undef, undef, undef, undef);

	# find ASN for SRC and DST address
	if ($UPDATE_AS) { 
		$srcas = $BGP->lookup_cache_raw($srcip);
		$dstas = $BGP->lookup_cache_raw($dstip);
		$srcas = undef if (!defined($srcas) || $srcas eq "");
		$dstas = undef if (!defined($dstas) || $dstas eq "");
	}

	# geolocation lookup
	if ($UPDATE_GEO) {
		$srccountry = geo_lookup_raw($srcip);
		$dstcountry = geo_lookup_raw($dstip);
	}

	# wite data to output file
	$flow_dst->clonerow($flow_src);
	$flow_dst->storerow_array( $srcas, $dstas, $srccountry, $dstcountry );

	# count statistics
	$flows += 1;
}


rename($FNAME_TMP, $FNAME);
mylog("Processed $flows flows in %d secs.", time() - $time) if ($DEBUG > 0);