package Email::Blaster;
use strict;
use warnings 'all';
use Carp 'confess';
use forks;
use forks::shared;
use POSIX 'ceil';
use HTTP::Date 'time2iso';
use Time::HiRes qw( gettimeofday usleep );
use Digest::MD5 'md5_hex';
use Email::Blaster::ConfigLoader;
use Email::Blaster::Event;
use Email::Blaster::Event::Type;
use Email::Blaster::Transmission;
our $VERSION = '1.0001';
our $InstanceClass = __PACKAGE__;
our $instance;
my @progress : shared = ( );
#==============================================================================
sub new
{
my ($class) = @_;
no strict 'refs';
return ${"$InstanceClass\::instance"} if ${"$InstanceClass\::instance"};
$class = ref($class) ? ref($class) : $class;
my $s = ${"$InstanceClass\::instance"} = bless {
config => Email::Blaster::ConfigLoader->load(),
continue_running => 1,
}, $class;
$s->config->_init;
# Load up our assembler:
$s->_load_class( $s->config->message_assembler );
$s->{message_assembler} = $s->config->message_assembler->new( );
# Load up our sender:
$s->_load_class( $s->config->mail_sender );
$s->{mail_sender} = $s->config->mail_sender->new( );
return $s;
}# end new()
#==============================================================================
sub run
{
my ($s) = @_;
# Wait until we get a new transmission, then process it:
while( $s->continue_running )
{
my $trans;
warn "Waiting for a transmission...\n";
unless( $trans = $s->find_new_transmission() )
{
sleep(5);
next;
}# end unless();
@progress = ( );
warn "Processing $trans...\n";
$trans->is_started( 1 );
$trans->started_on( time2iso() );
$trans->blaster_hostname( $s->config->hostname );
$trans->update();
# Call our initializer(s):
$s->handle_event(
type => 'init_transmission',
transmission_id => $trans->id
);
# Spread the workload across some workers:
my @workers = ( );
my @sendlogs = $trans->sendlogs;
my @bulk_sendlogs = grep { ! $_->throttled_domain } @sendlogs;
push @workers, $s->_init_throttled_workers( \@bulk_sendlogs );
my $boss = threads->create(sub {
while( $s->continue_running && grep { $_->is_running } @workers )
{
my $running = scalar( grep { $_->is_running } @workers);
my ( $ids, $processed );
SCOPE: {
lock(@progress);
$processed = scalar(@progress);
$ids = [ @progress ];
@progress = ( );
};
$s->mark_sendlogs_as_finished( $ids ) if @$ids;
# Also call our wait_cycle event:
$s->wait_cycle( $running, $processed );
warn "Waiting for $running workers - Finished $processed this round...\n";
sleep(1);
}# end while()
});
# Call our initializer(s):
$s->handle_event(
type => 'begin_transmission',
transmission_id => $trans->id
);
# Wait for our workers to finish:
$_->join foreach ( $boss, @workers );
# Don't forget any straggler sendlogs:
warn "Finished @{[ scalar(@progress) ]} in cleanup...\n";
$s->mark_sendlogs_as_finished( [ @progress ] ) if @progress;
# Mark it as completed:
$trans->is_completed( 1 );
$trans->completed_on( time2iso() );
$trans->update();
# Call our initializer(s):
$s->handle_event(
type => 'end_transmission',
transmission_id => $trans->id
);
}# end while()
}# end run()
#==============================================================================
sub wait_cycle
{
my ($s, $running, $processed) = @_;
# Do nothing here:
1;
}# end wait_cycle()
#==============================================================================
sub send_message
{
my ($s, $sendlog, $transmission) = @_;
my $msg = $s->message_assembler->assemble(
$s,
$sendlog,
$transmission
);
$s->mail_sender->send_message(
blaster => $s,
subject => $msg->{subject},
content => $msg->{content},
transmission => $transmission,
sendlog => $sendlog,
);
}# end send_message()
#==============================================================================
sub _init_throttled_workers
{
my ($s, $bulk_sendlogs) = @_;
my @workers = ( );
my $per_worker = ceil( scalar(@$bulk_sendlogs) / $s->config->max_bulk_workers );
while( my @chunk = splice(@$bulk_sendlogs, 0, $per_worker) )
{
push @workers, threads->create(sub {
my $sendlogs = shift;
my $trans;
map {
$trans ||= $_->transmission;
my $queued_as = $s->send_message(
$_,
$trans
);
lock( @progress );
push @progress, $_->id . ':' . $queued_as;
} @$sendlogs;
return;
}, \@chunk);
}# end while()
return @workers;
}# end _init_throttled_workers()
#==============================================================================
sub mark_sendlogs_as_finished
{
my ($s, $items) = @_;
my $sth = Email::Blaster::Model->db_Main->prepare(<<"SQL");
UPDATE sendlogs SET
is_sent = 1,
queued_as = ?,
sent_on = '@{[ time2iso ]}'
WHERE sendlog_id = ?
SQL
foreach my $item ( @$items )
{
my ($id, $queued_as) = split /:/, $item;
$sth->execute( $queued_as, $id );
}# end foreach()
$sth->finish();
}# end mark_sendlogs_as_finished()
#==============================================================================
sub find_new_transmission;
#==============================================================================
sub current { $instance || shift->new }
sub config { shift->{config} }
sub message_assembler { shift->{message_assembler} }
sub mail_sender { shift->{mail_sender} }
#==============================================================================
sub continue_running
{
my ($s) = shift;
@_ ? $s->{continue_running} = shift : $s->{continue_running};
}# end continue_running()
#==============================================================================
sub handle_event
{
my ($s, %args) = @_;
my ($type) = Email::Blaster::Event::Type->search( event_type_name => delete($args{type}) );
my $event = Email::Blaster::Event->create(
%args,
event_type_id => $type->id,
);
my $group = $type->event_type_name;
map {
$s->_load_class( $_ );
$_->new->run( $event );
} @{ $s->config->handlers->$group->handler };
1;
}# end handle_event()
#==============================================================================
sub _load_class
{
my ($s, $class) = @_;
(my $file = "$class.pm") =~ s/::/\//g;
eval { require $file unless $INC{$file}; 1 } or confess "Cannot load $class: $@";
}# end _load_class()
1;# return true:
__END__
=pod
=head1 NAME
Email::Blaster - Scalable Mass Email System
=head1 SYNOPSIS
Generally, don't use this module from your code. Use the supplied scripts instead.
=head1 DESCRIPTION
Email::Blaster is the latest in a B<long, long> line of mass-emailer systems I
have written since 2002.
This version has many features.
=over 4
=item * Clustering Support
Uses memcached and libevent to do the heavy lifting.
=item * Testing mode.
Send a few messages to yourself before you turn on the firehose.
=item * Domain-based throttling with hourly limits.
Never get blacklisted again because of email flooding too quickly from your network.
=item * Configurable (and subclassable) behaviors and components.
If configuration alone doesn't get you what you want, you can always subclass
something (i.e. MailSender or MaillogWatcher) to get the desired behavior.
=item * Scales Out Well (Clustering).
Designed to spread the work out across many, many, many servers. If your email
list has 1Million subscribers, you could *reliably* send your messages to them
in a matter of minutes.
Add more servers, get more capacity and throughput.
=item * Event handlers (in serial).
Handle server-level events with a simple plugin. Events like server startup and
shutdown, the start or end of a transmission, etc.
More details to follow.
=back
=head1 HANDLING EVENTS
Email::Blaster offers the following events, which can be handled by one or more
subclasses of the appropriate class:
=head2 server_startup
Subclass L<Email::Blaster::ServerStartupHandler> and add the following to your config:
<handlers>
...
<server_startup>
...
<handler>My::StartupHandler</handler>
</server_startup>
=head2 server_shutdown
Subclass L<Email::Blaster::ServerShutdownHandler> and add the following to your config:
<handlers>
...
<server_shutdown>
...
<handler>My::ShutdownHandler</handler>
</server_shutdown>
=head2 init_transmission
Subclass L<Email::Blaster::TransmissionInitHandler> and add the following to your config:
<handlers>
...
<init_transmission>
...
<handler>My::TransmissionInitHandler</handler>
</init_transmission>
=head2 begin_transmission
Subclass L<Email::Blaster::TransmissionBeginHandler> and add the following to your config:
<handlers>
...
<begin_transmission>
...
<handler>My::TransmissionBeginHandler</handler>
</begin_transmission>
=head2 end_transmission
Subclass L<Email::Blaster::TransmissionEndHandler> and add the following to your config:
<handlers>
...
<end_transmission>
...
<handler>My::TransmissionEndHandler</handler>
</end_transmission>
=head2 message_bounced
Subclass L<Email::Blaster::MessageBouncedHandler> and add the following to your config:
<handlers>
...
<message_bounced>
...
<handler>My::MessageBouncedHandler</handler>
</message_bounced>
=head1 SUPPORT
Visit L<http://www.devstack.com/contact/> or email the author at <jdrago_999@yahoo.com>
Commercial support and installation is available.
=head1 AUTHOR
John Drago <jdrago_999@yahoo.com>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2008 by John Drago
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.10.0 or,
at your option, any later version of Perl 5 you may have available.
=cut