package Net::Flowdock::Stream;
  $Net::Flowdock::Stream::AUTHORITY = 'cpan:DOY';
  $Net::Flowdock::Stream::VERSION = '0.01';
use Moose;
# ABSTRACT: Streaming API for Flowdock

use JSON;
use MIME::Base64;
use Net::HTTPS::NB;

has token => (
    is  => 'ro',
    isa => 'Str',

has email => (
    is  => 'ro',
    isa => 'Str',

has password => (
    is  => 'ro',
    isa => 'Str',

has flows => (
    traits   => ['Array'],
    isa      => 'ArrayRef[Str]',
    required => 1,
    handles  => {
        flows => 'elements',

has socket_timeout => (
    is      => 'ro',
    isa     => 'Num',
    default => 0.01,

has debug => (
    is      => 'rw',
    isa     => 'Bool',
    default => 0,

has _socket => (
    is  => 'rw',
    isa => 'Net::HTTPS::NB',

has _readbuf => (
    traits  => ['String'],
    is      => 'rw',
    isa     => 'Str',
    default => '',
    handles => {
        _append_readbuf => 'append',

has _events => (
    is      => 'rw',
    isa     => 'ArrayRef[HashRef]', # XXX make these into objects
    default => sub { [] },

sub BUILD {
    my $self = shift;

    my $auth;
    if (my $token = $self->token) {
        $auth = $token;
    elsif (my ($email, $pass) = ($self->email, $self->password)) {
        $auth = "$email:$pass";
    else {
        die "You must supply either your token or your email and password";

    my $s = Net::HTTPS::NB->new(Host => '');
    my $flows = join(',', $self->flows);
        GET => "/flows?filter=$flows" =>
        Authorization => 'Basic ' . MIME::Base64::encode($auth),
        Accept        => 'application/json',

    my ($code, $message, %headers) = $s->read_response_headers;
    die "Unable to connect: $message"
        unless $code == 200;


sub get_next_event {
    my $self = shift;

    return unless $self->_socket_is_readable;


    return $self->_process_readbuf;

sub _socket_is_readable {
    my $self = shift;

    my ($rin, $rout) = ('');
    vec($rin, fileno($self->_socket), 1) = 1;

    my $res = select($rout = $rin, undef, undef, $self->socket_timeout);

    if ($res == -1) {
        return if $!{EAGAIN} || $!{EINTR};
        die "Error reading from socket: $!";

    return if $res == 0;

    return 1 if $rout;


sub _read_next_chunk {
    my $self = shift;

    my $nbytes = $self->_socket->read_entity_body(my $buf, 4096);
    if (!defined $nbytes) {
        return if $!{EINTR} || $!{EAGAIN};
        die "Error reading from server";
    die "Disconnected" if $nbytes == 0;
    return if $nbytes == -1;


sub _process_readbuf {
    my $self = shift;

    if ((my $buf = $self->_readbuf) =~ s/^([^\x0d]*)\x0d//) {
        my $chunk = $1;
        warn "New event:\n$chunk" if $self->debug;
        return decode_json($chunk);


no Moose;



=head1 NAME

Net::Flowdock::Stream - Streaming API for Flowdock

=head1 VERSION

version 0.01


  my $stream = Net::Flowdock::Stream->new(
      token => '...',
      flows => ['myorg/testing'],

  while (1) {
      if (my $event = $stream->get_next_event) {


This module implements the streaming api for
L<Flowdock|>. It provides a non-blocking method which
you can call to get the next available event in the stream. You can then
integrate this method into your existing event-driven app.


=head2 token

Your account's API token, for authentication. Required unless C<email> and
C<password> are provided.

=head2 email

Your account's email address, for authentication. Required unless C<token> is

=head2 password

Your account's password, for authentication. Required unless C<token> is

=head2 flows

An arrayref of flows that should be listened to for events. Note that the flow
names must include the organization, so C<myorg/testing>, not just C<testing>.

=head1 METHODS

=head2 get_next_event

Returns the next event that has been received in the stream. This call is
nonblocking, and will return undef if no events are currently available.

=head1 AUTHOR

Jesse Luehrs <doy at tozt dot net>


