package Mail::Decency::Policy::Throttle;
use Moose;
extends 'Mail::Decency::Policy::Core';
use version 0.74; our $VERSION = qv( "v0.1.4" );
use mro 'c3';
use Scalar::Util qw/ weaken /;
use POSIX qw/ ceil /;
use Data::Dumper;
=head1 NAME
Mail::Decency::Policy::Throttle
=head1 DESCRIPTION
Throtle mail sending for dedicated sources (sender ip, sender username, sender address, "account")
=over
=item * client address
The IP of the client connecting to the server.
=item * sender username, sender domain, sender address
The username is the (if any provided) sasl username
=item * recipient domain, recipient address
Caution with this. You don't want your incoming mails to be rejected!
=item * account
An account is a kind of context. You can associate each row in the database with an column called "account". The counter for throtteling will then be applied to this value instead of sender or such.
Example usecase: There are domain1.tld and domain2.tld. The task is to limit the max amount for sending to 500 Mails per day for both domains together.
=back
=head1 CONFIG
Those are the config params in YAML
---
disable: 0
# Wheter affect ONLY those having a sasl username set.
# Use this if you have one mailserver for incoming AND outgoing
# mails. This does not check the validity of the sasl user!
require_sasl_username: 1
# Wheter use the counters generated by the Mail::Decency::LogParser::Stats
# module which has to be up and running.
# Your limits might be a little streched, depending on the speed of
# the log parser (+1 or +2 or something).
# However, this is the only way to assure only not-rejected mails
# get accounted for.
# If disabled, ANY mail seen will increment the counter (if a later
# configuration setting does reject it, it still counts!)
use_stats_cache: 0
# The default limits, if no exception is in the exception database.
# You can use:
# * client_address (ip of sending client)
# * sender_domain (domain part of sender)
# * sender_address (email address of sender)
# * sasl_username (the sasl username, if any)
# * sender_domain (domain part of sender)
# * recipient_domain (domain part of the recipient)
# * recipient_address (email address of recipient)
# * account (the account.. see above)
default_limits:
# the following can be read as:
# * Account per sender domain
# * It is not allowed to send more then:
# * 1 Mail per 10 Seconds
# * 50 Mails per 10 Minutes
# * 1000 Mails per day
sender_domain:
-
maximum: 1
interval: 10
-
maximum: 50
interval: 600
-
maximum: 1000
interval: 86400
account:
-
maximum: 50
interval: 600
# which exception database to use (see above)
# use only those you really have to. Don't activate all
# without actually having data!
exception_databases:
- sender_domain
- sender_address
# The reject messages per interval (above)
# Don't forget the rejection code (better use 4xx for
# temporary, instead of 5xx for hard)
reject_messages:
10:
message: 'Sorry, nicht mehr als eine Mail in 10 Sekunden'
code: 450
600:
message: 'Sorry, nicht mehr als 50 Mails in 10 Minuten'
code: 450
86400:
message: 'Sorry, nicht mehr als 1000 Mails pro Tag'
code: 450
# The default error message which will be used if none is set
# for the interval.. comes in handy if you use exception
# database with custom intervals
# Variables you can use are:
# * %maximum% (limit of mails in interval)
# * %interval% (interval in seconds)
# * %interval_minutes% (interval in minutes, round up)
# * %interval_hours% (interval in hours, round up)
# * %interval_days% (interval in days, round up)
default_reject_message:
message: 'Sorry, nicht mehr als %maximum% Mails in %interval_minutes% Minuten'
code: 450
=head1 DATABASE
CREATE TABLE throttle_client_address (
id INTEGER PRIMARY KEY,
client_address VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_client_address_uk ON throttle_client_address( client_address, interval );
CREATE TABLE throttle_sender_domain (
id INTEGER PRIMARY KEY,
sender_domain VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_sender_domain_uk ON throttle_sender_domain( sender_domain, interval );
CREATE TABLE throttle_sender_address(
id INTEGER PRIMARY KEY,
sender_address VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_sender_address_uk ON throttle_sender_address( sender_address, interval );
CREATE TABLE throttle_sasl_username(
id INTEGER PRIMARY KEY,
sasl_username VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_sasl_username_uk ON throttle_sasl_username( sasl_username, interval );
CREATE TABLE throttle_recipient_domain(
id INTEGER PRIMARY KEY,
recipient_domain VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_recipient_domain_uk ON throttle_recipient_domain( recipient_domain, interval );
CREATE TABLE throttle_recipient_address(
id INTEGER PRIMARY KEY,
recipient_address VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER,
account VARCHAR( 25 )
);
CREATE UNIQUE INDEX throttle_recipient_address_uk ON throttle_recipient_address( recipient_address, interval );
CREATE TABLE throttle_account(
id INTEGER PRIMARY KEY,
account VARCHAR( 255 ),
interval INTEGER,
maximum INTEGER
);
CREATE UNIQUE INDEX throttle_account_uk ON throttle_account( account, interval );
=cut
our @REGULAR_DATABASES = qw/ client_address sender_domain sasl_username sender_address recipient_domain recipient_address /;
our @ALL_DATABASES = ( @REGULAR_DATABASES, 'account' );
our %ALLOWED_DATABASES = map { ( $_ => 1 ) } @ALL_DATABASES;
=head1 CLASS ATTRIBUTES
=head2 default_reject_message : HashRef
Defauilt reject message and default reject code
{
message => 'Sorry, Limit reached (%maximum% mails in %interval% seconds)',
code => 450
}
=cut
has default_reject_message => ( is => 'rw', isa => 'HashRef', default => sub { {
message => 'Sorry, Limit reached (%maximum% mails in %interval% seconds)',
code => 450
} } );
=head2 reject_messages : HashRef[HashRef]
Reject messages per interval
{
10 => {
message => 'Sorry, Limit reached .. not more then 99 in 10 seconds',
code => 450
},
86400 => {
message => 'Sorry, Limit reached .. not more then 101 in 24 hours',
code => 450
}
}
=cut
has reject_messages => ( is => 'rw', isa => 'HashRef[HashRef]', default => sub { {} } );
=head2 default_limits : HashRef
Limits per attribute ( sender_domain, client_address, recipient_domain, account, .. )
{
sender_domain => [
{
maximum => 99,
interval => 10,
}
],
}
=cut
has default_limits => ( is => 'rw', isa => 'HashRef' );
=head2 default_limit_databases : ArrayRef[HashRef]
Which limit databases .. internal usage
=cut
has default_limit_databases => ( is => 'rw', isa => 'ArrayRef[HashRef]', default => sub { [] } );
=head2 exception_database : HashRef
Exception from default values (eg per account, per sender_domain, whatever)
=cut
has exception_database => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
=head2 used_databases : ArrayRef[Str]
Internal usage..
=cut
has used_databases => ( is => 'rw', isa => 'ArrayRef[Str]', default => sub { [] } );
=head2 use_accounts : Bool
Wheter accounts (see description) should be used or not (cost performance)
=cut
has use_accounts => ( is => 'rw', isa => 'Bool', default => 0 );
=head2 counter_increment : CodeRef
Internal usage..
=cut
has counter_increment => ( is => 'rw', isa => 'CodeRef' );
=head2 counter_read : CodeRef
Internal usage..
=cut
has counter_read => ( is => 'rw', isa => 'CodeRef' );
=head2 require_sasl_username : CodeRef
Enable throttle module only if sasl_username is given (this happens if the SMTP connection contains AUTH information.. )
=cut
has require_sasl_username => ( is => 'rw', isa => 'Bool', default => 0 );
=head2 schema_definition
Database ..
=cut
has schema_definition => ( is => 'ro', isa => 'HashRef[HashRef]', default => sub {
{
throttle => {
client_address => {
client_address => [ varchar => 39 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'client_address', 'interval' ]
},
sender_domain => {
sender_domain => [ varchar => 255 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'sender_domain', 'interval' ]
},
sender_address => {
sender_address => [ varchar => 255 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'sender_address', 'interval' ]
},
sasl_username => {
sasl_username => [ varchar => 255 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'sasl_username', 'interval' ]
},
recipient_domain => {
recipient_domain => [ varchar => 255 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'recipient_domain', 'interval' ]
},
recipient_address => {
recipient_address => [ varchar => 255 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'recipient_address', 'interval' ]
},
account => {
account => [ varchar => 100 ],
interval => 'integer',
maximum => 'integer',
account => [ varchar => 100 ],
-unique => [ 'account', 'interval' ]
},
}
};
} );
=head1 METHODS
=head2 init
=cut
sub init {
my ( $self ) = @_;
$self->next::method();
my %used_database = ();
# having exception databases ? which to use ?
my $min_ok = 0;
if ( $self->config->{ exception_databases } ) {
die "exception_databases has to be an array\n"
unless ref( $self->config->{ exception_databases } ) eq 'ARRAY';
# having non empty amount of databases
if ( scalar @{ $self->config->{ exception_databases } } > 0 ) {
# check database
foreach my $db( @{ $self->config->{ exception_databases } } ) {
# oops, unknwon database name!
die "Unknown database '$db' in 'exception_databases' (allowed databases: ". join( ", ", @ALL_DATABASES ). ")\n"
unless $ALLOWED_DATABASES{ $db };
# found ok, rememeber
$min_ok++;
$used_database{ $db } ++;
$self->exception_database->{ $db } ++;
}
}
}
# oops, no having any
if ( $self->config->{ default_limits } ) {
die "default_limits has to be a hash\n"
unless ref( $self->config->{ default_limits } ) eq 'HASH';
# parse limit config..
while( my ( $db, $ref ) = each %{ $self->config->{ default_limits } } ) {
die "Unknown database '$db' in 'default_limits' (allowed databases: ". join( ", ", @ALL_DATABASES ). ")\n"
unless $ALLOWED_DATABASES{ $db };
# remember as used
$used_database{ $db } ++;
$min_ok ++;
}
$self->default_limits( $self->config->{ default_limits } );
# add ORDERED list of default limit databaseds
foreach my $db( @ALL_DATABASES ) {
push @{ $self->default_limit_databases }, $db
if $self->default_limits->{ $db };
}
}
# min reqs not satisfied
die "Min requirements for Throttle: 'default_limits' and/or 'exception_databases'\n"
unless $min_ok;
# build up ORDERED list of actual used databases
foreach my $db( @ALL_DATABASES ) {
push @{ $self->used_databases }, $db
if $used_database{ $db };
}
# enabled account style ?
$self->use_accounts( 1 ) if $used_database{ account };
# read oyt reject messages (per interval and default)
$self->reject_messages( $self->config->{ reject_messages } || {} );
$self->default_reject_message( $self->config->{ default_reject_message } )
if $self->config->{ default_reject_message };
# init count method - either via LogParser::Stats or stand alone
weaken( my $self_weak = $self );
if ( $self->config->{ use_stats_cache } ) {
my %map_to_logparser_cache = qw(
client_address ip
sender_address from_address
recipient_address to_address
sender_domain from_domain
recipient_domain to_domain
);
$self->counter_read( sub {
my ( $name, $value, $interval ) = @_; # client_address, 127.0.0.1, 3600
my $time = time();
$time -= $time % $interval;
my $attr = $map_to_logparser_cache{ $name };
return $self_weak->cache->get( 'lp-stats-sent-'. $attr. '-'. $value. '-'. $time ) || 0;
} );
$self->counter_increment( sub {} );
}
else {
$self->counter_read( sub {
my ( $name, $value, $interval ) = @_;
my $time = time();
$time -= $time % $interval;
return $self_weak->cache->get( 'throttle-'. $name. '-'. $value. '-'. $time ) || 0;
} );
$self->counter_increment( sub {
my ( $name, $value, $interval ) = @_;
my $time = time();
$time -= $time % $interval;
my $amount = $self->counter_read->( $name, $value, $interval );
return $self_weak->cache->set( 'throttle-'. $name. '-'. $value. '-'. $time, $amount+1 );
} );
}
# wheter require require_sasl_username
$self->require_sasl_username( $self->config->{ require_sasl_username } ? 1 : 0 );
return ;
}
=head2 handle
=cut
sub handle {
my ( $self, $server, $attrs_ref ) = @_;
# check for sasl username
return if $self->require_sasl_username && ! $attrs_ref->{ sasl_username };
#
# CACHES
#
# check all caches
my %cache_name = ();
foreach my $db( @{ $self->used_databases } ) {
next if $db eq 'account';
$cache_name{ $db } = "Throttle-$db-$attrs_ref->{ $db }";
my $cached = $self->cache->get( $cache_name{ $db } );
if ( $cached ) {
$self->logger->debug0( "Cache hit for '$db' = '$attrs_ref->{ $db }'" );
$self->go_final_state( $cached );
}
}
# check accont cache
my $account;
if ( $self->use_accounts ) {
# and again: for any database (should be faster that way then one loop, cause we
# reduce database reads this way!
foreach my $db( @{ $self->used_databases } ) {
next if $db eq 'account';
# try find account
$account = $self->get_account( $db => $attrs_ref->{ $db } );
next unless $account;
# found account
$cache_name{ account } = "Throttle-account-$account";
# try cache
if ( defined( my $cached = $self->cache->get( $cache_name{ account } ) ) ) {
$self->logger->debug3( "Cache hit for account '$account' of '$db' = '$attrs_ref->{ $db }'" );
$self->go_final_state( $cached );
}
# anyway, we know who the account is for now
last;
}
}
#
# DATABASE
#
$attrs_ref->{ account } = $account || "**UNDEFINED ACCOUNT**";
# try database now
CHECK_DATABASE:
foreach my $db( @{ $self->used_databases } ) {
next if $db eq 'account' && ! $account;
my @limits;
my $exception = 0;
# we could find in exception database..
if ( $self->exception_database->{ $db } ) {
@limits = $self->database->search( throttle => $db, {
$db => $attrs_ref->{ $db }
} );
$exception ++ if @limits;
}
# no exceptions? -> try fallback to defaults
if ( ! @limits && $self->default_limits->{ $db } ) {
@limits = @{ $self->default_limits->{ $db } };
}
# no limits at all -> bye
next CHECK_DATABASE unless @limits;
$self->logger->debug3( "Using ". ( $exception ? "Exception" : "Default" ). " limits for $attrs_ref->{ $db } ($db)" )
if @limits;
# go through all limits -> first rejection counts
foreach my $limit_ref( @limits ) {
# now ..
my $now = time();
# get counter, which is the max counter for all db items
my $count = $self->counter_read->( $db, $attrs_ref->{ $db }, $limit_ref->{ interval } );
$self->logger->debug3( " Limit $count / $limit_ref->{ maximum } in $limit_ref->{ interval }" );
# limit reached
if ( $limit_ref->{ maximum } >= 0 && $count >= $limit_ref->{ maximum } ) {
# get reject message (either special for this limit or default)
my $reject_message_ref =
$self->reject_messages->{ $limit_ref->{ interval } }
|| $self->default_reject_message;
my $reject_message = $reject_message_ref->{ message };
# parse output message
$reject_message =~ s/%interval%/$limit_ref->{ interval }/g;
my $interval_minutes = ceil( $limit_ref->{ interval } / 60 );
$reject_message =~ s/%interval_minutes%/$interval_minutes/g;
my $interval_hours = ceil( $limit_ref->{ interval } / 3600 );
$reject_message =~ s/%interval_hours%/$interval_hours/g;
my $interval_days = ceil( $limit_ref->{ interval } / 86400 );
$reject_message =~ s/%interval_days%/$interval_days/g;
$reject_message =~ s/%maximum%/$limit_ref->{ maximum }/g;
# write state to cache
my $timeout = $limit_ref->{ interval } - ( $now % $limit_ref->{ interval } );
$self->cache->set( $cache_name{ $db }, $reject_message, $timeout );
# log event
$self->logger->debug0( " !! Limit reached for '$db' = '$attrs_ref->{ $db }' ($limit_ref->{ interval })!!" );
# get reject code
my $reject_code = $reject_message_ref->{ code } || 450;
# set final state (throws exception)
$self->go_final_state( $reject_code => $reject_message );
}
else {
$self->logger->debug3( " No limit issue" );
}
# increase counter now
$self->counter_increment->( $db, $attrs_ref->{ $db }, $limit_ref->{ interval } );
}
}
}
=head2 get_account $db, $value
returns account by attribute lookup
my $account = $self->get_account( sender_domain => 'sender.tld' );
=cut
sub get_account {
my ( $self, $db, $value ) = @_;
# from cache ??
my $cache_name = "Throttle-account-$db-$value";
if ( defined( my $cached = $self->cache->get( $cache_name ) ) ) {
return $cached;
}
# from database ..
my $ref = $self->database->get( throttle => $db => { $db => $value } );
if ( $ref && $ref->{ account } ) {
# save to cache
$self->cache->set( $cache_name => $ref->{ account } );
return $ref->{ account };
}
# nothing found
return;
}
=head1 AUTHOR
Ulrich Kautz <uk@fortrabbit.de>
=head1 COPYRIGHT
Copyright (c) 2010 the L</AUTHOR> as listed above
=head1 LICENCSE
This library is free software and may be distributed under the same terms as perl itself.
=cut
1;