The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
=head1 Introduction to Coro

This tutorial will introduce you to the main features of the Coro module
family.

It first introduces some basic concepts, and later gives a short overview
of the module family.


=head1 What is Coro?

Coro started as a simple module that implemented a specific form of
first class continuations called Coroutines. These basically allow you
to capture the current point execution and jump to another point, while
allowing you to return at any time, as kind of non-local jump, not unlike
C's C<setjmp>/C<longjmp>. This is nowadays known as a L<Coro::State>.

One natural application for these is to include a scheduler, resulting in
cooperative threads, which is the main use case for Coro today. Still,
much of the documentation and custom refers to these threads as
"coroutines" or often just "coros".

A thread is very much like a stripped-down perl interpreter, or a
process: Unlike a full interpreter process, a thread doesn't have its own
variable or code namespaces - everything is shared. That means that when
one thread modifies a variable (or any value, e.g. through a reference),
then other threads immediately see this change when they look at the same
variable or location.

Cooperative means that these threads must cooperate with each other, when
it comes to CPU usage - only one thread ever has the CPU, and if another
thread wants the CPU, the running thread has to give it up. The latter
is either explicitly, by calling a function to do so, or implicity, when
waiting on a resource (such as a Semaphore, or the completion of some I/O
request). This threading model is popular in scripting languages (such as
python or ruby), and this implementation is typically far more efficient
than threads implemented in other languages.

Perl itself uses rather confusing terminilogy - what perl calls a "thread"
(or "ithread") is actually called a "process" everywhere else: The
so-called "perl threads" are actually artifacts of the unix process
emulation code used on Windows, which is consequently why they are
actually processes and not threads. The biggest difference is that neither
variables (nor code) are shared between processes or ithreads.


=head1 Cooperative Threads

Cooperative threads is what the Coro module gives you. Obviously, you have
to C<use> it first:

   use Coro;

To create a thread, you can use the C<async> function that automatically
gets exported from that module:

   async {
      print "hello\n";
   };

Async expects a code block as first argument (in indirect object
notation). You can actually pass it extra arguments, and these will end up
in C<@_> when executing the codeblock, but since it is a closure, you can
also just refer to any lexical variables that are currently visible.

The above lines create a thread, but if you save them in a file and
execute it as a perl program, you will not get any output.

The reasons is that, although you created a thread, and the thread is
ready to execute (because C<async> puts it into the so-called I<ready
queue>), it never gets any CPU time to actually execute, as the main
program - which also is a thread almost like any other - never gives up
the CPU but instead exits the whole program, by running off the end of
the file. Since Coro threads are cooperative, the main thread has to
cooperate, and give up the CPU.

To explicitly give up the CPU, use the C<cede> function (which is often
called C<yield> in other thread implementations):

   use Coro;

   async {
      print "hello\n";
   };

   cede;

Running the above prints C<hello> and exits.

Now, this is not very interesting, so let's try a slightly more
interesting program:

   use Coro;

   async {
      print "async 1\n";
      cede;
      print "async 2\n";
   };

   print "main 1\n";
   cede;
   print "main 2\n";
   cede;

Running this program prints:

   main 1
   async 1
   main 2
   async 2

This nicely illustrates the non-local jump ability: the main program
prints the first line, and then yields the CPU to whatever other
threads there are. And there is one other, which runs and prints
"async 1", and itself yields the CPU. Since the only other thread
available is the main program, it continues running and so on.

Let's look at the example in more detail: C<async> first creates a new
thread. All new threads start in a suspended state. To make them run,
they need to be put into the ready queue, which is the second thing that
C<async> does. Each time a thread gives up the CPU, Coro runs a so-called
I<scheduler>. The scheduler selects the next thread from the ready queue,
removes it from the queue, and runs it.

C<cede> also does two things: first it puts the running thread into the
ready queue, and then it jumps into the scheduler. This has the effect of
giving up the CPU, but also ensures that, eventually, the thread gets run
again.

In fact, C<cede> could be implemented like this:

   sub my_cede {
      $Coro::current->ready;
      schedule;
   }

This works because C<$Coro::current> always contains the currently
running thread, and the scheduler itself can be called directly via
C<Coro::schedule>.

What is the effect of just calling C<schedule> without putting the current
thread into the ready queue first? Simple: the scheduler selects the
next ready thread and runs it. And the current thread, as it hasn't been
put into the ready queue, will go to sleep until something wakes it
up. If. Ever.

The following example remembers the current thread in a variable,
creates a thread and then puts the main program to sleep.

The newly created thread uses rand to wake up the main thread by
calling its C<ready> method - or not.

   use Coro;

   my $wakeme = $Coro::current;

   async {
      $wakeme->ready if 0.5 > rand;
   };

   schedule;

Now, when you run it, one of two things happen: Either the C<async> thread
wakes up the main thread again, in which case the program silently exits,
or it doesn't, in which case you get something like this:

   FATAL: deadlock detected.
         PID SC  RSS USES Description              Where
    31976480 -C  19k    0 [main::]                 [program:9]
    32223768 UC  12k    1                          [Coro.pm:691]
    32225088 -- 2068    1 [coro manager]           [Coro.pm:691]
    32225184 N-  216    0 [unblock_sub scheduler]  -

Why is that? Well, when the C<async> thread runs into the end of its
block, it will be terminated (via a call to C<Coro::terminate>) and the
scheduler is called again. Since the C<async> thread hasn't woken up the
main thread, and there aren't any other threads, there is nothing to wake
up, and the program cannot continue. Since there I<are> threads that
I<could> be running (main) but none are I<ready> to do so, Coro signals a
I<deadlock> - no progress is possible. Usually you also get a listing of
all threads, which might help you track down the problem.

However, there is an important case where progress I<is>, in fact,
possible, despite no threads being ready - namely in an event-based
program. In such a program, some threads could wait for I<external>
events, such as a timeout, or some data to arrive on a socket.

Since a deadlock in such a case would not be very useful, there is a
module named L<Coro::AnyEvent> that integrates threads into an event
loop. It configures Coro in a way that, instead of C<die>ing with an error
message, it instead runs the event loop in the hope of receiving an event
that will wake up some thread.


=head2 Semaphores and other locks

Using only C<ready>, C<cede> and C<schedule> to synchronise threads is
difficult, especially if many threads are ready at the same time. Coro
supports a number of primitives to help synchronising threads in easier
ways. The first such primitives is L<Coro::Semaphore>, which implements
counting semaphores (binary semaphores are available as L<Coro::Signal>,
and there are L<Coro::SemaphoreSet> and L<Coro::RWLock> primitives as
well).

Counting semaphores, in a sense, store a count of resources. You can
remove/allocate/reserve a resource by calling the C<< ->down >> method,
which decrements the counter, and you can add or free a resource by
calling the C<< ->up >> method, which increments the counter. If the
counter is C<0>, then C<< ->down >> cannot decrement the semaphore - it is
locked - and the thread will wait until a count becomes available again.

Here is an example:

   use Coro;

   my $sem = new Coro::Semaphore 0; # a locked semaphore

   async {
      print "unlocking semaphore\n";
      $sem->up;
   };

   print "trying to lock semaphore\n";
   $sem->down;
   print "we got it!\n";

This program creates a I<locked> semaphore (a semaphore with count C<0>)
and tries to lock it (by trying to decrement it's counter in the C<down>
method). Since the semaphore count is already exhausted, this will block
the main thread until the semaphore becomes available.

This yields the CPU to the only other read thread in the process,t he
one created with C<async>, which unlocks the semaphore (and instantly
terminates itself by returning).

Since the semaphore is now available, the main program locks it and
continues: "we got it!".

Counting semaphores are most often used to lock resources, or to exclude
other threads from accessing or using a resource. For example, consider
a very costly function (that temporarily allocates a lot of ram, for
example). You wouldn't want to have many threads calling this function at
the same time, so you use a semaphore:

   my $lock = new Coro::Semaphore; # unlocked initially - default is 1

   sub costly_function {
      $lock->down; # acquire semaphore

      # do costly operation that blocks

      $lock->up; # unlock it
   }

No matter how many threads call C<costly_function>, only one will run
the body of it, all others will wait in the C<down> call. If you want to
limit the number of concurrent executions to five, you could create the
semaphore with an initial count of C<5>.

Why does the comment mention an "operation the blocks"? Again, that's
because coro's threads are cooperative: unless C<costly_function>
willingly gives up the CPU, other threads of control will simply not
run. This makes locking superfluous in cases where the function itself
never gives up the CPU, but when dealing with the outside world, this is
rare.

Now consider what happens when the code C<die>s after executing C<down>,
but before C<up>. This will leave the semaphore in a locked state, which
often isn't what you want - imagine the caller expecting a failure and
wrapping the call into an C<eval {}>.

So normally you would want to free the lock again if execution somehow
leaves the function, whether "normally" or via an exception. Here the
C<guard> method proves useful:

   my $lock = new Coro::Semaphore; # unlocked initially

   sub costly_function {
      my $guard = $lock->guard; # acquire guard

      # do costly operation that blocks
   }

The C<guard> method C<down>s the semaphore and returns a so-called guard
object. Nothing happens as long as there are references to it (i.e. it is
in scope somehow), but when all references are gone, for example, when
C<costly_function> returns or throws an exception, it will automatically
call C<up> on the semaphore, no way to forget it. Even when the thread
gets C<cancel>ed by another thread will the guard object ensure that the
lock is freed.

This concludes this introduction to semaphores and locks. Apart from
L<Coro::Semaphore> and L<Coro::Signal>, there is also a reader-writer lock
(L<Coro::RWLock>) and a semaphore set (L<Coro::SemaphoreSet>). All of
these come with their own manpage.


=head2 Channels

Semaphores are fine, but usually you want to communicate by exchanging
data as well. Of course, you can just use some locks, and array of sorts
and use that to communicate, but there is a useful abstraction for
communicaiton between threads: L<Coro::Channel>. Channels are the Coro
equivalent of a unix pipe (and very similar to AmigaOS message ports :) -
you can put stuff into it on one side, and read data from it on the other.

Here is a simple example that creates a thread and sends numbers to
it. The thread calculates the square of each number and puts that into
another channel, which the main thread reads the result from:

   use Coro;

   my $calculate = new Coro::Channel;
   my $result    = new Coro::Channel;

   async {
      # endless loop
      while () {
         my $num = $calculate->get; # read a number
         $num **= 2; # square it
         $result->put ($num); # put the result into the result queue
      }
   };

   for (1, 2, 5, 10, 77) {
      $calculate->put ($_);
      print "$_ ** 2 = ", $result->get, "\n";
   }

Gives:

   1 ** 2 = 1
   2 ** 2 = 4
   5 ** 2 = 25
   10 ** 2 = 100
   77 ** 2 = 5929

Both C<get> and C<put> methods can block the current thread: C<get> first
checks whether there I<is> some data available, and if not, it block the
current thread until some data arrives. C<put> can also block, as each
Channel has a "maximum item capacity", i.e. you cannot store more than a
specific number of items, which can be configured when the Channel gets
created.

In the above example, C<put> never blocks, as the default capacity
of a Channel is very high. So the for loop first puts data into the
channel, then tries to C<get> the result. Since the async thread hasn't
put anything in there yet (on the first iteration it hasn't even run
yet), the result Channel is still empty, so the main thread blocks.

Since the only other runnable/ready thread at this point is the squaring
thread, it will be woken up, will C<get> the number, square it and put it
into the result channel, waking up the main thread again. It will still
continue to run, as waking up other threads just puts them into the ready
queue, nothing less, nothing more.

Only when the async thread tries to C<get> the next number from the
calculate channel will it block (because nothing is there yet) and the
main thread will continue running. And so on.

This illustrates a general principle used by Coro: a thread will I<only
ever block> when it has to. Neither the Coro module itself nor any of its
submodules will ever give up the CPU unless they have to, because they
wait for some event to happen.

Be careful, however: when multiple threads put numbers into C<$calculate>
and read from C<$result>, they won't know which result is theirs. The
solution for this is to either use a semaphore, or send not just the
number, but also your own private result channel.


=head2 What is mine, what is ours?

What, exactly, constitutes a thread? Obviously it contains the current
point of execution. Not so obviously, it also has to include all
lexical variables, that means, every thread has its own set of lexical
variables.

To see why this is necessary, consider this program:

   use Coro;

   sub printit {
      my ($string) = @_;

      cede;

      print $string;
   }

   async { printit "Hello, " };
   async { printit "World!\n" };

   cede; cede; # do it

The above prints C<Hello, World!\n>. If C<printit> wouldn't have
its own per-thread C<$string> variable, it would probably print
C<World!\nWorld\n>, which is rather unexpected, and would make it very
difficult to make good use of threads.

To make things run smoothly, there are quite a number of other things that
are per-thread:

=over 4

=item $_, @_, $@ and the regex result vars, $&, %+, $1, $2, ...

C<$_> is used much like a local variable, so it gets localised
per-thread. The same is true for regex results (C<$1>, C<$2> and so on).

C<@_> contains the arguments, so like lexicals, it also must be
per-thread.

C<$@> is not obviously required to be per-thread, but it is quite useful.

=item $/ and the default output file handle

Threads most often block when doing I/O. Since C<$/> is used when reading
lines, it would be very inconvenient if it were a shared variable, so it
is per-thread.

The default output handle (see C<select>) is a difficult case: sometimes
being global is preferable, sometimes per-thread is preferable. Since
per-thread seems to be more common, it is per-thread.

=item $SIG{__DIE__} and $SIG{__WARN__}

If these weren't per-thread, then common constructs such as:

   eval {
      local $SIG{__DIE__} = sub { ... };
      ...
   };

Would not allow coroutine switching. Since exception-handling is
per-thread, those variables should be per-thread as well.

=item Lots of other esoteric stuff

For example, C<$^H> is per-thread. Most of the additional per-thread state
is not directly visible to Perl, but required to make the interpreter
work. You won't normally notice these.

=back

Everything else is shared between all threads. For example, the globals
C<$a> and C<$b> are shared. When does that matter? When using C<sort>,
these variables become special, and therefore, switching threads when
sorting might have surprising results.

Other examples are the C<$!>, errno, C<$.>, the current input line number,
C<$,>, C<$\>, C<$"> and many other special variables.

While in some cases a good argument could be made for localising them to
the thread, they are rarely used, and sometimes hard to localise.

Future versions of Coro might include more per-thread state when it
becomes a problem.


=head2 Debugging

Sometimes it can be useful to find out what each thread is doing (or which
threads exist in the first place). The L<Coro::Debug> module has (among
other goodies), a function that allows you to print a "ps"-like listing -
you have seen it in action earlier when Coro detected a deadlock.

You use it like this:

   use Coro::Debug;

   Coro::Debug::command "ps";

Remember the example with the two channels and a worker thread that
squared numbers? Running "ps" just after C<< $calculate->get >> outputs
something similar to this:

        PID SC  RSS USES Description              Where
    8917312 -C  22k    0 [main::]                 [introscript:20]
    8964448 N-  152    0 [coro manager]           -
    8964520 N-  152    0 [unblock_sub scheduler]  -
    8591752 UC  152    1                          [introscript:12]
   11546944 N-  152    0 [EV idle process]        -

Interesting - there is more going on in the background than one would
expect. Ignoring the extra threads, the main thread has pid
C<8917312>, and the one started by C<async> has pid C<8591752>.

The latter is also the only thread that doesn't have a description,
simply because we haven't set one. Setting one is easy, just put it into
C<< $Coro::current->{desc} >>:

   async {
      $Coro::current->{desc} = "cruncher";
      ...
   };

This can be rather useful when debugging a program, or when using the
interactive debug shell of L<Coro::Debug>.


=head1 The Real World - Event Loops

Coro really wants to run in a program using some event loop. In fact, most
real-world programs using Coro threads are written with a combination of
event-based and thread-based techniques, as it is easy to get the best of
both worlds with Coro.

Coro integrates automatically into any event loop supported by L<AnyEvent>
(see L<Coro::AnyEvent> for details), but can take special advantage of the
L<EV> and L<Event> modules.

Here is a simple finger client, using whatever event loop L<AnyEvent>
comes up with:

   use Coro;
   use Coro::Socket;

   sub finger {
      my ($user, $host) = @_;

      my $fh = new Coro::Socket PeerHost => $host, PeerPort => "finger"
         or die "$user\@$host: $!";

      print $fh "$user\n";

      print "$user\@$host: $_" while <$fh>;
      print "$user\@$host: done\n";
   }

   # now finger a few accounts
   for (
      (async { finger "abc", "cornell.edu" }),
      (async { finger "sebbo", "world.std.com" }),
      (async { finger "trouble", "noc.dfn.de" }),
   ) {
      $_->join; # wait for the result
   }

There are a few new things here. First of all, there is
L<Coro::Socket>. This module works much the same way as
L<IO::Socket::INET>, except that it is coroutine-aware. This means that
L<IO::Socket::INET>, when waiting for the network, will block the whole
process - that means all threads, which is clearly undesirable.

On the other hand, L<Coro::Socket> knows how to give up the CPU to other
threads when it waits for the network, which makes parallel execution
possible.

The other new thing is the C<join> method: All we want to do in this
example is start three C<async> threads and only exit when they have
done their job. This could be done using a counting semaphore, but it is
much simpler to synchronously wait for them to C<terminate>, which is
exactly what the C<join> method does.

It doesn't matter that the three C<async>s will probably finish in a
different order then the for loop C<join>s them - when the thread is still
running, C<join> simply waits. If the thread has already terminated, it
will simply fetch its return status.

If you are experienced in event-based programming, you will see that the
above program doesn't quite follow the normal pattern, where you start
some work, and then run the event loop (e.v. C<EV::loop>).

In fact, nontrivial programs follow this pattern even with Coro, so a Coro
program that uses EV usually looks like this:

   use EV;
   use Coro;

   # start coroutines or event watchers

   EV::loop; # and loop

And in fact, for debugging, you often do something like this:

   use EV;
   use Coro::Debug;

   my $shell = new_unix_server Coro::Debug "/tmp/myshell";

   EV::loop; # and loop

This runs your program, but also an interactive shell on the unix domain
socket in F</tmp/myshell>. You can use the F<socat> program to access it:

   # socat readline /tmp/myshell
   coro debug session. use help for more info

   > ps
           PID SC  RSS USES Description              Where
     136672312 RC  19k 177k [main::]                 [myprog:28]
     136710424 -- 1268   48 [coro manager]           [Coro.pm:349]
   > help
   ps [w|v]                show the list of all coroutines (wide, verbose)
   bt <pid>                show a full backtrace of coroutine <pid>
   eval <pid> <perl>       evaluate <perl> expression in context of <pid>
   trace <pid>             enable tracing for this coroutine
   untrace <pid>           disable tracing for this coroutine
   kill <pid> <reason>     throws the given <reason> string in <pid>
   cancel <pid>            cancels this coroutine
   ready <pid>             force <pid> into the ready queue
   <anything else>         evaluate as perl and print results
   <anything else> &       same as above, but evaluate asynchronously
                           you can use (find_coro <pid>) in perl expressions
                           to find the coro with the given pid, e.g.
                           (find_coro 9768720)->ready
   loglevel <int>          enable logging for messages of level <int> and lower
   exit                    end this session

Microsft victims can of course use the even less secure C<new_tcp_server>
constructor.


=head2 The Real World - File I/O

Disk I/O, while often much faster than the network, nevertheless can take
quite a long time in which the CPU could do other things, if one would
only be able to do something.

Fortunately, the L<IO::AIO> module on CPAN allows you to move these
I/O calls into the background, letting you do useful work in the
foreground. It is event-/callback-based, but Coro has a nice wrapper
around it, called L<Coro::AIO>, which lets you use its functions
naturally from within threads:

   use Fcntl;
   use Coro::AIO;

   my $fh = aio_open "$filename~", O_WRONLY | O_CREAT, 0600
      or die "$filename~: $!";

   aio_write $fh, 0, (length $data), $data, 0;
   aio_fsync $fh;
   aio_close $fh;
   aio_rename "$filename~", "$filename";

The above creates a new file, writes data into it, syncs the data to disk
and atomically replaces a base file with a new copy.


=head2 Inversion of control - rouse functions

Last not least, me talk about inversion of control. The "control" refers
to "who calls whom", who is in control of the program. In this program,
the main program is in control and passes this to all functions it calls:

   use LWP;

   # pass control to get
   my $res = get "http://example.org/";
   # control returned to us

   print $res;

When switching to event-based programs, instead of "us calling them",
"they call us" - this is the inversion of control form the title:

   use AnyEvent::HTTP;

   # do not pass control for long - http_get immediately returns
   http_get "http://example.org/", sub {
      print $_[0];
   };

   # we stay in control and can do other things

Event based programming can be nice, but sometimes it's just easier to
write down some processing in "linear" fashion, without callbacks. Coro
provides some special functions to reduce typing:

   use AnyEvent::HTTP;

   # do not pass control for long - http_get immediately returns
   http_get "http://example.org/", Coro::rouse_cb;

   # we stay in control and can do other things...
   # ...such as wait for the result
   my ($res) = Coro::rouse_wait;

C<Coro::rouse_cb> creates and returns a special callback. You can pass
this callback to any function that would expect a callback.

C<Coro::rouse_wait> waits (block the current thread) until the most
recently created callback has been called, and returns whatever was passed
to it.

These two functions allow you to I<mechanically> invert the control from
"callback based style" used by most event-based libraries to "blocking
style", whenever you wish to.

The pattern is simple: instead of...

   some_func ..., sub {
      my @res = @_;
      ...
   };

... you write:

   some_func ..., Coro::rouse_cb;
   my @res = Coro::rouse_wait;
   ...

Callback-based interfaces are plenty, and the rouse functions allow you to
use them in an often more convenient way.


=head1 Other Modules

This introduction only mentions a few methods and modules, Coro has many
other functions (see the L<Coro> manpage) and modules (documented in the
C<SEE ALSO> section of the L<Coro> manpage).

Noteworthy modules are L<Coro::LWP> (for parallel LWP requests, but see
L<AnyEvent::HTTP> for a better HTTP-only alternative), L<Coro::BDB>, for
when you need an asynchronous database, L<Coro::Handle>, when you need
to use any file handle in a coroutine (popular to access C<STDIN> and
C<STDOUT>) and L<Coro::EV>, the optimised interface to L<EV> (which gets
used automatically by L<Coro::AnyEvent>).

There are a number of Coro-related moduels that might be useful for your problem
(see L<http://search.cpan.org/search?query=Coro&mode=module>). And since Coro
integrates so well into AnyEvent, it's often easy to adapt existing AnyEvent modules
(see L<http://search.cpan.org/search?query=AnyEvent&mode=module>).


=head1 AUTHOR

   Marc Lehmann <schmorp@schmorp.de>
   http://home.schmorp.de/