The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Starman::Server::ModCluster;

use 5.008_001;
use strict;
use warnings;
use base 'Starman::Server';
use Net::MCMP;
use IO::Socket::Multicast;
use IPC::Shareable;

sub pre_loop_hook {
	my $self = shift;

	# register for a new mcmp

	my @mcmp_uri;
	tie @mcmp_uri, 'IPC::Shareable', 'mcmp_uris',
	  {
		create    => 1,
		exclusive => 0,
		mode      => 0644,
		destroy   => 0,
	  };

	@mcmp_uri = ();

	if ( exists $self->{options}->{mc_uri}
		&& defined $self->{options}->{mc_uri} )
	{
		foreach my $uri ( split ',', $self->{options}->{mc_uri} ) {
			push @mcmp_uri, $uri;
			my $mcmp = $self->mcmp($uri);

			$self->mcmp_config($mcmp);
			$self->mcmp_enable_app($mcmp);
			$self->mcmp_status($mcmp);

		}
	}

	if ( $self->{options}->{mc_multicast_enable} ) {
		$self->start_mc_multicast();
	}

	if ( @mcmp_uri || $self->{options}->{mc_multicast_enable} ) {
		$self->start_mc_status;
	}
	else {
		$self->fatal("multicast disabled and no mcmp uris defined");
	}

	$self->SUPER::pre_loop_hook(@_);
}

sub pre_server_close_hook {
	my $self = shift;

	my @mcmp_uri;

	tie @mcmp_uri, 'IPC::Shareable', 'mcmp_uris',
	  {
		create    => 0,
		exclusive => 0,
		mode      => 0644,
		destroy   => 1,
	  };

	foreach my $uri (@mcmp_uri) {
		my $mcmp = $self->mcmp($uri);
		$self->mcmp_remove_app($mcmp);
		$self->mcmp_remove_route($mcmp);

	}

	$self->SUPER::pre_server_close_hook(@_);
}

sub start_mc_multicast {
	my $self = shift;
	local $!;
	my $pid = fork;
	if ( !defined $pid ) {
		$self->fatal("Unable to fork mod_cluster multicast child [$!]");
	}

	if ( $pid == 0 ) {

		my @mcmp_uri;

		tie @mcmp_uri, 'IPC::Shareable', 'mcmp_uris',
		  {
			create    => 0,
			exclusive => 0,
			mode      => 0644,
			destroy   => 0,
		  };

		$SIG{'INT'} = $SIG{'TERM'} = $SIG{'QUIT'} = sub {
			$self->log( 4, "exiting mod_cluster multicast handler" );
			exit;
		};
		$SIG{'PIPE'} = 'IGNORE';
		$SIG{'CHLD'} = 'DEFAULT';
		$SIG{'HUP'}  = 'DEFAULT';

		$self->log( 4, "mod_cluster multicast handler forked ($$)" );
		$0 = "Starman::ModCluster multicast handler";

		$self->{options}->{mc_multicast_port}    ||= 23364;
		$self->{options}->{mc_multicast_address} ||= '224.0.1.105';

		my $s = IO::Socket::Multicast->new(
			LocalPort => $self->{options}->{mc_multicast_port},
			LocalAddr => $self->{options}->{mc_multicast_address},
			Proto     => 'udp',
			ReuseAddr => 1
		) || die "Couldnt create mcast object: $!\n";
		$s->mcast_add( $self->{options}->{mc_multicast_address} )
		  || die "Couldn't set group: $!\n";

		while (1) {
			my $data;
			next unless $s->recv( $data, 1024 );

			my ($addr)  = $data =~ /X-Manager-Address: (.*?)\s/;
			my ($proto) = $data =~ /X-Manager-Protocol: (.*?)\s/;

			if ( defined $addr && defined $proto ) {
				my $mcast_uri = "$proto://$addr";

				# dont want to add same ips over and over
				my $duplicate = 0;
				foreach my $uri (@mcmp_uri) {
					if ( $uri eq $mcast_uri ) {
						$duplicate = 1;
						last;
					}
				}
				unless ($duplicate) {
					my $mcmp = $self->mcmp($mcast_uri);
					$self->mcmp_config($mcmp);
					$self->mcmp_enable_app($mcmp);
					$self->mcmp_status($mcmp);
					push @mcmp_uri, $mcast_uri;
				}
			}

		}

		$self->log( 4, "exiting mod_cluster multicast handler" );
		exit;
	}
}

sub start_mc_status {
	my $self = shift;

	local $!;
	my $pid = fork;
	if ( !defined $pid ) {
		$self->fatal("Unable to fork mod_cluster status child [$!]");
	}

	if ( $pid == 0 ) {
		$SIG{'INT'} = $SIG{'TERM'} = $SIG{'QUIT'} = sub {
			$self->log( 4, "exiting mod_cluster status reporter" );

			# just exit, no need to have stop hook
			exit;
		};
		$SIG{'PIPE'} = 'IGNORE';
		$SIG{'CHLD'} = 'DEFAULT';
		$SIG{'HUP'}  = 'DEFAULT';

		$self->log( 4, "mod_cluster status reporter forked ($$)" );
		$0 = "Starman::ModCluster status reporter";

		my @mcmp_uri;

		tie @mcmp_uri, 'IPC::Shareable', 'mcmp_uris',
		  {
			create    => 0,
			exclusive => 0,
			mode      => 0644,
			destroy   => 0,
		  };

		while (1) {
			foreach my $uri (@mcmp_uri) {
				my $mcmp     = $self->mcmp($uri);
				my $response = $self->mcmp_status($mcmp);
			}
			sleep( $self->{options}->{mc_status_interval} || 30 );
		}

		$self->log( 4, "exiting mod_cluster status reporter" );
		exit;
	}
}

sub mcmp {
	my ( $self, $uri ) = @_;

	unless ($uri) {
		$self->fatal("uri is required at mcmp helper");
	}

	my $mcmp = Net::MCMP->new(
		{ uri => $uri, debug => $self->{options}->{mc_debug} || 0 } );
	return $mcmp;

}

sub mcmp_config {
	my ( $self, $mcmp ) = @_;

	unless ( exists $self->{options}->{mc_host} ) {
		if ( defined $self->{options}->{host} ) {
			$self->{options}->{mc_host} = $self->{options}->{mc_host};
		}
		else {
			$self->fatal('missing mc_host option');
		}
	}

	unless ( exists $self->{options}->{mc_port} ) {
		if ( defined $self->{options}->{port} ) {
			$self->{options}->{mc_port} = $self->{options}->{port};
		}
		elsif ( exists $ENV{SERVER_PORT} && defined $ENV{SERVER_PORT} ) {
			$self->{options}->{mc_port} = $ENV{SERVER_PORT};
		}
		else {
			$self->fatal('missing mc_port option');
		}
	}

	unless ( exists $self->{options}->{mc_type}
		&& defined $self->{options}->{mc_type} )
	{
		$self->{options}->{mc_type} = $ENV{'psgi.url_scheme'} || 'http';
	}

	unless ( exists $self->{options}->{mc_alias}
		&& defined $self->{options}->{mc_alias} )
	{
		$self->{options}->{mc_alias} = 'StarmanServer';
	}

	my @uris;

	if ( ref $self->{options}->{mc_uri} eq 'ARRAY' ) {
		$self->{options}->{mc_uri} = join ',', @{ $self->{options}->{mc_uri} };
	}

	foreach my $key (qw/mc_context mc_alias/) {
		if ( exists $self->{options}->{$key}
			&& ref $self->{options}->{$key} eq 'ARRAY' )
		{
			$self->{options}->{$key} = join ',', @{ $self->{options}->{$key} };
		}
	}

	return $mcmp->config(
		{
			Balancer            => $self->{options}->{mc_balancer},
			WaitWorker          => $self->{options}->{mc_wait_worker},
			MaxAttempts         => $self->{options}->{mc_max_attempts},
			JvmRoute            => $self->{options}->{mc_node_name},
			Domain              => $self->{options}->{mc_domain},
			Host                => $self->{options}->{mc_host},
			Port                => $self->{options}->{mc_port},
			Type                => $self->{options}->{mc_type},
			FlushPackets        => $self->{options}->{mc_flush_packets},
			FlushWait           => $self->{options}->{mc_flush_wait},
			Ping                => $self->{options}->{mc_ping},
			Smax                => $self->{options}->{mc_smax},
			Ttl                 => $self->{options}->{mc_ttl},
			Timeout             => $self->{options}->{mc_timeoutt},
			Context             => $self->{options}->{mc_context},
			Alias               => $self->{options}->{mc_alias},
			StickySession       => $self->{options}->{mc_sticky_session},
			StickySessionCookie => $self->{options}->{mc_sticky_session_cookie},
			StickySessionPath   => $self->{options}->{mc_sticky_session_path},
			StickySessionRemove => $self->{options}->{mc_sticky_session_remove},
			WaitWorker          => $self->{options}->{mc_wait_worker},
			MaxAttempts         => $self->{options}->{mc_max_attempts},
			FlushPackets        => $self->{options}->{mc_flush_packets},
			FlushWait           => $self->{options}->{mc_flushwait},
			Ping                => $self->{options}->{mc_ping},
			Smax                => $self->{options}->{mc_smax},
			Ttl                 => $self->{options}->{mc_ttl},
			Timeout             => $self->{options}->{mc_timeout},
		}
	);
}

sub mcmp_enable_app {
	my ( $self, $mcmp ) = @_;

	return $mcmp->enable_app(
		{
			JvmRoute => $self->{options}->{mc_node_name},
			Alias    => $self->{options}->{mc_alias},
			Context  => $self->{options}->{mc_context}
		}
	  ),
	  ;
}

sub mcmp_status {
	my ( $self, $mcmp ) = @_;

	$self->log( 4, "sending mcmp status to " . $mcmp->uri );

	my $response = $mcmp->status(
		{
			JvmRoute => $self->{options}->{mc_node_name},
			Load     => 99,
		}
	);

	if ( exists $response->{State} && $response->{State} ne 'OK' ) {
		$self->log( 1, "STATUS response is not OK: " . $response->{Status} );
	}
	else {
  # try to register again, mod_cluster was probably restarted and lost this node
		$self->mcmp_config($mcmp);
		$self->mcmp_enable_app($mcmp);
	}

	return $response;

}

sub mcmp_remove_app {
	my ( $self, $mcmp ) = @_;

	return $mcmp->remove_app(
		{
			JvmRoute => $self->{options}->{mc_node_name},
			Alias    => $self->{options}->{mc_alias},
			Context  => $self->{options}->{mc_context}
		}
	);
}

sub mcmp_remove_route {
	my ( $self, $mcmp ) = @_;
	return $mcmp->remove_route(
		{
			JvmRoute => $self->{options}->{mc_node_name},
		}
	);
}

1;
__END__
=encoding utf-8

=for stopwords

=head1 NAME

Starman::Server::ModCluster - extension to Starman::Server that registers pre_server_close_hook and 
pre_loop_hook of Net::Server to register and remove node from a cluster.


=head1 DESCRIPTION

This module is not intended to use directly. It should be used via L<starman-modcluster> command


=head1 SEE ALSO

L<starman-modcluster> L<Starman>

=head1 AUTHOR

Roman Jurkov, E<lt>winfinit@cpan.org<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2014- by Roman Jurkov

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.18.1 or,
at your option, any later version of Perl 5 you may have available.


=cut