The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package MySQL::QueryMulti;

use 5.006;
use Carp;
use DBI;
use Moose;
use namespace::autoclean;
use SQL::Statement;
use Data::Dumper;

=head1 NAME

MySQL::QueryMulti - module for querying multiple MySQL databases in parallel

=head1 VERSION

Version 0.05


our $VERSION = '0.05';


 my $qm = MySQL::QueryMulti->new;
        [ get_dsn('pet1'), $ENV{DBI_USER}, $pass ],
        [ get_dsn('pet2'), $ENV{DBI_USER}, $pass ],
        ... repeat as necessary ...,
        { AutoInactiveDestroy => 0 }

 $qm->prepare( "select * from pet order by owner" );  
 my $sth = $qm->execute; 

 while ( my @row = $sth->fetchrow_array ) {
     print "@row\n";



MySQL::QueryMulti is a module that allows the user to query multiple MySQL 
databases in parallel and get an aggregated/concatentated result set back.  

 * must have "create temporary table" privileges across all databases
 * schemas must be identical 

MySQL::QueryMulti is built using DBI and hence has nearly identical method 
calls (connect, prepare, and execute).  See method descriptions below.

The primary use case for this is when you have a sharded database environment.

See link for more info on sharding:





my $TEMP_TABLE_NAME = 'mytemp';

has '_dbh_list' => (
    is       => 'rw',
    isa      => 'ArrayRef',
    required => 0,
    init_arg => undef,

has '_sth_list' => (
    is       => 'rw',
    isa      => 'ArrayRef',
    required => 0,
    default => sub { [ ] }

has '_temp_dbh' => (
    is       => 'rw',
    isa      => 'Object',
    required => 0,
    init_arg => undef,

has '_temp_prepare_stmt' => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
    init_arg => undef,

has '_prepare_args' => (
    is       => 'rw',
    isa      => 'ArrayRef',
    required => 0,
    init_arg => undef,

has '_execute_args' => (
    is       => 'rw',
    isa      => 'ArrayRef',
    required => 0,
    init_arg => undef,

has '_sql_stmt' => (
    is       => 'rw',
    isa      => 'Object',
    required => 0,
    init_arg => undef,

has 'raise_error' => (
    is       => 'rw',
    isa      => 'Int',
    required => 0,
    default  => 1,

has 'err' => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
    init_arg => undef,


has 'errstr' => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
    init_arg => undef,


has 'state' => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
    init_arg => undef,


has '_sql_parser' => (
    is       => 'rw',
    isa      => 'Object',
    required => 0,
    init_arg => undef

sub BUILD {
    my $self = shift;

    my $parser = SQL::Parser->new();
    $parser->{RaiseError} = 1;
    $parser->{PrintError} = 0;


=head2 new( %hash );

Object constructor.  Accepts an optional hash of arguments.  


=head3 raise_error( 0|1 )

Allows you to change the behavior of error handling.  The default is to throw 
an exception.  Pass true or false to modify behavior as necessary.

=head2 connect ( [ $dsn, $user, $pass ], [ $dsn2, $user, $pass ], ..., 
{ DBI attributes } )

Method for establishing a connection to a set of databases.  The arguments are
similar to DBI::connect except you pass an array of array references that each 
contain their respective DBI::connect arguments (dsn, user, password).  
Attributes are only specified once (as the last arg) and applied to each 
connection automatically. 

Passing the attributes "RaiseError" and "PrintError" will have no effect.  The
"raise_error" attribute of MySQL::QueryMulti controls that behavior.
Returns true on success or false on error.



        [ get_dsn('pet1'), $ENV{DBI_USER}, $pass ],
        [ get_dsn('pet2'), $ENV{DBI_USER}, $pass ],
        ... repeat as necessary ...,
        { AutoInactiveDestroy => 0 }


sub connect {
    my $self = shift;
    my @args = @_;      # each element is an array ref that contains arguments
                        # to pass to DBI::connect

    if ( @args < 1 ) {
        $self->_err_handler( 1,
            "must provide connection info for at least one database", '' );
        return 0;

    my $last_arg = scalar(@args) - 1;
    my $attr;

    if ( ref( $args[$last_arg] ) eq 'HASH' ) {
        $attr = pop(@args);
    else {
        $attr = {};

    $attr->{RaiseError} = 0;
    $attr->{PrintError} = 0;

    my @dbhs;

    foreach my $aref (@args) {
        if ( @$aref > 3 ) {

            # don't allow attributes to be specified differently per connection
            my $args;
            if ( @$aref > 0 ) {
                no warnings;
                $args = join( ', ', @$aref );
            else {
                $args = '';

                "too many arguments detected for connection\n"
                    . "\t[ $args ]\n",
            return 0;

        my $dbh = eval { DBI->connect( @$aref, $attr ) };
        if ($@) {
            $self->_err_handler( 1, $@, '' );
            return 0;
        elsif ( !defined($dbh) ) {
            $self->_err_handler( $DBI::err, $DBI::errstr, $DBI::state );
            return 0;

        # TODO: verify 'create temporary table' priv is enabled

        push( @dbhs, $dbh );

    # randomly pick one to be the designated temp table owner
    my $i = int( rand(@dbhs) );

    my $temp_dbh = eval { DBI->connect( @{ $args[$i] }, $attr ) };
    if ($@) {
        $self->_err_handler( 1, $@, '' );
        return 0;
    elsif ($DBI::err) {
        $self->_err_handler( $DBI::err, $DBI::errstr, $DBI::state );
        return 0;

    $self->_dbh_list( \@dbhs );

    return 1;

sub _err_handler {
    my $self = shift;

    if ( $self->raise_error ) {
        confess "ERROR: " . $self->errstr;

=head2 prepare

Identical to DBI::prepare except it does the prepare for all databases in the 

Returns true on success or false on error.


sub prepare {
    my $self = shift;
    my @args = @_;      # ($statement, \%attr)

    if ( @args < 1 ) {
        $self->_err_handler( 1, "must provide prepare args", '' );
        return 0;

    $self->_prepare_args( [@args] );    # store prepare args for later use

    my ( $sql, $attr ) = @args;
    $attr->{async} = 1;

    my $parser = $self->_sql_parser;

    my $stmt = SQL::Statement->new( $sql, $parser );
    if ( $stmt->command eq 'CALL' or $stmt->command eq 'LOAD' ) {
        $self->_err_handler( 1, $stmt->command . " is not implemented", '' );
        return 0;

    foreach my $col_def ( @{ $stmt->column_defs } ) {
        next unless exists( $col_def->{name} );
        my $name = $col_def->{name};

        if ( $name eq 'COUNT' or $name eq 'AVG' ) {
            $self->_err_handler( 1,
                "$name aggregate function is not implemented", '' );
            return 0;

    # cleanup for any errors that may have occurred on the last execute
    foreach my $sth ( @{ $self->_sth_list } ) {
        if (!defined($sth->mysql_async_ready))  {
            # no outstanding query
        elsif ($sth->mysql_async_ready) {
            # async query done, harvest and discard result
        else {
            # async query still running
            while(!$sth->mysql_async_ready) {
                # wait for it
                sleep 1;
            # async query done, harvest and discard result
    my @sths;
    foreach my $dbh ( @{ $self->_dbh_list } ) {
        my $sth = $dbh->prepare( $sql, $attr );
        if ( !$sth ) {
            $self->_err_handler( $DBI::err, $DBI::errstr, $DBI::state );
            return 0;

        push( @sths, $sth );

    $self->_sth_list( \@sths );

    return 1;

=head2 execute

Identical to DBI::execute except it returns either a statement handle or the 
number of rows affected depending on the type of query.  A statement handle is 
returned for select queries.  The number of affected rows for all others.

Returns a statement handle or the number of affected rows on success.  Returns
undef on error.


sub execute {
    my $self = shift;
    my @args = @_;

    $self->_execute_args( [@args] );

    my @pending;
    my @results;

    foreach my $sth ( @{ $self->_sth_list } ) {
        if ( $sth->err ) {
            $self->_err_handler( $sth->err, $sth->errstr, $sth->state );
            return undef;

        push( @pending, $sth );

    my $select_query  = 0;
    my $rows_affected = 0;

    while (@pending) {
        my @temp;
        foreach my $sth (@pending) {
            if ( $sth->mysql_async_ready ) {
                my $ret = $sth->mysql_async_result;
                if ( $sth->err ) {
                    $self->_err_handler( $sth->err, $sth->errstr, $sth->state );
                    return undef;

                if ( $sth->{NUM_OF_FIELDS} ) {

                    # we have a select query
                    if ( !$select_query ) {
                        $select_query = 1;

                    while ( my $aref = $sth->fetchrow_arrayref ) {
                else {

                    # we have an insert, update, or delete query
                    $rows_affected += $ret;
            else {
                push( @temp, $sth );

        @pending = @temp;
        sleep 1;

    if ($select_query) {

        my $parser = SQL::Parser->new();
        $parser->{RaiseError} = 1;
        $parser->{PrintError} = 0;

        my $select = $self->_get_select_clause( $self->_sql_stmt );
        my $sql    = "$select from $TEMP_TABLE_NAME\n";

        # skip the where clause because it is redundant
        $sql .= $self->_get_group_by( $self->_sql_stmt );
        $sql .= $self->_get_order_by( $self->_sql_stmt );
        $sql .= $self->_get_limit( $self->_sql_stmt );

        my $dbh = $self->_get_temp_dbh;
        my $sth = $dbh->prepare($sql);
        if ( $dbh->err ) {
            $self->_err_handler( $dbh->err, $dbh->errstr, $dbh->state );
            return undef;

        if ( $dbh->err ) {
            $self->_err_handler( $dbh->err, $dbh->errstr, $dbh->state );
            return undef;

        return $sth;

    return $rows_affected;

sub _get_select_clause {
    my $self = shift;
    my $stmt = shift;

    my @cols;

    foreach my $col_def ( @{ $stmt->column_defs } ) {
        my $col;
        if ( $col_def->{type} ne 'column' ) {
            $col = $col_def->{fullorg};
            $col =~ s/\s//g;
        else {
                = defined( $col_def->{fullorg} )
                ? $col_def->{fullorg}
                : $col_def->{value};

        if ( defined( $col_def->{alias} ) ) {
            $col .= " as $col_def->{alias}";

        push( @cols, $col );

    my $distinct = '';
    if ( defined( $stmt->{set_quantifier} )
        and $stmt->{set_quantifier} eq 'DISTINCT' )
        $distinct = 'distinct';

    return "select $distinct " . join( ', ', @cols );

sub _get_limit {
    my $self = shift;
    my $stmt = shift;

    my $limit = $stmt->limit;
    if ( defined($limit) ) {
        return "limit $limit\n";

    return '';

sub _get_order_by {
    my $self = shift;
    my $stmt = shift;

    my @order = $stmt->order();
    my @cols;

    foreach my $o (@order) {
        my $col  = ( keys(%$o) )[0];
        my $sort = $o->{$col};

        push( @cols, "$col $sort" );

    if (@cols) {
        return 'order by ' . join( ', ', @cols ) . "\n";

    return '';

sub _get_group_by {
    my $self = shift;
    my $stmt = shift;

    if ( defined( $stmt->{group_by} ) ) {
        my @cols = @{ $stmt->{group_by} };

        return 'group by ' . join( ', ', @cols ) . "\n";

    return '';

sub _get_temp_dbh {
    my $self = shift;

    return $self->_temp_dbh;

sub _create_temp_table {
    my $self = shift;


    my $dbh = $self->_get_temp_dbh;

    my ( $sql, $attr ) = @{ $self->_prepare_args };

    if ( $sql =~ /limit \d+/i ) {
        $sql =~ s/limit \d+/limit 0/;
    else {
        $sql .= ' limit 0';

    my $create_sql = "create temporary table $TEMP_TABLE_NAME as $sql";
    my $sth        = $dbh->prepare($create_sql);
    if ( $sth->err ) {
        $self->_err_handler( $sth->err, $sth->errstr, $sth->state );
        return 0;

    $sth->execute( @{ $self->_execute_args } );
    if ( $sth->err ) {
        $self->_err_handler( $sth->err, $sth->errstr, $sth->state );
        return 0;

    $sql = qq{
        select * from $TEMP_TABLE_NAME
    $sth = $dbh->prepare($sql);

    my @placeholders;
    for ( my $i = 0; $i < $sth->{NUM_OF_FIELDS}; $i++ ) {
        push( @placeholders, '?' );

    my $placeholders = join( ', ', @placeholders );

    my $tmp_prepare_stmt = qq{
        insert into $TEMP_TABLE_NAME values ($placeholders)


sub _drop_temp_table {
    my $self = shift;

    my $dbh = $self->_get_temp_dbh;

    my $sql = qq{
        drop temporary table if exists $TEMP_TABLE_NAME
    if ( $dbh->err ) {
        $self->_err_handler( $dbh->err, $dbh->errstr, $dbh->state );
        return 0;

sub _add_row_to_temp_table {
    my $self = shift;
    my $aref = shift;

    my $dbh = $self->_get_temp_dbh;

    my $sth = $dbh->prepare( $self->_temp_prepare_stmt );


 * This does not provide true parallelism in that it leverages the 
   "async" feature of DBD::MySQL.  You could accomplish true parallelism with 
   threads or the heavier fork/exec, but that adds extra complexity (especially
   if you have to recompile the mysql client libs with threading enabled).  
   This keeps things simple and still provides reasonable performance.
 * Does not work with count or sum aggregate functions.
 * Stored procedures have not been tested so use them at your own risk.
=head1 AUTHOR

John Gravatt, C<< <gravattj at> >>

=head1 BUGS

Please report any bugs or feature requests to C<bug-mysql-querymulti at>, or through
the web interface at L<>.  I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc MySQL::QueryMulti

You can also look for information at:

=over 4

=item * RT: CPAN's request tracker (report bugs here)


=item * AnnoCPAN: Annotated CPAN documentation


=item * CPAN Ratings


=item * Search CPAN





Copyright 2012 John Gravatt.

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See for more information.


__PACKAGE__->meta->make_immutable;    # moose stuff
