The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
#  Copyright 2014 MongoDB, Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

package MongoDB::_Topology;

use version;
our $VERSION = 'v1.4.4';

use Moo;
use MongoDB::BSON;
use MongoDB::Error;
use MongoDB::Op::_Command;
use MongoDB::ReadPreference;
use MongoDB::_Constants;
use MongoDB::_Link;
use MongoDB::_Types qw(
    BSONCodec
    NonNegNum
    TopologyType
);
use Types::Standard qw(
    Bool
    HashRef
    InstanceOf
    Num
    Str
    Maybe
);
use MongoDB::_Server;
use Config;
use List::Util qw/first/;
use Safe::Isa;
use Time::HiRes qw/time usleep/;
use Try::Tiny;

use namespace::clean;

#--------------------------------------------------------------------------#
# attributes
#--------------------------------------------------------------------------#

has uri => (
    is       => 'ro',
    required => 1,
    isa => InstanceOf['MongoDB::_URI'],
);

has max_wire_version => (
    is       => 'ro',
    required => 1,
    isa => Num,
);

has min_wire_version => (
    is       => 'ro',
    required => 1,
    isa => Num,
);

has credential => (
    is       => 'ro',
    required => 1,
    isa => InstanceOf['MongoDB::_Credential'],
);

has type => (
    is      => 'ro',
    writer  => '_set_type',
    default => 'Unknown',
    isa => TopologyType,
);

has replica_set_name => (
    is      => 'ro',
    default => '',
    writer  => '_set_replica_set_name', # :-)
    isa => Str,
);

has heartbeat_frequency_sec => (
    is      => 'ro',
    default => 60,
    isa => NonNegNum,
);

has last_scan_time => (
    is      => 'ro',
    default => EPOCH,
    writer  => '_set_last_scan_time',
    isa => Num,
);

has local_threshold_sec => (
    is      => 'ro',
    default => 0.015,
    isa => Num,
);

has socket_check_interval_sec => (
    is      => 'ro',
    default => 5,
    isa => Num,
);

has server_selection_timeout_sec => (
    is      => 'ro',
    default => 60,
    isa => Num,
);

has server_selection_try_once => (
    is      => 'ro',
    default => 1,
    isa => Bool,
);

has ewma_alpha => (
    is      => 'ro',
    default => 0.2,
    isa => Num,
);

has link_options => (
    is      => 'ro',
    default => sub { {} },
    isa => HashRef,
);

has bson_codec => (
    is       => 'ro',
    default  => sub { MongoDB::BSON->new },
    isa => BSONCodec,
);

has number_of_seeds => (
    is      => 'lazy',
    builder => '_build_number_of_seeds',
    isa => Num,
);

has max_election_id => (
    is      => 'rw',
    isa     => Maybe[ InstanceOf['MongoDB::OID'] ],
    writer  => '_set_max_election_id',
);

has max_set_version => (
    is     => 'rw',
    isa    => Maybe [Num],
    writer => '_set_max_set_version',
);

# compatible wire protocol
has is_compatible => (
    is => 'ro',
    writer => '_set_is_compatible',
    isa => Bool,
);

has current_primary => (
    is => 'rwp',
    clearer => '_clear_current_primary',
    init_arg => undef,
);

has stale => (
    is => 'rwp',
    init_arg => undef,
    default => 1,
);

# servers, links and rtt_ewma_sec are all hashes on server address

has servers => (
    is      => 'ro',
    default => sub { {} },
    isa => HashRef[InstanceOf['MongoDB::_Server']],
);

has links => (
    is      => 'ro',
    default => sub { {} },
    isa => HashRef[InstanceOf['MongoDB::_Link']],
);

has rtt_ewma_sec => (
    is      => 'ro',
    default => sub { {} },
    isa => HashRef[Num],
);

#--------------------------------------------------------------------------#
# builders
#--------------------------------------------------------------------------#

sub _build_number_of_seeds {
    my ($self) = @_;
    return scalar @{ $self->uri->hostpairs };
}

sub BUILD {
    my ($self) = @_;
    my $type = $self->type;
    my @addresses = @{ $self->uri->hostpairs };

    if ( my $set_name = $self->replica_set_name ) {
        if ( $type eq 'Single' || $type eq 'ReplicaSetNoPrimary' ) {
            # these are valid, so nothing to do here
        }
        elsif ( $type eq 'Unknown' ) {
            $self->_set_type('ReplicaSetNoPrimary');
        }
        else {
            MongoDB::InternalError->throw(
                "deployment with set name '$set_name' may not be initialized as type '$type'");
        }
    }

    if ( $type eq 'Single' && @addresses > 1 ) {
        MongoDB::InternalError->throw(
            "topology type 'Single' cannot be used with multiple addresses: @addresses");
    }

    $self->_add_address_as_unknown($_) for @addresses;

    return;
}

#--------------------------------------------------------------------------#
# public methods
#--------------------------------------------------------------------------#

sub all_servers { return values %{ $_[0]->servers } }

sub check_address {
    my ( $self, $address ) = @_;

    my $link = $self->links->{$address};
    if ( $link && $link->is_connected ) {
        $self->_update_topology_from_link($link);
    }
    else {
        # initialize_link will call update_topology_from_link
        $self->_initialize_link($address);
    }

    return;
}

sub close_all_links {
    my ($self) = @_;
    delete $self->links->{ $_->address } for $self->all_servers;
    return;
}

sub get_readable_link {
    my ( $self, $read_pref ) = @_;

    my $mode = $read_pref ? lc $read_pref->mode : 'primary';
    my $method =
      ( $self->type eq "Single" || $self->type eq "Sharded" )
      ? '_find_any_server'
      : "_find_${mode}_server";

    if ($mode eq 'primary' && $self->current_primary) {
        my $link = $self->_get_server_link( $self->current_primary, $method );
        return $link if $link;
    }

    while ( my $server = $self->_selection_timeout( $method, $read_pref ) ) {
        my $link = $self->_get_server_link( $server, $method, $read_pref );
        if ($link) {
            $self->_set_current_primary($server)
              if $mode eq 'primary'
              && ( $self->type eq "ReplicaSetWithPrimary"
                || 1 == keys %{ $self->servers } );
            return $link;
        }
    }

    my $rp = $read_pref ? $read_pref->as_string : 'primary';
    MongoDB::SelectionError->throw(
        "No readable server available for matching read preference $rp. MongoDB server status:\n"
          . $self->_status_string );
}

sub get_specific_link {
    my ( $self, $address ) = @_;

    my $server = $self->servers->{$address};
    if ( $server && ( my $link = $self->_get_server_link($server) ) ) {
        return $link;
    }
    else {
        MongoDB::SelectionError->throw("Server $address is no longer available");
    }
}

sub get_writable_link {
    my ($self) = @_;

    my $method =
      ( $self->type eq "Single" || $self->type eq "Sharded" )
      ? '_find_any_server'
      : "_find_primary_server";


    if ($self->current_primary) {
        my $link = $self->_get_server_link( $self->current_primary, $method );
        return $link if $link;
    }

    while ( my $server = $self->_selection_timeout($method) ) {
        my $link = $self->_get_server_link( $server, $method );
        if ($link) {
            $self->_set_current_primary($server)
              if $self->type eq "ReplicaSetWithPrimary"
              || 1 == keys %{ $self->servers };
            return $link;
        }
    }

    MongoDB::SelectionError->throw(
        "No writable server available.  MongoDB server status:\n" . $self->_status_string );
}

sub mark_server_unknown {
    my ( $self, $server, $error ) = @_;
    $self->_reset_address_to_unknown( $server->address, $error );
    return;
}

sub mark_stale {
    my ($self) = @_;
    $self->_set_stale(1);
    return;
}

sub scan_all_servers {
    my ($self) = @_;

    my ( $next, @ordinary, @to_check );
    my $start_time = time;
    my $cooldown_time = $start_time - COOLDOWN_SECS;

    # anything not updated since scan start is eligible for a check; when all servers
    # are updated, the loop terminates; Unknown servers aren't checked if
    # they are in the cooldown window since we don't want to wait the connect
    # timeout each attempt when they are unlikely to have changed status
    while (1) {
        @to_check =
          grep {
            $_->type eq 'Unknown'
              ? !$_->updated_since($cooldown_time)
              : !$_->updated_since($start_time)
          } $self->all_servers;

        last unless @to_check;

        if ( $next = first { $_->type eq 'RSPrimary' } @to_check ) {
            $self->check_address( $next->address );
        }
        elsif ( $next = first { $_->type eq 'PossiblePrimary' } @to_check ) {
            $self->check_address( $next->address );
        }
        elsif ( @ordinary = grep { $_->type ne 'Unknown' && $_->type ne 'RSGhost' } @to_check ) {
            $self->_check_oldest_server(@ordinary);
        }
        else {
            $self->_check_oldest_server(@to_check);
        }
    }

    $self->_set_last_scan_time( time );
    $self->_set_stale( 0 );
    $self->_check_wire_versions;
    return;
}

sub status_struct {
    my ($self) = @_;
    my $status = { topology_type => $self->type, };
    $status->{replica_set_name} = $self->replica_set_name if $self->replica_set_name;

    # convert from [sec, microsec] array to floating point
    $status->{last_scan_time} = $self->last_scan_time;

    my $rtt_hash = $self->rtt_ewma_sec;
    my $ss = $status->{servers} = [];
    for my $server ( $self->all_servers ) {
        my $addr = $server->address;
        my $server_struct = $server->status_struct;
        if ( defined $rtt_hash->{$addr} ) {
            $server_struct->{ewma_rtt_sec} = $rtt_hash->{$addr};
        }
        push @$ss, $server_struct;
    }
    return $status;
}

#--------------------------------------------------------------------------#
# private methods
#--------------------------------------------------------------------------#

sub _add_address_as_unknown {
    my ( $self, $address, $last_update, $error ) = @_;
    $error = $error ? "$error" : "";
    $error =~ s/ at \S+ line \d+.*//ms;

    return $self->servers->{$address} = MongoDB::_Server->new(
        address          => $address,
        last_update_time => $last_update || EPOCH,
        error            => $error,
    );
}

sub _check_for_primary {
    my ($self) = @_;
    if ( 0 == $self->_primaries ) {
        $self->_set_type('ReplicaSetNoPrimary');
        $self->_clear_current_primary;
        return 0;
    }
    return 1;
}

sub _check_oldest_server {
    my ( $self, @to_check ) = @_;

    my @ordered =
      map { $_->[0] }
      sort { $a->[1] <=> $b->[1] || rand() <=> rand() } # random if equal
      map { [ $_, $_->last_update_time ] }         # ignore partial secs
      @to_check;

    $self->check_address( $ordered[0]->address );

    return;
}

sub _check_wire_versions {
    my ($self) = @_;

    my $compat = 1;
    for my $server ( grep { $_->is_available } $self->all_servers ) {
        my ( $server_min_wire_version, $server_max_wire_version ) =
          @{ $server->is_master }{qw/minWireVersion maxWireVersion/};

        if (   ( $server_min_wire_version || 0 ) > $self->max_wire_version
            || ( $server_max_wire_version || 0 ) < $self->min_wire_version )
        {
            $compat = 0;
        }
    }
    $self->_set_is_compatible($compat);

    return;
}

sub _dump {
    my ($self) = @_;
    print $self->_status_string . "\n";
}

sub _eligible {
    my ( $self, $read_pref, @candidates ) = @_;

    return @candidates
      if $read_pref->has_empty_tag_sets;

    # given a tag set list, if a tag set matches at least one
    # candidate, then all candidates matching that tag set are eligible
    for my $ts ( @{ $read_pref->tag_sets } ) {
        my @eligible = grep { $_->matches_tag_set($ts) } @candidates;
        return @eligible if @eligible;
    }

    return;
}

sub _find_any_server {
    my ( $self, undef, @candidates ) = @_;
    push @candidates, $self->all_servers unless @candidates;
    return $self->_get_server_in_latency_window(
        [ grep { $_->is_available } @candidates ] );
}


sub _find_nearest_server {
    my ( $self, $read_pref, @candidates ) = @_;
    push @candidates, ( $self->_primaries, $self->_secondaries ) unless @candidates;
    my @suitable = $self->_eligible( $read_pref, @candidates );
    return $self->_get_server_in_latency_window( \@suitable );
}

sub _find_primary_server {
    my ( $self, undef, @candidates ) = @_;
    return $self->current_primary
      if $self->current_primary;
    push @candidates, $self->all_servers unless @candidates;
    return first { $_->is_writable } @candidates;
}

sub _find_primarypreferred_server {
    my ( $self, $read_pref, @candidates ) = @_;
    return $self->_find_primary_server(@candidates)
      || $self->_find_secondary_server( $read_pref, @candidates );
}

sub _find_secondary_server {
    my ( $self, $read_pref, @candidates ) = @_;
    push @candidates, $self->_secondaries unless @candidates;
    my @suitable = $self->_eligible( $read_pref, @candidates );
    return $self->_get_server_in_latency_window( \@suitable );
}

sub _find_secondarypreferred_server {
    my ( $self, $read_pref, @candidates ) = @_;
    return $self->_find_secondary_server( $read_pref, @candidates )
      || $self->_find_primary_server(@candidates);
}

sub _get_server_in_latency_window {
    my ( $self, $servers ) = @_;
    return unless @$servers;
    return $servers->[0] if @$servers == 1;

    # order servers by RTT EWMA
    my $rtt_hash = $self->rtt_ewma_sec;
    my @sorted =
      sort { $a->{rtt} <=> $b->{rtt} }
      map { { server => $_, rtt => $rtt_hash->{ $_->address } } } @$servers;

    # lowest RTT is always in the windows
    my @in_window = shift @sorted;

    # add any other servers in window and return a random one
    my $max_rtt = $in_window[0]->{rtt} + $self->local_threshold_sec;
    push @in_window, grep { $_->{rtt} <= $max_rtt } @sorted;
    return $in_window[ int( rand(@in_window) ) ]->{server};
}

sub _get_server_link {
    my ( $self, $server, $method, $read_pref ) = @_;
    my $address = $server->address;
    my $link    = $self->links->{$address};

    # if no link, make a new connection or give up
    $link = $self->_initialize_link($address) unless $link && $link->connected;
    return unless $link;

    # for idle links, refresh the server and verify validity
    if ( time - $link->last_used > $self->socket_check_interval_sec ) {
        $self->check_address($address);

        # topology might have dropped the server
        $server = $self->servers->{$address}
          or return;

        my $fresh_link = $self->links->{$address};
        return $fresh_link if !$method;

        # verify selection criteria
        return $self->$method( $read_pref, $server ) ? $fresh_link : undef;
    }

    return $link;
}

sub _initialize_link {
    my ( $self, $address ) = @_;

    my $link = try {
        MongoDB::_Link->new( %{$self->link_options}, address => $address )->connect;
    }
    catch {
        # if connection failed, update topology with Unknown description
        $self->_reset_address_to_unknown( $address, $_ );
        return;
    };

    return unless $link;

    # connection succeeded, so register link and get a server description
    $self->links->{$address} = $link;
    $self->_update_topology_from_link($link);

    # after update, server might or might not exist in the topology;
    # if not, return nothing
    return unless my $server = $self->servers->{$address};

    # we have a link and the server is a valid member, so
    # try to authenticate; if authentication fails, all
    # servers are considered invalid and we throw an error
    if ( $self->type eq 'Single' || first { $_ eq $server->type } qw/Standalone Mongos RSPrimary RSSecondary/ ) {
        try {
            $self->credential->authenticate($link, $self->bson_codec);
        }
        catch {
            my $err = $_;
            $self->_reset_address_to_unknown( $_->address, $err ) for $self->all_servers;
            MongoDB::AuthError->throw("Authentication to $address failed: $err");
        };
    }

    return $link;
}

sub _primaries {
    return grep { $_->type eq 'RSPrimary' } $_[0]->all_servers;
}

sub _remove_address {
    my ( $self, $address ) = @_;
    if ( $self->current_primary &&  $self->current_primary->address eq $address ) {
        $self->_clear_current_primary;
    }
    delete $self->$_->{$address} for qw/servers links rtt_ewma_sec/;
    return;
}

sub _remove_server {
    my ( $self, $server ) = @_;
    $self->_remove_address( $server->address );
    return;
}

sub _reset_address_to_unknown {
    my ( $self, $address, $error, $update_time ) = @_;
    $update_time ||= time;

    $self->_remove_address($address);
    my $desc = $self->_add_address_as_unknown( $address, $update_time, $error );
    $self->_update_topology_from_server_desc($address, $desc);

    return;
}

sub _secondaries {
    return grep { $_->type eq 'RSSecondary' } $_[0]->all_servers;
}

sub _status_string {
    my ($self) = @_;
    my $status = '';
    if ( $self->type =~ /^Replica/ ) {
        $status .= sprintf( "Topology type: %s; Set name: %s, Member status:\n",
            $self->type, $self->replica_set_name );
    }
    else {
        $status .= sprintf( "Topology type: %s; Member status:\n", $self->type );
    }

    $status .= join( "\n", map { "  $_" } map { $_->status_string } $self->all_servers ) . "\n";
    return $status;
}

# this implements the server selection timeout around whatever actual method
# is used for returning a link
sub _selection_timeout {
    my ( $self, $method, $read_pref ) = @_;

    my $start_time = my $loop_end_time = time();
    my $max_time = $start_time + $self->server_selection_timeout_sec;

    if ( $self->last_scan_time + $self->heartbeat_frequency_sec < $start_time ) {
        $self->_set_stale(1);
    }

    while (1) {
        if ( $self->stale ) {
            my $scan_ready_time = $self->last_scan_time + MIN_HEARTBEAT_FREQUENCY_SEC;

            # if not enough time left to wait to check; then caller throws error
            return if !$self->server_selection_try_once && $scan_ready_time > $max_time;

            # loop_end_time is a proxy for time() to avoid overhead
            my $sleep_time = $scan_ready_time - $loop_end_time;

            usleep( 1e6 * $sleep_time ) if $sleep_time > 0;

            $self->scan_all_servers;
        }

        unless ( $self->is_compatible ) {
            $self->_set_stale(1);
            MongoDB::ProtocolError->throw(
                "Incompatible wire protocol version. This version of the MongoDB driver is not compatible with the server. You probably need to upgrade this library."
            );
        }

        my $server = $self->$method($read_pref);

        return $server if $server;

        $self->_set_stale(1);
        $loop_end_time = time();

        if ( $self->server_selection_try_once ) {
            # if already tried once; then caller throws error
            return if $self->last_scan_time > $start_time;
        }
        else {
            # if selection timed out; then caller throws error
            return if $loop_end_time > $max_time;
        }
    }
}

my $PRIMARY = MongoDB::ReadPreference->new;

sub _update_topology_from_link {
    my ( $self, $link ) = @_;

    my $start_time = time;
    my $is_master = eval {
        my $op = MongoDB::Op::_Command->_new(
            db_name         => 'admin',
            query           => [ ismaster => 1 ],
            query_flags     => {},
            bson_codec      => $self->bson_codec,
            read_preference => $PRIMARY,
        );
        # just for this command, use connect timeout as socket timeout;
        # this violates encapsulation, but requires less API modification
        # to support this specific exception to the socket timeout
        local $link->{socket_timeout} = $link->{connect_timeout};
        $op->execute( $link )->output;
    };
    if ( $@ ) {
        local $_ = $@;
        warn "During MongoDB topology update for @{[$link->address]}: $_"
            if WITH_ASSERTS;
        $self->_reset_address_to_unknown( $link->address, $_ );
        # retry a network error if server was previously known to us
        if (    $_->$_isa("MongoDB::NetworkError")
            and $link->server
            and $link->server->type ne 'Unknown'
            and $link->server->type ne 'PossiblePrimary' )
        {
            # the earlier reset to unknown avoids us reaching this branch again
            # and recursing forever
            $self->check_address( $link->address );
        }
        return;
    };

    return unless $is_master;

    my $end_time = time;
    my $rtt_sec = $end_time - $start_time;

    my $new_server = MongoDB::_Server->new(
        address          => $link->address,
        last_update_time => $end_time,
        rtt_sec           => $rtt_sec,
        is_master        => $is_master,
    );

    $self->_update_topology_from_server_desc( $link->address, $new_server );

    return;
}

sub _update_topology_from_server_desc {
    my ( $self, $address, $new_server ) = @_;

    # ignore spurious result not in the set; this isn't strictly necessary
    # for single-threaded operation, but spec tests expect it and if we
    # have async monitoring in the future, late responses could come back
    # after a server has been removed
    return unless $self->servers->{$address};

    $self->_update_ewma( $address, $new_server );

    # must come after ewma update
    $self->servers->{$address} = $new_server;

    my $method = "_update_" . $self->type;
    $self->$method( $address, $new_server );

    # if link is still around, tag it with server specifics
    $self->_update_link_metadata( $address, $new_server );

    return $new_server;
}

sub _update_ewma {
    my ( $self, $address, $new_server ) = @_;

    if ( $new_server->type eq 'Unknown' ) {
        delete $self->rtt_ewma_sec->{$address};
    }
    else {
        my $old_avg = $self->rtt_ewma_sec->{$address};
        my $alpha   = $self->ewma_alpha;
        my $rtt_sec  = $new_server->rtt_sec;
        $self->rtt_ewma_sec->{$address} =
          defined($old_avg) ? ( $alpha * $rtt_sec + ( 1 - $alpha ) * $old_avg ) : $rtt_sec;
    }

    return;
}

sub _update_link_metadata {
    my ( $self, $address, $server ) = @_;

    # if the link didn't get dropped from the topology during the update, we
    # attach the server so the link knows where it came from
    if ( $self->links->{$address} ) {
        $self->links->{$address}->set_metadata($server);
    }

    return;
}

sub _update_rs_with_primary_from_member {
    my ( $self, $new_server ) = @_;

    if (  !$self->servers->{ $new_server->address }
        || $self->replica_set_name ne $new_server->set_name )
    {
        $self->_remove_server($new_server);
    }

    # require 'me' that matches expected address
    if ( $new_server->me && $new_server->me ne $new_server->address ) {
        $self->_remove_server($new_server);
        $self->_check_for_primary;
        return;
    }

    if ( ! $self->_check_for_primary ) {

        # flag possible primary to amend scanning order
        my $primary = $new_server->primary;
        if (   length($primary)
            && $self->servers->{$primary}
            && $self->servers->{$primary}->type eq 'Unknown' )
        {
            $self->servers->{$primary}->_set_type('PossiblePrimary');
        }
    }

    return;
}

sub _update_rs_with_primary_from_primary {
    my ( $self, $new_server ) = @_;

    if ( !length $self->replica_set_name ) {
        $self->_set_replica_set_name( $new_server->set_name );
    }
    elsif ( $self->replica_set_name ne $new_server->set_name ) {
        # We found a primary but it doesn't have the setName
        # provided by the user or previously discovered
        $self->_remove_server($new_server);
        return;
    }

    my $election_id = $new_server->is_master->{electionId};
    my $set_version = $new_server->is_master->{setVersion};
    my $max_election_id = $self->max_election_id;
    my $max_set_version = $self->max_set_version;

    if ( defined $set_version && defined $election_id ) {
        if (
               defined $max_election_id
            && defined $max_set_version
            && (
                $max_set_version > $set_version
                || (   $max_set_version == $set_version
                    && $max_election_id->value gt $election_id->value )
            )
          )
        {
            # stale primary

            $self->_remove_address( $new_server->address );
            $self->_add_address_as_unknown( $new_server->address );
            $self->_check_for_primary;
            return;
        }
        $self->_set_max_election_id( $election_id );
    }

    if ( defined $set_version
        && ( !defined $max_set_version || $set_version > $max_set_version ) )
    {
        $self->_set_max_set_version($set_version);
    }

    # possibly invalidate an old primary (even if more than one!)
    for my $old_primary ( $self->_primaries ) {
        if ( $old_primary->address ne $new_server->address ) {
            $self->_reset_address_to_unknown(
                $old_primary->address,
                "no longer primary; update needed",
                $old_primary->last_update_time
            );
        }
    }

    # unknown set members need to be added to the topology
    my %set_members =
      map { $_ => undef } map { @{ $new_server->$_ } } qw/hosts passives arbiters/;

    $self->_add_address_as_unknown($_)
      for grep { !exists $self->servers->{$_} } keys %set_members;

    # topology servers no longer in the set need to be removed
    $self->_remove_address($_)
      for grep { !exists $set_members{$_} } keys %{ $self->servers };

    return;
}

sub _update_rs_without_primary {
    my ( $self, $new_server ) = @_;

    if ( !length $self->replica_set_name ) {
        $self->_set_replica_set_name( $new_server->set_name );
    }
    elsif ( $self->replica_set_name ne $new_server->set_name ) {
        $self->_remove_server($new_server);
        return;
    }

    # unknown set members need to be added to the topology
    my %set_members =
      map { $_ => undef } map { @{ $new_server->$_ } } qw/hosts passives arbiters/;

    $self->_add_address_as_unknown($_)
      for grep { !exists $self->servers->{$_} } keys %set_members;

    # require 'me' that matches expected address
    if ( $new_server->me && $new_server->me ne $new_server->address ) {
        $self->_remove_server($new_server);
        return;
    }

    # flag possible primary to amend scanning order
    my $primary = $new_server->primary;
    if (   length($primary)
        && $self->servers->{$primary}
        && $self->servers->{$primary}->type eq 'Unknown' )
    {
        $self->servers->{$primary}->_set_type('PossiblePrimary');
    }

    return;
}

#--------------------------------------------------------------------------#
# update methods by topology types: behavior in each depends on new server
# type received
#--------------------------------------------------------------------------#

sub _update_ReplicaSetNoPrimary {
    my ( $self, $address, $new_server ) = @_;

    my $server_type = $new_server->type;

    if ( $server_type eq 'RSPrimary' ) {
        $self->_set_type('ReplicaSetWithPrimary');
        $self->_update_rs_with_primary_from_primary($new_server);
        # topology changes might have removed all primaries
        $self->_check_for_primary;
    }
    elsif ( grep { $server_type eq $_ } qw/RSSecondary RSArbiter RSOther/ ) {
        $self->_update_rs_without_primary($new_server);
    }
    elsif ( grep { $server_type eq $_ } qw/Standalone Mongos/ ) {
        $self->_remove_server($new_server);
    }
    else {
        # Unknown or RSGhost are no-ops
    }

    return;
}

sub _update_ReplicaSetWithPrimary {
    my ( $self, $address, $new_server ) = @_;

    my $server_type = $new_server->type;

    if ( $server_type eq 'RSPrimary' ) {
        $self->_update_rs_with_primary_from_primary($new_server);
    }
    elsif ( grep { $server_type eq $_ } qw/RSSecondary RSArbiter RSOther/ ) {
        $self->_update_rs_with_primary_from_member($new_server);
    }
    elsif ( grep { $server_type eq $_ } qw/Unknown Standalone Mongos/ ) {
        $self->_remove_server($new_server)
          unless $server_type eq 'Unknown';
    }
    else {
        # RSGhost is no-op
    }

    # topology changes might have removed all primaries
    $self->_check_for_primary;

    return;
}

sub _update_Sharded {
    my ( $self, $address, $new_server ) = @_;

    my $server_type = $new_server->type;

    if ( grep { $server_type eq $_ } qw/Unknown Mongos/ ) {
        # no-op
    }
    else {
        $self->_remove_server($new_server);
    }

    return;
}

sub _update_Single {
    my ( $self, $address, $new_server ) = @_;
    # Per the spec, TopologyType Single never changes type or membership
    return;
}

sub _update_Unknown {
    my ( $self, $address, $new_server ) = @_;

    my $server_type = $new_server->type;

    if ( $server_type eq 'Standalone' ) {
        if ( $self->number_of_seeds == 1 ) {
            $self->_set_type('Single');
        }
        else {
            # a standalone server with multiple seeds is a replica set member
            # in maintenance mode; we drop it and may pick it up later if it
            # rejoins the replica set.
            $self->_remove_address($address);
        }
    }
    elsif ( $server_type eq 'Mongos' ) {
        $self->_set_type('Sharded');
    }
    elsif ( $server_type eq 'RSPrimary' ) {
        $self->_set_type('ReplicaSetWithPrimary');
        $self->_update_rs_with_primary_from_primary($new_server);
        # topology changes might have removed all primaries
        $self->_check_for_primary;
    }
    elsif ( grep { $server_type eq $_ }  qw/RSSecondary RSArbiter RSOther/ ) {
        $self->_set_type('ReplicaSetNoPrimary');
        $self->_update_rs_without_primary($new_server);
    }
    else {
        # Unknown or RSGhost are no-ops
    }

    return;
}

1;

# vim: ts=4 sts=4 sw=4 et: