#!/usr/bin/perl -w
use strict;
use Net::NfDump;
use Data::Dumper;
use Net::IP::LPM;
use Socket;
use Socket6;
use Getopt::Std;
use POSIX;
use Sys::Syslog qw(:DEFAULT setlogsock);
use File::Basename;
# path to ASN database
my $DBDIR = "/var/db/flowtools";
my $CASNSDB = "/usr/local/etc/asns.txt"; # cuscom ASN DB
my $BGPDB = "/var/tmp/asns-$>.gz";
my $BGPDB_MD5 = "/var/tmp/asns-$>.gz.md5";
my $GEO_DB4 = $DBDIR.'/geo/GeoIP.dat';
my $GEO_DB6 = $DBDIR.'/geo/GeoIPv6.dat';
my $FNAME_TMP = ".nffile_$$.tmp";
my $DEBUG = 1;
my $FNAME;
my $VERSION = "1.10";
# URL of bgpdb database and its checksum privided by libnf.net project
my $BGPDB_URL = "http://bgpdb.libnf.net/bgpdb/latest.gz";
my $BGPDB_URL_MD5 = "http://bgpdb.libnf.net/bgpdb/latest.gz.md5";
# The minimum interval for checking for new databas and scatter of
# checks (in seconds). The check will be performed randomly somwhere
# between: $BGPDB_CHECK_INTERVAL .. $BGPDB_CHECK_INTERVAL + $BGPDB_CHECK_SCATTER
my $BGPDB_CHECK_INTERVAL = 7200;
my $BGPDB_CHECK_SCATTER = 7200;
# path to fetch (wget) and gunzip utility
my $FETCH = "wget -q -O %s %s?v=$VERSION";
my $GUNZIP = "gunzip";
my $LOGNAME = substr($0, rindex($0, "/") + 1);;
my $LOGFACILITY = "daemon.info";
# default options
my $UPDATE_AS = 1;
my $UPDATE_GEO = 0;
# gloval variables handles to databases
my ($GEO4, $GEO6, $BGP, %OPTS);
# logging rutine
sub mylog {
my ($msg, @par) = @_;
my $lmsg = sprintf($msg, @par);
if ($DEBUG > 0) {
printf "%s[%d]: %s\n", strftime("%Y-%m-%d.%H:%M:%S", localtime), $$, $lmsg;
}
setlogsock('unix');
openlog("$LOGNAME\[$$\]", 'ndelay', 'user');
syslog($LOGFACILITY, $lmsg);
}
# download file from URL
sub fetch_url($$) {
my ($file, $url) = @_;
mylog("Downloading %s", $url) if ($DEBUG > 3);
my $cmd = sprintf($FETCH, $file, $url);
mylog("CMD: %s.", $cmd) if ($DEBUG > 5);
system($cmd);
if ($? != 0) {
return 0;
}
return 1;
}
# check, download and unzip bgp database
sub check_bgpdb() {
# test if both bgpdb and checksum exists
if ( -f $BGPDB && -f $BGPDB_MD5 ) {
my $mtime = (stat($BGPDB_MD5))[10]; # get mtime
my $time = time();
# BGPDB os older than $BGPDB_CHECK_INTERVA
if ($time - $mtime > $BGPDB_CHECK_INTERVAL) {
srand($time+$$);
my $rand = int(rand($BGPDB_CHECK_SCATTER));
# printf("%s %s\n", $time - $mtime - $BGPDB_CHECK_INTERVAL, $rand);
if ($time - $mtime - $BGPDB_CHECK_INTERVAL < $rand) {
mylog("BGPDB check skipped in SCATTER interval.") if ($DEBUG > 3);
return 1;
}
} else {
mylog("BGPDB check skipped in CHECK interval.") if ($DEBUG > 3);
return 1;
}
# read content of md5 and compare it with server version
open MD5, "< $BGPDB_MD5";
my $md5 = join('', <MD5>);
close MD5;
if (fetch_url($BGPDB_MD5, $BGPDB_URL_MD5)) {
# compare checksums
open MD5, "< $BGPDB_MD5";
my $md5_new = join('', <MD5>);
close MD5;
if ($md5 eq $md5_new) {
mylog("BGPDB download skipped - same md5 sum.") if ($DEBUG > 3);
return 1;
}
} else {
mylog("Can not dowload data from %s", $BGPDB_URL_MD5);
return 2;
}
}
if (fetch_url($BGPDB."._tmp", $BGPDB_URL)) {
mylog("Downloaded BGPDB from %s.", $BGPDB_URL) if ($DEBUG > 0);
if (!fetch_url($BGPDB_MD5."._tmp", $BGPDB_URL_MD5)) {
mylog("Can not download data from %s.", $BGPDB_URL_MD5);
return 0;
}
rename($BGPDB_MD5."._tmp", $BGPDB_MD5);
unlink($BGPDB_MD5."._tmp");
rename($BGPDB."._tmp", $BGPDB);
unlink($BGPDB."._tmp");
return 3;
} else {
mylog("Can not download data from %s.", $BGPDB_URL);
return 0;
}
}
# lookup in geodb
sub geo_lookup_raw {
my ($addr) = @_;
my ($code) = undef;
if ( length($addr) == 4 ) {
$code = $GEO4->country_code_by_addr(inet_ntop(AF_INET, $addr));
} else {
$code = $GEO6->country_code_by_addr_v6(inet_ntop(AF_INET6, $addr));
}
return undef if (!defined($code));
# convert code to 16 bit number
my ($a, $b) = unpack('CC', $code);
return 256 * $a + $b;
}
# parse options
sub usage() {
printf "\nCommand updates nfdump file and adds AS ang geoIP information\n\n";
printf "Usage: \n\n";
printf " %s [ -d <level> ] -b -g [ -a -5 -4 -6 ] [ -c <ASN_db_file> ] <nfdump_file>\n\n", $LOGNAME;
printf "Options: \n\n";
printf " -d <level> : debug level (dafault: 1)\n\n";
printf " -B do NOT update AS numbers (srcas, dstas) \n";
printf " -g update country code (*xsrcport, *xdstport) \n\n";
printf " -a <file> : path to BGPDB file (default: %s) \n", $BGPDB;
printf " -5 <file> : path to BGPDB MD5 (default: %s) \n", $BGPDB_MD5;
printf " -c <file> : path to additional textfile with ASN mapping (default: %s) \n", $CASNSDB;
printf " -4 <file> : path to IPv4 GeoIP database (default: %s) \n", $GEO_DB4;
printf " -6 <file> : path to IPv6 GeoIP database (default: %s) \n\n", $GEO_DB6;
printf " Part of libnf.net project, version: %s \n", $VERSION;
}
if ( !getopts("d:a:4:6:bg5:", \%OPTS) || !defined($ARGV[0]) ) {
usage();
exit 1;
}
$DEBUG = $OPTS{"d"} if (defined($OPTS{"d"}));
$CASNSDB = $OPTS{"c"} if (defined($OPTS{"c"}));
$UPDATE_AS = 1 if (defined($OPTS{"b"}));
$UPDATE_AS = 0 if (defined($OPTS{"B"}));
$UPDATE_GEO = 1 if (defined($OPTS{"g"}));
$BGPDB = $OPTS{"a"} if (defined($OPTS{"a"}));
$BGPDB_MD5 = $OPTS{"5"} if (defined($OPTS{"5"}));
$GEO_DB4 = $OPTS{"4"} if (defined($OPTS{"4"}));
$GEO_DB6 = $OPTS{"6"} if (defined($OPTS{"6"}));
$FNAME = $ARGV[0];
$FNAME_TMP = dirname($FNAME).'/'.$FNAME_TMP;
# instance of source and destination files
my $flow_src = new Net::NfDump(InputFiles => [ $FNAME ], Fields => 'srcip,dstip' );
my $flow_dst = new Net::NfDump(OutputFile => $FNAME_TMP, Fields => 'srcas,dstas,xsrcport,xdstport' );
# Open ASN database
if ($UPDATE_AS) {
# check and update BGP database
mylog("Checking BGP database for new version.") if ($DEBUG > 0);
check_bgpdb();
if ( ! -f $BGPDB ) {
mylog("BGP database not found. Exiting.");
exit 1;
}
mylog("Loading AS database.") if ($DEBUG > 0);
$BGP = Net::IP::LPM->new();
open F1, "$GUNZIP < $BGPDB |";
while (<F1>) {
chomp ;
my ($prefix, $as) = split(/ /);
$BGP->add($prefix, $as);
}
close F1;
}
# read additional custom ASNS DB
if ( -f $CASNSDB) {
open F1, "< $CASNSDB";
while (<F1>) {
chomp ;
my ($prefix, $as) = split(/ /);
$BGP->add($prefix, $as);
}
close F1;
}
# Open MAXMIND GEO database
if ($UPDATE_GEO) {
printf("XXXx\n");
if (!eval("use Geo::IP;")) {
mylog("%s", $@);
mylog("Geo::IP module not found. Geo update remains disabled.", $@);
$UPDATE_GEO = 0;
} else {
mylog("Opening country code (IPv4 and IPv6) database.") if ($DEBUG > 0);
$GEO4 = Geo::IP->open($GEO_DB4);
$GEO6 = Geo::IP->open($GEO_DB6);
}
}
# statistics counters
my $flows = 0;
my $time = time();
# exec query
$flow_src->query();
mylog("Updating records.") if ($DEBUG > 0);
while (my ($srcip, $dstip) = $flow_src->fetchrow_array()) {
my ($srcas, $dstas, $srccountry, $dstcountry) = (undef, undef, undef, undef);
# find ASN for SRC and DST address
if ($UPDATE_AS) {
$srcas = $BGP->lookup_cache_raw($srcip);
$dstas = $BGP->lookup_cache_raw($dstip);
$srcas = undef if (!defined($srcas) || $srcas eq "");
$dstas = undef if (!defined($dstas) || $dstas eq "");
}
# geolocation lookup
if ($UPDATE_GEO) {
$srccountry = geo_lookup_raw($srcip);
$dstcountry = geo_lookup_raw($dstip);
}
# wite data to output file
$flow_dst->clonerow($flow_src);
$flow_dst->storerow_array( $srcas, $dstas, $srccountry, $dstcountry );
# count statistics
$flows += 1;
}
rename($FNAME_TMP, $FNAME);
mylog("Processed $flows flows in %d secs.", time() - $time) if ($DEBUG > 0);