The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl
package AnyEvent::MultiDownload;
use strict;
use AnyEvent::HTTP qw/http_get/;
use AnyEvent::Util;
use AnyEvent::Socket;
use AnyEvent::IO;
use AE;
use Moo;
use File::Temp;
use File::Copy;
use File::Basename;
use List::Util qw/shuffle/;
use AnyEvent::Digest;
use utf8;

our $VERSION = '1.08';


has content_file => (
    is => 'ro',
    isa => sub {
        die "文件存在" if -e $_[0];
    },
    required => 1,
);

has url => (
    is  => 'ro',
    required => 1,
);

has mirror => (
    is => 'rw',
    predicate => 1,
    isa => sub {
        return 1 if ref $_[0] eq 'ARRAY';
    },
);

has digest => (
    is => 'rw',
    isa => sub {
        return 1 if $_[0] =~ /Digest::(SHA|MD5)/;
    }
);


has on_finish => (
    is => 'rw',
    required => 1,
    isa => sub {
        return 2 if ref $_[0] eq 'CODE';
    }
);

has on_error => (
    is => 'rw',
    isa => sub {
        return 2 if ref $_[0] eq 'CODE';
    },
    default => sub {  sub { 1 } },
);

has on_seg_finish => (
    is => 'rw',
    isa => sub {
        return 2 if ref $_[0] eq 'CODE';
    },
    default => sub {  
        return sub { $_[-1]->(1) } 
    },
);

has fh       => (
    is      => 'rw',
    lazy    => 1,
    default => sub {
        my $self = shift;
        my $out = File::Temp->new(UNLINK => 1); 
        $out->autoflush(1);
        return $out;
    },
);

has retry_interval => is => 'rw', default => sub { 10 };
has max_retries    => is => 'rw', default => sub { 5 };
has seg_size       => is => 'rw', default => sub { 1 * 1024 * 1024 };
has timeout        => is => 'rw', default => sub { 60 };
has recurse        => is => 'rw', default => sub { 6 }; 
has headers        => is => 'rw', default => sub {{}};
has tasks          => is => 'rw', default => sub { [] };
has error          => is => 'rw', default => sub {};
has task_lists     => is => 'rw', default => sub {[]};
has max_per_host   => is => 'rw', default => sub { 8 };
has url_status  => (
    is      => 'rw', 
    lazy    => 1,
    default => sub { 
        my $self = shift;
        my %hash;
        if ($self->has_mirror) {
            %hash = map {
                        $_ => 0
                    } @{ $self->mirror }, $self->url;
        }
        else {
            $hash{$self->url} = 0;
        }
        return \%hash;
    }
);


sub multi_get_file  {
    my $self = shift;
    my $cb   = shift;
    
    # 用于做事件同步
    my $cv = AE::cv { 
        my $cv = shift;

	    if ($cv->recv) {
            # 有一个不能处理的出错的时候, 发出去的连接都要断开掉, 所以使用这个
            $self->task_lists(undef);
	        $self->clean;
	    	$self->on_error->($self->error);
	    }
	    else {
        	$self->move_to;
        	$self->on_finish->($self->size);
	    }

        undef $cv;
    };

    my $first_task;
    my $run; $run = sub {
        my $retry = shift || 0;
        my $url = $self->shuffle_url;
        my $ev; $ev = http_get $url,
            headers     => $self->headers, 
            timeout     => $self->timeout,
            recurse     => $self->recurse,
            on_header   => sub {
                my ($hdr) = @_;
                if ( $hdr->{Status} == 200 ) {
                    my $len = $hdr->{'content-length'};

                    if (!defined($len)) {
                        $self->error("Cannot find a content-length header.");
                    }

                    # 准备开始下载的信息
                    my $ranges = $self->split_range($len);

                    # 除了第一个块, 其它块现在开始下载
                    # 事件开始, 但这个回调会在最后才调用.
                    $first_task = shift @{ $self->tasks };
                    $first_task->{chunk} = $first_task->{chunk} || 0;
                    $first_task->{ofs}   = $first_task->{ofs}   || 0;
                    return 1 if $len <= $self->seg_size;

                    for ( 1 .. $self->max_per_host ) {
                        my $chunk_task = shift @{ $self->tasks };
                        last unless defined $chunk_task;
                        $cv->begin;
                        $self->fetch_chunk( $cv, $chunk_task) ;
                    }
                }
                1
            },
            on_body   => sub {
                my ($partial_body, $hdr) = @_;
                if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
                    # 如果是第一个块的话, 下载到指定的大小就需要断开
                    if ( ( $hdr->{'content-length'} <= $self->seg_size and  $first_task->{size} == $hdr->{'content-length'} )
                            or 
                            $first_task->{size} >= $self->seg_size
                        ) {
                        $self->on_seg_finish->( 
                             $hdr,
                             $first_task, 
                             $self->digest ? $first_task->{ctx}->hexdigest : '', 
                        sub {

                            my ($result, $error) = @_;
                            # 2. 大小相等, 块较检不相等 | 直接失败
                            if ($result) {
		                        $cv->end;  # 完成第一个块
                            }
                            else {
                                $self->error("The 0 block the compared failure");
                                $self->url_status->{$url}++;
                                $cv->send(1); # 失败, 结束整个请求
                                AE::log debug => $self->error;
                                return;
                            }
                        });
                        return 0
                    }
                }
                return 1;
            },
            sub {
                my (undef, $hdr) = @_;
                undef $ev;
                my $status = $hdr->{Status};
                if ( $retry > $self->max_retries ) {
		            $self->error( 
                        sprintf("Status: %s, Reason: %s.", 
			        	    $status ? $status : '500', 
                            $hdr->{Reason} ? $hdr->{Reason} : ' ',)
                    );
		            $cv->send(1);
		            return;
                }
                return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200;
                if ( $status == 500 or $status == 503 or $status =~ /^59/ ) {
                    my $w; $w = AE::timer( $self->retry_interval, 0, sub {
                        $first_task->{pos}  = $first_task->{ofs}; # 重下本块时要 seek 回零
                        $first_task->{size} = 0;
                        $first_task->{ctx}  = undef;
                        $run->(++$retry); 
                        undef $w;
                    });
                }
                else {
		            $self->error( 
                        sprintf("Status: %s, Reason: %s.", 
			        	    $status ? $status : '500', 
                            $hdr->{Reason} ? $hdr->{Reason} : ' ',)
                    );
		            $cv->send(1);
                    return
                }

            }
    };
    $cv->begin;
    $run->(0);
}

sub shuffle_url {
    my $self = shift;
    my $urls = $self->url_status;
    return (shuffle keys %$urls)[0];
}

sub on_body {
    my ($self, $task) = @_; 
    return sub {
        my ($partial_body, $hdr) = @_;
        return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);

        my $len = length($partial_body);
        # 主要是用于解决第一个块会超过写的位置
        if ( $task->{size} + $len > $self->seg_size ) {
            my $spsize = $len - ( $task->{size} + $len - $self->seg_size );
            $partial_body = substr($partial_body, 0, $spsize);
            $len = $spsize; 
        }

        seek($self->fh, $task->{pos}, 0); 
        syswrite($self->fh, $partial_body) != $len and return 0;

        # 写完的记录
        if ( $self->digest ) {
            $task->{ctx} ||= AnyEvent::Digest->new($self->digest);
            $task->{ctx}->add_async($partial_body);
        }
        $task->{pos}   += $len;
        $task->{size}  += $len;
        return 1;
    }
}
sub fetch_chunk {
    my ($self, $cv, $task, $retry) = @_; 
    $retry ||= 0;
    my $url   = $self->shuffle_url;

    my $ev; $ev = http_get $url,
        timeout     => $self->timeout,
        recurse     => $self->recurse,
        persistent  => 1,
        keepalive   => 1,
        headers     => { 
            %{ $self->headers }, 
            Range => $task->{range} 
        },
        on_body => $self->on_body($task),
        sub {
            my ($hdl, $hdr) = @_;
            my $status = $hdr->{Status};
            undef $ev;
            if ( $retry > $self->max_retries ) {
	            $self->error( 
                    sprintf("Too many failures, Status: %s, Reason: %s.", 
		        	    $status ? $status : '500', 
                        $hdr->{Reason} ? $hdr->{Reason} : ' ',)
                ) if !$self->error;
                $self->url_status->{$url}++;
                $cv->send(1);
                return;
            }

            # 成功
            # 1. 需要对比大小是否一致, 接着对比块较检
            # 2. 开始下一个任务的下载
            # 3. 当前块就退出, 不然下面会重试
            if ( $status == 200 || $status == 206  ) {
                if ($task->{size} == ( $task->{tail} -$task->{ofs} + 1 ) ) {
                    # 三种情况
                    # 1. 大小不相等 | 进入下面的重试看是否有可能成功
                    # 2. 大小相等, 块较检不相等 | 直接失败
                    # 3. 大小相等, 块较检相等
                    $self->on_seg_finish->( 
                            $hdl, 
                            $task, 
                            $self->digest ? $task->{ctx}->hexdigest : '', 
                        sub {
                            my ($result, $error) = @_;
                            # 2. 大小相等, 块较检不相等 | 直接失败
                            if (!$result) {
                                $self->error("The $task->{chunk} block the compared failure");
                                $self->url_status->{$url}++;
                                $cv->send(1); # 失败, 结束整个请求
                                AE::log debug => $self->error;
                                return;
                            }
                            else {
                                # 情况 3, 大小相等, 块较检相等, 当前块下载完成, 开始下载新的 
                                AE::log debug => "地址 $url 的块 $task->{chunk} 下载完成 $$";
                                my $chunk_task = shift @{ $self->tasks };
                                # 处理接下来的一个请求
                                if ( $chunk_task ) {
                                    $self->fetch_chunk($cv, $chunk_task);
                                }
                                else {
                                    $cv->end;  # 完成, 标记结束本次请求
                                    return; 
                                }
                            }
                        }
                    );
                    return;
                }
            } 

	        $self->error( 
                sprintf("Chunk %s the size is wrong, expect the size: %s actual size: %s, The %s try again,  Status: %s, Reason: %s.", 
                    $task->{chunk},
                    $self->seg_size,
                    $task->{size},
                    $retry,
		    	    $status ? $status : '500', 
                    $hdr->{Reason} ? $hdr->{Reason} : ' ', )
            );
            AE::log warn => $self->error;
            $self->url_status->{$url}++;
            
            # 失败
            # 1. 如果有可能还连接上的响应, 就需要重试, 直到达到重试
            # 2. 如果不可能连接的响应, 就直接快速的退出
            if ( $status =~ /^(59.|503|500|502|200|206|)$/ ) {
                $self->retry($cv, $task, $retry);
            }
            else {
                # 直接失败
                $cv->send(1);
                return;
            }
        };
    $self->task_lists->[$task->{chunk}] = $ev;
}

sub retry {
    my ($self, $cv, $task, $retry) = @_;
    my $w;$w = AE::timer( $self->retry_interval, 0, sub {
        $task->{pos}  = $task->{ofs}; # 重下本块时要 seek 回零
        $task->{size} = 0;
        $task->{ctx}  = undef;
        $self->fetch_chunk( $cv, $task, ++$retry );
        undef $w;
    });
}

sub split_range {
    my $self    = shift;
    my $length  = shift;

    # 每个请求的段大小的范围,字节
    my $seg_size   = $self->seg_size;
    my $segments   = int($length / $seg_size);

    # 要处理的字节的总数
    my $len_remain = $length;

    my @ranges;
    my $chunk = 0;
    while ( $len_remain > 0 ) {
        # 每个 segment  的大小
        my $seg_len = $seg_size;

        # 偏移长度
        my $ofs = $length - $len_remain;
        
        # 剩余字节
        $len_remain -= $seg_len;

        my $tail  = $ofs + $seg_len - 1; 
        if ( $length-1  < $tail) {
            $tail = $length-1;
        }

        my $task  = { 
            chunk => $chunk, # 当前块编号
            ofs   => $ofs,   # 当前的偏移量
            pos   => $ofs,   # 本块的起点
            tail  => $tail,  # 本块的结束
            range => 'bytes=' . $ofs . '-' . $tail, 
            size  => 0,      # 总共下载的长度
        }; 

        $self->tasks->[$chunk] = $task; 
        $chunk++;
    }
}

sub move_to {
    my $self = shift;

    close $self->fh;
    my $dir  = File::Basename::dirname( $self->content_file );
    if (! -e $dir ) {
        if (! File::Path::make_path( $dir ) || ! -d $dir ) {
            my $e = $!;
        }
    }
    File::Copy::copy( $self->fh->filename, $self->content_file )
          or die "Failed to rename $self->fh->filename to $self->content_file: $!";

    unlink $self->fh->filename;
    delete $self->{fh};
}

sub clean {
    my $self = shift;
    close $self->fh;
    unlink $self->fh->filename;
    delete $self->{fh};
}

sub size {
  return 0 unless defined(my $file = shift->content_file);
  return -s $file;
}

1;

__END__

=pod
 
=encoding utf8

=head1 NAME

AnyEvent::MultiDownload - 非阻塞的多线程多地址文件下载的模块

=head1 SYNOPSIS

这是一个全非阻塞的多线程多地址文件下载的模块, 可以象下面这个应用一样, 同时下载多个文件, 并且整个过程都是异步事件解发, 不会阻塞主进程.

下面是个简单的例子, 同时从多个地址下载同一个文件.

    use AE;
    use AnyEvent::MultiDownload;

    my @urls = (
        'http://mirrors.163.com/ubuntu-releases/12.04/ubuntu-12.04.2-desktop-i386.iso',
        'http://releases.ubuntu.com/12.04.2/ubuntu-12.04.2-desktop-i386.iso',
    );
    
    my $cv = AE::cv;
    my $MultiDown = AnyEvent::MultiDownload->new( 
        url     => pop @urls, 
        mirror  => \@urls, 
        content_file  => '/tmp/ubuntu.iso',
        seg_size => 1 * 1024 * 1024, # 1M
        on_seg_finish => sub {
            my ($hdr, $chunk_obj, $md5, $cb) = @_;
            $cb->(1);
        },
        on_finish => sub {
            my $len = shift;
            $cv->send;
        },
        on_error => sub {
            my $error = shift;
            $cv->send;
        }
    )->multi_get_file;
    
    $cv->recv;


下面是异步同时下载多个文件的实例. 整个过程异步.

    use AE;
    use AnyEvent::MultiDownload;
    
    my $cv = AE::cv;
    
    $cv->begin;
    my $MultiDown = AnyEvent::MultiDownload->new( 
        url     => 'http://xxx1',
        content_file  => "/tmp/file2",
        on_finish => sub {
            my $len = shift;
            $cv->end;
        },
        on_error => sub {
            my $error = shift;
            $cv->end;
        }
    );
    $MultiDown->multi_get_file;
    
    $cv->begin;
    my $MultiDown1 = AnyEvent::MultiDownload->new( 
        content_file  => "/tmp/file1",
        url     => 'http://xxx', 
        on_finish => sub {
            my $len = shift;
            $cv->end;
        },
        on_error => sub {
            my $error = shift;
            $cv->end;
        }
    );
    $MultiDown1->multi_get_file;
    
    $cv->recv;

以上是同时下载多个文件的实例.

=head1 METHODS

创建一个多下载的对象.

    my @urls = (
        'http://mirrors.163.com/ubuntu-releases/12.04/ubuntu-12.04.2-desktop-i386.iso',
        'http://releases.ubuntu.com/12.04.2/ubuntu-12.04.2-desktop-i386.iso',
    );
    my $MultiDown = AnyEvent::MultiDownload->new( 
            url     => shift @urls, 
            mirror  => \@urls, 
            content_file  => $content_file,
            seg_size => 1 * 1024 * 1024, # 1M
            on_seg_finish => sub {
                my ($hdr, $chunk_obj, $md5, $cb) = @_;
                $cb->(1);
            },
            on_finish => sub {
                my $len = shift;
                $cv->send;
            },
            on_error => sub {
                my $error = shift;
                $cv->send;
            },
    
    );

=over 8

=item url => 下载的主地址

这个是下载用的 master 的地址, 是主地址, 这个参数必须有.

=item content_file => 下载后的存放地址

这个地址用于指定, 下载完了, 存放在什么位置. 这个参数必须有.

=item mirror => 镜象地址

这个是可以用来做备用地址和分块下载时用的地址. 需要一个数组引用, 其中放入这个文件的其它用于下载的地址. 如果块下载失败会自动切换成其它的地址下载. 本参数不是必须的.

=item seg_size => 下载块的大小

默认这个 seg_size 是指每次取块的大小, 默认是 1M 一个块, 这个参数会给文件按照 1M 的大小来切成一个个块来下载并合并. 本参数不是必须的.

=item digest

用于指定所使用的块较检所使用的模块, 支持 Digest::MD5 和 Digest::SHA1

=item retry_interval => 重试的间隔 

重试的间隔, 默认为 10 s.

=item max_retries => 最多重试的次数

重试每个块所能重试的次数, 默认为 5 次.

=item max_per_host => 每个主机最多的连接数量

目前模块没有开发总连接数控制, 主要原因是.多线路为了快,所以控制单个主机的并发比控制总体好. 默认为 8. 并且一个 url 最多这么多请求.

=item headers => 自定义的 header

如果你想自己定义传送的 header , 就在这个参数中加就好了, 默认是一个哈希引用.

=item timeout

下载多久算超时, 可选参数, 默认为 60s.

=item recurse 重定向

如果请求过程中有重定向, 可以最多重定向多少次.

=back

=head2 multi_get_file()

事件开始的方法. 只有调用这个函数时, 这个下载的事件才开始执行.

=head1 callback

=head2 on_seg_finish

当每下载完 1M 时,会回调一次, 你可以用于检查你的下载每块的完整性, 这个时候只有 200 和 206 响应的时候才会回调.

回调传四个参数, 本块下载时响应的 header, 当前块的信息的引用 ( 包含 chunk 第几块, size 下载块的大小, pos 块的开始位置 ), 检查的 md5 的结果, 最后一个参数为处理完后的回调. 这时如果回调为 1 证明检查结果正常, 如果为 0 证明检查失败, 会在次重新下载本块. 

默认模块会帮助检查大小, 所以大小不用对比和检查了, 这个地方会根据 $self->digest 指定的信息, 给每块的 MD5 或者 SHA1 记录下来, 使用这个来对比. 本参数不是必须的. 如果没有这个回调默认检查大小正确.

=head2 on_finish

当整个文件下载完成时的回调, 下载完成的回调会传一个下载的文件大小的参数过来. 这个回调必须存在.

=head2 on_error

当整个文件下载过程出错时回调, 这个参数必须存在, 因为不能保证每次下载都能正常.


=head1 AUTHOR

扶凯 fukai <iakuf@163.com>

=cut