The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl

use strict;
use warnings;

use Test::More;
use t::TestHTTP;

use IO::Async::Test;
use IO::Async::Loop;
use Digest::MD5 qw( md5_hex );

use HTTP::Response;

use Net::Async::Webservice::S3;

my $loop = IO::Async::Loop->new;
testing_loop( $loop );

my $s3 = Net::Async::Webservice::S3->new(
   max_retries => 1,
   http => my $http = TestHTTP->new,
   access_key => 'K'x20,
   secret_key => 's'x40,
);

$loop->add( $s3 );

sub await_multipart_initiate_and_respond
{
   my ( $key ) = @_;

   my $req;
   wait_for { $req = $http->pending_request };

   is( $req->method,         "POST",                    "Initiate request method for $key" );
   is( $req->uri->authority, "bucket.s3.amazonaws.com", "Initiate request URI authority for $key" );
   is( $req->uri->path,      "/$key",                   "Initiate request URI path for $key" );
   is( $req->uri->query,     "uploads",                 "Initiate request URI query for $key" );

   # Technically this isn't a valid S3 UploadId but nothing checks the
   # formatting so it's probably OK
   $http->respond(
      HTTP::Response->new( 200, "OK", [
         Content_Type => "application/xml",
      ], <<"EOF" )
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  <Bucket>bucket</Bucket>
  <Key>$key</Key>
  <UploadId>ABCDEFG</UploadId>
</InitiateMultipartUploadResult>
EOF
   );

   return ( $req );
}

sub await_multipart_part_and_respond
{
   my ( $key, $part_num, $content ) = @_;

   my $req;
   wait_for { $req = $http->pending_request };

   is( $req->method,     "PUT",    "Part request method for $key" );
   is( $req->uri->path,  "/$key",  "Part request URI path for $key" );
   is( $req->uri->query, "partNumber=$part_num&uploadId=ABCDEFG",
      "Part request URI query for $key" );
   is( $req->content,    $content, "Part request body for $key" );

   my $md5 = md5_hex( $req->content );

   $http->respond(
      HTTP::Response->new( 200, "OK", [
            ETag => qq("$md5"),
         ], "" )
   );

   return ( $req );
}

sub await_multipart_complete_and_respond
{
   my ( $key, $want_etags ) = @_;

   my $req;
   wait_for { $req = $http->pending_request };

   is( $req->method,         "POST",                    "Complete request method for $key" );
   is( $req->uri->authority, "bucket.s3.amazonaws.com", "Complete request URI authority for $key" );
   is( $req->uri->path,      "/$key",                   "Complete request URI path for $key" );
   is( $req->uri->query,     "uploadId=ABCDEFG",        "Complete request URI query for $key" );

   is( $req->content_length, length( $req->content ), "Complete request has Content-Length" );
   like( $req->header( "Content-MD5" ), qr/^[0-9A-Z+\/]{22}==$/i,
      "Complete request has valid Base64-encoded MD5 sum" );

   my $libxml = XML::LibXML->new;
   my $xpc = XML::LibXML::XPathContext->new( $libxml->parse_string( $req->content ) );
   my %got_etags;
   foreach my $n ( $xpc->findnodes( "/CompleteMultipartUpload/Part" ) ) {
      $got_etags{$xpc->findvalue( "./PartNumber", $n )} = $xpc->findvalue( "./ETag", $n );
   }
   is_deeply( \%got_etags, $want_etags, "Complete request body etags for $key" ) if $want_etags;

   $http->respond(
      HTTP::Response->new( 200, "OK", [
         Content_Type => "application/xml",
      ], <<"EOF" )
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  <Location>http://bucket.s3.amazonaws.com/three</Location>
  <Bucket>bucket</Bucket>
  <Key>$key</Key>
  <ETag>"3858f62230ac3c915f300c664312c11f-2"</ETag>
</CompleteMultipartUploadResult>
EOF
   );

   return ( $req );
}

# Multipart put from strings
{
   my @parts = ( "The first part", "The second part" );

   my @written;
   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "four",
      gen_parts => sub { return unless @parts; shift @parts },
      on_write => sub { push @written, $_[0] },
   );
   $f->on_fail( sub { die @_ } );

   await_multipart_initiate_and_respond "four";

   # Now wait on the chunks
   foreach ( [ 1, "The first part" ], [ 2, "The second part" ] ) {
      await_multipart_part_and_respond "four", @$_;
   }

   await_multipart_complete_and_respond "four",
      {
         1 => '"797d5e4fc29677594bbc843c096073f6"',
         2 => '"09c94e3dd1d6d832a44f6ef38505a2a9"',
      };

   wait_for { $f->is_ready };

   my ( $etag, $len ) = $f->get;
   is( $etag, '"3858f62230ac3c915f300c664312c11f-2"', 'result of multipart put' );
   is( $len, 29, '$length of multipart put' );

   is_deeply( \@written,
              [ 14, 14+15 ], 'on_write invoked on each chunk with total' );
}

# Multipart put from Future->string
{
   my @parts = (
      my $value1_f = Future->new,
      my $value2_f = Future->new,
   );

   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "five",
      gen_parts => sub { return unless @parts; shift @parts },
   );
   $f->on_fail( sub { die @_ } );

   $loop->later( sub { $value1_f->done( "The content that " ) } );
   $loop->later( sub { $value2_f->done( "comes later" ) } );

   await_multipart_initiate_and_respond "five";

   foreach ( [ 1, "The content that " ], [ 2, "comes later" ] ) {
      await_multipart_part_and_respond "five", @$_;
   }

   await_multipart_complete_and_respond "five",
      {
         1 => '"bd71e9fef5ab11c8b0c2da23f6d29c46"',
         2 => '"6ab4f91cfb0588abbb269a65a838d54f"',
      };

   wait_for { $f->is_ready };

   my ( $etag, $len ) = $f->get;
   is( $len, 28, '$length from multipart put from Future' );
}

# Multipart put from CODE/size pairs
{
   my @parts = (
      [ sub { substr( "Content generated ", $_[0], $_[1] ) }, 18 ],
      [ sub { substr( "by code", $_[0], $_[1] ) }, 7 ],
   );

   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "six",
      gen_parts => sub { return unless @parts; @{ shift @parts } },
   );
   $f->on_fail( sub { die @_ } );

   await_multipart_initiate_and_respond "six";

   foreach ( [ 1, "Content generated " ], [ 2, "by code" ] ) {
      await_multipart_part_and_respond "six", @$_;
   }

   await_multipart_complete_and_respond "six",
      {
         1 => '"2d7db94bfed210fef0e7a7f89d79ecc3"',
         2 => '"f554ebe9e8a03b051dc8ac36685f6d9f"',
      };

   wait_for { $f->is_ready };

   my ( $etag, $len ) = $f->get;
   is( $len, 25, '$length from multipart put from CODE' );
}

# Multipart put from Future->CODE/size pair
{
   my @parts = (
      my $value1_f = Future->new,
      my $value2_f = Future->new,
   );

   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "seven",
      gen_parts => sub { return unless @parts; shift @parts },
   );
   $f->on_fail( sub { die @_ } );

   $loop->later( sub { $value1_f->done(
      sub { substr( "The content that comes ", $_[0], $_[1] ) }, 23 ) } );
   $loop->later( sub { $value2_f->done(
      sub { substr( "later from Future->CODE", $_[0], $_[1] ) }, 23 ) } );

   await_multipart_initiate_and_respond "seven";

   foreach ( [ 1, "The content that comes " ], [ 2, "later from Future->CODE" ] ) {
      await_multipart_part_and_respond "seven", @$_;
   }

   await_multipart_complete_and_respond "seven",
      {
         1 => '"9cfc17e76eab703a4241df165e403504"',
         2 => '"470f2cf68ea83fa4c497b8ca345f9661"',
      };

   wait_for { $f->is_ready };
}

# Multipart put from value automatically split
{
   $s3->configure( part_size => 16 );

   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "one.split",
      value  => "Content too long for one chunk",
   );
   $f->on_fail( sub { die @_ } );

   await_multipart_initiate_and_respond "one.split";

   foreach ( [ 1, "Content too long" ], [ 2, " for one chunk" ] ) {
      await_multipart_part_and_respond "one.split", @$_;
   }

   await_multipart_complete_and_respond "one.split",
      {
         1 => '"5747c2b9ef2e762db6439e19c7dc08d5"',
         2 => '"c8a4d06065b7628e62fa7fb48616e825"',
      };

   wait_for { $f->is_ready };

   my ( $etag ) = $f->get;
   is( $etag, '"3858f62230ac3c915f300c664312c11f-2"', 'result of multipart put from value' );

   # Restore it
   $s3->configure( part_size => 100*1024*1024 );
}

# Multipart put with extra metadata
{
   my @parts = ( "Part #1", "Part #2" );

   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "FOUR",
      gen_parts => sub { return unless @parts; shift @parts },
      meta   => {
         A      => "a",
      },
   );
   $f->on_fail( sub { die @_ } );

   my ( $request ) = await_multipart_initiate_and_respond "FOUR";
   is( $request->header( "X-Amz-Meta-A" ), "a", '$request has X-Amz-Meta-A header' );

   # Now wait on the chunks
   foreach ( [ 1, "Part #1" ], [ 2, "Part #2" ] ) {
      await_multipart_part_and_respond "FOUR", @$_;
   }

   await_multipart_complete_and_respond "FOUR";

   wait_for { $f->is_ready };
   $f->get;
}

# Multipart put with no actual parts
{
   my $f = $s3->put_object(
      bucket => "bucket",
      key    => "FIVE",
      gen_parts => sub { () },
   );
   $f->on_fail( sub { die @_ } );

   my $req;
   wait_for { $req = $http->pending_request };

   is( $req->method, "PUT" );

   my $md5 = md5_hex( $req->content );

   $http->respond(
      HTTP::Response->new( 200, "OK", [
            ETag => qq("$md5"),
         ], "" )
   );

   wait_for { $f->is_ready };
   $f->get;
}

# Test that a configured timeout is only used for meta-operations
{
   my $f;
   my $req;
   my @parts;

   $s3->configure( timeout => 10 );

   @parts = ( "Part #1", "Part #2" );
   $f = $s3->put_object(
      bucket => "bucket",
      key    => "-four-",
      gen_parts => sub { return unless @parts; shift @parts },
   );
   $f->on_fail( sub { die @_ } );

   ( $req ) = await_multipart_initiate_and_respond "-four-";
   is( $req->header( "X-NaHTTP-Timeout" ), 10, 'Initiate request has timeout set for configured' );

   # Now wait on the chunks
   foreach ( [ 1, "Part #1" ], [ 2, "Part #2" ] ) {
      ( $req ) = await_multipart_part_and_respond "-four-", @$_;
      is( $req->header( "X-NaHTTP-Timeout" ), undef, 'Part request has no timeout for configured' );
   }

   ( $req ) = await_multipart_complete_and_respond "-four-";
   is( $req->header( "X-NaHTTP-Timeout" ), 10, 'Complete request has timeout set for configured' );
   wait_for { $f->is_ready };
   $f->get;

   # Now, direct arguments
   @parts = ( "Part #1", "Part #2" );
   $f = $s3->put_object(
      bucket => "bucket",
      key    => "-four-",
      gen_parts => sub { return unless @parts; shift @parts },
      part_timeout => 30,
      meta_timeout => 20,
   );
   $f->on_fail( sub { die @_ } );

   ( $req ) = await_multipart_initiate_and_respond "-four-";
   is( $req->header( "X-NaHTTP-Timeout" ), 20, 'Initiate request has timeout set for direct' );

   # Now wait on the chunks
   foreach ( [ 1, "Part #1" ], [ 2, "Part #2" ] ) {
      ( $req ) = await_multipart_part_and_respond "-four-", @$_;
      is( $req->header( "X-NaHTTP-Timeout" ), 30, 'Part request timeout set for direct' );
   }

   ( $req ) = await_multipart_complete_and_respond "-four-";
   is( $req->header( "X-NaHTTP-Timeout" ), 20, 'Complete request has timeout set for direct' );


   wait_for { $f->is_ready };
   $f->get;
}

done_testing;