# Copyright(C) 2006 David Muir Sharnoff <muir@idiom.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This software is available without the GPL: please write if you need
# a non-GPL license. All submissions of patches must come with a
# copyright grant so that David Sharnoff remains able to change the
# license at will.
package Qpsmtpd::Plugin::Quarantine::Batch;
require Exporter;
use OOPS;
use strict;
use Qpsmtpd::Plugin::Quarantine::Common;
use Qpsmtpd::Plugin::Quarantine::Sendmail;
use Mail::SendVarious;
use Mail::SendVarious qw(make_message $mail_error);
use Scalar::Util qw(refaddr);
use IO::Pipe;
use Time::CTime;
my $mailq_timefmt = "%a %b %d %X";
our @ISA = qw(Exporter);
our @EXPORT = qw(cronjob sendqueued mailq);
our @EXPORT_OK = qw(
find_oldest_bucket prune_headers
prune_recipients generate_recipients
prune_senders generate_senders
walk_eval
indent);
my $debug = 0;
my $recipients_deleted = 0;
my $recipients_settings = 0;
my $recipients_count = 0;
my $senders_deleted = 0;
my $senders_count = 0;
my $senders_with_settings = 0;
my $stride = 100;
sub cronjob
{
my $start = time;
print "# upgrades?\n";
upgrade();
print "# cleaning out messages\n" if $debug;
my $messages_deleted = 0;
for(;;) {
my $done;
my $del;
my $diskused = message_store_size();
transaction(sub {
my $oops = get_oops();
my $oldest = find_oldest_bucket($oops);
if ($oldest and (time - $oldest) / 86400 > $defaults{message_longevity}) {
printf "# Oldest bucket is dated %s, must prune headers\n", scalar(localtime($oldest)) if $debug;
$del = prune_headers($oops);
} elsif ($diskused / 1024000 > $defaults{message_store_size}) {
printf "# Oldest bucket is dated %s, we're over our disk quote -- must prune headers\n", scalar(localtime($oldest)) if $debug;
$del = prune_headers($oops);
} else {
printf "# Oldest bucket is dated %s, we're done\n", scalar(localtime($oldest)) if $debug;
$done = 1;
}
$oops->commit() if $del;
});
$messages_deleted += $del;
last if $done;
}
print "Messages deleted: $messages_deleted\n\n";
print "\n\n";
print "# cleaning up recipients...\n" if $debug;
prune_recipients();
print "Recipients deleted: $recipients_deleted\n";
print "Recipients kept: $recipients_count\n";
print "Recipients with settings: $recipients_settings\n";
print "\n\n";
print "# cleaning up senders...\n" if $debug;
prune_senders();
print "Senders kept: $senders_count\n";
print "Senders with settings: $senders_with_settings\n";
print "Senders deleted: $senders_deleted\n";
print "\n\n";
printf "Time for batch run: %d (seconds)\n", time - $start;
}
sub upgrade
{
transaction(sub {
{
print " Upgrade oops?\n";
my $oops = OOPS->new(oops_args(), auto_ugprade => 1);
$oops->commit;
}
my $version;
{
my $oops = get_oops();
my $qd = $oops->{quarantine};
$version = $qd->{version};
}
if ($version <= 0.31) {
my $oops = get_oops();
my $qd = $oops->{quarantine};
print "Fixing 3600 hours/day problem\n";
my $time = time;
my $b0 = $qd->{buckets};
my $b0count = 0;
for my $day (sort { $a <=> $b } keys %{$b0}) {
print " Remapping ".scalar(gmtime($day*86400))."\n";
my $b1count = 0;
my $b1 = $b0->{$day};
for my $bucket (keys %{$b1}) {
my $oldtime = $day * 86400 + $bucket * 24;
print " Bucket at ".scalar(gmtime($oldtime))."\n";
my $b2 = $b1->{$bucket};
my $count = 0;
for my $header_checksum (keys %$b2) {
$qd->{buckets3}{int($oldtime / 86400)}{int(($oldtime % 86400) / 3600)}{$header_checksum} = $b2->{$header_checksum};
$oops->virtual_object($qd->{buckets3}{int($oldtime / 86400)}, 1);
$oops->virtual_object($qd->{buckets3}{int($oldtime / 86400)}{int(($oldtime % 86400) / 3600)}, 1);
$count++;
}
print " $count headers moved\n";
$b1count += $count;
delete $b1->{$bucket};
}
print " $b1count moved\n";
$b0count += $b1count;
delete $b0->{$day};
}
print "Total moved: $b0count\n";
$oops->commit();
}
if ($version < 0.34) {
$| = 1;
transaction(sub {
my $oops = get_oops();
print "Counting up message storage space...\n";
my $qd = $oops->{quarantine};
$qd->{diskused} = {}
unless $qd->{diskused};
bless $qd->{diskused}, 'Quarantine::DiskUsage';
$oops->virtual_object($qd->{diskused}, 1);
});
my $tsize = 0;
my $tcount = 0;
my $size;
my $count;
my @buf;
require Qpsmtpd::Plugin::Quarantine;
walk_eval(
50,
sub {
my $oops = shift;
return $oops->{quarantine}{bodies};
},
sub {
my ($oops, @bodies) = @_;
my $size = 0;
my $count = 0;
my $qd = $oops->{quarantine};
for my $bdsum (@bodies) {
my $pbody = $qd->{bodies}{$bdsum};
return unless $pbody;
return if $pbody->{size};
$pbody->{size} = length($pbody->{body});
$size += $pbody->{size};
$count += 1;
print "." if ($tcount + $count) % 10 == 0;
}
$qd->{diskused}{$$ % $defaults{size_storage_array_size}} += $size + $count * $defaults{message_size_overhead};
$tsize += $size;
$tcount += $count;
print "C";
},
allatonce => 1,
);
printf "\n%d messages using %.1fMB\n", $tcount, $tsize / 1024000;
}
if ($version < 0.37) {
print "Running database fsck\n";
use OOPS::Fsck;
$OOPS::Fsck::check_batchsize = 2000;
fsck(oops_args());
print "Done with fsck\n";
}
if ($version < 0.37) {
print "Running database GC\n";
use OOPS::GC;
$OOPS::GC::too_many_todo = 50_000;
$OOPS::GC::work_length = 10_000;
$OOPS::GC::clear_batchsize = 4000;
$OOPS::GC::virtual_hash_slice = 3_000;
$OOPS::GC::maximum_spill_size = 10_000;
}
update_version();
});
}
sub update_version
{
my ($oops) = @_;
my $doit = sub {
my $qd = $oops->{quarantine};
require Qpsmtpd::Plugin::Quarantine;
$qd->{version} = $Qpsmtpd::Plugin::Quarantine::VERSION;
};
if ($oops) {
&$doit();
} else {
transaction(sub {
$oops = get_oops();
&$doit();
$oops->commit;
});
}
}
sub find_oldest_bucket
{
my ($oops) = @_;
my $qd = $oops->{quarantine};
my $b0 = $qd->{buckets3};
my ($b0first) = sort { $a <=> $b } keys %{$b0};
my $b1 = $b0->{$b0first};
my ($b1first) = sort { $a <=> $b } keys %{$b1};
my $bucket = $b1->{$b1first};
return ($b0, $b0first, $b1, $b1first, $bucket) if wantarray;
return $b0first * 86400 + $b1first * 3600;
}
sub message_store_size
{
transaction(sub {
my $oops = get_oops();
my $qd = $oops->{quarantine};
my $size = 0;
for my $v (values %{$qd->{diskused}}) {
$size += $v;
}
printf "Disk space used %.1fMB\n", $size / 1024000;
return $size;
});
}
my $mqueue_sent;
my $mqueue_unsent;
sub sendqueued
{
walk_eval($defaults{mqueue_stride_length}, sub {
my $oops = shift;
return $oops->{quarantine}{mqueue};
}, \&mqueue_agent, allatonce => 1);
}
sub mqueue_agent
{
my ($oops, @mqueue) = @_;
for my $mqueue (@mqueue) {
my $mq = $oops->{mqueue}{$mqueue};
next unless time - $mq->{last_attempt} >= $defaults{mqueue_minimum_gap};
$oops->lock($oops->{mqueue}{$mqueue});
}
for my $mqueue (@mqueue) {
my $mq = $oops->{mqueue}{$mqueue};
next unless time - $mq->{last_attempt} >= $defaults{mqueue_minimum_gap};
mqueue_agent2($oops, $mqueue);
}
}
sub mqueue_agent2
{
my ($oops, $mqueue) = @_;
my $mq = $oops->{mqueue}{$mqueue} || return;
if (sendmail(%$mq, debuglogger => sub { 1 }, errorlogger => sub { 1 })) {
delete $oops->{mqueue}{$mqueue};
$mqueue_sent++;
return;
}
$mq->{last_attempt} = time;
$mq->{attempt_count}++;
$mq->{last_error} = $mail_error;
if (time - $mq->{first_attempt} >= $mq->{mqueue_maximum_keep}
and $mq->{attempt_count} >= $defaults{mqueue_minimum_attempts})
{
delete $oops->{mqueue}{$mqueue};
if ($mq->{from} ne "<>" && $mq->{from} ne $defaults{bounce_from} && $mq->{from} =~ /^mailer-daemon\@/i) {
my (undef, $mes) = make_message(%$mq);
sendmail_or_postpone(
from => $defaults{bounce_from},
subject => "Returned mail: $mq->{last_error}",
to => $mq->{from},
body => <<END,
We attempted to send a message on your behalf but we could
not do so. The specific problem we had was:
$mq->{last_error}
The message we were trying to send was:
$mes
END
debuglogger => sub { 1 },
);
}
}
}
sub mqueue_postcommit
{
send_postponed();
}
sub mailq
{
my $oops = get_oops(readonly => 1, less_caching => 1);
my $qd = $oops->{quarantine};
my $count = 0;
my $size = 0;
for my $mqueue (keys %{$qd->{mqueue}}) {
my $mq = $qd->{mqueue}{$mqueue};
my ($from, $message, @to) = make_message(%$mq);
printf "%15s %6d %20s %s\n", $mqueue, length($message), strftime($mailq_timefmt, localtime($mq->{first_attempt})), $from;
print " ($mq->{last_error})\n";
for my $t (@to) {
print "\t\t\t\t\t $t\n";
}
$count++;
$size += length($message);
}
printf "-- %d Kbytes in %d Requests.\n", $size / 1024, $count;
}
sub prune_headers
{
my ($oops, $messages) = @_;
$messages = $defaults{delete_batchsize}
unless $messages;
print "Pruning $messages messages\n" if $debug > 2;
my $qd = $oops->{quarantine};
my ($b0, $b0first, $b1, $b1first, $bucket);
for(;;) {
($b0, $b0first, $b1, $b1first, $bucket) = find_oldest_bucket($oops);
last if $bucket && %$bucket;
if (%$b1) {
print "Deleting b1first $b1first\n" if $debug >2;
delete $b1->{$b1first};
redo;
}
if (%$b0) {
print "Deleting b0first $b0first\n" if $debug >2;
delete $b0->{$b0first};
redo;
}
die "no messages";
}
my $pruned = 0;
my ($hcksum, $pheader);
while (($hcksum, $pheader) = each(%$bucket)) {
return --$pruned if $pruned++ >= $messages;
my $wasdone = $pheader->{done};
my $pbody = $pheader->{body};
my $psender = $pheader->{sender};
my $recipients = $pheader->{recipients};
print STDERR <<END if $debug > 3;
Removing....
From $psender->{address}
From: $pheader->{from}To: $pheader->{to}Subject: $pheader->{subject}Date: $pheader->{date}
END
%$pheader = ();
if (refaddr($pbody->{last_reference}) == refaddr($pheader)) {
delete $pbody->{last_reference};
my $bcksum = $pbody->{cksum};
delete $qd->{bodies}{$bcksum};
$qd->{diskused}{$$ % $defaults{size_storage_array_size}}
-= $pbody->{size};
print STDERR "(body too)\n\n" if $debug > 3
} else {
print STDERR "\n" if $debug > 3;
}
delete $bucket->{$hcksum};
delete $qd->{headers}{$hcksum};
delete $psender->{headers}{$hcksum};
for my $r (@{$pheader->{recipients}}) {
my $rd = $qd->{recipients}{$r};
if ($rd->{headers}{$hcksum}) {
delete $rd->{headers}{$hcksum};
$rd->{mcount}-- unless $wasdone;
}
}
}
print "Only pruned $pruned messages\n" if $debug >2;
delete $b1->{$b1first};
return $pruned;
}
sub prune_recipients
{
walk_eval($defaults{recipent_stride_length}, sub { my $oops = shift; return $oops->{quarantine}{recipients} }, \&recipient_agent);
}
sub recipient_agent
{
my ($oops, $recipient) = @_;
my $qd = $oops->{quarantine};
my $rd = $qd->{recipients}{$recipient};
unless ($rd) {
print STDERR "That's odd, cannot find recipient '$recipient'\n";
delete $qd->{recipients}{$recipient};
return;
}
unless ($rd->{headers}) {
print STDERR "Recipient $recipient invalid, deleting\n";
delete $qd->{recipients}{$recipient};
$recipients_deleted++;
return;
}
my $msgcount = %{$rd->{headers}} ? scalar(%{$rd->{headers}}) : 0;
print "Recipient: $recipient..." if $debug;
printf " (has %d messages)", $msgcount if $debug;
printf " %d days idle...", (time - $rd->{last_timestamp})/86400 if $debug;
print " has settings" if $debug && $rd->has_settings;
if (
(
(time - $rd->{last_timestamp}) / 86400 > $defaults{keep_idle_recipients}
&&
! $msgcount
)
||
(
(time - $rd->{last_timestamp}) / 86400 > $defaults{message_longevity}
&&
! $rd->has_settings()
&&
! $msgcount
)
) {
delete $qd->{recipients}{$recipient};
$recipients_deleted++;
print " DELETE" if $debug;
} else {
$recipients_settings++ if $rd->has_settings();
$recipients_count++;
}
print "\n" if $debug;
}
sub prune_senders
{
walk_eval($defaults{sender_stride_length}, sub { my $oops = shift; return $oops->{quarantine}{senders} }, \&sender_agent);
}
sub sender_agent
{
my ($oops, $sender) = @_;
my $qd = $oops->{quarantine};
my $psender = $qd->{senders}{$sender};
unless ($psender) {
print STDERR "That's odd, cannot find sender '$sender'\n";
delete $qd->{senders}{$sender};
return;
}
print "Sender: $sender" if $debug;
my ($ip, $tstamp);
my $kept;
while (($ip, $tstamp) = each %{$psender->{send_ip_used}}) {
printf " (from %s %d ago)", $ip, (time - $tstamp)/86400 if $debug;
if (time - $tstamp > 86400 * $defaults{renotify_sender_ip} * 2) {
print "[D]" if $debug;
delete $psender->{send_ip_used}{$ip};
} else {
$kept++;
}
}
my $spams_sent;
my $today = time / 86400;
my $count = 0;
for my $spamday (keys %{$psender->{spams_sent_perday}}) {
if ($today - $spamday > $defaults{sender_history_to_keep}) {
delete $psender->{spams_sent_perday}{$spamday};
next;
}
$spams_sent += $psender->{spams_sent_perday}{$spamday};
$count++;
}
delete $psender->{spams_sent_perday} unless $spams_sent;
printf " %d spams in %d days", $spams_sent, $count if $debug;
if ($spams_sent >= $defaults{report_senders_after}) {
print "\n" if $debug;
print "Sender $sender has sent $spams_sent in the last $defaults{sender_history_to_keep} days\n";
my ($hsum, $pheader);
while (($hsum, $pheader) = each %{$psender->{headers}}) {
print "\nFor example:\n";
indent($pheader->{header});
indent($pheader->{body}{body}, limit => 100);
last;
}
}
my $has_settings = $psender->has_settings();
printf " kept:%d ss/day:%d settings:%d headers:%s", $kept, scalar(%$psender->{spams_sent_perday}), !!$has_settings, scalar(%{$psender->{headers}}) if $debug;
if (! $kept && ! scalar(%$psender->{spams_sent_perday}) && ! $has_settings && ! scalar(%{$psender->{headers}})) {
print " DELETE" if $debug;
delete $qd->{senders}{$sender};
$senders_deleted++;
} else {
$senders_count++;
$senders_with_settings++ if $has_settings;
}
print "\n" if $debug;
}
sub indent
{
my ($text, %args) = @_;
my $tab = $args{indent} || "\t";
my $limit = $args{limit} || 0;
while (--$limit != 0 && $text =~ /^(.*)/gm) {
print "$tab$1\n";
}
}
sub walk_eval
{
my ($stride, $get_hash, $agent, %opts) = @_;
my $done = 0;
my $last = undef;
$stride ||= 100;
while (! $done) {
transaction(sub {
my $oops = get_oops();
my $hash = &$get_hash($oops);
my @items = walk_hash(%$hash, $stride, $last);
if ($opts{allatonce}) {
&$agent($oops, @items);
} else {
for my $item (@items) {
&$agent($oops, $item);
}
}
$oops->commit();
$last = $items[$#items];
$done = 1 unless @items == $stride;
});
}
}
1;