--- flow2rrd.pl 2004/12/29 12:03:13 1.15
+++ flow2rrd.pl 2004/12/29 14:10:38 1.16
@@ -71,6 +71,7 @@
my %getopt_spec = (
'h|help' => \$opt->{-help},
'v|version' => \$opt->{-version},
+ 'V|verbose' => \$opt->{-verbose},
'f|config=s' => \$opt->{-config},
's|store' => \$opt->{-store},
'g|graph' => \$opt->{-graph},
@@ -83,6 +84,7 @@
"available options are:\n" .
" -h,--help print out this usage page\n" .
" -v,--version print program version\n" .
+ " -V,--verbose print verbose messages\n" .
" -f,--config FILE read this configuration file only\n" .
" -s,--store store NetFlow values into RRD\n" .
" -g,--graph produce RRD graphs\n" .
@@ -111,10 +113,11 @@
$cs->parse($txt);
my $tree = $cs->unpack();
undef $cs;
+#print Data::Dumper->Dump([$tree]);
# extract configuration elements
my $cfg = {
- 'Database' => undef,
+ 'Database' => {},
'Host' => [],
'Protocol' => {},
'Service' => {},
@@ -122,8 +125,26 @@
};
foreach my $dir (@{$tree}) {
if ($dir->[0] eq 'Database') {
- die "Database already defined" if (defined($cfg->{'Database'}));
- $cfg->{'Database'} = $dir->[1];
+ die "Database already defined" if (defined($cfg->{'Database'}->{-file}));
+ $cfg->{'Database'}->{-file} = $dir->[1];
+ my $seq = $dir->[2];
+ foreach my $dir2 (@{$seq}) {
+ if ($dir2->[0] eq 'Stepping') {
+ $cfg->{'Database'}->{-step} = $dir2->[1];
+ }
+ elsif ($dir2->[0] eq 'Storage') {
+ my $s = [];
+ foreach my $spec (@{$dir2}[1..$#{$dir2}]) {
+ if (my ($res, $dur) = ($spec =~ m/^(\S+):(\S+)$/)) {
+ push(@{$s}, { -res => $res, -dur => $dur});
+ }
+ else {
+ die "invalid storage specification \"$spec\"";
+ }
+ }
+ $cfg->{'Database'}->{-storage} = $s;
+ }
+ }
}
elsif ($dir->[0] eq 'Protocol') {
die "Protocol \"$dir->[1]\" already defined" if (defined($cfg->{'Protocol'}->{$dir->[1]}));
@@ -224,27 +245,83 @@
return $ds_name;
}
+# conversion/canonicalization of time specifications
+sub cv_time {
+ my ($t) = @_;
+ if ($t =~ m|^now(.*)$|) {
+ $t = time() + &cv_time($1);
+ }
+ elsif ($t =~ m|^([-+])(.+)$|) {
+ $t = &cv_time($2);
+ eval "\$t = $1 \$t";
+ }
+ elsif ($t =~ m|(\d{2})-([A-Za-z]{3})-(\d{4})|) {
+ $t = str2time($t);
+ }
+ elsif ($t =~ m|^([\d.]+)([smhDWMY])$|) {
+ $t = $1;
+ if ($2 eq 's') { $t *= 1; }
+ elsif ($2 eq 'm') { $t *= 60; }
+ elsif ($2 eq 'h') { $t *= 60*60; }
+ elsif ($2 eq 'D') { $t *= 24*60*60; }
+ elsif ($2 eq 'W') { $t *= 7*24*60*60; }
+ elsif ($2 eq 'M') { $t *= 30*24*60*60; }
+ elsif ($2 eq 'Y') { $t *= 365*24*60*60; }
+ }
+ elsif ($t =~ m|^([\d.]+)$|) {
+ $t = $1;
+ }
+ else {
+ $t = 0;
+ }
+ return $t;
+}
+
+# conversion/canonicalization of limit specifications
+sub cv_limit {
+ my ($l) = @_;
+ if ($l eq '') {
+ $l = 0;
+ }
+ elsif ($l =~ m|^([-+])(.*)$|) {
+ $l = &cv_limit($2);
+ eval "\$l = $1 \$l";
+ }
+ elsif ($l =~ m|^(\d+)([KMGT])$|) {
+ $l = $1;
+ if ($2 eq 'K') { $l *= 1024; }
+ if ($2 eq 'M') { $l *= 1024*1024; }
+ if ($2 eq 'G') { $l *= 1024*1024*1024; }
+ if ($2 eq 'T') { $l *= 1024*1024*1024*1024; }
+ }
+ return $l;
+}
+
##
## ==== OPERATION MODE 1: STORE DATA ====
##
if ($opt->{-store}) {
- my $step = 1*60; # 1 min
+ my $step = &cv_time($cfg->{'Database'}->{-step});
# initialize data
my $ctx = &data_init($cfg);
# scan flow-tools stream on STDIN for NetFlow records
+ my $flows = 0;
+ my $tick = 0;
+ my $ticktick = 0;
+ my $done = 0;
+ my @done = ();
Cflow::verbose(0);
Cflow::find(sub { &foreach_record($cfg, $ctx) }, "-");
sub foreach_record {
my ($cfg, $ctx) = @_;
- # determine time slot
- my $t = $Cflow::endtime;
+ # at start of time slot, load accumulated data
if (not defined($ctx->{-endtime})) {
# initial setup, so initialize time slot tracking
- $ctx->{-starttime} = int($t / $step) * $step;
+ $ctx->{-starttime} = int($Cflow::endtime / $step) * $step;
$ctx->{-endtime} = $ctx->{-starttime} + $step;
# load data
@@ -252,7 +329,7 @@
}
# at end of time slot, store accumulated data
- if ($t >= $ctx->{-endtime}) {
+ if ($Cflow::endtime >= $ctx->{-endtime}) {
# store data
&rrd_store($cfg, $ctx);
@@ -263,6 +340,22 @@
# accumulate data
&data_accumulate($cfg, $ctx);
+
+ # statistics
+ if ($opt->{-verbose}) {
+ $flows++;
+ $done++;
+ $ticktick++;
+ my $tick_new = ($ticktick > 1000 ? time() : 0);
+ if ($tick < $tick_new) {
+ push(@done, $done);
+ shift(@done) if (@done > 20);
+ my $sum = 0; map { $sum += $_ } @done; $sum /= scalar(@done);
+ printf(STDERR "Storing: %10d flows, %6.1f flows/sec (average: %6.1f flows/sec)\r", $flows, $done, $sum);
+ $tick = $tick_new;
+ $done = 0;
+ }
+ }
}
&rrd_store($cfg, $ctx);
@@ -297,7 +390,7 @@
# create RRD file (if still not existing)
sub rrd_create {
my ($cfg, $time) = @_;
- return if (-f $cfg->{'Database'});
+ return if (-f $cfg->{'Database'}->{-file});
# determine RRD data sources (DS)
my @ds = ();
@@ -321,17 +414,14 @@
my $rra = sprintf('RRA:LAST:0:%d:%d', $steps, $rows);
push(@rra, $rra);
}
- &mkrra($step, 1*60, 7*24*60*60); # 1 min res. for 1 week
- &mkrra($step, 2*60, 14*24*60*60); # 2 min res. for 2 weeks
- &mkrra($step, 5*60, 30*24*60*60); # 5 min res. for 1 month
- &mkrra($step, 15*60, 3*30*24*60*60); # 15 min res. for 3 months
- &mkrra($step, 1*60*60, 6*30*24*60*60); # 1 hour res. for 6 months
- &mkrra($step, 2*60*60, 365*24*60*60); # 2 hour res. for 1 year
- &mkrra($step, 6*60*60, 2*365*24*60*60); # 6 hour res. for 2 years
- &mkrra($step, 12*60*60, 4*365*24*60*60); # 12 hour res. for 4 years
+ foreach my $s (@{$cfg->{'Database'}->{-storage}}) {
+ my $res = &cv_time($s->{-res});
+ my $dur = &cv_time($s->{-dur});
+ &mkrra($step, $res, $dur);
+ }
# create RRD database
- RRDs::create($cfg->{'Database'}, '--start', $time, '--step', $step, @ds, @rra);
+ RRDs::create($cfg->{'Database'}->{-file}, '--start', $time, '--step', $step, @ds, @rra);
my $err = RRDs::error();
die "failed to create RRD file: $err" if (defined($err));
}
@@ -345,7 +435,7 @@
# load data from RRD
my ($rrd_start, $rrd_step, $rrd_names, $rrd_data) = RRDs::fetch(
- $cfg->{'Database'},
+ $cfg->{'Database'}->{-file},
'LAST',
'--resolution', $step,
'--start', $ctx->{-endtime},
@@ -371,32 +461,26 @@
LOOP:
foreach my $host (@{$cfg->{'Host'}}) {
foreach my $target (@{$host->{-target}->{-order}}) {
- my $matched = 0;
- my $inbound; $inbound = undef;
+ my $inbound;
my $np = $ctx->{-network}->{$host->{-name}.":".$target};
if ($np->match_string($Cflow::srcip)) { $inbound = 0; }
elsif ($np->match_string($Cflow::dstip)) { $inbound = 1; }
if (defined($inbound)) {
foreach my $service (@{$host->{-target}->{$target}->{-service}}) {
- my $services = $cfg->{'Service'}->{$service};
- foreach my $s (@{$services}) {
- my $proto = $cfg->{'Protocol'}->{$s->{-proto}};
- my $port = $s->{-port};
- if ($Cflow::protocol == $proto) {
+ foreach my $s (@{$cfg->{'Service'}->{$service}}) {
+ if ($Cflow::protocol == $cfg->{'Protocol'}->{$s->{-proto}}) {
+ my $port = $s->{-port};
if ( $port eq '*'
or (( $inbound and $port == $Cflow::dstport)
or (not $inbound and $port == $Cflow::srcport))) {
- $matched = 1;
+ # flow matched target/service, so accumulate data
+ my $ds_name = &make_rrd_ds_name($host->{-name}, $target, $service);
+ if ($inbound) { $ctx->{-track}->{"${ds_name}_i"} += $Cflow::bytes; }
+ else { $ctx->{-track}->{"${ds_name}_o"} += $Cflow::bytes; }
+ $matched_total++;
+ last LOOP;
}
}
- if ($matched) {
- # flow matched target/service, so accumulate data
- my $ds_name = &make_rrd_ds_name($host->{-name}, $target, $service);
- if ($inbound) { $ctx->{-track}->{"${ds_name}_i"} += $Cflow::bytes; }
- else { $ctx->{-track}->{"${ds_name}_o"} += $Cflow::bytes; }
- $matched_total++;
- last LOOP;
- }
}
}
}
@@ -426,7 +510,7 @@
$ctx->{-track}->{$ds_name} = 0;
}
RRDs::update(
- $cfg->{'Database'},
+ $cfg->{'Database'}->{-file},
'--template', $ds_list,
sprintf("%d", $ctx->{-endtime}).":".$dv_list
);
@@ -452,36 +536,6 @@
# post-process parameters
my $img_format = ($img_file =~ m|\.png$| ? "PNG" : "GIF");
- sub cv_time {
- my ($t) = @_;
- if ($t =~ m|^now(.*)$|) {
- $t = time() + &cv_time($1);
- }
- elsif ($t =~ m|^([-+])(.+)$|) {
- $t = &cv_time($2);
- eval "\$t = $1 \$t";
- }
- elsif ($t =~ m|(\d{2})-([A-Za-z]{3})-(\d{4})|) {
- $t = str2time($t);
- }
- elsif ($t =~ m|^([\d.]+)([smhdwMY])$|) {
- $t = $1;
- if ($2 eq 's') { $t *= 1; }
- elsif ($2 eq 'm') { $t *= 60; }
- elsif ($2 eq 'h') { $t *= 60*60; }
- elsif ($2 eq 'd') { $t *= 24*60*60; }
- elsif ($2 eq 'w') { $t *= 7*24*60*60; }
- elsif ($2 eq 'M') { $t *= 30*24*60*60; }
- elsif ($2 eq 'Y') { $t *= 365*24*60*60; }
- }
- elsif ($t =~ m|^([\d.]+)$|) {
- $t = $1;
- }
- else {
- $t = 0;
- }
- return $t;
- }
if ($graph_start =~ m/^\-(.+)/) {
$graph_end = &cv_time($graph_end);
$graph_start = $graph_end - &cv_time($1);
@@ -494,24 +548,6 @@
$graph_start = &cv_time($graph_start);
$graph_end = &cv_time($graph_end);
}
- sub cv_limit {
- my ($l) = @_;
- if ($l eq '') {
- $l = 0;
- }
- elsif ($l =~ m|^([-+])(.*)$|) {
- $l = &cv_limit($2);
- eval "\$l = $1 \$l";
- }
- elsif ($l =~ m|^(\d+)([KMGT])$|) {
- $l = $1;
- if ($2 eq 'K') { $l *= 1024; }
- if ($2 eq 'M') { $l *= 1024*1024; }
- if ($2 eq 'G') { $l *= 1024*1024*1024; }
- if ($2 eq 'T') { $l *= 1024*1024*1024*1024; }
- }
- return $l;
- }
$graph_ulimit = &cv_limit($graph_ulimit);
$graph_llimit = &cv_limit($graph_llimit);
@@ -561,8 +597,8 @@
my $cdef_o = '';
foreach my $service (@{$host->{-target}->{$target}->{-service}}) {
my $ds_name = &make_rrd_ds_name($host->{-name}, $target, $service);
- push(@def, sprintf("DEF:%s_o=%s:%s_o:LAST", $ds_name, $cfg->{'Database'}, $ds_name));
- push(@def, sprintf("DEF:%s_i=%s:%s_i:LAST", $ds_name, $cfg->{'Database'}, $ds_name));
+ push(@def, sprintf("DEF:%s_o=%s:%s_o:LAST", $ds_name, $cfg->{'Database'}->{-file}, $ds_name));
+ push(@def, sprintf("DEF:%s_i=%s:%s_i:LAST", $ds_name, $cfg->{'Database'}->{-file}, $ds_name));
$cdef_o = ($cdef_o eq '' ? "${ds_name}_o" : "${ds_name}_o,$cdef_o,+");
$cdef_i = ($cdef_i eq '' ? "${ds_name}_i" : "${ds_name}_i,$cdef_i,+");
}
@@ -655,8 +691,8 @@
my $i = 0;
foreach my $service (@{$host->{-target}->{$target}->{-service}}) {
my $ds_name = &make_rrd_ds_name($host->{-name}, $target, $service);
- push(@def, sprintf("DEF:%s_o=%s:%s_o:LAST", $ds_name, $cfg->{'Database'}, $ds_name));
- push(@def, sprintf("DEF:%s_i=%s:%s_i:LAST", $ds_name, $cfg->{'Database'}, $ds_name));
+ push(@def, sprintf("DEF:%s_o=%s:%s_o:LAST", $ds_name, $cfg->{'Database'}->{-file}, $ds_name));
+ push(@def, sprintf("DEF:%s_i=%s:%s_i:LAST", $ds_name, $cfg->{'Database'}->{-file}, $ds_name));
push(@cdef, sprintf("CDEF:data%d_o=%s_o,8,*,+1,*", $i, $ds_name));
push(@cdef, sprintf("CDEF:data%d_i=%s_i,8,*,-1,*", $i, $ds_name));
my $color_o; eval "\$color_o = 0x".$colors->[$i];
|