The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: /mirror/plagger/trunk/plagger/lib/Plagger/Plugin/Aggregator/Xango.pm 2173 2006-05-05T07:51:03.432950Z miyagawa  $
#
# Copyright (c) 2006 Daisuke Maki <dmaki@cpan.org>
# All rights reserved.

package Plagger::Plugin::Aggregator::Xango;
use strict;
use base qw( Plagger::Plugin::Aggregator::Simple );
use POE;
use Xango::Broker::Push;
# BEGIN { sub Xango::DEBUG { 1 } } # uncomment to get Xango debug messages

our $VERSION = '0.1';

sub register {
    my($self, $context) = @_;

    my %xango_args = (
        Alias => 'xgbroker',
        HandlerAlias => 'xghandler',
        HttpCompArgs => [ Agent => "Plagger/$Plagger::VERSION (http://plagger.org/)", Timeout => $self->conf->{timeout} || 10 ],
        %{$self->conf->{xango_args} || {}},
    );
    $self->{xango_alias} = $xango_args{Alias};
    Plagger::Plugin::Aggregator::Xango::Crawler->spawn(
        Plugin => $self,
        UseCache => exists $self->conf->{use_cache} ?
            $self->conf->{use_cache} : 1,
        BrokerAlias => $xango_args{Alias},
        MaxRedirect => $self->conf->{max_redirect} || 3,
    );
    Xango::Broker::Push->spawn(%xango_args);
    $context->register_hook(
        $self,
        'customfeed.handle'   => \&aggregate,
        'aggregator.finalize' => \&finalize,
    );
}

sub aggregate {
    my($self, $context, $args) = @_;

    my $url = $args->{feed}->url;
    return unless $url =~ m!^https?://!i;

    $self->{_url2feed}->{$url} = $args->{feed}; # map from url to feed object

    $context->log(info => "Fetch $url");
    POE::Kernel->post($self->{xango_alias}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => 0));
}

sub handle_feed {
    my($self, $url, $xml_ref) = @_;
    $self->SUPER::handle_feed($url, $xml_ref, $self->{_url2feed}->{$url});
}

sub finalize {
    my($self, $context, $args) = @_;
    POE::Kernel->run;
}

package Plagger::Plugin::Aggregator::Xango::Crawler;
use strict;
use Feed::Find;
use POE;
use Storable qw(freeze thaw);
use XML::Feed;

sub apply_policy { 1 }
sub spawn  {
    my $class = shift;
    my %args  = @_;

    POE::Session->create(
        heap => {
            PLUGIN => $args{Plugin}, USE_CACHE => $args{UseCache},
            BROKER_ALIAS => $args{BrokerAlias},
            MaxRedirect => $args{MaxRedirect},
        },
        package_states => [
            $class => [ qw(_start _stop apply_policy prep_request handle_response) ]
        ]
    );
}

sub _start { $_[KERNEL]->alias_set('xghandler') }
sub _stop  { }
sub prep_request {
    return unless $_[HEAP]->{USE_CACHE};

    my $job = $_[ARG0];
    my $req = $_[ARG1];
    my $plugin = $_[HEAP]->{PLUGIN};

    my $ref = $plugin->cache->get($job->uri);
    if ($ref) {
        $req->if_modified_since($ref->{LastModified})
            if $ref->{LastModified};
        $req->header('If-None-Match', $ref->{ETag})
            if $ref->{ETag};
    }
}

sub handle_response {
    my $job = $_[ARG0];
    my $plugin = $_[HEAP]->{PLUGIN};

    my $redirect = $job->notes('redirect') + 1;
    return if $redirect > $_[HEAP]->{MaxRedirect};

    my $r = $job->notes('http_response');
    my $url    = $job->uri;
    if ($r->code =~ /^30[12]$/) {
        $url = $r->header('location');
        return unless $url =~ m!^https?://!i;
        $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => $redirect));
	return;
    } else {
        return unless $r->is_success;

        my $ct = $r->content_type;
        if ( $Feed::Find::IsFeed{$ct} ) {
            $plugin->handle_feed($url, $r->content_ref);
        } else {
            my @feeds = Feed::Find->find_in_html($r->content_ref, $url);
            if (@feeds) {
                my $feed_url = $feeds[0];
                return unless $feed_url =~ m!^https?://!i;

                # OMG we should alias Feed so it can be looked up with $feed_url, too
                $plugin->{_url2feed}->{$feed_url} = $plugin->{_url2feed}->{$url};

                $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($feed_url), redirect => $redirect));
            }
            return;
        }
    }

    if ($_[HEAP]->{USE_CACHE}) {
        $plugin->cache->set(
            $job->uri,
            {ETag => $r->header('ETag'),
                LastModified => $r->header('Last-Modified')}
        );
    }
}

1;