###########################################################################
# package Net::SIP::Dispatcher
#
# Manages the sending of SIP packets to the legs (and finding out which
# leg can be used) and the receiving of SIP packets and forwarding to
# the upper layer.
# Handles retransmits
###########################################################################
use strict;
use warnings;
package Net::SIP::Dispatcher;
use fields (
# interface to outside
'receiver', # callback into upper layer
'legs', # \@list of Net::SIP::Legs managed by dispatcher
'eventloop', # Net::SIP::Dispatcher::Eventloop or similar
'outgoing_proxy', # optional fixed outgoing proxy
'domain2proxy', # optional mapping between SIP domains and proxies (otherwise use DNS)
# internals
'do_retransmits', # flag if retransmits will be done (false for stateless proxy)
'queue', # \@list of outstanding Net::SIP::Dispatcher::Packet
'response_cache', # Cache of responses, used to reply to retransmits
'disp_expire', # expire/retransmit timer
);
use Net::SIP::Leg;
use Net::SIP::Util ':all';
use Errno qw(EHOSTUNREACH ETIMEDOUT ENOPROTOOPT EINVAL);
use IO::Socket;
use List::Util 'first';
use Net::DNS;
use Carp 'croak';
use Net::SIP::Debug;
use Scalar::Util 'weaken';
###########################################################################
# create new dispatcher
# Args: ($class,$legs,$eventloop;%args)
# $legs: \@array, see add_leg()
# $eventloop: Net::SIP::Dispatcher::Eventloop or similar
# %args:
# outgoing_proxy: optional outgoing proxy (ip:port)
# do_retransmits: set if the dispatcher has to handle retransmits by itself
# defaults to true
# domain2proxy: mappings { domain => proxy } if a fixed proxy is used
# for specific domains, otherwise lookup will be done per DNS
# proxy can be ip,ip:port or \@list of [ prio,proto,ip,port ] like
# in the DNS SRV record.
# with special domain '*' a default can be specified, so that DNS
# will not be used at all
# Returns: $self
###########################################################################
sub new {
my ($class,$legs,$eventloop,%args) = @_;
my ($outgoing_proxy,$do_retransmits,$domain2proxy)
= delete @args{qw( outgoing_proxy do_retransmits domain2proxy )};
die "bad args: ".join( ' ',keys %args ) if %args;
$eventloop ||= Net::SIP::Dispatcher::Eventloop->new;
# normalize domain2proxy so that its the same format one gets from
# the SRV record
$domain2proxy ||= {};
foreach ( values %$domain2proxy ) {
if ( ref($_) ) { # should be \@list of [ prio,proto,ip,port ]
} elsif ( m{^(?:(udp|tcp):)?([^:]+)(?::(\d+))?$} ) {
my @proto = $1 ? ( $1 ) : ( 'udp','tcp' );
my $host = $2;
my $port = $3 || 5060;
$_ = [ map { [ -1, $_, $host, $port ] } @proto ];
} else {
croak( "invalid entry in domain2proxy: $_" );
}
}
my $self = fields::new($class);
%$self = (
legs => [],
queue => [],
outgoing_proxy => undef,
response_cache => {},
do_retransmits => defined( $do_retransmits ) ? $do_retransmits : 1,
eventloop => $eventloop,
domain2proxy => $domain2proxy,
);
$self->add_leg( @$legs );
if ( $outgoing_proxy ) {
my $leg = $self->_find_leg4addr( $outgoing_proxy )
|| die "cannot find leg for destination $outgoing_proxy";
$self->{outgoing_proxy} = $outgoing_proxy;
}
# regularly prune queue
my $sub = sub {
my ($self,$timer) = @_;
if ( $self ) {
$self->queue_expire( $self->{eventloop}->looptime );
} else {
$timer->cancel;
}
};
my $cb = [ $sub,$self ];
weaken( $cb->[1] );
$self->{disp_expire} = $self->add_timer( 1,$cb,1,'disp_expire' );
return $self;
}
###########################################################################
# set receiver, e.g the upper layer which gets the incoming packets
# received by the dispatcher
# Args: ($self,$receiver)
# $receiver: object which has receive( Net::SIP::Leg,Net::SIP::Packet )
# method to handle incoming SIP packets or callback
# might be undef - in this case the existing receiver will be removed
# Returns: NONE
###########################################################################
sub set_receiver {
my Net::SIP::Dispatcher $self = shift;
if ( my $receiver = shift ) {
if ( my $sub = UNIVERSAL::can($receiver,'receive' )) {
# Object with method receive()
$receiver = [ $sub,$receiver ]
}
$self->{receiver} = $receiver;
} else {
# remove receiver
$self->{receiver} = undef
}
}
###########################################################################
# adds a leg to the dispatcher
# Args: ($self,@legs)
# @legs: can be sockets, \%args for constructing or already
# objects of class Net::SIP::Leg
# Returns: NONE
###########################################################################
sub add_leg {
my Net::SIP::Dispatcher $self = shift;
my $legs = $self->{legs};
foreach my $arg (@_) {
my $leg;
# if it is not a leg yet create one based
# on the arguments
if ( UNIVERSAL::isa( $arg,'Net::SIP::Leg' )) {
# already a leg
$leg = $arg;
} elsif ( UNIVERSAL::isa( $arg,'IO::Handle' )) {
# create from socket
$leg = Net::SIP::Leg->new( sock => $arg );
} elsif ( UNIVERSAL::isa( $arg,'HASH' )) {
# create from %args
$leg = Net::SIP::Leg->new( %$arg );
} else {
croak "invalid spec for leg: $arg";
}
push @$legs, $leg;
if ( my $fd = $leg->fd ) {
my $cb = sub {
# don't crash Dispatcher on bad or unexpected packets
eval {
my ($self,$leg) = @_;
$self || return;
# leg->receive might return undef if the packet wasnt
# read successfully. for tcp connections the receive
# on a listening socket might cause a new leg to be added
# which then will receive the packet (maybe over multiple
# read attempts)
my ($packet,$from) = $leg->receive or do {
DEBUG( 50,"failed to receive on leg" );
return;
};
if ($packet->is_request) {
# add received and rport to top via
$packet->scan_header( via => [ sub {
my ($vref,$hdr) = @_;
return if $$vref++;
my ($d,$h) = sip_hdrval2parts(via => $hdr->{value});
# FIXME: not IPv6 save
my ($host,$port) = $d =~m{^\S+\s+(\S+?)(?::(\d+))?$};
my ($addr,$rport) = $from =~m{^(\S+)(?::(\d+))$};
my %nh;
if ( exists $h->{rport} and ! defined $h->{rport}) {
$nh{rport} = $rport;
}
if ( $host ne $addr or $nh{rport}) {
# either hostname or different IP or required because
# rport was set
$nh{received} = $addr;
}
if (%nh) {
$hdr->{value} = sip_parts2hdrval('via',$d,{ %$h,%nh});
$hdr->set_modified;
}
}, \( my $cvia )]);
}
# handle received packet
$self->receive( $packet,$leg,$from );
};
if ($@) {
DEBUG(1,"dispatcher croaked: $@");
}
};
$cb = [ $cb,$self,$leg ];
weaken( $cb->[1] );
$self->{eventloop}->addFD( $fd, $cb );
}
}
}
###########################################################################
# remove a leg from the dispatcher
# Args: ($self,@legs)
# @legs: Net::SIP::Leg objects
# Returns: NONE
###########################################################################
sub remove_leg {
my Net::SIP::Dispatcher $self = shift;
my $legs = $self->{legs};
foreach my $leg (@_) {
@$legs = grep { $_ != $leg } @$legs;
if ( my $fd = $leg->fd ) {
$self->{eventloop}->delFD( $fd );
}
}
}
###########################################################################
# find legs matching specific criterias
# Args: ($self,%args)
# %args: Hash with some of these keys
# addr: leg must match addr
# port: leg must match port
# proto: leg must match proto
# sock: leg must match sock
# sub: $sub->($leg) must return true
# Returns: @legs
# @legs: all Legs matching the criteria
# Comment:
# if no criteria given it will return all legs
###########################################################################
sub get_legs {
my Net::SIP::Dispatcher $self = shift;
return @{ $self->{legs} } if ! @_; # shortcut
my %args = @_;
my @rv;
foreach my $leg (@{ $self->{legs} }) {
next if $args{addr} && $args{addr} ne $leg->{addr};
next if $args{port} && $args{port} != $leg->{port};
next if $args{proto} && $args{proto} ne $leg->{proto};
next if $args{sock} && $args{sock} != $leg->{sock};
next if $args{sub} && !invoke_callback( $args{sub},$leg );
push @rv,$leg
}
return @rv;
}
###########################################################################
# add timer
# propagates to add_timer of eventloop
# Args: ($self,$when,$cb,$repeat)
# $when: when callback gets called, can be absolute time (epoch, time_t)
# or relative time (seconds)
# $cb: callback
# $repeat: after how much seconds it gets repeated (default 0, e.g never)
# Returns: $timer
# $timer: Timer object, has method cancel for canceling timer
###########################################################################
sub add_timer {
my Net::SIP::Dispatcher $self = shift;
return $self->{eventloop}->add_timer( @_ );
}
###########################################################################
# initiate delivery of a packet, e.g. put packet into delivery queue
# Args: ($self,$packet,%more_args)
# $packet: Net::SIP::Packet which needs to be delivered
# %more_args: hash with some of the following keys
# id: id for packet, used in cancel_delivery
# callback: [ \&sub,@arg ] for calling back on definite delivery
# success (tcp only) or error (timeout,no route,...)
# leg: specify outgoing leg, needed for responses
# dst_addr: specify outgoing addr [ip,port] or sockaddr, needed
# for responses
# do_retransmits: if retransmits should be done, default from
# global value (see new())
# Returns: NONE
# Comment: no return value, but die()s on errors
###########################################################################
sub deliver {
my Net::SIP::Dispatcher $self = shift;
my ($packet,%more_args) = @_;
my $now = delete $more_args{now};
my $do_retransmits = delete $more_args{do_retransmits};
$do_retransmits = $self->{do_retransmits} if !defined $do_retransmits;
DEBUG( 100,"deliver $packet" );
if ( $packet->is_response ) {
# cache response for 32 sec (64*T1)
if ( $do_retransmits ) {
my $cid = join( "\0",
map { $packet->get_header($_) }
qw( cseq call-id from to )
);
$self->{response_cache}{$cid} = {
packet => $packet,
expire => ( $now ||= time()) +32
};
}
}
my $new_entry = Net::SIP::Dispatcher::Packet->new(
packet => $packet,
%more_args
);
$new_entry->prepare_retransmits( $now ) if $do_retransmits;
push @{ $self->{queue}}, $new_entry;
$self->__deliver( $new_entry );
}
###########################################################################
# cancel delivery of all packets with specific id
# Args: ($self,$typ?,$id)
# $typ: what to cancel, e.g. 'id','callid' or 'qentry', optional,
# defaults to 'id' if $id is not ref or 'qentry' if $id is ref
# $id: id to cancel, can also be queue entry
# Returns: bool, true if the was something canceled
###########################################################################
sub cancel_delivery {
my Net::SIP::Dispatcher $self = shift;
my ($callid,$id,$qentry);
if ( @_ == 2 ) {
my $typ = shift;
if ( $typ eq 'callid' ) { $callid = shift }
elsif ( $typ eq 'id' ) { $id = shift }
elsif ( $typ eq 'qentry' ) { $qentry = shift }
else {
croak( "bad typ '$typ', should be id|callid|qentry" );
}
} else {
$id = shift;
if ( ref($id)) {
$qentry = $id;
$id = undef;
}
}
my $q = $self->{queue};
my $qn = @$q;
if ( $qentry ) {
# it's a *::Dispatcher::Packet
DEBUG( 100,"cancel packet id: $qentry->{id}" );
@$q = grep { $_ != $qentry } @$q;
} elsif ( defined $id ) {
no warnings; # $_->{id} can be undef
DEBUG( 100, "cancel packet id $id" );
@$q = grep { $_->{id} ne $id } @$q;
} elsif ( defined $callid ) {
no warnings; # $_->{callid} can be undef
DEBUG( 100, "cancel packet callid $callid" );
@$q = grep { $_->{callid} ne $callid } @$q;
} else {
croak( "cancel_delivery w/o id" );
}
return @$q < $qn; # true if items got deleted
}
###########################################################################
# Receive a packet from a leg and forward it to the upper layer
# if the packet is a request and I have a cached response resend it
# w/o involving the upper layer
# Args: ($self,$packet,$leg,$from)
# $packet: Net::SIP::Packet
# $leg: through which leg it was received
# $from: where the packet comes from (ip:port)
# Returns: NONE
# Comment: if no receiver is defined using set_receiver the packet
# will be silently dropped
###########################################################################
sub receive {
my Net::SIP::Dispatcher $self = shift;
my ($packet,$leg,$from) = @_;
if ( $packet->is_request ) {
my $cache = $self->{response_cache};
if ( %$cache ) {
my $cid = join( "\0",
map { $packet->get_header($_) }
qw( cseq call-id from to )
);
if ( my $response = $cache->{$cid} ) {
# I have a cached response, use it
$self->deliver($response->{packet}, leg => $leg, dst_addr => $from);
return;
}
}
}
invoke_callback( $self->{receiver},$packet,$leg,$from );
}
###########################################################################
# expire the entries on the queue, eg removes expired entries and
# calls callback if necessary
# expires also the response cache
# Args: ($self;$time)
# $time: expire regarding $time, if not given use time()
# Returns: undef|$min_expire
# $min_expire: time when next thing expires (undef if nothing to expire)
###########################################################################
sub queue_expire {
my Net::SIP::Dispatcher $self = shift;
my $now = shift || $self->{eventloop}->looptime;
# expire queue
my $queue = $self->{queue};
my (@nq,$changed,$min_expire);
foreach my $qe (@$queue) {
my $retransmit;
if ( my $retransmits = $qe->{retransmits} ) {
while ( @$retransmits && $retransmits->[0] < $now ) {
$retransmit = shift(@$retransmits);
}
if ( !@$retransmits ) {
# completly expired
DEBUG( 50,"entry %s expired because expire=%.2f but now=%d", $qe->tid,$retransmit,$now );
$changed++;
$qe->trigger_callback( ETIMEDOUT );
# don't put into new queue
next;
}
if ( $retransmit ) {
# need to retransmit the packet
$self->__deliver( $qe );
}
my $next_retransmit = $retransmits->[0];
if ( !defined($min_expire) || $next_retransmit<$min_expire ) {
$min_expire = $next_retransmit
}
}
push @nq,$qe;
}
$self->{queue} = \@nq if $changed;
# expire response cache
my $cache = $self->{response_cache};
foreach my $cid ( keys %$cache ) {
my $expire = $cache->{$cid}{expire};
if ( $expire < $now ) {
delete $cache->{$cid};
} elsif ( !defined($min_expire) || $expire<$min_expire ) {
$min_expire = $expire
}
}
# return time to next expire for optimizations
return $min_expire;
}
###########################################################################
# the real delivery of a queue entry:
# if no leg,addr try to determine them from request-URI
# prepare timeout handling
# Args: ($self,$qentry)
# $qentry: Net::SIP::Dispatcher::Packet
# Returns: NONE
# Comment:
# this might be called several times for a queue entry, eg as a callback
# at the various stages (find leg,addr for URI needs DNS lookup which
# might be done asynchronous, eg callback driven, send might be callback
# driven for tcp connections which need connect, multiple writes...)
###########################################################################
sub __deliver {
my Net::SIP::Dispatcher $self = shift;
my $qentry = shift;
# loop until leg und dst_addr are known, when we call leg->deliver
my $leg = $qentry->{leg}[0];
if ( $leg && @{ $qentry->{leg}}>1 ) {
DEBUG( 50,"picking first of multiple legs: ".join( " ", map { $_->dump } @{ $qentry->{leg}} ));
}
my $dst_addr = $qentry->{dst_addr}[0];
if ( ! $dst_addr || ! $leg) {
# if explicit routes given use first route
# else resolve URI from request
my $uri;
my $packet = $qentry->{packet};
if ( my ($route) = $packet->get_header( 'route' )) {
($uri) = sip_hdrval2parts( route => $route );
} else {
$uri = $packet->uri;
}
DEBUG( 100,"no dst_addr or leg yet, uri='$uri'" );
my $callback = sub {
my ($self,$qentry,@error) = @_;
if ( @error ) {
$qentry->trigger_callback(@error);
return $self->cancel_delivery( $qentry );
} else {
$self->__deliver($qentry);
}
};
return $self->resolve_uri(
$uri,
$qentry->{dst_addr},
$qentry->{leg},
[ $callback, $self,$qentry ],
$qentry->{proto},
);
}
# I have leg and addr, send packet thru leg to addr
my $cb = sub {
my ($self,$qentry,$error) = @_;
$self || return;
if ( !$error && $qentry->{retransmits} ) {
# remove from queue even if timeout
$self->cancel_delivery( $qentry );
}
$qentry->trigger_callback( $error );
};
# adds via on cloned packet, calls cb if definite success (tcp)
# or error
DEBUG( 50,"deliver through leg ".$leg->dump." \@$dst_addr" );
weaken( my $rself = \$self );
$cb = [ $cb,$self,$qentry ];
weaken( $cb->[1] );
$leg->deliver( $qentry->{packet},$dst_addr,$cb );
if ( !$qentry->{retransmits} ) {
# remove from queue if no timeout
$self->cancel_delivery( $qentry );
}
}
###########################################################################
# resolve URI, determine dst_addr and outgoing leg
# Args: ($self,$uri,$dst_addr,$legs,$callback;$allowed_proto,$allowed_legs)
# $uri: URI to resolve
# $dst_addr: reference to list where to put dst_addr
# $legs: reference to list where to put leg
# $callback: called with () if resolved successfully, else called
# with @error
# $allowed_proto: optional \@list of protocols (default udp,tcp). If given only
# only these protocols will be considered and in this order.
# $allowed_legs: optional list of legs which are allowed
# Returns: NONE
###########################################################################
sub resolve_uri {
my Net::SIP::Dispatcher $self = shift;
my ($uri,$dst_addr,$legs,$callback,$allowed_proto,$allowed_legs) = @_;
# packet should be a request packet (see constructor of *::Dispatcher::Packet)
my ($domain,$user,$sip_proto,undef,$param) = sip_uri2parts($uri);
$domain or do {
DEBUG( 50,"bad URI '$uri'" );
return invoke_callback($callback, EHOSTUNREACH );
};
my @proto;
my $default_port = 5060;
# XXXX hack, better would be to really parse URI, see *::Util::sip_hdrval2parts
if ( $sip_proto eq 'sips' ) {
$default_port = 5061;
@proto = 'tcp';
} elsif ( my $p = $param->{transport} ) {
# explicit spec of proto
@proto = lc($p)
} else {
# XXXX maybe we should use tcp first if the packet has a specific
# minimum length, udp should not be used at all if the packet size is > 2**16
@proto = ( 'udp','tcp' );
}
# change @proto so that only the protocols from $allowed_proto are ini it
# and that they are tried in the order from $allowed_proto
if ( $allowed_proto && @$allowed_proto ) {
my @proto_new;
foreach my $ap ( @$allowed_proto ) {
my $p = first { $ap eq $_ } @proto;
push @proto_new,$p if $p;
}
@proto = @proto_new;
@proto or do {
DEBUG( 50,"no protocols allowed for $uri" );
return invoke_callback( $callback, ENOPROTOOPT ); # no proto available
};
}
$dst_addr ||= [];
$allowed_legs ||= [ $self->get_legs ];
if ( @$legs ) {
my %allowed = map { $_ => 1 } @$legs;
@$allowed_legs = grep { $allowed{$_} } @$allowed_legs;
}
@$allowed_legs or do {
DEBUG( 50,"no legs allowed for '$uri'" );
return invoke_callback($callback, EHOSTUNREACH );
};
my $ip_addr;
if ( $domain =~m{^(\d+\.\d+\.\d+\.\d+)(?::(\d+))?$} ) {
# if domain part of URI is IPv4[:port]
$default_port = $2 if defined $2;
$ip_addr = $1;
# e.g. 10.0.3.4 should match *.3.0.10.in-addr.arpa
$domain = join( '.', reverse split( m{\.},$ip_addr )).'.in-addr.arpa';
} else {
$domain =~s{\.*(?::(\d+))?$}{}; # remove trailing dots + port
$default_port = $1 if defined $1;
}
DEBUG( 100,"domain=$domain" );
# do we have a fixed proxy for the domain or upper domain?
if ( ! @$dst_addr ) {
my $d2p = $self->{domain2proxy};
if ( $d2p && %$d2p ) {
my $dom = $domain;
my $addr = $d2p->{$dom}; # exact match
while ( ! $addr) {
$dom =~s{^[^\.]+\.}{} or last;
$addr = $d2p->{ "*.$dom" };
}
$addr ||= $d2p->{ $dom = '*'}; # catch-all
if ( $addr ) {
DEBUG( 50,"setting dst_addr from domain specific proxy for domain $dom" );
@$dst_addr = @$addr;
}
}
}
# do we have a global outgoing proxy?
if ( !@$dst_addr
&& ( my $addr = $self->{outgoing_proxy} )) {
# if we have a fixed outgoing proxy use it
DEBUG( 50,"setting dst_addr+leg to $addr from outgoing_proxy" );
@$dst_addr = ( $addr );
}
# is it an IP address?
if ( !@$dst_addr && $ip_addr ) {
DEBUG( 50,"setting dst_addr from URI because IP address given" );
@$dst_addr = ( $ip_addr );
}
# is param maddr set?
if ( my $ip = $param->{maddr} ) {
@$dst_addr = ( $ip )
if $ip =~m{^[\d\.]+$} && eval { inet_aton($ip) };
}
# entries in form [ prio,proto,ip,port ]
my @resp;
foreach my $addr ( @$dst_addr ) {
if ( ref($addr)) {
push @resp,$addr; # right format: see domain2proxy
} else {
$addr =~m{^(?:(udp|tcp):)?([^:]+)(?::(\d+))?$} || next;
my $host = $2;
my $proto = $1 ? [ $1 ] : \@proto;
my $port = $3 ? $3 : $default_port;
push @resp, map { [ -1,$_,$host,$port ] } @$proto;
}
}
# should we use a fixed transport?
if ( my $proto = $param->{transport} ) {
@resp = grep { lc($_->[1]) eq lc($proto) } @resp;
}
my @param = ( $dst_addr,$legs,$allowed_legs,$default_port,$callback );
return __resolve_uri_final( @param,0,\@resp ) if @resp;
# If no fixed mapping DNS needs to be used
# XXXX no full support for RFC3263, eg we don't support NAPTR
# but query instead directly for _sip._udp.domain.. like in
# RFC2543 specified
return $self->dns_domain2srv(
$domain, \@proto, $sip_proto,
[ \&__resolve_uri_final, @param ]
);
}
sub __resolve_uri_final {
my ($dst_addr,$legs,$allowed_legs,$default_port,$callback,$error,$resp) = @_;
DEBUG_DUMP( 100,$resp );
return invoke_callback( $callback,EHOSTUNREACH )
unless $resp && @$resp;
# for A records we got no port, use default_port
$_->[3] ||= $default_port for(@$resp);
# sort by prio
# FIXME: can contradict order in @proto
@$resp = sort { $a->[0] <=> $b->[0] } @$resp;
@$dst_addr = ();
@$legs = ();
foreach my $r ( @$resp ) {
my $leg = first { $_->can_deliver_to(
proto => $r->[1],
addr => $r->[2],
port => $r->[3]
)} @$allowed_legs;
if ( $leg ) {
push @$dst_addr, "$r->[1]:$r->[2]:$r->[3]";
push @$legs,$leg;
} else {
DEBUG( 50,"no leg for $r->[1]:$r->[2]:$r->[3]" );
}
}
return invoke_callback( $callback, EHOSTUNREACH ) if !@$dst_addr;
invoke_callback( $callback );
}
sub _find_leg4addr {
my Net::SIP::Dispatcher $self = shift;
my $dst_addr = shift;
my ($proto,$ip) = $dst_addr =~m{^(?:(tcp|udp):)?([^:]+)};
my @legs;
foreach my $leg (@{ $self->{legs} }) {
push @legs,$leg if $leg->can_deliver_to( addr => $ip, proto => $proto );
}
return @legs;
}
###########################################################################
# resolve hostname to IP using DNS
# FIXME: should work asynchronously
# Args: ($self,$host,$callback)
# $host: hostname or hash with hostname as keys
# $callback: gets called with (EINVAL) or (undef,result) once finished
# result is IP for single hosts or the input hash ref where the
# IPs are filled in as values
# Returns: NONE
###########################################################################
sub dns_host2ip {
my Net::SIP::Dispatcher $self = shift;
my ($host,$callback) = @_;
if ( ref($host)) {
my $err;
foreach ( keys %$host ) {
if ( my $addr = gethostbyname( $_ )) {
$host->{$_} = inet_ntoa($addr);
} else {
$err = EINVAL;
}
}
invoke_callback( $callback, $err,$host );
} else {
my $addr = gethostbyname( $host );
invoke_callback( $callback, $addr ? ( undef,inet_ntoa($addr) ) : ( $? ));
}
}
###########################################################################
# get SRV records using DNS
# FIXME: should work asynchronously
# Args: ($self,$domain,$proto,$sip_proto,$callback)
# $domain: domain for SRV query
# $proto: which protocols to check
# $sip_proto: sip|sips
# $callback: gets called with result once finished
# result is \@list of [ prio,proto,name,port ]
# Returns: NONE
###########################################################################
sub dns_domain2srv {
my Net::SIP::Dispatcher $self = shift;
my ($domain,$protos,$sip_proto,$callback) = @_;
# FIXME: don't do blocking DNS queries
my $dns = Net::DNS::Resolver->new;
# Try to get SRV records for _sip._udp.domain or _sip._tcp.domain
my (@resp,%addr2ip);
foreach my $proto ( @$protos ) {
if ( my $q = $dns->query( '_'.$sip_proto.'._'.$proto.'.'.$domain,'SRV' )) {
foreach my $rr ( $q->answer ) {
if ( $rr->type eq 'A' ) {
push @{ $addr2ip{$rr->name} }, $rr->address;
} elsif ( $rr->type eq 'SRV' ) {
push @resp,[ $rr->priority, $proto,$rr->target,$rr->port ]
}
}
}
}
# name to addr based on additional records in DNS answer
my @resp_resolved;
for my $r (@resp) {
if ( my $addr = $addr2ip{ $r->[2] } ) {
for (@$addr) {
my @cp = @$r;
$cp[2] = $_;
push @resp_resolved, \@cp;
}
} else {
# either already IP or no additional data for resolving -> later
my @cp = @$r;
# XXX fixme blocking DNS lookup
my $ipn = gethostbyname( $r->[2] ) or do {
DEBUG( 1,"cannot resolve $r->[2]" );
next;
};
$cp[2] = inet_ntoa($ipn);
push @resp_resolved, \@cp;
}
}
@resp = @resp_resolved;
# if no SRV records try to resolve address directly
unless (@resp) {
# try addr directly
my $default_port = $sip_proto eq 'sips' ? 5061:5060;
if ( my $q = $dns->query( $domain,'A' )) {
foreach my $rr ($q->answer ) {
$rr->type eq 'A' || next;
# XXX fixme, check that name in response corresponds to query
# (beware of CNAMEs!)
push @resp,map {
[ -1, $_ , $rr->address,$default_port ]
} @$protos;
}
}
}
my $error = @resp ? 0 : EINVAL;
invoke_callback( $callback,$error,\@resp );
}
###########################################################################
# Net::SIP::Dispatcher::Packet
# Container for Queue entries in Net::SIP::Dispatchers queue
###########################################################################
package Net::SIP::Dispatcher::Packet;
use fields (
'id', # transaction id, used for canceling delivery if response came in
'callid', # callid, used for canceling all deliveries for this call
'packet', # the packet which nees to be delivered
'dst_addr', # to which adress the packet gets delivered, is array-ref because
# the DNS/SRV lookup might return multiple addresses and protocols
'leg', # through which leg the packet gets delivered, same number
# of items like dst_addr
'retransmits', # array of retransmit time stamps, if undef no retransmit will be
# done, if [] no more retransmits can be done (trigger ETIMEDOUT)
# the last element in this array will not used for retransmit, but
# is the timestamp, when the delivery fails permanently
'callback', # callback for DSN (success, ETIMEDOUT...)
'proto', # list of possible protocols, default tcp and udp for sip:
);
use Net::SIP::Debug;
use Net::SIP::Util 'invoke_callback';
###########################################################################
# create new Dispatcher::Packet
# Args: ($class,%args)
# %args: hash with values according to fields
# for response packets leg and dst_addr must be set
# Returns: $self
###########################################################################
sub new {
my ($class,%args) = @_;
my $now = delete $args{now};
my $self = fields::new( $class );
%$self = %args;
$self->{id} ||= $self->{packet}->tid;
$self->{callid} ||= $self->{packet}->callid;
if ( my $addr = $self->{dst_addr} ) {
$self->{dst_addr} = [ $addr ] if !ref($addr)
}
if ( my $leg = $self->{leg} ) {
$self->{leg} = [ $leg ] if UNIVERSAL::can( $leg,'deliver' );
}
$self->{dst_addr} ||= [];
$self->{leg} ||= [];
# figure out retransmit times
my $p = $self->{packet} || die "no packet for delivery";
if ( $p->is_response ) {
unless ( $self->{leg} && $self->{dst_addr} ) {
die "Response packet needs leg and dst_addr"
}
}
return $self;
}
###########################################################################
# prepare retransmit infos if dispatcher handles retransmits itself
# Args: ($self;$now)
# $now: current time
# Returns: NONE
###########################################################################
sub prepare_retransmits {
my Net::SIP::Dispatcher::Packet $self = shift;
my $now = shift;
my $p = $self->{packet};
# RFC3261, 17.1.1.2 (final response to INVITE) -> T1=0.5, T2=4
# RFC3261, 17.1.2.2 (non-INVITE requests) -> T1=0.5, T2=4
# RFC3261, 17.1.1.2 (INVITE request) -> T1=0.5, T2=undef
# no retransmit -> T1=undef
my ($t1,$t2);
if ( $p->is_response ) {
if ( $p->code > 100 && $p->cseq =~m{\sINVITE$} ) {
# this is a final response to an INVITE
# this is the only type of response which gets retransmitted
# (until I get an ACK)
($t1,$t2) = (0.500,4);
}
} elsif ( $p->method eq 'INVITE' ) {
# INVITE request
($t1,$t2) = (0.500,undef);
} elsif ( $p->method eq 'ACK' ) {
# no retransmit of ACKs
} else {
# non-INVITE request
($t1,$t2) = (0.500,4);
}
# no retransmits?
$t1 || return;
$now ||= time();
my $expire = $now + 64*$t1;
my $to = $t1;
my $rtm = $now + $to;
my @retransmits;
while ( $rtm < $expire ) {
push @retransmits, $rtm;
$to *= 2;
$to = $t2 if $t2 && $to>$t2;
$rtm += $to
}
DEBUG( 100,"retransmits $now + ".join( " ", map { $_ - $now } @retransmits ));
$self->{retransmits} = \@retransmits;
}
###########################################################################
# use next dst_addr (eg if previous failed)
# Args: $self
# Returns: $addr
# $addr: new address it will use or undef if no more addresses available
###########################################################################
sub use_next_dstaddr {
my Net::SIP::Dispatcher::Packet $self = shift;
my $addr = $self->{dst_addr} || return;
shift(@$addr);
my $leg = $self->{leg} || return;
shift(@$leg);
return @$addr && $addr->[0];
}
###########################################################################
# trigger callback to upper layer
# Args: ($self;$errno)
# $errno: Errno
# Returns: $callback_done
# $callback_done: true if callback was triggered, if no callback existed
# returns false
###########################################################################
sub trigger_callback {
my Net::SIP::Dispatcher::Packet $self = shift;
my $error = shift;
my $cb = $self->{callback} || return;
invoke_callback( $cb,$error,$self);
return 1;
}
###########################################################################
# return transaction id of packet
# Args: $self
# Returns: $tid
###########################################################################
sub tid {
my Net::SIP::Dispatcher::Packet $self = shift;
return $self->{packet}->tid;
}
1;