The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
#  Copyright 2014 MongoDB, Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

use v5.8.0;
use strict;
use warnings;

package MongoDB::_Protocol;

use version;
our $VERSION = 'v1.4.1';

use MongoDB::_Constants;
use MongoDB::Error;

use constant {
    OP_REPLY        => 1,    # Reply to a client request. responseTo is set
    OP_MSG          => 1000, # generic msg command followed by a string
    OP_UPDATE       => 2001, # update document
    OP_INSERT       => 2002, # insert new document
    RESERVED        => 2003, # formerly used for OP_GET_BY_OID
    OP_QUERY        => 2004, # query a collection
    OP_GET_MORE     => 2005, # Get more data from a query. See Cursors
    OP_DELETE       => 2006, # Delete documents
    OP_KILL_CURSORS => 2007, # Tell database client is done with a cursor
};

use constant {
    PERL58           => $] lt '5.010',
    MIN_REPLY_LENGTH => 4 * 5 + 8 + 4 * 2,
    MAX_REQUEST_ID   => 2**31 - 1,
};

# Perl < 5.10, pack doesn't have endianness modifiers, and the MongoDB wire
# protocol mandates little-endian order. For 5.10, we can use modifiers but
# before that we only work on platforms that are natively little-endian.  We
# die during configuration on big endian platforms on 5.8

use constant {
    P_HEADER => PERL58 ? "l4" : "l<4",
};

# These ops all include P_HEADER already
use constant {
    P_UPDATE       => PERL58 ? "l5Z*l"   : "l<5Z*l<",
    P_INSERT       => PERL58 ? "l5Z*"    : "l<5Z*",
    P_QUERY        => PERL58 ? "l5Z*l2"  : "l<5Z*l<2",
    P_GET_MORE     => PERL58 ? "l5Z*la8" : "l<5Z*l<a8",
    P_DELETE       => PERL58 ? "l5Z*l"   : "l<5Z*l<",
    P_KILL_CURSORS => PERL58 ? "l6(a8)*" : "l<6(a8)*",
    P_REPLY_HEADER => PERL58 ? "l5a8l2"  : "l<5a8l<2",
};

# struct MsgHeader {
#     int32   messageLength; // total message size, including this
#     int32   requestID;     // identifier for this message
#     int32   responseTo;    // requestID from the original request
#                            //   (used in reponses from db)
#     int32   opCode;        // request type - see table below
# }
#
# Approach for MsgHeader is to write a header with 0 for length, then
# fix it up after the message is constructed.  E.g.
#     my $msg = pack( P_INSERT, 0, int(rand(2**32-1)), 0, OP_INSERT, 0, $ns ) . $bson_docs;
#     substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );

# struct OP_UPDATE {
#     MsgHeader header;             // standard message header
#     int32     ZERO;               // 0 - reserved for future use
#     cstring   fullCollectionName; // "dbname.collectionname"
#     int32     flags;              // bit vector. see below
#     document  selector;           // the query to select the document
#     document  update;             // specification of the update to perform
# }

use constant {
    U_UPSERT       => 0,
    U_MULTI_UPDATE => 1,
};

sub write_update {
    my ( $ns, $selector, $update, $flags ) = @_;
    utf8::encode($ns);

    my $bitflags = 0;
    if ($flags) {
        $bitflags =
          ( $flags->{upsert} ? 1 << U_UPSERT : 0 )
          | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 );
    }

    my $msg =
        pack( P_UPDATE, 0, int( rand( 2**32 - 1 ) ), 0, OP_UPDATE, 0, $ns, $bitflags )
      . $selector
      . $update;
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return $msg;
}

# struct OP_INSERT {
#     MsgHeader header;             // standard message header
#     int32     flags;              // bit vector - see below
#     cstring   fullCollectionName; // "dbname.collectionname"
#     document* documents;          // one or more documents to insert into the collection
# }

use constant { I_CONTINUE_ON_ERROR => 0, };

sub write_insert {
    my ( $ns, $bson_docs, $flags ) = @_;
    utf8::encode($ns);

    my $bitflags = 0;
    if ($flags) {
        $bitflags = ( $flags->{continue_on_error} ? 1 << I_CONTINUE_ON_ERROR : 0 );
    }

    my $msg =
      pack( P_INSERT, 0, int( rand( 2**32 - 1 ) ), 0, OP_INSERT, $bitflags, $ns )
      . $bson_docs;
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return $msg;
}

# struct OP_QUERY {
#     MsgHeader header;                 // standard message header
#     int32     flags;                  // bit vector of query options.  See below for details.
#     cstring   fullCollectionName ;    // "dbname.collectionname"
#     int32     numberToSkip;           // number of documents to skip
#     int32     numberToReturn;         // number of documents to return
#                                       //  in the first OP_REPLY batch
#     document  query;                  // query object.  See below for details.
#   [ document  returnFieldsSelector; ] // Optional. Selector indicating the fields
#                                       //  to return.  See below for details.
# }

use constant {
    Q_TAILABLE          => 1,
    Q_SLAVE_OK          => 2,
    Q_NO_CURSOR_TIMEOUT => 4,
    Q_AWAIT_DATA        => 5,
    Q_EXHAUST           => 6, # unsupported (PERL-282)
    Q_PARTIAL           => 7,
};

sub write_query {
    my ( $ns, $query, $fields, $skip, $batch_size, $flags ) = @_;

    utf8::encode($ns);

    my $bitflags = 0;
    if ($flags) {
        $bitflags =
            ( $flags->{tailable}   ? 1 << Q_TAILABLE          : 0 )
          | ( $flags->{slave_ok}   ? 1 << Q_SLAVE_OK          : 0 )
          | ( $flags->{await_data} ? 1 << Q_AWAIT_DATA        : 0 )
          | ( $flags->{immortal}   ? 1 << Q_NO_CURSOR_TIMEOUT : 0 )
          | ( $flags->{partial}    ? 1 << Q_PARTIAL           : 0 );
    }

    my $request_id = int( rand( MAX_REQUEST_ID ) );

    my $msg =
        pack( P_QUERY, 0, $request_id, 0, OP_QUERY, $bitflags, $ns, $skip, $batch_size )
      . $query
      . ( defined $fields && length $fields ? $fields : '' );
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return ( $msg, $request_id );
}

# struct {
#     MsgHeader header;             // standard message header
#     int32     ZERO;               // 0 - reserved for future use
#     cstring   fullCollectionName; // "dbname.collectionname"
#     int32     numberToReturn;     // number of documents to return
#     int64     cursorID;           // cursorID from the OP_REPLY
# }

# We treat cursor_id as an opaque string so we don't have to depend
# on 64-bit integer support

sub write_get_more {
    my ( $ns, $cursor_id, $batch_size ) = @_;
    utf8::encode($ns);
    my $request_id = int( rand( MAX_REQUEST_ID ) );
    my $msg =
      pack( P_GET_MORE, 0, $request_id, 0, OP_GET_MORE, 0, $ns, $batch_size,
        _pack_cursor_id($cursor_id) );
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return ( $msg, $request_id );
}

# struct {
#     MsgHeader header;             // standard message header
#     int32     ZERO;               // 0 - reserved for future use
#     cstring   fullCollectionName; // "dbname.collectionname"
#     int32     flags;              // bit vector - see below for details.
#     document  selector;           // query object.  See below for details.
# }

use constant { D_SINGLE_REMOVE => 0, };

sub write_delete {
    my ( $ns, $selector, $flags ) = @_;
    utf8::encode($ns);

    my $bitflags = 0;
    if ($flags) {
        $bitflags = ( $flags->{just_one} ? 1 << D_SINGLE_REMOVE : 0 );
    }

    my $msg =
      pack( P_DELETE, 0, int( rand( 2**32 - 1 ) ), 0, OP_DELETE, 0, $ns, $bitflags )
      . $selector;
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return $msg;
}

# legacy alias
{
    no warnings 'once';
    *write_remove = \&write_delete;
}

# struct {
#     MsgHeader header;            // standard message header
#     int32     ZERO;              // 0 - reserved for future use
#     int32     numberOfCursorIDs; // number of cursorIDs in message
#     int64*    cursorIDs;         // sequence of cursorIDs to close
# }

sub write_kill_cursors {
    my (@cursors) = map _pack_cursor_id($_), @_;
    my $msg = pack( P_KILL_CURSORS,
        0, int( rand( 2**32 - 1 ) ),
        0, OP_KILL_CURSORS, 0, scalar(@cursors), @cursors );
    substr( $msg, 0, 4, pack( P_INT32, length($msg) ) );
    return $msg;
}

# struct {
#   // MessageHeader
#     int32   messageLength; // total message size, including this
#     int32   requestID;     // identifier for this message
#     int32   responseTo;    // requestID from the original request
#     int32   opCode;        // request type - see table below
#   // OP_REPLY fields
#     int32     responseFlags;  // bit vector - see details below
#     int64     cursorID;       // cursor id if client needs to do get more's
#     int32     startingFrom;   // where in the cursor this reply is starting
#     int32     numberReturned; // number of documents in the reply
#     document* documents;      // documents
# }

# We treat cursor_id as an opaque string so we don't have to depend
# on 64-bit integer support

# flag bits relevant to drivers
use constant {
    R_CURSOR_NOT_FOUND => 0,
    R_QUERY_FAILURE    => 1,
    R_AWAIT_CAPABLE    => 3,
};

sub parse_reply {
    my ( $msg, $request_id ) = @_;

    MongoDB::ProtocolError->throw("response was truncated")
        if length($msg) < MIN_REPLY_LENGTH;

    my (
        $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from,
        $number_returned
    ) = unpack( P_REPLY_HEADER, $msg );

    # pre-check all conditions using a modifier in one statement for speed;
    # disambiguate afterwards only if an error exists

    do {

        if ( length($msg) < $len ) {
            MongoDB::ProtocolError->throw("response was truncated");
        }

        if ( $opcode != OP_REPLY ) {
            MongoDB::ProtocolError->throw("response was not OP_REPLY");
        }

        if ( $response_to != $request_id ) {
            MongoDB::ProtocolError->throw(
                "response ID ($response_to) did not match request ID ($request_id)");
        }
        }
        if ( length($msg) < $len )
        || ( $opcode != OP_REPLY )
        || ( $response_to != $request_id );

    # returns non-zero cursor_id as blessed object to identify it as an
    # 8-byte opaque ID rather than an ambiguous Perl scalar. N.B. cursors
    # from commands are handled differently: they are perl integers or
    # else Math::BigInt objects

    substr( $msg, 0, MIN_REPLY_LENGTH, '' ),
        return {
        flags => {
            cursor_not_found => vec( $bitflags, R_CURSOR_NOT_FOUND, 1 ),
            query_failure    => vec( $bitflags, R_QUERY_FAILURE,    1 ),
        },
        cursor_id => (
            ( $cursor_id eq CURSOR_ZERO )
            ? 0
            : bless( \$cursor_id, "MongoDB::_CursorID" )
          ),
        starting_from   => $starting_from,
        number_returned => $number_returned,
        docs            => $msg,
        };
}

#--------------------------------------------------------------------------#
# utility functions
#--------------------------------------------------------------------------#

# CursorID's can come in 3 forms:
#
# 1. MongoDB::CursorID object (a blessed reference to an 8-byte string)
# 2. A perl scalar (an integer)
# 3. A Math::BigInt object (64 bit integer on 32-bit perl)
#
# The _pack_cursor_id function converts any of them to a packed Int64 for
# use in OP_GET_MORE or OP_KILL_CURSORS
sub _pack_cursor_id {
    my $cursor_id = shift;
    if ( ref($cursor_id) eq "MongoDB::_CursorID" ) {
        $cursor_id = $$cursor_id;
    }
    elsif ( ref($cursor_id) eq "Math::BigInt" ) {
        my $as_hex = $cursor_id->as_hex; # big-endian hex
        substr( $as_hex, 0, 2, '' );     # remove "0x"
        my $len = length($as_hex);
        substr( $as_hex, 0, 0, "0" x ( 16 - $len ) ) if $len < 16; # pad to quad length
        $cursor_id = pack( "H*", $as_hex );                        # packed big-endian
        $cursor_id = reverse($cursor_id);                          # reverse to little-endian
    }
    elsif (HAS_INT64) {
        # pack doesn't have endianness modifiers before perl 5.10.
        # We die during configuration on big-endian platforms on 5.8
        $cursor_id = pack( $] lt '5.010' ? "q" : "q<", $cursor_id );
    }
    else {
        # we on 32-bit perl *and* have a cursor ID that fits in 32 bits,
        # so pack it as long and pad out to a quad
        $cursor_id = pack( $] lt '5.010' ? "l" : "l<", $cursor_id ) . ( "\0" x 4 );
    }

    return $cursor_id;
}

1;

# vim: ts=4 sts=4 sw=4 et: