Net-Async-Redis

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for Net-Async-Redis

6.000     2024-01-28 05:00:01+08:00 Asia/Singapore
    [New features]

    - Initial OpenTelemetry support, thanks to JJATRIA's work on implementing (see OpenTelemetry::SDK).
    Note that this is expected to evolve in future as more async support is added to the OpenTelemetry
    SDK.

    [Bugs fixed]

    - some configuration features, such as clientside caching, could end up triggering a rapid memory
    leak due to interactions with Syntax::Keyword::Dynamically, the connection code has been refactored
    to avoid this (partially related to https://rt.cpan.org/Public/Bug/Display.html?id=148742)

5.001     2023-11-15 13:42:42+08:00 Asia/Singapore
    [New features]

    - helper methods for some standard cluster commands with no keys - these previously failed and
    generated warnings, now they accumulate results from all primary nodes and return the aggregated
    results
    - fall back to a random node when no keys are found for other commands
    - we now use the same primary connection across multiple slots, reduces the connection count when
    the shards are highly fragmented

5.000     2023-11-05 21:22:40+08:00 Asia/Singapore
    [Refactoring]

    - migrated to Object::Pad, for better OO handling (still uses a hashref-based object
    implementation, for backwards compatibility). Since this has the potential to be a big
    change, and may affect backward compatibility, the major version has been increased,
    and the Object::Pad migration will be released in steps during this version series
    (e.g. conversion to `field` instead of `$self->{accessor}`, move Net::Async::Redis::Commands
    to a rôle)
    - MULTI handling rewritten to handle cluster mode and mixed regular/transactional commands
    better

    [Bugs fixed]

    - previous versions exhibited excessive memory usage and poor performance when attempting
    to call MULTI within MULTI, the refactoring now includes tests to cover this scenario

4.002     2023-10-26 11:39:40+08:00 Asia/Singapore
    [Bugs fixed]

    - previously unable to connect to a single-node cluster, since the CLUSTER SLOTS
    output did not include any host/IP information (single-node clusters are not recommended,
    but useful for testing and local development!)
    - subscription on a cluster would fail since the subscription key isn't sharded
    - connection closure by the Redis server should be handled better now

4.001     2023-09-21 06:39:51+08:00 Asia/Singapore
    [Bugs fixed]

    - MULTI/EXEC were emitting a log message at `info` level

4.000     2023-08-27 01:12:09+08:00 Asia/Singapore
    [New features]

    - latest commands imported via `COMMAND LIST`, covering Redis 7.2
    - support for key spec parsing via https://redis.io/docs/reference/key-specs/

    [Bugs fixed]

    - MULTI/EXEC support overhaul for Net::Async::Redis::Cluster

3.024     2023-06-29 12:19:45+08:00 Asia/Singapore
    [New features]

    - helper method to allow Net::Async::Redis::Cluster::XS to ensure it's getting XS-backed nodes

3.023     2023-05-08 04:37:32+08:00 Asia/Singapore
    [New features]

    - support for sharded pubsub (`SSUBSCRIBE` / `SPUBLISH`)

    [Bugs fixed]

    - exception during subscription cancellation, thanks to Pavel Shaydo for patch+test
    - subscription handling should now work properly and consistently on both RESP2/RESP3
    - `XREAD`/`XREADGROUP` now returns empty arrayref when no items, for consistency
    - cancelling a cached `GET` could cause an exception due to the request Future going out of scope

3.022     2022-03-21 02:11:58+08:00 Asia/Kuala_Lumpur
    [New features]

    - Added latest commands as of 7.0 RC2

3.021     2022-02-24 23:13:49+08:00 Asia/Kuala_Lumpur
    [New features]

    - to improve compatibility in future, the approach for handling multi-word commands
    such as `XGROUP CREATE` now applies to all commands, including cases such as `MEMORY DOCTOR`
    so you can use `->memory('DOCTOR')` or `->memory_doctor` interchangeably. This is
    due to the command list we auto-extract from the Redis website potentially changing
    the structure again in future.

3.020     2022-02-17 17:47:08+08:00 Asia/Kuala_Lumpur
    [Bugs fixed]

    - the ->xgroup method was still not available in Net::Async::Redis::Cluster due to
    an incomplete fix in 3.019, thanks to Nael Alolwani for catching and fixing this

3.019     2022-02-16 15:42:23+08:00 Asia/Kuala_Lumpur
    [Bugs fixed]

    - the ->xgroup method was not available due to 7.0 changes, added back in

3.018     2022-02-01 11:58:52+08:00 Asia/Kuala_Lumpur
    [New features]

    - Added latest commands from Redis 7.0 release candidate

3.017     2021-10-18 00:15:14+08:00 Asia/Kuala_Lumpur
    [New features]

    - Added ->client_no_evict from latest Redis

3.016     2021-09-26 02:23:34+08:00 Asia/Kuala_Lumpur
    [New features]
    - Added readonly methods from latest Redis, e.g. `->sort_ro` and `->bitfield_ro`

3.015     2021-07-27 03:47:43+08:00 Asia/Kuala_Lumpur
    [Bugs fixed]
    - The CLUSTER INFO command generated warnings (RT138053, thanks to Nael for
    reporting)

3.014     2021-07-20 18:37:09+08:00 Asia/Kuala_Lumpur
    [New features]

    - ->expiretime and ->eval_ro from latest commands list

3.013     2021-04-27 10:28:39+08:00 Asia/Kuala_Lumpur
    No new features.

    Bugs fixed:

    - removed `dynamically` usage for `$log->{context}` since it can trigger a
    memory leak and also affects performance, see:
    https://github.com/binary-com/perl-Myriad/issues/117 for an example

3.012     2021-04-22 21:26:23+08:00 Asia/Kuala_Lumpur
    New features:

    - automatic connection and queuing is now more reliable, you should only
    need `await $redis->connected` in a few cases now
    - acess to client-side invalidation events, for implementing observables
    - enable keyspace notifications in cluster mode

    Bugs fixed:

    - various issues with client-side caching resolved
    - in cluster mode, XREAD could fail due to incorrect node lookup for key
    (Github #23, thanks to Marc Frank for the patch)
    - the stream read/write lengths were using the same internal key,
    so any changes to one value would affect the other as well
    - XINFO was mapping to the wrong key in cluster mode, thanks to Eyad for
    catching and fixing this

3.011     2021-02-03 03:43:05+08:00 Asia/Kuala_Lumpur
    New features:

    - ->failover added from latest commands list

3.010     2021-02-01 04:56:52+08:00 Asia/Kuala_Lumpur
    New features:

    - ->bootstrap will now fail if the cluster's nodes addresses are not initiated.
    - cluster->execute_command will now redirect the command to the correct node 
      if 'MOVED' error received.
    - latest commands from 6.2
    - this includes ->getdel, ->getex, ->zrandfield

3.009     2021-01-27 06:33:18+08:00 Asia/Kuala_Lumpur
    No new features.

    Bugs fixed:

    - `XGROUP` KEY_FINDER index was wrong.
    - `PUBLISH`/`SUBSCRIBE` were not in the KEY_FINDER index.
    - Net::Async::Redis::Cluster was ignoring the protocol compatibility layer.

3.008     2021-01-17 01:29:19+08:00 Asia/Kuala_Lumpur
    New features:

    - latest commands from 6.2
    - this includes ->xautoclaim, ->client_unpause and the `count => 'any'` option
    on various commands

3.007     2020-12-31 07:23:30+08:00 Asia/Kuala_Lumpur
    New features:

    - latest commands from 6.2
    - this includes ->geosearch, ->client_info, ->client_trackinginfo etc.

3.006     2020-12-04 05:33:59+08:00 Asia/Kuala_Lumpur
    New features:

    - latest commands from 6.0.9
    - ->copy is now supported (from latest Redis release)
    - stream handling now documents `MKSTREAM` and `NOMKSTREAM`,
    and updates the trimming to mention `~` (approximate) and `=`
    (exact) modes for removing old entries

3.005     2020-11-16 05:20:49+08:00 Asia/Kuala_Lumpur
    New features:

    - ->zdiff and ->zdiffstore commands from latest Redis release
    - provide RESP2/RESP3 compatibility when hashrefs option is not enabled,
    otherwise some commands such as ->xreadgroup can return inconsistent
    results between versions (thanks Eyad, Paul and Nael for the patch)
    - cluster mode now provides basic slot calculation caching, since this
    yields a small performance improvement

3.004     2020-11-04 12:40:40+08:00 Asia/Kuala_Lumpur
    No new features.

    Bugs fixed:

    - top-level hashes also now follow the `hashrefs` configuration option,
    this affects methods such as `->xreadgroup`.

3.003     2020-11-03 13:58:00+08:00 Asia/Kuala_Lumpur
    No new features.

    Bugs fixed:

    - RESP3 protocol changes (enabled by default in earlier versions of this module)
    meant that you'd end up with hashrefs or arrayrefs depending on what version
    of Redis you were connecting to. This behaviour is now controlled by the
    `hashrefs` configuration option, and is disabled by default even under RESP3.

3.002     2020-10-12 00:19:11+08:00 Asia/Kuala_Lumpur
    New features:

    - latest commands from 6.0.8, including LMOVE/BLMOVE

3.001     2020-09-22 01:51:28+08:00 Asia/Kuala_Lumpur
    New features:

    - RESP3 no longer establishes a separate client-side caching connection,
    since it can share the main connection
    - `AUTH username password` from latest Redis now supported

    Bugs fixed:

    - proper differentiation between regular arrays and pubsub messages
    when in pubsub mode on a RESP3 connection
    - avoid establishing client-side caching connection until we need one,
    could also have resulted in an attempt to connect to the wrong Redis server
    if using `->configure(host => ...); ->configure(port => ...);`
    - on a failure, continue with next item in pipeline - otherwise subsequent
    requests would get stuck

3.000     2020-09-14 02:00:10+08:00 Asia/Kuala_Lumpur
    New features:

    - RESP3 support, for Redis 6
    - pubsub is now allowed on the same connection as
    other traffic if the connection is in RESP3 mode
    - protocol is autodetected via `HELLO` command,
    pass `protocol => 'resp2'` to disable this

2.007     2020-09-05 02:08:25+08:00 Asia/Kuala_Lumpur
    New features:

    - applies client_name on connection if configured
    - opentracing can be controlled by `->configure(opentracing => 1 || 0)`

    Note that OpenTracing support is now *disabled* by default, since it
    incurs a small (~5%) performance penalty. The USE_OPENTRACING env var
    is still supported for enabling/disabling globally.

2.006     2020-08-26 10:23:47+08:00 Asia/Kuala_Lumpur
    New features:

    - latest command updates from Redis 6.0.6 - main change here is ->lpos
    renaming "first" to "rank", see https://redis.io/commands/lpos
    - Redis database selection via URI or `database` ->configure parameter
    - OpenTracing support via OpenTracing::Any

2.005     2020-06-28 00:20:52+08:00 Asia/Kuala_Lumpur
    New features:

    - LPOS method added from latest Redis release

    Bugs fixed:

    - URI parameter without `redis://` prefix is now upgraded,
    allowing `->new(uri => 'localhost:1234')` (Github issue #7,
    thanks LeoNerd)
    - pipelining encoding bug (Github issue #14, thanks to dankroboth
    for reporting and highlighting the faulty code)

2.004_001 2020-06-01 06:46:51+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    New features:

    - initial client-side caching support for ->get calls,
    pass client_side_caching_size => $size to enable.
    Please note that this key is likely to change in a future version.

2.004     2020-06-01 05:29:32+08:00 Asia/Kuala_Lumpur
    New features:

    - acl_getuser and related commands added
    - client-side caching documentation updated

2.003     2020-05-01 00:37:12+08:00 Asia/Kuala_Lumpur
    New features:

    - latest commands from Redis 6.0 GA release

    This is intended to be a stable release with next phase of
    development aimed at supporting the newer RESP3 protocol:
    https://github.com/antirez/RESP3

2.002_005 2020-04-29 02:50:10+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    No new features.

    Bugs fixed:

    - the ->multi return value was a nested Future, changing the result
    compared to previous versions - thanks to Nael for reporting
    - subscribe/psubscribe interleaved with ping or other requests
    could get confused about the pending queue

2.002_004 2020-04-23 02:19:48+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    New features:

    - pipeline parameter now does something, set to 0 to allow "unlimited"
    pipelined requests (as in keep trying until the Redis server stops listening
    to process the backlog)

2.002_003 2020-04-23 01:59:30+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    No new features.

    Bugs fixed:

    - MULTI implementation broke in the previous version

2.002_002 2020-04-14 01:40:41+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    New features:

    - Latest Redis has new ACL methods, including those in Commands.pm
    - simple ping latency example for testing

2.002_001 2020-04-03 22:02:06+08:00 Asia/Kuala_Lumpur (TRIAL RELEASE)
    New features:

    - Redis cluster support, https://redis.io/topics/cluster-spec
    - updated to latest Redis 5.x commands

2.001     2019-07-22 13:36:07+08:00 Asia/Kuala_Lumpur
    New features:

    - improved performance when issuing many Redis commands at once
    - ->watch_keyspace now provides a Ryu::Source instance
    - latest command parameters from redis.io (ABSTTL etc. for `restore`
    and TYPE for `scan`)

    New examples:

    - moving-window sum

2.000     2019-06-16 22:44:26+08:00 Asia/Kuala_Lumpur
    New features:

    - now requires Future::AsyncAwait

1.014     2019-02-04 02:05:21+08:00 Asia/Kuala_Lumpur
    New features:

    - increase default buffer sizes from 8KB to 1MB, to improve efficiency
    in the common case where there are only a few active Redis connections
    - support configurable buffer sizes via `stream_read_len` / `stream_write_len`

    New examples:

    - `incr-ratelimit-aa.pl` and `consumer-groups-aa.pl` for Future::AsyncAwait basic
    examples

    Bugs fixed:

    - the example for `->subscription` was wrong, thanks to Michael Mueller for catching
    and patching!
    - retain completion Future when executing commands

1.013     2018-12-10 02:03:19+08:00 Asia/Kuala_Lumpur
    New features:

    - support for `NOACK` and type parameter for `client_kill`, as provided in
    latest Redis 5.x release

1.012     2018-10-19 11:03:35+08:00 Asia/Kuala_Lumpur
    No new features.

    Examples:

    - Improved documentation for consumer-groups.pl example
    (tested against 5.0 release)

1.011     2018-10-06 20:50:12+08:00 Asia/Kuala_Lumpur
    No new features.

    Bug fixes:

    - error handling improved, previously requests were not marked as failed
    - protocol handling for 'undef' arrays (treated as `undef` now)

    Examples:

    - consumer-groups.pl example for XADD/XREADGROUP

1.010     2018-10-06 15:26:00+08:00 Asia/Kuala_Lumpur
    New commands:

    - CLIENT ID
    - CLIENT UNBLOCK

    from Redis 5.0rc5.

1.009     2018-10-03 19:26:59+08:00 Asia/Kuala_Lumpur
    Updated to latest Redis 5.0 commands from last RC prior to 5.0 release.

1.008     2018-09-27 11:28:37+08:00 Asia/Kuala_Lumpur
    No new features.

    Bug fixes:

    - The connection was cached even if connection failed/interrupted so reconnecting
    attempts were failing as well, now the connection object will get deleted on failure
    - Subscriptions were not cancelled when the connection being interrupted,
    now they are cancelled properly

    (thanks to Eyad Arnabeh for reporting and fixing)

1.007     2018-09-05 10:11:44+08:00 Asia/Kuala_Lumpur
    No new features.

    Bug fixes:

    - Support auth parameter, rather than insisting on auth information be passed as
    part of the URI (thanks to Nael Alolwani for reporting)
    - "Pipeline depth" notifications were logged at `info` level, these are an internal
    diagnostic and not useful for application code, these are now `trace` level messages

1.006     2018-07-12 22:39:10+08:00 Asia/Kuala_Lumpur
    No new features.

    Bug fixes:

    - PSUBSCRIBE wasn't working (reported by Bill Marriott and leonerd, fix+tests by leonerd,
    I did little more than hit the merge button - thanks!)

    Dependencies:

    - Math::Random::Secure dep was missing (thanks MANWAR!)

1.005     2018-01-06 18:18:22+08:00 Asia/Kuala_Lumpur
    No new features.

    Dependencies:

    - Class::Method::Modifiers, and make sure we have a recent version of Future.pm

1.004     2017-12-29 20:10:42+08:00 Asia/Kuala_Lumpur
    No new features.

    Performance improvements:

    - DISCARD/EXEC are now pipelined, we don't need to wait for the
    write before sending more commands


1.003     2017-12-26 18:00:36+08:00 Asia/Kuala_Lumpur
    New features:

    - support queuing for MULTI and other commands
    - support for passing a Redis URI
    - minor performance improvements in protocol implementation and event handling

    Bugs fixed:

    - it was possible for cancelled/failed items within ->multi to cause the protocol
    handling to go out of sync, this is now fixed.
    - multi-word commands were generated incorrectly, these now work again

    Examples:

    - `examples/job-hash-worker.pl` - reliable priority-queue worker with job
    details in separate hash key
    - `examples/job-hash-submit.pl` - submit requests for priority-queue workers


1.002     2017-12-11 06:06:47+08:00 Asia/Kuala_Lumpur
    Dependencies:

    - Explicit Log::Any dependency to ensure that context support is available

    New features:

    - PSUBSCRIBE support

1.001     2017-12-11 05:38:20+08:00 Asia/Kuala_Lumpur
    Dependencies:

    - Added Test::HexString and Test::Deep to test deps list

    New features:

    - Support for MULTI transactions via ->multi method

    Examples:

    - `examples/job-worker.pl` - Simple list-based job queue
    - `examples/multi.pl` - Transaction using MULTI

1.000     2017-12-09 16:51:40+08:00 Asia/Kuala_Lumpur
    First stable release. Note that there are some API changes, but
    previous code should work mostly unmodified.

    New features:

    - Dropped Protocol::Redis in favour of local implementation
    - Separated out commands into an autogenerated class to make
    it easier to keep up to date with protocol changes
    - Better support for subscriptions (see `examples/pub|sub.pl`)
    - Dropped JSON::MaybeXS and Mixin::Event::Dispatch deps, events
    are now handled through Ryu

    Known issues:

    - All data is assumed to be binary, callers need their own
    Encode::encode()/::decode() calls - future versions are likely
    to include an `->encoding` setting and/or `_utf8` versions of
    string methods.

0.003     2015-11-15 03:16:27+00:00 Europe/London
    No new features.

    Dependencies:

    - Added List::Util 1.29, for pairmap

0.002     2015-11-13 16:35:59+00:00 Europe/London
    Bugs fixed:

    - [RT107134](https://rt.cpan.org/Public/Bug/Display.html?id=107134) Include JSON::MaybeXS dependency

0.001     2015-09-14 02:30:30+01:00 Europe/London
    Initial CPAN release

LICENSE  view on Meta::CPAN

This software is copyright (c) 2015 by Tom Molesworth.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

Terms of the Perl programming language system itself

a) the GNU General Public License as published by the Free
   Software Foundation; either version 1, or (at your option) any
   later version, or
b) the "Artistic License"

--- The GNU General Public License, Version 1, February 1989 ---

This software is Copyright (c) 2015 by Tom Molesworth.

This is free software, licensed under:

  The GNU General Public License, Version 1, February 1989

                    GNU GENERAL PUBLIC LICENSE
                     Version 1, February 1989

 Copyright (C) 1989 Free Software Foundation, Inc.
 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA

 Everyone is permitted to copy and distribute verbatim copies
 of this license document, but changing it is not allowed.

                            Preamble

  The license agreements of most software companies try to keep users
at the mercy of those companies.  By contrast, our General Public
License is intended to guarantee your freedom to share and change free
software--to make sure the software is free for all its users.  The
General Public License applies to the Free Software Foundation's
software and to any other program whose authors commit to using it.
You can use it for your programs, too.

  When we speak of free software, we are referring to freedom, not
price.  Specifically, the General Public License is designed to make
sure that you have the freedom to give away or sell copies of free
software, that you receive source code or can get it if you want it,
that you can change the software or use pieces of it in new free
programs; and that you know you can do these things.

  To protect your rights, we need to make restrictions that forbid
anyone to deny you these rights or to ask you to surrender the rights.
These restrictions translate to certain responsibilities for you if you
distribute copies of the software, or if you modify it.

  For example, if you distribute copies of a such a program, whether
gratis or for a fee, you must give the recipients all the rights that
you have.  You must make sure that they, too, receive or can get the
source code.  And you must tell them their rights.

  We protect your rights with two steps: (1) copyright the software, and
(2) offer you this license which gives you legal permission to copy,
distribute and/or modify the software.

  Also, for each author's protection and ours, we want to make certain
that everyone understands that there is no warranty for this free
software.  If the software is modified by someone else and passed on, we
want its recipients to know that what they have is not the original, so
that any problems introduced by others will not reflect on the original
authors' reputations.

  The precise terms and conditions for copying, distribution and
modification follow.

                    GNU GENERAL PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. This License Agreement applies to any program or other work which
contains a notice placed by the copyright holder saying it may be
distributed under the terms of this General Public License.  The
"Program", below, refers to any such program or work, and a "work based
on the Program" means either the Program or any work containing the
Program or a portion of it, either verbatim or with modifications.  Each
licensee is addressed as "you".

  1. You may copy and distribute verbatim copies of the Program's source
code as you receive it, in any medium, provided that you conspicuously and
appropriately publish on each copy an appropriate copyright notice and
disclaimer of warranty; keep intact all the notices that refer to this
General Public License and to the absence of any warranty; and give any
other recipients of the Program a copy of this General Public License
along with the Program.  You may charge a fee for the physical act of
transferring a copy.

  2. You may modify your copy or copies of the Program or any portion of
it, and copy and distribute such modifications under the terms of Paragraph
1 above, provided that you also do the following:

    a) cause the modified files to carry prominent notices stating that
    you changed the files and the date of any change; and

    b) cause the whole of any work that you distribute or publish, that
    in whole or in part contains the Program or any part thereof, either
    with or without modifications, to be licensed at no charge to all
    third parties under the terms of this General Public License (except
    that you may choose to grant warranty protection to some or all
    third parties, at your option).

    c) If the modified program normally reads commands interactively when
    run, you must cause it, when started running for such interactive use
    in the simplest and most usual way, to print or display an
    announcement including an appropriate copyright notice and a notice
    that there is no warranty (or else, saying that you provide a
    warranty) and that users may redistribute the program under these
    conditions, and telling the user how to view a copy of this General
    Public License.

    d) You may charge a fee for the physical act of transferring a
    copy, and you may at your option offer warranty protection in
    exchange for a fee.

Mere aggregation of another independent work with the Program (or its
derivative) on a volume of a storage or distribution medium does not bring
the other work under the scope of these terms.

  3. You may copy and distribute the Program (or a portion or derivative of
it, under Paragraph 2) in object code or executable form under the terms of
Paragraphs 1 and 2 above provided that you also do one of the following:

    a) accompany it with the complete corresponding machine-readable
    source code, which must be distributed under the terms of
    Paragraphs 1 and 2 above; or,

    b) accompany it with a written offer, valid for at least three
    years, to give any third party free (except for a nominal charge
    for the cost of distribution) a complete machine-readable copy of the
    corresponding source code, to be distributed under the terms of
    Paragraphs 1 and 2 above; or,

    c) accompany it with the information you received as to where the
    corresponding source code may be obtained.  (This alternative is
    allowed only for noncommercial distribution and only if you
    received the program in object code or executable form alone.)

Source code for a work means the preferred form of the work for making
modifications to it.  For an executable file, complete source code means
all the source code for all modules it contains; but, as a special
exception, it need not include source code for modules which are standard
libraries that accompany the operating system on which the executable
file runs, or for standard header files or definitions files that
accompany that operating system.

  4. You may not copy, modify, sublicense, distribute or transfer the
Program except as expressly provided under this General Public License.
Any attempt otherwise to copy, modify, sublicense, distribute or transfer
the Program is void, and will automatically terminate your rights to use
the Program under this License.  However, parties who have received
copies, or rights to use copies, from you under this General Public
License will not have their licenses terminated so long as such parties
remain in full compliance.

  5. By copying, distributing or modifying the Program (or any work based
on the Program) you indicate your acceptance of this license to do so,
and all its terms and conditions.

  6. Each time you redistribute the Program (or any work based on the
Program), the recipient automatically receives a license from the original
licensor to copy, distribute or modify the Program subject to these
terms and conditions.  You may not impose any further restrictions on the
recipients' exercise of the rights granted herein.

  7. The Free Software Foundation may publish revised and/or new versions
of the General Public License from time to time.  Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.

Each version is given a distinguishing version number.  If the Program
specifies a version number of the license which applies to it and "any
later version", you have the option of following the terms and conditions
either of that version or of any later version published by the Free
Software Foundation.  If the Program does not specify a version number of
the license, you may choose any version ever published by the Free Software
Foundation.

  8. If you wish to incorporate parts of the Program into other free
programs whose distribution conditions are different, write to the author
to ask for permission.  For software which is copyrighted by the Free
Software Foundation, write to the Free Software Foundation; we sometimes
make exceptions for this.  Our decision will be guided by the two goals
of preserving the free status of all derivatives of our free software and
of promoting the sharing and reuse of software generally.

                            NO WARRANTY

  9. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW.  EXCEPT WHEN
OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.  THE ENTIRE RISK AS
TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU.  SHOULD THE
PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
REPAIR OR CORRECTION.

  10. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGES.

                     END OF TERMS AND CONDITIONS

        Appendix: How to Apply These Terms to Your New Programs

  If you develop a new program, and you want it to be of the greatest
possible use to humanity, the best way to achieve this is to make it
free software which everyone can redistribute and change under these
terms.

  To do so, attach the following notices to the program.  It is safest to
attach them to the start of each source file to most effectively convey
the exclusion of warranty; and each file should have at least the
"copyright" line and a pointer to where the full notice is found.

    <one line to give the program's name and a brief idea of what it does.>
    Copyright (C) 19yy  <name of author>

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 1, or (at your option)
    any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA  02110-1301 USA


Also add information on how to contact you by electronic and paper mail.

If the program is interactive, make it output a short notice like this
when it starts in an interactive mode:

    Gnomovision version 69, Copyright (C) 19xx name of author
    Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
    This is free software, and you are welcome to redistribute it
    under certain conditions; type `show c' for details.

The hypothetical commands `show w' and `show c' should show the
appropriate parts of the General Public License.  Of course, the
commands you use may be called something other than `show w' and `show
c'; they could even be mouse-clicks or menu items--whatever suits your
program.

You should also get your employer (if you work as a programmer) or your
school, if any, to sign a "copyright disclaimer" for the program, if
necessary.  Here a sample; alter the names:

  Yoyodyne, Inc., hereby disclaims all copyright interest in the
  program `Gnomovision' (a program to direct compilers to make passes
  at assemblers) written by James Hacker.

  <signature of Ty Coon>, 1 April 1989
  Ty Coon, President of Vice

That's all there is to it!


--- The Perl Artistic License 1.0 ---

This software is Copyright (c) 2015 by Tom Molesworth.

This is free software, licensed under:

  The Perl Artistic License 1.0





                         The "Artistic License"

                                Preamble

The intent of this document is to state the conditions under which a
Package may be copied, such that the Copyright Holder maintains some
semblance of artistic control over the development of the package,
while giving the users of the package the right to use and distribute
the Package in a more-or-less customary fashion, plus the right to make
reasonable modifications.

Definitions:

        "Package" refers to the collection of files distributed by the
        Copyright Holder, and derivatives of that collection of files
        created through textual modification.

        "Standard Version" refers to such a Package if it has not been
        modified, or has been modified in accordance with the wishes
        of the Copyright Holder as specified below.

        "Copyright Holder" is whoever is named in the copyright or
        copyrights for the package.

        "You" is you, if you're thinking about copying or distributing
        this Package.

        "Reasonable copying fee" is whatever you can justify on the
        basis of media cost, duplication charges, time of people involved,
        and so on.  (You will not be required to justify it to the
        Copyright Holder, but only to the computing community at large
        as a market that must bear the fee.)

        "Freely Available" means that no fee is charged for the item
        itself, though there may be fees involved in handling the item.
        It also means that recipients of the item may redistribute it
        under the same conditions they received it.

1. You may make and give away verbatim copies of the source form of the
Standard Version of this Package without restriction, provided that you
duplicate all of the original copyright notices and associated disclaimers.

2. You may apply bug fixes, portability fixes and other modifications
derived from the Public Domain or from the Copyright Holder.  A Package
modified in such a way shall still be considered the Standard Version.

3. You may otherwise modify your copy of this Package in any way, provided
that you insert a prominent notice in each changed file stating how and
when you changed that file, and provided that you do at least ONE of the
following:

    a) place your modifications in the Public Domain or otherwise make them
    Freely Available, such as by posting said modifications to Usenet or
    an equivalent medium, or placing the modifications on a major archive
    site such as uunet.uu.net, or by allowing the Copyright Holder to include
    your modifications in the Standard Version of the Package.

    b) use the modified Package only within your corporation or organization.

    c) rename any non-standard executables so the names do not conflict
    with standard executables, which must also be provided, and provide
    a separate manual page for each non-standard executable that clearly
    documents how it differs from the Standard Version.

    d) make other distribution arrangements with the Copyright Holder.

4. You may distribute the programs of this Package in object code or
executable form, provided that you do at least ONE of the following:

    a) distribute a Standard Version of the executables and library files,
    together with instructions (in the manual page or equivalent) on where
    to get the Standard Version.

    b) accompany the distribution with the machine-readable source of
    the Package with your modifications.

    c) give non-standard executables non-standard names, and clearly
    document the differences in manual pages (or equivalent), together
    with instructions on where to get the Standard Version.

    d) make other distribution arrangements with the Copyright Holder.

5. You may charge a reasonable copying fee for any distribution of this
Package.  You may charge any fee you choose for support of this
Package.  You may not charge a fee for this Package itself.  However,
you may distribute this Package in aggregate with other (possibly
commercial) programs as part of a larger (possibly commercial) software
distribution provided that you do not advertise this Package as a
product of your own.  You may embed this Package's interpreter within
an executable of yours (by linking); this shall be construed as a mere
form of aggregation, provided that the complete Standard Version of the
interpreter is so embedded.

6. The scripts and library files supplied as input to or produced as
output from the programs of this Package do not automatically fall
under the copyright of this Package, but belong to whoever generated
them, and may be sold commercially, and may be aggregated with this
Package.  If such scripts or library files are aggregated with this
Package via the so-called "undump" or "unexec" methods of producing a
binary executable image, then distribution of such an image shall
neither be construed as a distribution of this Package nor shall it
fall under the restrictions of Paragraphs 3 and 4, provided that you do
not represent such an executable image as a Standard Version of this
Package.

7. C subroutines (or comparably compiled subroutines in other
languages) supplied by you and linked into this Package in order to
emulate subroutines and variables of the language defined by this
Package shall not be considered part of this Package, but are the
equivalent of input as in Paragraph 6, provided these subroutines do
not change the language in any way that would cause it to fail the
regression tests for the language.

8. Aggregation of this Package with a commercial distribution is always
permitted provided that the use of this Package is embedded; that is,
when no overt attempt is made to make this Package's interfaces visible
to the end user of the commercial distribution.  Such use shall not be
construed as a distribution of this Package.

9. The name of the Copyright Holder may not be used to endorse or promote
products derived from this software without specific prior written permission.

10. THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.

                                The End

MANIFEST  view on Meta::CPAN

# This file was automatically generated by Dist::Zilla::Plugin::Manifest v6.031.
Changes
LICENSE
MANIFEST
META.json
META.yml
Makefile.PL
README
cpanfile
dist.ini
examples/cache-events.pl
examples/client-cache-streams.pl
examples/consumer-groups-aa-cluster.pl
examples/consumer-groups-aa.pl
examples/consumer-groups.pl
examples/hincr.pl
examples/incr-ratelimit-aa.pl
examples/incr-ratelimit.pl
examples/job-benchmark.pl
examples/job-hash-submit.pl
examples/job-hash-worker.pl
examples/job-queue.pl
examples/job-submit.pl
examples/job-worker.pl
examples/lrange.pl
examples/moving-sum.pl
examples/multi.pl
examples/ping.pl
examples/pub.pl
examples/redis-cli
examples/sub.pl
lib/Net/Async/Redis.pm
lib/Net/Async/Redis.pod
lib/Net/Async/Redis/Cluster.pm
lib/Net/Async/Redis/Cluster.pod
lib/Net/Async/Redis/Cluster/Multi.pm
lib/Net/Async/Redis/Cluster/Node.pm
lib/Net/Async/Redis/Cluster/Node.pod
lib/Net/Async/Redis/Commands.pm
lib/Net/Async/Redis/Multi.pm
lib/Net/Async/Redis/Protocol.pm
lib/Net/Async/Redis/Server.pm
lib/Net/Async/Redis/Server.pod
lib/Net/Async/Redis/Server/Connection.pm
lib/Net/Async/Redis/Server/Connection.pod
lib/Net/Async/Redis/Server/Database.pm
lib/Net/Async/Redis/Subscription.pm
lib/Net/Async/Redis/Subscription/Message.pm
script/commands.pl
script/docker-cluster.sh
script/extract-keyspec-examples.sh
script/generate-perl-for-commands.pl
share/commands.yaml
t/00-check-deps.t
t/00-compile.t
t/00-report-prereqs.dd
t/00-report-prereqs.t
t/basic.t
t/client_side_caching.t
t/cluster.t
t/hash_slot.t
t/keyspec.t
t/multi.t
t/pipeline.t
t/protocol.t
t/protocol_compatibility_test.t
t/psubscribe.t
t/pubsub.t
t/watch_keyspace.t
xt/author/distmeta.t
xt/author/eol.t
xt/author/minimum-version.t
xt/author/mojibake.t
xt/author/no-tabs.t
xt/author/pod-syntax.t
xt/author/portability.t
xt/author/test-version.t
xt/release/common_spelling.t
xt/release/cpan-changes.t

META.json  view on Meta::CPAN

{
   "abstract" : "Redis support for IO::Async",
   "author" : [
      "Tom Molesworth <TEAM@cpan.org>"
   ],
   "dynamic_config" : 0,
   "generated_by" : "Dist::Zilla version 6.031, CPAN::Meta::Converter version 2.150010",
   "license" : [
      "perl_5"
   ],
   "meta-spec" : {
      "url" : "http://search.cpan.org/perldoc?CPAN::Meta::Spec",
      "version" : 2
   },
   "name" : "Net-Async-Redis",
   "no_index" : {
      "directory" : [
         "eg",
         "share",
         "shares",
         "t",
         "xt"
      ]
   },
   "prereqs" : {
      "configure" : {
         "requires" : {
            "ExtUtils::MakeMaker" : "0",
            "File::ShareDir::Install" : "0.06"
         }
      },
      "develop" : {
         "requires" : {
            "Dist::Zilla" : "5",
            "Dist::Zilla::PluginBundle::Author::TEAM" : "0",
            "HTML::TreeBuilder" : "0",
            "Net::Async::HTTP" : "0",
            "Software::License::Perl_5" : "0",
            "Template" : "0",
            "Test::CPAN::Changes" : "0.19",
            "Test::CPAN::Meta" : "0",
            "Test::EOL" : "0",
            "Test::MinimumVersion" : "0",
            "Test::Mojibake" : "0",
            "Test::More" : "0.88",
            "Test::NoTabs" : "0",
            "Test::Pod" : "1.41",
            "Test::Portability::Files" : "0",
            "Test::Version" : "1"
         }
      },
      "runtime" : {
         "recommends" : {
            "OpenTelemetry" : "0.018",
            "OpenTelemetry::SDK" : "0.020"
         },
         "requires" : {
            "Cache::LRU" : "0.04",
            "Class::Method::Modifiers" : "0",
            "Digest::CRC" : "0.22",
            "Dir::Self" : "0",
            "File::ShareDir" : "0",
            "File::ShareDir::Install" : "0",
            "Future" : "0.47",
            "Future::AsyncAwait" : "0.48",
            "Future::Queue" : "0.50",
            "IO::Async" : "0",
            "List::BinarySearch::XS" : "0.09",
            "List::Util" : "1.55",
            "Log::Any" : "1.708",
            "Math::Random::Secure" : "0",
            "Metrics::Any" : "0",
            "Object::Pad" : "0.800",
            "Path::Tiny" : "0",
            "Ryu" : "2.009",
            "Ryu::Async" : "0.019",
            "Syntax::Keyword::Dynamically" : "0.07",
            "Syntax::Keyword::Match" : "0",
            "Syntax::Keyword::Try" : "0.21",
            "URI" : "0",
            "URI::redis" : "0",
            "YAML::XS" : "0.88",
            "curry" : "0",
            "parent" : "0"
         },
         "suggests" : {
            "OpenTracing::Any" : "1.003"
         }
      },
      "test" : {
         "recommends" : {
            "CPAN::Meta" : "2.120900"
         },
         "requires" : {
            "ExtUtils::MakeMaker" : "0",
            "File::Spec" : "0",
            "IO::Handle" : "0",
            "IPC::Open3" : "0",
            "Test::CheckDeps" : "0.010",
            "Test::Deep" : "0",
            "Test::Fatal" : "0.016",
            "Test::HexString" : "0",
            "Test::MockModule" : "0",
            "Test::More" : "0.98",
            "Variable::Disposition" : "0.004"
         }
      }
   },
   "provides" : {
      "Net::Async::Redis" : {
         "file" : "lib/Net/Async/Redis.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Cluster" : {
         "file" : "lib/Net/Async/Redis/Cluster.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Cluster::Multi" : {
         "file" : "lib/Net/Async/Redis/Cluster/Multi.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Cluster::Node" : {
         "file" : "lib/Net/Async/Redis/Cluster/Node.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Commands" : {
         "file" : "lib/Net/Async/Redis/Commands.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Multi" : {
         "file" : "lib/Net/Async/Redis/Multi.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Protocol" : {
         "file" : "lib/Net/Async/Redis/Protocol.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Server" : {
         "file" : "lib/Net/Async/Redis/Server.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Server::Connection" : {
         "file" : "lib/Net/Async/Redis/Server/Connection.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Server::Database" : {
         "file" : "lib/Net/Async/Redis/Server/Database.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Subscription" : {
         "file" : "lib/Net/Async/Redis/Subscription.pm",
         "version" : "6.000"
      },
      "Net::Async::Redis::Subscription::Message" : {
         "file" : "lib/Net/Async/Redis/Subscription/Message.pm",
         "version" : "6.000"
      }
   },
   "release_status" : "stable",
   "resources" : {
      "bugtracker" : {
         "web" : "https://github.com/team-at-cpan/Net-Async-Redis/issues"
      },
      "homepage" : "https://github.com/team-at-cpan/Net-Async-Redis",
      "repository" : {
         "type" : "git",
         "url" : "https://github.com/team-at-cpan/Net-Async-Redis.git",
         "web" : "https://github.com/team-at-cpan/Net-Async-Redis"
      }
   },
   "version" : "6.000",
   "x_authority" : "cpan:TEAM",
   "x_contributors" : [
      "Eyad Arnabeh <eyadof@gmail.com>",
      "Tom Molesworth <tom@audioboundary.com>",
      "Paul \"LeoNerd\" Evans <leonerd@leonerd.org.uk>",
      "Tom Molesworth <tom@deriv.com>",
      "tm604 <tom@perlsite.co.uk>",
      "Marc Frank <mfrank@sipxpert.de>",
      "Michael Mueller <michael.mueller@binary.com>",
      "Mohammad S Anwar <mohammad.anwar@yahoo.com>",
      "shaydo-deriv <129739908+shaydo-deriv@users.noreply.github.com>"
   ],
   "x_generated_by_perl" : "v5.38.0",
   "x_serialization_backend" : "Cpanel::JSON::XS version 4.37",
   "x_spdx_expression" : "Artistic-1.0-Perl OR GPL-1.0-or-later"
}

META.yml  view on Meta::CPAN

---
abstract: 'Redis support for IO::Async'
author:
  - 'Tom Molesworth <TEAM@cpan.org>'
build_requires:
  ExtUtils::MakeMaker: '0'
  File::Spec: '0'
  IO::Handle: '0'
  IPC::Open3: '0'
  Test::CheckDeps: '0.010'
  Test::Deep: '0'
  Test::Fatal: '0.016'
  Test::HexString: '0'
  Test::MockModule: '0'
  Test::More: '0.98'
  Variable::Disposition: '0.004'
configure_requires:
  ExtUtils::MakeMaker: '0'
  File::ShareDir::Install: '0.06'
dynamic_config: 0
generated_by: 'Dist::Zilla version 6.031, CPAN::Meta::Converter version 2.150010'
license: perl
meta-spec:
  url: http://module-build.sourceforge.net/META-spec-v1.4.html
  version: '1.4'
name: Net-Async-Redis
no_index:
  directory:
    - eg
    - share
    - shares
    - t
    - xt
provides:
  Net::Async::Redis:
    file: lib/Net/Async/Redis.pm
    version: '6.000'
  Net::Async::Redis::Cluster:
    file: lib/Net/Async/Redis/Cluster.pm
    version: '6.000'
  Net::Async::Redis::Cluster::Multi:
    file: lib/Net/Async/Redis/Cluster/Multi.pm
    version: '6.000'
  Net::Async::Redis::Cluster::Node:
    file: lib/Net/Async/Redis/Cluster/Node.pm
    version: '6.000'
  Net::Async::Redis::Commands:
    file: lib/Net/Async/Redis/Commands.pm
    version: '6.000'
  Net::Async::Redis::Multi:
    file: lib/Net/Async/Redis/Multi.pm
    version: '6.000'
  Net::Async::Redis::Protocol:
    file: lib/Net/Async/Redis/Protocol.pm
    version: '6.000'
  Net::Async::Redis::Server:
    file: lib/Net/Async/Redis/Server.pm
    version: '6.000'
  Net::Async::Redis::Server::Connection:
    file: lib/Net/Async/Redis/Server/Connection.pm
    version: '6.000'
  Net::Async::Redis::Server::Database:
    file: lib/Net/Async/Redis/Server/Database.pm
    version: '6.000'
  Net::Async::Redis::Subscription:
    file: lib/Net/Async/Redis/Subscription.pm
    version: '6.000'
  Net::Async::Redis::Subscription::Message:
    file: lib/Net/Async/Redis/Subscription/Message.pm
    version: '6.000'
recommends:
  OpenTelemetry: '0.018'
  OpenTelemetry::SDK: '0.020'
requires:
  Cache::LRU: '0.04'
  Class::Method::Modifiers: '0'
  Digest::CRC: '0.22'
  Dir::Self: '0'
  File::ShareDir: '0'
  File::ShareDir::Install: '0'
  Future: '0.47'
  Future::AsyncAwait: '0.48'
  Future::Queue: '0.50'
  IO::Async: '0'
  List::BinarySearch::XS: '0.09'
  List::Util: '1.55'
  Log::Any: '1.708'
  Math::Random::Secure: '0'
  Metrics::Any: '0'
  Object::Pad: '0.800'
  Path::Tiny: '0'
  Ryu: '2.009'
  Ryu::Async: '0.019'
  Syntax::Keyword::Dynamically: '0.07'
  Syntax::Keyword::Match: '0'
  Syntax::Keyword::Try: '0.21'
  URI: '0'
  URI::redis: '0'
  YAML::XS: '0.88'
  curry: '0'
  parent: '0'
resources:
  bugtracker: https://github.com/team-at-cpan/Net-Async-Redis/issues
  homepage: https://github.com/team-at-cpan/Net-Async-Redis
  repository: https://github.com/team-at-cpan/Net-Async-Redis.git
version: '6.000'
x_authority: cpan:TEAM
x_contributors:
  - 'Eyad Arnabeh <eyadof@gmail.com>'
  - 'Tom Molesworth <tom@audioboundary.com>'
  - 'Paul "LeoNerd" Evans <leonerd@leonerd.org.uk>'
  - 'Tom Molesworth <tom@deriv.com>'
  - 'tm604 <tom@perlsite.co.uk>'
  - 'Marc Frank <mfrank@sipxpert.de>'
  - 'Michael Mueller <michael.mueller@binary.com>'
  - 'Mohammad S Anwar <mohammad.anwar@yahoo.com>'
  - 'shaydo-deriv <129739908+shaydo-deriv@users.noreply.github.com>'
x_generated_by_perl: v5.38.0
x_serialization_backend: 'YAML::Tiny version 1.74'
x_spdx_expression: 'Artistic-1.0-Perl OR GPL-1.0-or-later'

Makefile.PL  view on Meta::CPAN

# This file was automatically generated by Dist::Zilla::Plugin::MakeMaker v6.031.
use strict;
use warnings;



use ExtUtils::MakeMaker;

use File::ShareDir::Install;
$File::ShareDir::Install::INCLUDE_DOTFILES = 1;
$File::ShareDir::Install::INCLUDE_DOTDIRS = 1;
install_share dist => "share";


my %WriteMakefileArgs = (
  "ABSTRACT" => "Redis support for IO::Async",
  "AUTHOR" => "Tom Molesworth <TEAM\@cpan.org>",
  "CONFIGURE_REQUIRES" => {
    "ExtUtils::MakeMaker" => 0,
    "File::ShareDir::Install" => "0.06"
  },
  "DISTNAME" => "Net-Async-Redis",
  "LICENSE" => "perl",
  "NAME" => "Net::Async::Redis",
  "PREREQ_PM" => {
    "Cache::LRU" => "0.04",
    "Class::Method::Modifiers" => 0,
    "Digest::CRC" => "0.22",
    "Dir::Self" => 0,
    "File::ShareDir" => 0,
    "File::ShareDir::Install" => 0,
    "Future" => "0.47",
    "Future::AsyncAwait" => "0.48",
    "Future::Queue" => "0.50",
    "IO::Async" => 0,
    "List::BinarySearch::XS" => "0.09",
    "List::Util" => "1.55",
    "Log::Any" => "1.708",
    "Math::Random::Secure" => 0,
    "Metrics::Any" => 0,
    "Object::Pad" => "0.800",
    "Path::Tiny" => 0,
    "Ryu" => "2.009",
    "Ryu::Async" => "0.019",
    "Syntax::Keyword::Dynamically" => "0.07",
    "Syntax::Keyword::Match" => 0,
    "Syntax::Keyword::Try" => "0.21",
    "URI" => 0,
    "URI::redis" => 0,
    "YAML::XS" => "0.88",
    "curry" => 0,
    "parent" => 0
  },
  "TEST_REQUIRES" => {
    "ExtUtils::MakeMaker" => 0,
    "File::Spec" => 0,
    "IO::Handle" => 0,
    "IPC::Open3" => 0,
    "Test::CheckDeps" => "0.010",
    "Test::Deep" => 0,
    "Test::Fatal" => "0.016",
    "Test::HexString" => 0,
    "Test::MockModule" => 0,
    "Test::More" => "0.98",
    "Variable::Disposition" => "0.004"
  },
  "VERSION" => "6.000",
  "test" => {
    "TESTS" => "t/*.t"
  }
);


my %FallbackPrereqs = (
  "Cache::LRU" => "0.04",
  "Class::Method::Modifiers" => 0,
  "Digest::CRC" => "0.22",
  "Dir::Self" => 0,
  "ExtUtils::MakeMaker" => 0,
  "File::ShareDir" => 0,
  "File::ShareDir::Install" => 0,
  "File::Spec" => 0,
  "Future" => "0.47",
  "Future::AsyncAwait" => "0.48",
  "Future::Queue" => "0.50",
  "IO::Async" => 0,
  "IO::Handle" => 0,
  "IPC::Open3" => 0,
  "List::BinarySearch::XS" => "0.09",
  "List::Util" => "1.55",
  "Log::Any" => "1.708",
  "Math::Random::Secure" => 0,
  "Metrics::Any" => 0,
  "Object::Pad" => "0.800",
  "Path::Tiny" => 0,
  "Ryu" => "2.009",
  "Ryu::Async" => "0.019",
  "Syntax::Keyword::Dynamically" => "0.07",
  "Syntax::Keyword::Match" => 0,
  "Syntax::Keyword::Try" => "0.21",
  "Test::CheckDeps" => "0.010",
  "Test::Deep" => 0,
  "Test::Fatal" => "0.016",
  "Test::HexString" => 0,
  "Test::MockModule" => 0,
  "Test::More" => "0.98",
  "URI" => 0,
  "URI::redis" => 0,
  "Variable::Disposition" => "0.004",
  "YAML::XS" => "0.88",
  "curry" => 0,
  "parent" => 0
);


unless ( eval { ExtUtils::MakeMaker->VERSION(6.63_03) } ) {
  delete $WriteMakefileArgs{TEST_REQUIRES};
  delete $WriteMakefileArgs{BUILD_REQUIRES};
  $WriteMakefileArgs{PREREQ_PM} = \%FallbackPrereqs;
}

delete $WriteMakefileArgs{CONFIGURE_REQUIRES}
  unless eval { ExtUtils::MakeMaker->VERSION(6.52) };

WriteMakefile(%WriteMakefileArgs);

{
package
MY;
use File::ShareDir::Install qw(postamble);
}

README  view on Meta::CPAN

NAME

    Net::Async::Redis - talk to Redis servers via IO::Async

SYNOPSIS

        use Net::Async::Redis;
        use Future::AsyncAwait;
        use IO::Async::Loop;
        my $loop = IO::Async::Loop->new;
        $loop->add(my $redis = Net::Async::Redis->new);
        my $value = await $redis->get('some_key');
        $value ||= await $redis->set(some_key => 'some_value');
        print "Value: $value\n";
    
        # You can also use ->then chaining, see L<Future> for more details
        $redis->connect->then(sub {
            $redis->get('some_key')
        })->then(sub {
            my $value = shift;
            return Future->done($value) if $value;
            $redis->set(some_key => 'some_value')
        })->on_done(sub {
            print "Value: " . shift;
        })->get;

DESCRIPTION

    Provides client access for dealing with Redis servers.

    6ee Net::Async::Redis::Commands for the full list of commands, this
    list is autogenerated from the official documentation here:

    https://redis.io/commands

    This is intended to be a near-complete low-level client module for
    asynchronous Redis support. See Net::Async::Redis::Server for a
    (limited) Perl server implementation.

    This is an unofficial Perl port, and not endorsed by the Redis server
    maintainers in any way.

 Supported features

    Current features include:

      * all commands <https://redis.io/commands> as of 7.2 (August 2023),
      see https://redis.io/commands for the methods and parameters

      * pub/sub support <https://redis.io/topics/pubsub>, see "METHODS -
      Subscriptions" including sharded pubsub

      * pipelining <https://redis.io/topics/pipelining>, see
      "pipeline_depth"

      * transactions <https://redis.io/topics/transactions>, see "METHODS -
      Transactions"

      * streams <https://redis.io/topics/streams-intro> and consumer
      groups, via "XADD" in Net::Async::Redis::Commands and related methods

      * client-side caching <https://redis.io/topics/client-side-caching>,
      see "METHODS - Clientside caching"

      * "https://github.com/antirez/RESP3/blob/master/spec.md" in RESP3
      protocol for Redis 6 and above, allowing pubsub on the same
      connection as regular commands

      * cluster support via Net::Async::Redis::Cluster, including key
      specifications from https://redis.io/docs/reference/key-specs/ to
      route commands to the correct node(s)

      * see Net::Async::Redis::XS for a faster XS version (can be 40x
      faster than the pure Perl version, particularly when parsing large
      "xreadgroup" responses)

 Connecting

    As with any other IO::Async::Notifier-based module, you'll need to add
    this to an IO::Async::Loop:

        my $loop = IO::Async::Loop->new;
        $loop->add(
            my $redis = Net::Async::Redis->new
        );

    then connect to the server:

        use Future::AsyncAwait;
        await $redis->connect;
        # You could achieve a similar result by passing client_name in
        # constructor or ->connect parameters
        await $redis->client_setname("example client");

 Key-value handling

    One of the most common Redis scenarios is as a key/value store. The
    "get" and "set" methods are typically used here:

        use Future::AsyncAwait;
        await $redis->connect;
        $redis->set(some_key => 'some value');
        my ($value) = await $redis->get('some_key');
        print "Read back value [$value]\n";

    See the next section for more information on what these methods are
    actually returning.

 Requests and responses

    Requests are implemented as methods on the Net::Async::Redis object.
    These typically return a Future which will resolve once ready:

        my $future = $redis->incr("xyz")
            ->on_done(sub {
                print "result of increment was " . shift . "\n"
            });

    For synchronous code, call ->get on that Future:

        print "Database has " . $redis->dbsize->get . " total keys\n";

    This means you can end up with ->get being called on the result of
    ->get, note that these are two very different methods:

     $redis
      ->get('some key') # this is being called on $redis, and is issuing a GET request
      ->get # this is called on the returned Future, and blocks until the value is ready

    Typical async code would not be expected to use the "get" in Future
    method extensively; often only calling it in one place at the top level
    in the code.

  RESP3 and RESP2 compatibility

    In RESP3 some of the responses are structured differently from RESP2.
    Net::Async::Redis guarantees the same structure unless you have
    explicitly requested the new types using the "configure" hashrefs
    option, which is disabled by default.

    Generally RESP3 is recommended if you have Redis version 6 or later
    installed: it allows subscription operations to share the same
    connection as regular Redis traffic.

 Error handling

    Since Future is used for deferred results, failure is indicated by a
    failing Future with failure category of redis.

    The "catch" in Future feature may be useful for handling these:

     $redis->lpush(key => $value)
         ->catch(
             redis => sub { warn "probably an incorrect type, cannot push value"; Future->done }
         )->get;

    Note that this module uses Future::AsyncAwait internally.

CONSTANTS

 OPENTRACING_ENABLED

    Defaults to false, this can be controlled by the USE_OPENTRACING
    environment variable. This provides a way to set the default
    opentracing mode for all Net::Async::Redis instances - you can
    enable/disable for a specific instance via "configure":

     $redis->configure(opentracing => 1);

    When enabled, this will create a span for every Redis request. See
    OpenTracing::Any for details.

 OPENTELEMETRY_ENABLED

    Defaults to false, this can be controlled by the USE_OPENTELEMETRY
    environment variable. This provides a way to set the default
    opentelemetry mode for all Net::Async::Redis instances - you can
    enable/disable for a specific instance via "configure":

     $redis->configure(opentelemetry => 1);

    When enabled, this will create a span for every Redis request. See
    OpenTelemetry or https://opentelemetry.io for details.

METHODS

    NOTE: For a full list of the Redis methods supported by this module,
    please see Net::Async::Redis::Commands.

 configure

    Applies configuration parameters - currently supports:

      * host

      * port

      * auth

      * database

      * pipeline_depth

      * stream_read_len

      * stream_write_len

      * on_disconnect

      * client_name

      * opentracing

      * opentelemetry

      * protocol - either 'resp2' or 'resp3', default is autodetect

      * hashrefs - RESP3 (Redis 6.0+) supports more data types, currently
      the only difference this makes to us is that it now supports hashrefs
      for key/value pairs. This is disabled by default to ensure
      compatibility across newer+older versions.

    Note that enabling hashrefs will cause connections to fail if the
    server does not support RESP3.

 host

    Returns the host or IP address for the Redis server.

 port

    Returns the port used for connecting to the Redis server.

 database

    Returns the database index used when connecting to the Redis server.

    See the "select" in Net::Async::Redis::Commands method for details.

 uri

    Returns the Redis endpoint URI instance.

 stream_read_len

    Returns the buffer size when reading from a Redis connection.

    Defaults to 1MB, reduce this if you're dealing with a lot of
    connections and want to minimise memory usage. Alternatively, if you're
    reading large amounts of data and spend too much time in needless
    epoll_wait calls, try a larger value.

 stream_write_len

    Returns the buffer size when writing to Redis connections, in bytes.
    Defaults to 1MB.

    See "stream_read_len".

 client_name

    Returns the name used for this client when connecting.

METHODS - Connection

 connect

    Connects to the Redis server.

    Will use the "configure"d parameters if available, but as a convenience
    can be passed additional parameters which will then be applied as if
    you had called "configure" with those beforehand. This also means that
    they will be preserved for subsequent "connect" calls.

 connected

    Establishes a connection if needed, otherwise returns an
    immediately-available Future instance.

 endpoint

    The string describing the remote endpoint.

 local_endpoint

    A string describing the local endpoint, usually host:port.

METHODS - Subscriptions

    See https://redis.io/topics/pubsub for more details on this topic.
    There's also more details on the internal implementation in Redis here:
    https://making.pusher.com/redis-pubsub-under-the-hood/.

    NOTE: On Redis versions prior to 6.0, you will need a separate
    connection for subscriptions; you cannot share a connection for regular
    requests once any of the "subscribe" or "psubscribe" methods have been
    called on an existing connection.

    With Redis 6.0, a newer protocol version (RESP3) is used by default,
    and this is quite happy to support pubsub activity on the same
    connection as other traffic.

 psubscribe

    Subscribes to a pattern.

    Example:

     # Subscribe to 'info::*' channels, i.e. any message
     # that starts with the C<info::> prefix, and prints them
     # with a timestamp.
     $redis_connection->psubscribe('info::*')
        ->then(sub {
            my $sub = shift;
            $sub->map('payload')
                ->each(sub {
                 print localtime . ' ' . $_ . "\n";
                })->retain
        })->get;
     # this will block until the subscribe is confirmed. Note that you can't publish on
     # a connection that's handling subscriptions due to Redis protocol restrictions.
     $other_redis_connection->publish('info::example', 'a message here')->get;

    Returns a Future which resolves to a Net::Async::Redis::Subscription
    instance.

 subscribe

    Subscribes to one or more channels.

    Returns a Future which resolves to a Net::Async::Redis::Subscription
    instance.

    Example:

     # Subscribe to 'notifications' channel,
     # print the first 5 messages, then unsubscribe
     $redis->subscribe('notifications')
        ->then(sub {
            my $sub = shift;
            $sub->events
                ->map('payload')
                ->take(5)
                ->say
                ->completed
        })->then(sub {
            $redis->unsubscribe('notifications')
        })->get

 ssubscribe

    Subscribes to one or more sharded channels.

    This behaves similarly to "subscribe", but applies to messages received
    on a specific shard. This is mostly relevant in a cluster context,
    where subscriptions can be localised to one shard (group of nodes) in
    the cluster to improve performance.

    More details are in the sharded pubsub documentation
    <https://redis.io/topics/pubsub#sharded-pubsub>.

    Returns a Future which resolves to a Net::Async::Redis::Subscription
    instance.

    Example:

     # Subscribe to 'notifications' channel,
     # print the first 5 messages, then unsubscribe
     $redis->subscribe('notifications')
        ->then(sub {
            my $sub = shift;
            $sub->events
                ->map('payload')
                ->take(5)
                ->say
                ->completed
        })->then(sub {
            $redis->unsubscribe('notifications')
        })->get

METHODS - Transactions

 multi

    Executes the given code in a Redis MULTI transaction.

    This will cause each of the requests to be queued on the server, then
    applied in a single atomic transaction.

    Note that the commands will resolve only after the transaction is
    committed: for example, when the "set" command is issued, Redis will
    return QUEUED. This information is not used as the result - we only
    pass through the immediate response if there was an error. The Future
    representing the response will be marked as done once the EXEC command
    is applied and we have the results back.

    Example:

     $redis->multi(sub {
      my $tx = shift;
      $tx->incr('some::key')->on_done(sub { print "Final value for incremented key was " . shift . "\n"; });
      $tx->set('other::key => 'test data')
     })->then(sub {
      my ($success, $failure) = @_;
      return Future->fail("Had $failure failures, expecting everything to succeed") if $failure;
      print "$success succeeded\m";
      return Future->done;
     })->retain;

METHODS - Clientside caching

    Enable clientside caching by passing a true value for
    client_side_caching_enabled in "configure" or "new". This is currently
    experimental, and only operates on "get" in Net::Async::Redis::Commands
    requests.

    See https://redis.io/topics/client-side-caching for more details on
    this feature.

 clientside_cache_events

    A Ryu::Source which emits key names as they are invalidated.

    With client-side caching enabled, can be used to monitor which keys are
    changing.

 client_side_cache_ready

    Returns a Future representing the client-side cache connection status,
    if there is one.

 client_side_cache

    Returns the Cache::LRU instance used for the client-side cache.

 is_client_side_cache_enabled

    Returns true if the client-side cache is enabled.

 client_side_cache_size

    Returns the current client-side cache size, as a number of entries.

METHODS - Generic

 keys

 watch_keyspace

    A convenience wrapper around the keyspace notifications API.

    Provides the necessary setup to establish a PSUBSCRIBE subscription on
    the __keyspace@*__ namespace, setting the configuration required for
    this to start emitting events, and then calls $code with each event.

    Note that this will switch the connection into pubsub mode on versions
    of Redis older than 6.0, so it will no longer be available for any
    other activity. This limitation does not apply on Redis 6 or above.

    Use * to listen for all keyspace changes.

    Resolves to a Ryu::Source instance.

 pipeline_depth

    Number of requests awaiting responses before we start queuing. This
    defaults to an arbitrary value of 100 requests.

    Note that this does not apply when in transaction (MULTI) mode.

    See https://redis.io/topics/pipelining for more details on this
    concept.

 opentracing

    Indicates whether OpenTracing::Any support is enabled.

 opentelemetry

    Indicates whether OpenTelemetry support is enabled.

METHODS - Deprecated

    This are still supported, but no longer recommended.

METHODS - Internal

 on_message

    Called for each incoming message.

    Passes off the work to "handle_pubsub_message" or the next queue item,
    depending on whether we're dealing with subscriptions at the moment.

 next_in_pipeline

    Attempt to process next pending request when in pipeline mode.

 on_error_message

    Called when there's an error response.

 handle_pubsub_message

    Deal with an incoming pubsub-related message.

 stream

    Represents the IO::Async::Stream instance for the active Redis
    connection.

 notify_close

    Called when the socket is closed.

 command_label

    Generate a label for the given command list.

 span_for_future

    See https://opentelemetry.io/docs/specs/semconv/database/redis/ for
    current semantic conventions around Redis.

 execute_command

    Queues the given command for execution.

 ryu

    A Ryu::Async instance for source/sink creation.

 future

    Factory method for creating new Future instances.

 wire_protocol

    Returns the Net::Async::Redis::Protocol instance used for encoding and
    decoding messages.

 enable_clientside_cache

    Used internally to prepare for client-side caching: subscribes to the
    invalidation events.

 _init

 _add_to_loop

 retrieve_full_command_list

    Iterates through all commands defined in Redis, extracting the
    information about that command using COMMAND INFO.

    The data is formatted for internal use, converting information such as
    flags into hashrefs for easier lookup.

    This information is also used by "extract_keys_from_command".

    Returns a hashref, where each key represents a method name
    (space-separated commands such as CLUSTER NODES are returned as
    cluster_nodes). The values are a restructured form of
    https://redis.io/commands/command.

 extract_keys_for_command

    Given a command arrayref and a definition for the server, this will
    return a list of any keys found in that command.

    Since the logic for this is slightly slow, we are caching the result
    unless a specific definition is provided: this is an internal
    implementation detail and not something to rely on.

    (the key specification is a relatively new Redis feature - an optimised
    version of this logic will be added to Net::Async::Redis::XS in due
    course, which should reduce the need for caching)

    Returns a list of keys.

 ssl_options

    Extracts the SSL-related options as a hashref for passing to
    $loop->connect.

SEE ALSO

    Some other Redis implementations on CPAN:

      * Mojo::Redis2 - nonblocking, using the Mojolicious framework,
      actively maintained

      * MojoX::Redis - changelog mentions that this was obsoleted by
      Mojo::Redis, although there have been new versions released since
      then

      * RedisDB - another synchronous (blocking) implementation, handles
      pub/sub and autoreconnect

      * Cache::Redis - wrapper around RedisDB

      * Redis::Fast - wraps hiredis, faster than Redis

      * Redis::Jet - also XS-based, docs mention very early development
      stage but appears to support pipelining and can handle newer commands
      via ->command.

      * Redis - synchronous (blocking) implementation, handles pub/sub and
      autoreconnect

      * HiRedis::Raw - another hiredis wrapper

AUTHOR

    Tom Molesworth <TEAM@cpan.org>

CONTRIBUTORS

    With thanks to the following for contributing patches, bug reports,
    tests and feedback:

      * BINARY@cpan.org

      * PEVANS@cpan.org

      * @eyadof

      * Nael Alolwani

      * Marc Frank

      * @pnevins

LICENSE

    Copyright Tom Molesworth and others 2015-2024. Licensed under the same
    terms as Perl itself.

cpanfile  view on Meta::CPAN

requires 'parent', 0;
requires 'curry', 0;
requires 'Path::Tiny', 0;
requires 'Dir::Self', 0;
requires 'Object::Pad', '>= 0.800';
requires 'Future', '>= 0.47';
requires 'Future::AsyncAwait', '>= 0.48';
requires 'Future::Queue', '>= 0.50';
requires 'Syntax::Keyword::Try', '>= 0.21';
requires 'Syntax::Keyword::Match', 0;
requires 'Syntax::Keyword::Dynamically', '>= 0.07';
requires 'IO::Async', 0;
requires 'Ryu', '>= 2.008';
requires 'Ryu::Async', '>= 0.019';
requires 'Ryu', '>= 2.009';
requires 'List::Util', '>= 1.55';
requires 'Log::Any', '>= 1.708';
requires 'URI', 0;
requires 'URI::redis', 0;
requires 'Class::Method::Modifiers', 0;
requires 'Math::Random::Secure', 0;
requires 'Metrics::Any', 0;

# Client-side caching
requires 'Cache::LRU', '>= 0.04';

# Cluster support
requires 'Digest::CRC', '>= 0.22';
requires 'List::BinarySearch::XS', '>= 0.09';
requires 'YAML::XS', '>= 0.88';

requires 'File::ShareDir';
requires 'File::ShareDir::Install';

suggests 'OpenTracing::Any', '>= 1.003';
recommends 'OpenTelemetry', '>= 0.018';
recommends 'OpenTelemetry::SDK', '>= 0.020';

on 'test' => sub {
    requires 'Test::More', '>= 0.98';
    requires 'Test::Fatal', '>= 0.016';
    requires 'Test::HexString', 0;
    requires 'Test::Deep', 0;
    requires 'Test::MockModule', 0;
    requires 'Variable::Disposition', '>= 0.004';
};

on 'develop' => sub {
    requires 'Net::Async::HTTP';
    requires 'Template';
    requires 'HTML::TreeBuilder';
};

dist.ini  view on Meta::CPAN

name    = Net-Async-Redis
author  = Tom Molesworth <TEAM@cpan.org>
license = Perl_5
copyright_holder = Tom Molesworth
copyright_year   = 2015
main_module	= lib/Net/Async/Redis.pm

[@Author::TEAM]
max_target_perl = 5.020

examples/cache-events.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use Net::Async::Redis;
use IO::Async::Loop;

use Future::AsyncAwait;
use Syntax::Keyword::Try;
use Future::Utils qw(fmap_void fmap_concat repeat);

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new(
        client_side_cache_size => 100,
    )
);

await $redis->connected;
$redis->clientside_cache_events
    ->each(sub {
        $log->infof('Key change detected for %s', $_)
    });
$log->infof('Set key');
await $redis->set('clientside.cached' => 1);
$log->infof('Get key');
await $redis->get('clientside.cached');
$log->infof('Apply more changes');
await $redis->set('clientside.cached' => 2);
await $redis->llen('clientside.cached.lpush');
await $redis->lpush('clientside.cached.lpush' => 1,2,3);
await $redis->hgetall('clientside.cached.hset');
await $redis->hset('clientside.cached.hset' => abc => 123);
await $redis->hset('clientside.cached.hset' => def => 123);
await $redis->hset('clientside.cached.hset' => ghi => 123);

await $redis->del(
    'example_stream',
);
await $redis->xgroup(
    CREATE => 'example_stream',
    primary_group => '0',
    'MKSTREAM'
);
my ($item) = await $redis->xreadgroup(
    # Wait up to 2 seconds for a message
    GROUP       => 'primary_group',
    $$,
    COUNT       => 1,
    STREAMS     => 'example_stream',
    '>'
);

await $redis->xadd(
    example_stream => '*',
    key => 'value',
);
my ($item) = await $redis->xread(
    # Wait up to 2 seconds for a message
    COUNT       => 1,
    STREAMS     => 'example_stream',
    '$',
);
await $redis->xadd(
    example_stream => '*',
    key => 'value',
);
$log->infof('Start loop');
$loop->run;

examples/client-cache-streams.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use Net::Async::Redis;
use IO::Async::Loop;

use Future::AsyncAwait;
use Syntax::Keyword::Try;
use Future::Utils qw(fmap_void fmap_concat repeat);

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new(
        client_side_cache_size => 100,
    )
);

await $redis->connected;
$redis->clientside_cache_events
    ->each(sub {
        $log->infof('Key change detected for %s', $_)
    });
$log->infof('Set key');
await $redis->set('clientside.cached' => 1);
$log->infof('Get key');
await $redis->get('clientside.cached');
$log->infof('Apply more changes');
await $redis->set('clientside.cached' => 2);
await $redis->llen('clientside.cached.lpush');
await $redis->lpush('clientside.cached.lpush' => 1,2,3);
await $redis->hgetall('clientside.cached.hset');
await $redis->hset('clientside.cached.hset' => abc => 123);
await $redis->hset('clientside.cached.hset' => def => 123);
await $redis->hset('clientside.cached.hset' => ghi => 123);

await $redis->del(
    'example_stream',
);
await $redis->xgroup(
    CREATE => 'example_stream',
    primary_group => '0',
    'MKSTREAM'
);

$log->infof('Read group');
my ($item) = await $redis->xreadgroup(
    GROUP       => 'primary_group',
    $$,
    COUNT       => 1,
    STREAMS     => 'example_stream',
    '>'
);
$log->infof('item is %s', $item);

unless($item->@*) {
    my ($item) = await $redis->xinfo_stream(
        'example_stream',
    );
    $log->infof('xinfo is %s', $item);
}

$log->infof('xadd');
await $redis->xadd(
    example_stream => '*',
    key => 'value',
);
{
    my ($item) = await $redis->xreadgroup(
        GROUP       => 'primary_group',
        $$,
        COUNT       => 1,
        STREAMS     => 'example_stream',
        '>'
    );
    $log->infof('new item is %s', $item);
    $log->infof('ack %s', my $id = $item->[0][1][0][0]);
    $redis->xack(
        example_stream => 'primary_group',
        $id
    );
}
if(0) {
$log->infof('Read ');
my ($item) = await $redis->xread(
    COUNT       => 1,
    STREAMS     => 'example_stream',
    '$',
);
}
$log->infof('xadd');
await $redis->xadd(
    example_stream => '*',
    key => 'value',
);
$log->infof('Start loop');
$loop->run;

examples/consumer-groups-aa-cluster.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

# Details on the concepts and how the Redis commands work here can be found
# in https://redis.io/topics/streams-intro

use Net::Async::Redis::Cluster;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;

use Future::AsyncAwait;
use Syntax::Keyword::Try;
use Future::Utils qw(fmap_void fmap_concat repeat);

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

my $loop = IO::Async::Loop->new;

# We start with a primary connection (for initial set up and queuing new messages)
# plus several workers (for retrieving messages via XREADGROUP)
my $worker_count = 4;
my @conn = map {;
    $loop->add(
        my $redis = Net::Async::Redis::Cluster->new
    );
    $redis;
} 0..$worker_count;
Future->needs_all(
    map $_->bootstrap(
        host => '127.0.0.1',
        port => '7000',
    ), @conn
)->get;

# This is used to shut down all the various moving parts when we want to quit.
my $active = 1;

# IDs that we expect to receive someday
my $pending = {};
# IDs that we have already processed, note that this can get very large if we
# increase the message limit
my $handled = {};

# Counts of jobs per worker (index 0 will always be zero, since no worker runs there)
my @jobs_processed_by_worker_index = (0) x $worker_count;

# How many messages we have seen
my $message_count = 0;

# We'll continuously generate new messages likely averaging about 0.5ms
# between each one, network roundtrip won't be included since it'll end
# up pipelining multiple requests if we get a backlog.
my $background_publish = do {
    my $idx = 0;
    async sub {
        my ($primary) = @_;
        $log->infof('Starting background publish');
        try {
            while($active) {
                # Don't try to add more to the queue if we think we have more than 1k
                # sat around waiting to be processed.
                my $blocked = (1000 < keys %$pending);
                if($blocked) {
                    # We add in a 0.5s delay if we're blocked
                    await $loop->delay_future(after => (0.5 * $blocked) + (0.001 * rand));
                } else {
                    my $id = (++$idx) . '-' . int(1_000_000 * rand);
                    # Note that we mark this as pending *before* we try sending it.
                    # Doing this in the ->on_done handler introduces a race condition:
                    # Redis might send the new item to a worker before we get the
                    # response back from the XADD command.
                    $pending->{$id} = 1;
                    try {
                        await Future->needs_any(
                            # maximum 0..1ms delay, we want lots of these going out
                            $loop->delay_future(after => 0.001 * rand),
                            $primary->xadd(
                                # * means 'just use an autogenerated ID', which is fine for our needs
                                example_stream => '*',
                                id => $id,
                            )->retain
                            ->without_cancel
                        )
                    } catch {
                        # If something goes wrong, we should drop that item from our
                        # pending list...
                        delete $pending->{$id}
                    }
                }
            }
        } catch {
            $log->errorf('Failed in our background publishing loop, active was %d: %s', $active, $@);
        }
        $log->infof('Ending background publish');
    }
};

(async sub {
#    await Future->wait_all(
#        map $_->connect, @conn
#    );
    $log->debug("All instances connected, starting test");

    my ($primary) = @conn;
    $log->infof('Clearing out old streams');
    # Some of these steps may fail, so we use ->wait_all to ignore the status.
    # On the first run, the streams and groups are not expected to exist.
    await Future->wait_all(
        $primary->xgroup(
            DESTROY => 'example_stream',
            'primary_group'
        )->on_ready(sub { $log->debugf('ready primary_group') }),
        $primary->xgroup(
            DESTROY => 'example_stream',
            'secondary_group',
        )->on_ready(sub { $log->debugf('ready secondary_group') }),
        $primary->del(
            'example_stream',
        )->on_ready(sub { $log->debugf('ready example_stream') }),
    );

    $log->infof('About to add some data to streams');
    my $start = Time::HiRes::time;
    ($background_publish->($primary))->retain;

    $loop->add(
        IO::Async::Timer::Periodic->new(
            interval => 1,
            on_tick => sub {
                my $elapsed = Time::HiRes::time - $start;
                $active = 0 if $message_count > 30_000;
                $log->infof("%d messages after %d seconds, %.2f/sec, pending %d, workers %s",
                    $message_count, $elapsed, $message_count / ($elapsed || 0),
                    0 + keys %$pending,
                    [ @jobs_processed_by_worker_index[1..$worker_count] ]
                );
            }
        )->start
    );
    $log->infof('Set up 2 consumer groups');
    await Future->needs_all(
        $primary->xgroup(
            CREATE => 'example_stream',
            primary_group => '0'
        ),
        $primary->xgroup(
            CREATE => 'example_stream',
            secondary_group => '0'
        ),
    );

    $log->infof('Start workers');
    await fmap_concat(async sub {
        my ($idx) = @_;
        my ($worker_id) = 'worker_' . $idx;
        my ($redis) = $conn[$idx];
        while($active) {
            try {
                my ($item) = await $redis->xreadgroup(
                    # Wait up to 2 seconds for a message
                    BLOCK       => 2000,
                    GROUP       => 'primary_group',
                    $worker_id,
                    COUNT       => 1,
                    STREAMS     => 'example_stream',
                    '>'
                );
                # We'll receive undef if we had no message to process
                next unless $item;

                # Things are returned in a curiously-nested form, unpack that here
                my ($stream, $items) = map @$_, @$item;
                my ($id, $content) = map @$_, @$items;
                my (%data) = @{$content || []};
                $log->debugf('Data was %s for ID %s and data was %s', $stream, $id, \%data);

                # Sanity check what we received
                warn "already handled " . $data{id} if $handled->{$data{id}}++;
                delete $pending->{$data{id}} or warn "deleting thing that should have existed - " . $data{id};

                ++$jobs_processed_by_worker_index[$idx];
                ++$message_count;
                # Claim this message as processed
                await $redis->xack(
                    $stream => 'primary_group',
                    $id
                )
            } catch {
                warn "Failed in read group stuff - $@";
                die $@;
            }
        }
    }, foreach    => [1..$worker_count],
      concurrent => $worker_count);
})->()->get;

examples/consumer-groups-aa.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

# Details on the concepts and how the Redis commands work here can be found
# in https://redis.io/topics/streams-intro

use Net::Async::Redis;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;

use Future::AsyncAwait;
use Syntax::Keyword::Try;
use Future::Utils qw(fmap_void fmap_concat repeat);

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

my $loop = IO::Async::Loop->new;

# We start with a primary connection (for initial set up and queuing new messages)
# plus several workers (for retrieving messages via XREADGROUP)
my $worker_count = 4;
my @conn = map {;
    $loop->add(
        my $redis = Net::Async::Redis->new
    );
    $redis;
} 0..$worker_count;

# This is used to shut down all the various moving parts when we want to quit.
my $active = 1;

# IDs that we expect to receive someday
my $pending = {};
# IDs that we have already processed, note that this can get very large if we
# increase the message limit
my $handled = {};

# Counts of jobs per worker (index 0 will always be zero, since no worker runs there)
my @jobs_processed_by_worker_index = (0) x $worker_count;

# How many messages we have seen
my $message_count = 0;

# We'll continuously generate new messages likely averaging about 0.5ms
# between each one, network roundtrip won't be included since it'll end
# up pipelining multiple requests if we get a backlog.
my $background_publish = do {
    my $idx = 0;
    async sub {
        my ($primary) = @_;
        $log->infof('Starting background publish');
        try {
            while($active) {
                # Don't try to add more to the queue if we think we have more than 1k
                # sat around waiting to be processed.
                my $blocked = (1000 < keys %$pending);
                if($blocked) {
                    # We add in a 0.5s delay if we're blocked
                    await $loop->delay_future(after => (0.5 * $blocked) + (0.001 * rand));
                } else {
                    my $id = (++$idx) . '-' . int(1_000_000 * rand);
                    # Note that we mark this as pending *before* we try sending it.
                    # Doing this in the ->on_done handler introduces a race condition:
                    # Redis might send the new item to a worker before we get the
                    # response back from the XADD command.
                    $pending->{$id} = 1;
                    try {
                        await Future->needs_any(
                            # maximum 0..1ms delay, we want lots of these going out
                            $loop->delay_future(after => 0.001 * rand),
                            $primary->xadd(
                                # * means 'just use an autogenerated ID', which is fine for our needs
                                example_stream => '*',
                                id => $id,
                            )->retain
                            ->without_cancel
                        )
                    } catch {
                        # If something goes wrong, we should drop that item from our
                        # pending list...
                        delete $pending->{$id}
                    }
                }
            }
        } catch {
            $log->errorf('Failed in our background publishing loop, active was %d: %s', $active, $@);
        }
        $log->infof('Ending background publish');
    }
};

(async sub {
    await Future->wait_all(
        map $_->connect, @conn
    );
    $log->debug("All instances connected, starting test");

    my ($primary) = @conn;
    $log->infof('Clearing out old streams');
    # Some of these steps may fail, so we use ->wait_all to ignore the status.
    # On the first run, the streams and groups are not expected to exist.
    await Future->wait_all(
        $primary->xgroup(
            DESTROY => 'example_stream',
            'primary_group'
        )->on_ready(sub { $log->debugf('ready primary_group') }),
        $primary->xgroup(
            DESTROY => 'example_stream',
            'secondary_group',
        )->on_ready(sub { $log->debugf('ready secondary_group') }),
        $primary->del(
            'example_stream',
        )->on_ready(sub { $log->debugf('ready example_stream') }),
    );

    $log->infof('About to add some data to streams');
    my $start = Time::HiRes::time;
    ($background_publish->($primary))->retain;

    $loop->add(
        IO::Async::Timer::Periodic->new(
            interval => 1,
            on_tick => sub {
                my $elapsed = Time::HiRes::time - $start;
                $active = 0 if $message_count > 30_000;
                $log->infof("%d messages after %d seconds, %.2f/sec, pending %d, workers %s",
                    $message_count, $elapsed, $message_count / ($elapsed || 0),
                    0 + keys %$pending,
                    [ @jobs_processed_by_worker_index[1..$worker_count] ]
                );
            }
        )->start
    );
    $log->infof('Set up 2 consumer groups');
    await Future->needs_all(
        $primary->xgroup(
            CREATE => 'example_stream',
            primary_group => '0'
        ),
        $primary->xgroup(
            CREATE => 'example_stream',
            secondary_group => '0'
        ),
    );

    $log->infof('Start workers');
    await fmap_concat(async sub {
        my ($idx) = @_;
        my ($worker_id) = 'worker_' . $idx;
        my ($redis) = $conn[$idx];
        while($active) {
            try {
                my ($item) = await $redis->xreadgroup(
                    # Wait up to 2 seconds for a message
                    BLOCK       => 2000,
                    GROUP       => 'primary_group',
                    $worker_id,
                    COUNT       => 1,
                    STREAMS     => 'example_stream',
                    '>'
                );
                # We'll receive undef if we had no message to process
                next unless $item;

                # Things are returned in a curiously-nested form, unpack that here
                my ($stream, $items) = map @$_, @$item;
                my ($id, $content) = map @$_, @$items;
                my (%data) = @{$content || []};
                $log->debugf('Data was %s for ID %s and data was %s', $stream, $id, \%data);

                # Sanity check what we received
                warn "already handled " . $data{id} if $handled->{$data{id}}++;
                delete $pending->{$data{id}} or warn "deleting thing that should have existed - " . $data{id};

                ++$jobs_processed_by_worker_index[$idx];
                ++$message_count;
                # Claim this message as processed
                await $redis->xack(
                    $stream => 'primary_group',
                    $id
                )
            } catch {
                warn "Failed in read group stuff - $@";
                die $@;
            }
        }
    }, foreach    => [1..$worker_count],
      concurrent => $worker_count);
})->()->get;

examples/consumer-groups.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

# Details on the concepts and how the Redis commands work here can be found
# in https://redis.io/topics/streams-intro

use Net::Async::Redis;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;

use Future::Utils qw(fmap_void fmap_concat repeat);

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

use Future::Utils qw(fmap0);

my $loop = IO::Async::Loop->new;

# We start with a primary connection (for initial set up and queuing new messages)
# plus several workers (for retrieving messages via XREADGROUP)
my $worker_count = 4;
my @conn = map {;
    $loop->add(
        my $redis = Net::Async::Redis->new
    );
    $redis;
} 0..$worker_count;

# This is used to shut down all the various moving parts when we want to quit.
my $active = 1;

# IDs that we expect to receive someday
my $pending = {};
# IDs that we have already processed, note that this can get very large if we
# increase the message limit
my $handled = {};

# Counts of jobs per worker (index 0 will always be zero, since no worker runs there)
my @jobs_processed_by_worker_index = (0) x $worker_count;

# How many messages we have seen
my $message_count = 0;

Future->wait_all(
    map $_->connect, @conn
)->then(sub {
    $log->debug("All instances connected, starting test");

    my ($primary) = @conn;
    $log->infof('Clearing out old streams');
    # Some of these steps may fail, so we use ->wait_all to ignore the status.
    # On the first run, the streams and groups are not expected to exist.
    Future->wait_all(
        $primary->xgroup(
            DESTROY => 'example_stream',
            'primary_group'
        )->on_ready(sub { $log->debugf('ready primary_group') }),
        $primary->xgroup(
            DESTROY => 'example_stream',
            'secondary_group',
        )->on_ready(sub { $log->debugf('ready secondary_group') }),
        $primary->del(
            'example_stream',
        )->on_ready(sub { $log->debugf('ready example_stream') }),
    )->then(sub {
        $log->infof('About to add some data to streams');
        my $start = Time::HiRes::time;
        # We'll continuously generate new messages likely averaging about 0.5ms
        # between each one, network roundtrip won't be included since it'll end
        # up pipelining multiple requests if we get a backlog.
        my $idx = 0;
        (
            repeat {
                # Don't try to add more to the queue if we think we have more than 1k
                # sat around waiting to be processed.
                my $blocked = (1000 < keys %$pending);
                Future->needs_any(
                    # We add in a 0.5s delay if we're blocked, otherwise 0..1ms delay
                    $loop->delay_future(after => (0.5 * $blocked) + (0.001 * rand)),

                    $blocked ? () : do {
                        my $id = (++$idx) . '-' . int(1_000_000 * rand);
                        # Note that we mark this as pending *before* we try sending it.
                        # Doing this in the ->on_done handler introduces a race condition:
                        # Redis might send the new item to a worker before we get the
                        # response back from the XADD command.
                        $pending->{$id} = 1;
                        $primary->xadd(
                            # * means 'just use an autogenerated ID', which is fine for our needs
                            example_stream => '*',
                            id => $id,
                        )->on_fail(sub {
                            # If something goes wrong, we should drop that item from our
                            # pending list...
                            delete $pending->{$id}
                        })
                         ->retain
                         ->without_cancel
                     }
                )
            } while => sub { $active }
        )->retain;

        $loop->add(
            IO::Async::Timer::Periodic->new(
                interval => 1,
                on_tick => sub {
                    my $elapsed = Time::HiRes::time - $start;
                    $active = 0 if $message_count > 30_000;
                    $log->infof("%d messages after %d seconds, %.2f/sec, pending %d, workers %s",
                        $message_count, $elapsed, $message_count / ($elapsed || 0),
                        0 + keys %$pending,
                        [ @jobs_processed_by_worker_index[1..$worker_count] ]
                    );
                }
            )->start
        );
        $log->infof('Set up 2 consumer groups');
        Future->needs_all(
            $primary->xgroup(
                CREATE => 'example_stream',
                primary_group => '0'
            ),
            $primary->xgroup(
                CREATE => 'example_stream',
                secondary_group => '0'
            ),
        )
    })->then(sub {
        $log->infof('Start workers');
        fmap_concat {
            my ($idx) = @_;
            my ($worker_id) = 'worker_' . $idx;
            my ($redis) = $conn[$idx];
            repeat {
                $redis->xreadgroup(
                    # Wait up to 2 seconds for a message
                    BLOCK       => 2000,
                    GROUP       => 'primary_group',
                    $worker_id,
                    COUNT       => 1,
                    STREAMS     => 'example_stream',
                    '>'
                )->then(sub {
                    my ($item) = @_;
                    # We'll receive undef if we had no message to process
                    return Future->done unless $item;

                    # Things are returned in a curiously-nested form, unpack that here
                    my ($stream, $items) = map @$_, @$item;
                    my ($id, $content) = map @$_, @$items;
                    my (%data) = @{$content || []};
                    $log->debugf('Data was %s for ID %s and data was %s', $stream, $id, \%data);

                    # Sanity check what we received
                    warn "already handled " . $data{id} if $handled->{$data{id}}++;
                    delete $pending->{$data{id}} or warn "deleting thing that should have existed - " . $data{id};

                    ++$jobs_processed_by_worker_index[$idx];
                    ++$message_count;
                    # Claim this message as processed
                    $redis->xack(
                        $stream => 'primary_group',
                        $id
                    )
                })->on_fail(sub { $log->errorf('failed future - %s', [ @_ ]) })
            } while => sub { $active }
        } foreach    => [1..$worker_count],
          concurrent => $worker_count
    })
})->get;

examples/hincr.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

use Net::Async::Redis;
use Future::AsyncAwait;
use IO::Async::Loop;

use List::Util qw(min max);

my $loop = IO::Async::Loop->new;
$loop->add(my $redis = Net::Async::Redis->new);

my $shutdown = $loop->new_future;
$loop->watch_signal(
    TERM => sub { $shutdown->done unless $shutdown->is_ready },
    QUIT => sub { $shutdown->done unless $shutdown->is_ready },
);

(async sub {
    await $redis->connect;
    my ($min, $max, $avg, $last);
    my $f = (async sub {
        while(1) {
            await $loop->delay_future(after => 1);
            printf "Ping time - latest %.3fms, min/avg/max %.3fms/%.3fms/%.3fms\n", $last, $min, $avg, $max;
        }
    })->();
    my $count = 0;
    my $sum = 0;
    until($shutdown->is_ready) {
        my $start = Time::HiRes::time();
        await Future->wait_all(
            map { $redis->hincrbyfloat("some_key::" . rand(), rand(), int(100 * rand())) } 1..2000
        );
        my $elapsed = 1000.0 * (Time::HiRes::time() - $start);
        ++$count;
        $sum += $elapsed;
        $min = min($elapsed, $min // ());
        $max = max($elapsed, $max // ());
        $avg = $sum / $count;
        $last = $elapsed;
    }
    $f->cancel;
})->()->get;

examples/incr-ratelimit-aa.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

use Future::AsyncAwait 0.18;
use Syntax::Keyword::Try 0.07;
use Net::Async::Redis;
use IO::Async::Loop::Epoll;
use IO::Async::Timer::Periodic;

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

use Future::Utils qw(fmap0);

$SIG{PIPE} = 'ignore';
my $loop = IO::Async::Loop::Epoll->new;

my %conn;
for my $idx (1..4) {
    $loop->add(
        my $redis = Net::Async::Redis->new
    );
    $conn{$redis} = $redis;
}

my $incr_count = 0;
my $start = Time::HiRes::time;
$loop->add(
    IO::Async::Timer::Periodic->new(
        interval => 2,
        on_tick => sub {
            my $elapsed = Time::HiRes::time - $start;
            $log->infof("%d INCR calls after %d seconds, %.2f/sec",
                $incr_count, $elapsed, $incr_count / ($elapsed || 0)
            );
        }
    )->start
);
Future->wait_all(
    map $_->connect, values %conn
)->then(sub {
    $log->debug("All instances connected, starting test");
    Future->wait_all(
        map {
            my $key = "ratelimit." . $_;
            my $redis = $conn{$_};
            fmap0(async sub {
                await $loop->delay_future(
                    after => 0.001 * rand
                );
                my $count = await $redis->incr($key);
                ++$incr_count;
                $count == 1
                ? await $redis->expire($key => 5)
                : ()
            }, foreach => [1..100000], concurrent => 10)->on_fail(sub { warn "failed for $key - @_" })
        } keys %conn
    )
})->get;

examples/incr-ratelimit.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

use Net::Async::Redis;
use IO::Async::Loop::Epoll;
use IO::Async::Timer::Periodic;

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

use Future::Utils qw(fmap0);

$SIG{PIPE} = 'ignore';
my $loop = IO::Async::Loop::Epoll->new;

my %conn;
for my $idx (1..4) {
    $loop->add(
        my $redis = Net::Async::Redis->new
    );
    $conn{$redis} = $redis;
}

my $incr_count = 0;
my $start = Time::HiRes::time;
$loop->add(
    IO::Async::Timer::Periodic->new(
        interval => 2,
        on_tick => sub {
            my $elapsed = Time::HiRes::time - $start;
            $log->infof("%d INCR calls after %d seconds, %.2f/sec",
                $incr_count, $elapsed, $incr_count / ($elapsed || 0)
            );
        }
    )->start
);
Future->wait_all(
    map $_->connect, values %conn
)->then(sub {
    $log->debug("All instances connected, starting test");
    Future->wait_all(
        map {
            my $key = "ratelimit." . $_;
            my $redis = $conn{$_};
            (fmap0 {
                $loop->delay_future(
                    after => 0.001 * rand
                )->then(sub {
                    $redis->incr($key)->then(sub {
                        my ($count) = @_;
                        ++$incr_count;
                        $count == 1
                        ? $redis->expire($key => 5)
                        : Future->done
                    })
                })
            } foreach => [1..100000], concurrent => 10)->on_fail(sub { warn "failed for $key - @_" })
        } keys %conn
    )
})->get;

examples/job-benchmark.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

job-benchmark.pl - test performance of some job queuing implementations

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

no indirect;

use Net::Async::Redis;
use IO::Async::Loop;
use Future::Utils qw(repeat fmap0);
use Future qw(call);

use Math::Random::Secure qw(irand);
use Getopt::Long;
use Pod::Usage;
use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';
use Test::More;

my %config;
GetOptions(
    'u|uri'       => \$config{uri},
    'p|port'      => \$config{port},
    'h|host'      => \$config{host},
    'a|auth'      => \$config{auth},
    'h|help'      => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

STDOUT->autoflush(1);
$SIG{PIPE} = 'IGNORE';

sub uuid {
    # UUIDv4 (random)
    return sprintf '%04x%04x-%04x-%04x-%02x%02x-%04x%04x%04x',
        (map { Math::Random::Secure::irand(2**16) } 1..3),
        (Math::Random::Secure::irand(2**16) & 0x0FFF) | 0x4000,
        (Math::Random::Secure::irand(2**8)) & 0xBF,
        (Math::Random::Secure::irand(2**8)),
        (map { Math::Random::Secure::irand(2**16) } 1..3)
}

my @child;
for my $idx (1..4) {
    if(my $pid = fork // die) {
        push @child, $pid;
    } else {
        # child
        my $client_id = uuid();
        local $log->{context}{client_id} = $client_id;
        undef $IO::Async::Loop::ONE_TRUE_LOOP;
        my $loop = IO::Async::Loop->new;

        # Our client has a single Redis connection, a UUID to
        # represent the client, and expects to see job announcements
        # on the pubsub channel client::$client_id. For each
        # announcement, the payload represents the job ID, and we get
        # the actual details from the job hash.
        $loop->add(
            my $client = Net::Async::Redis->new(
                client_name => 'client:' . $client_id,
            )
        );
        $loop->add(
            my $subscriber = Net::Async::Redis->new(
                client_name => 'subscriber:' . $client_id,
            )
        );
        $loop->add(
            my $submitter = Net::Async::Redis->new(
                client_name => 'submitter:' . $client_id,
            )
        );
        my $processed = 0;
        my $start = Time::HiRes::time;
        $loop->add(
            my $timer = IO::Async::Timer::Periodic->new(
                interval => 1,
                on_tick => sub {
                    my $runtime = Time::HiRes::time() - $start;
                    $log->infof('Client %s has %d processed, %.2f/sec', $client_id, $processed, $processed / ($runtime || 1));
                }
            )
        );
        $timer->start;

        $log->infof("Client awaiting Redis connections");
        Future->wait_all(
            $client->connect,
            $submitter->connect,
            $subscriber->connect
        )->get;
        $log->infof("Subscribing to notifications");
        my $count = 0;
        $subscriber->subscribe('client::' . $client_id)
            ->then(sub {
                my ($sub) = @_;
                # Every time someone tells us they finished a job, we pull back the details
                # and check the results
                my %pending_job;
                $sub->events
                    ->map('payload')
                    ->each(sub {
                        my ($id) = @_;
                        # $log->infof('Completion notification for %s', $id);
                        $client->hmget('job::' . $id, qw(left right result))->then(sub {
                            my ($x, $y, $result) = @{$_[0]};
                            my $expected = delete $pending_job{$id};
                            die 'invalid left' unless $x eq $expected->{left};
                            die 'invalid right' unless $y eq $expected->{right};
                            die 'invalid result' unless $result eq $x + $y;
                            ++$processed;
                            # $log->infof('Job result for %s was %s', $id, $result);
                            my $f = $client->del('job::' . $id);
                            $expected->{completion}->done($result);
                            $f
                        })->on_fail(sub { $log->errorf("A failure! %s", shift) })->retain;
                    });

                $log->infof("Redis connections established, starting client operations");
                my $queue = 'jobs::pending';
                (fmap0 {
                    my $f = $loop->new_future;
                    $submitter->multi(sub {
                        my $tx = shift;
                        my $id = uuid();
                        my $x = irand(10000);
                        my $y = irand(10000);
                        $pending_job{$id} = {
                            left => $x,
                            right => $y,
                            completion => $f,
                        };
                        $tx->hmset(
                            'job::' . $id,
                            reply => $client_id,
                            left  => $x,
                            right => $y
                        );
                        $tx->lpush($queue, $id);
                    })->then(sub { $f }),
                } generate => sub { return 1 if ++$count < 10000; return })
            })->get;
        $log->infof('Client completed');
        exit 0;
    }
}
$log->infof('Total of %d child workers', 0 + @child);

my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new(
        client_name => 'server',
    )
);
$loop->add(
    my $handler = Net::Async::Redis->new(
        client_name => 'handler',
    )
);
$redis->configure(map { defined $config{$_} ? ($_ => $config{$_}) : () } keys %config);

Future->wait_any(
    Future->needs_all(
        $redis->connect,
        $handler->connect,
    ),
    ($timeout ? $loop->timeout_future(after => $timeout) : ()),
)->get;
my $processed = 0;
my $start = Time::HiRes::time;
$loop->add(
    my $timer = IO::Async::Timer::Periodic->new(
        interval => 1,
        on_tick => sub {
            my $runtime = Time::HiRes::time() - $start;
            $log->infof('Server %d processed, %.2f/sec', $processed, $processed / ($runtime || 1));
        }
    )
);
$timer->start;

sub worker {
    my (%details) = @_;
    return Future->fail('missing parameter `left`') unless defined $details{left};
    return Future->fail('missing parameter `right`') unless defined $details{right};
    return Future->done($details{left} + $details{right});
}

my $src_queue = 'jobs::pending';
my $dst_queue = 'jobs::active';
$redis->del($src_queue)->get;
$redis->del($dst_queue)->get;
(fmap0 {
    $redis->brpoplpush(
        $src_queue => $dst_queue, 0
    )->then(sub {
        my ($id, $queue, @details) = @_;
        $log->debugf('Received job %s from queue %s', $id, $queue);
        $redis->hgetall('job::' . $id)->then(sub {
            my ($items) = @_;
            my %details = @$items;
            worker(%details)->then(sub {
                my ($result) = @_;
                $handler->multi(sub {
                    my $tx = shift;
                    ++$processed;
                    $tx->hset('job::' . $id, result => $result);
                    # warn "sending to client::$details{reply}";
                    $tx->publish('client::' . $details{reply}, $id)->on_done(sub {
                        warn "no subscribers for $id => $details{reply} - @_" unless $_[0];
                    });
                    $tx->lrem($dst_queue => 1, $id);
                })
            })
        })
    })->on_fail(sub { warn "failed to process jobs - @_" })
} generate => sub { 1 }, concurrent => 1)->get;

exit 0;

examples/job-hash-submit.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

job-hash-submit.pl - simple Redis job worker with data stored in hashes

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

use Net::Async::Redis;
use IO::Async::Loop;
use Future::Utils qw(repeat);

use Getopt::Long;
use Pod::Usage;
use Math::Random::Secure qw(irand);

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);
$loop->add(
    my $subscriber = Net::Async::Redis->new
);

sub uuid {
    return sprintf '%04x%04x-%04x-%04x-%02x%02x-%04x%04x%04x',
        (map { Math::Random::Secure::irand(2**16) } 1..3),
        (Math::Random::Secure::irand(2**16) & 0x0FFF) | 0x4000,
        (Math::Random::Secure::irand(2**8)) & 0xBF,
        (Math::Random::Secure::irand(2**8)),
        (map { Math::Random::Secure::irand(2**16) } 1..3)
}

Future->wait_any(
    Future->needs_all(
        $redis->connect,
        $subscriber->connect,
    ),
    $loop->timeout_future(after => $timeout),
)->get;

my $client_id = uuid();
$subscriber->subscribe('client::' . $client_id)
    ->then(sub {
        my ($sub) = @_;
        my $completion = $sub->events
            ->take(1)
            ->map('payload')
            ->say
            ->completed;
        my $queue = 'jobs::pending';
        Future->needs_all(
            $redis->multi(sub {
                my $tx = shift;
                my $id = uuid();
                $tx->hset('job::' . $id, reply => $client_id);
                $tx->lpush($queue, $id);
            }),
            $completion
        )
    })->get;

examples/job-hash-worker.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

job-worker.pl - simple Redis job worker with data stored in hashes

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

use Net::Async::Redis;
use IO::Async::Loop;
use Future::Utils qw(repeat);

use Getopt::Long;
use Pod::Usage;
use Log::Any qw($log);

my %config;
GetOptions(
    'u|uri'       => \$config{uri},
    'p|port'      => \$config{port},
    'h|host'      => \$config{host},
    'a|auth'      => \$config{auth},
    'h|help'      => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);
$redis->configure(map { defined $config{$_} ? ($_ => $config{$_}) : () } keys %config);

Future->wait_any(
    $redis->connect,
    ($timeout ? $loop->timeout_future(after => $timeout) : ()),
)->get;

my $src_queue = 'jobs::pending';
my $dst_queue = 'jobs::active';
STDOUT->autoflush(1);
print "Awaiting items...\n";
(repeat {
    $redis->brpoplpush(
        $src_queue => $dst_queue, 0
    )->then(sub {
        my ($id, $queue, @details) = @_;
        $log->debugf('Received job %s from queue %s', $id, $queue);
        $redis->hgetall('job::' . $id)->then(sub {
            my ($items) = @_;
            warn "Have - $_\n" for @$items;
            my %details = @$items;
            $redis->multi(sub {
                my $tx = shift;
                $tx->publish('client::' . $details{reply}, 'done');
                $tx->lrem($dst_queue => 1, $id);
                $tx->del('job::' . $id);
            })
        })
    })
} while => sub { 1 })->get;

examples/job-queue.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use Net::Async::Redis;
use IO::Async::Loop::Epoll;
use IO::Async::Timer::Periodic;

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'debug';

use Future::Utils qw(fmap0);

my $loop = IO::Async::Loop::Epoll->new;

my %conn;
for my $idx (1..1000) {
    $loop->add(
        my $redis = Net::Async::Redis->new
    );
    $conn{$redis} = $redis;
}

my $incr_count = 0;
my $start = Time::HiRes::time;
$loop->add(
    IO::Async::Timer::Periodic->new(
        interval => 2,
        on_tick => sub {
            my $elapsed = Time::HiRes::time - $start;
            $log->infof("%d INCR calls after %d seconds, %.2f/sec",
                $incr_count, $elapsed, $incr_count / ($elapsed || 0)
            );
        }
    )->start
);
Future->wait_all(
    map $_->connect, values %conn
)->then(sub {
    $log->debug("All instances connected, starting test");
    Future->wait_all(
        map {
            my $key = "ratelimit." . $_;
            my $redis = $conn{$_};
            (fmap0 {
                $loop->delay_future(
                    after => 0.025 * rand
                )->then(sub {
                    $log->debugf("Incr $key");
                    $redis->incr($key)->then(sub {
                        my ($count) = @_;
                        ++$incr_count;
                        $log->debugf("%s => %d", $key => $count);
                        $count == 1
                        ? $redis->expire($key => 5)
                        : Future->done
                    }, sub { $log->errorf("Error! %s", @_) })
                }, sub { warn "here? @_" })
            } foreach => [1..10000], concurrent => 10)
        } keys %conn
    )
})->get;

examples/job-submit.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

job-worker.pl - simple Redis job worker

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

use Net::Async::Redis;
use IO::Async::Loop;

use Getopt::Long;
use Pod::Usage;

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

my (@details) = @ARGV or die 'need at least one job to submit';

Future->wait_any(
    $redis->connect,
    $loop->timeout_future(after => $timeout),
)->get;

my $src_queue = 'jobs::pending';
STDOUT->autoflush(1);
print "Submitting job...\n";
my ($job) = $redis->lpush(
    $src_queue, @details
)->get;

print "Have job - $job\n";

examples/job-worker.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

job-worker.pl - simple Redis job worker where each job is fully contained in a list item

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

use Net::Async::Redis;
use IO::Async::Loop;
use Future::Utils qw(repeat);

use Getopt::Long;
use Pod::Usage;

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

my (@channels) = @ARGV or die 'need at least one channel to listen on';

Future->wait_any(
    $redis->connect,
    $loop->timeout_future(after => $timeout),
)->get;

my $src_queue = 'jobs::pending';
my $dst_queue = 'jobs::active';
STDOUT->autoflush(1);
print "Awaiting items...\n";
(repeat {
    $redis->brpoplpush(
        $src_queue => $dst_queue, 0
    )->then(sub {
        my ($id, @details) = @_;
        print "Have job - $id with details @details\n";
        warn for @details;
        $redis->lrem($dst_queue => 1, $id);
    })
} while => sub { 1 })->get;

examples/lrange.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

=head1 NAME

lrange.pl

=head1 DESCRIPTION

Combined LRANGE/LTRIM.

=cut

use Future::AsyncAwait 0.28;
use Syntax::Keyword::Try 0.07;
use Net::Async::Redis;
use IO::Async::Loop::Epoll;
use IO::Async::Timer::Periodic;

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

use POSIX qw(floor);
use List::Util qw(sum0);
use Future::Utils qw(fmap0);

my $loop = IO::Async::Loop::Epoll->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);
$loop->add(
    my $sub = Net::Async::Redis->new
);

use constant REDIS_KEY => 'example::lrange';

(async sub {
    await $redis->connected;
    await $redis->del(REDIS_KEY);
    await $redis->rpush(REDIS_KEY, 'a'..'z');
    my $el = await $redis->lrange(REDIS_KEY, 0, 9);
    $log->infof('Elements are %s', $el);
    await $redis->ltrim(REDIS_KEY, 10, -1);
    my $remaining = await $redis->lrange(REDIS_KEY, 0, -1);
    $log->infof('Remaining %s', $remaining);
})->()->get;


examples/moving-sum.pl  view on Meta::CPAN

#!/usr/bin/env perl 
use strict;
use warnings;

=head1 NAME

moving-sum.pl

=head1 DESCRIPTION

Provides a simple example for managing "moving-sum" calculations with Redis:

=over 4

=item * total value tracked in one key

=item * sorted sets used to record elements

=item * total is incremented as new values are added

=item * as old values drop out of the desired window, the total is decremented accordingly

=back

The tracked total uses L<Net::Async::Redis::Commands/incrbyfloat> to provide an atomic update
when the new values are added and removed. This can be tracked using keyspace notifications,
or simply polled if the update interval is low enough.

=cut

use Future::AsyncAwait 0.28;
use Syntax::Keyword::Try 0.07;
use Net::Async::Redis;
use IO::Async::Loop::Epoll;
use IO::Async::Timer::Periodic;

use Log::Any qw($log);
use Log::Any::Adapter qw(Stdout), log_level => 'info';

use POSIX qw(floor);
use List::Util qw(sum0);
use Future::Utils qw(fmap0);

$SIG{PIPE} = 'ignore';
my $loop = IO::Async::Loop::Epoll->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);
$loop->add(
    my $sub = Net::Async::Redis->new
);

use constant REDIS_KEY_PREFIX => 'example::moving_sum::';

(async sub {
    await $redis->connected;
    my $id = 0;
    my $total = 0;
    my $add = async sub {
        my ($item) = @_;
        my $time = Time::HiRes::time;
        my $k = REDIS_KEY_PREFIX . 'elements';
        my $score = floor($time * 100);

        # First we add the new item and update our local score tracking
        await $redis->zadd($k, $score => join ':', $id++, $item);

        # Now we find any older items...
        my $target = $score - 500;
        my @el = (await $redis->zrangebyscore($k => 0, $target))->@*;
        $log->debugf('Have elements: %s', \@el);
        my $diff = $item - sum0 map { /:([0-9]+)$/ } @el;
        $total += $diff;
        await $redis->incrbyfloat(REDIS_KEY_PREFIX . 'current', $diff);
        # ... and clear those out
        await $redis->zremrangebyscore($k => 0, $target);
        my @remaining = (await $redis->zrange($k => 0, -1))->@*;
        $log->debugf('Remaining: %s', \@remaining);
        my $expected = sum0 map { /:([0-9]+)$/ } @remaining;
        $log->infof('Resulting score was %d and we expected %d', $total, $expected);
        $log->errorf('Score mismatch - somehow our internal tracking does not match what was in Redis, difference is %s', $expected - $total) unless $expected == $total;
    };
    my $f = (async sub {
        my $k = REDIS_KEY_PREFIX . 'elements';
        await $redis->del(map { REDIS_KEY_PREFIX . $_ } qw(elements current));
        while(1) {
            await $loop->delay_future(after => 0.002 + rand(0.11));
            await $add->(int rand(100));
        }
    })->();
    (async sub {
        $log->infof('Set up current update notifications');
        await $sub->watch_keyspace(REDIS_KEY_PREFIX . 'current', async sub {
            my ($op, $k) = @_;
            my $v = await $redis->get($k);
            $log->infof('Notified current update: %s = %s', $k, $v);
        });
    })->()->retain;
    await $f;
})->()->get;

examples/multi.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

use Net::Async::Redis;
use IO::Async::Loop;

use Getopt::Long;

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

Future->wait_any(
    $redis->connect
        ->then(sub {
            $redis->multi(sub {
                my $tx = shift;
                $tx->incr('test::some_key')->on_done(sub {
                    print "Incr - @_\n";
                });
                $tx->set('test::other_key' => 'key value')->on_done(sub {
                    print "Set - @_\n";
                });
                $tx->del('test::deleted_key')->on_done(sub {
                    print "Del - @_\n";
                });
            })
        })->on_done(sub {
            say $_ // '<undef>' for @_;
        }),
    $loop->timeout_future(after => $timeout),
)->get;

examples/ping.pl  view on Meta::CPAN

use strict;
use warnings;

use Net::Async::Redis;
use Future::AsyncAwait;
use IO::Async::Loop;

use List::Util qw(min max);

my $loop = IO::Async::Loop->new;
$loop->add(my $redis = Net::Async::Redis->new);

my $shutdown = $loop->new_future;
$loop->watch_signal(
    TERM => sub { $shutdown->done unless $shutdown->is_ready },
    QUIT => sub { $shutdown->done unless $shutdown->is_ready },
);

(async sub {
    await $redis->connect;
    my ($min, $max, $avg, $last);
    my $f = (async sub {
        while(1) {
            await $loop->delay_future(after => 1);
            printf "Ping time - latest %.3fms, min/avg/max %.3fms/%.3fms/%.3fms\n", $last, $min, $avg, $max;
        }
    })->();
    my $count = 0;
    my $sum = 0;
    until($shutdown->is_ready) {
        my $start = Time::HiRes::time();
        await $redis->ping;
        my $elapsed = 1000.0 * (Time::HiRes::time() - $start);
        ++$count;
        $sum += $elapsed;
        $min = min($elapsed, $min // ());
        $max = max($elapsed, $max // ());
        $avg = $sum / $count;
        $last = $elapsed;
    }
    $f->cancel;
})->()->get;

examples/pub.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

pub.pl - simple Redis publish example

=head1 SYNOPSIS

 pub.pl channel_name message_content
 pub.pl -h localhost channel_name message_content
 pub.pl -a some_password channel_name message_content

=cut

use Net::Async::Redis;
use IO::Async::Loop;

use Getopt::Long;
use Pod::Usage;

use Log::Any::Adapter qw(Stderr), log_level => 'trace';

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

my ($channel, $msg) = @ARGV or die 'need a channel';
Future->wait_any(
    $redis->connect
        ->then(sub {
            $redis->publish($channel => $msg)
        })->on_done(sub {
            say $_ // '<undef>' for @_;
        }),
    $loop->timeout_future(after => $timeout),
)->get;

examples/redis-cli  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

use Net::Async::Redis;
use IO::Async::Loop;

use Getopt::Long;

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

my ($cmd, @args) = @ARGV or die 'need a command';
$cmd =~ tr/ /_/;
$cmd = lc $cmd;
die 'Unknown command ' . $cmd unless $redis->can($cmd);

Future->wait_any(
    $redis->connect
        ->then(sub {
            $redis->$cmd(@args)
        })->on_done(sub {
            say $_ // '<undef>' for @_;
        }),
    $loop->timeout_future(after => $timeout),
)->get;

examples/sub.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;

use feature qw(say);

=head1 NAME

sub.pl - simple Redis subscription example

=head1 SYNOPSIS

 sub.pl channel_name other_channel third_channel

=cut

use Net::Async::Redis;
use IO::Async::Loop;

use Getopt::Long;
use Pod::Usage;

use Log::Any::Adapter qw(Stdout), log_level => 'trace';

GetOptions(
    'p|port' => \my $port,
    'h|host' => \my $host,
    'a|auth' => \my $auth,
    'h|help' => \my $help,
    't|timeout=i' => \my $timeout,
) or pod2usage(1);
pod2usage(2) if $help;

# Defaults
$timeout //= 30;
$host //= 'localhost';
$port //= 6379;

$SIG{PIPE} = 'IGNORE';
my $loop = IO::Async::Loop->new;

$loop->add(
    my $redis = Net::Async::Redis->new
);

my (@channels) = @ARGV or die 'need at least one channel to listen on';

Future->wait_any(
    $redis->connect
        ->then(sub {
            $redis->subscribe(@channels)
                ->then(sub {
                    Future->needs_all(
                        map $_->events
                            ->sprintf_methods('%s => %s', qw(channel payload))
                            ->say
                            ->completed
                            ->on_done(sub {
                                say $_ // '<undef>' for @_;
                            }), @_
                    )
                })
        }),
    $loop->timeout_future(after => $timeout),
)->get;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.869 second using v1.00-cache-2.02-grep-82fe00e-cpan-1925d2aa809 )