The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__

#if defined ZMQ_HAVE_NORM

#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"

#include <normApi.h>

namespace zmq
{
    class io_thread_t;
    class session_base_t;

    class norm_engine_t : public io_object_t, public i_engine
    {
        public:
            norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
            ~norm_engine_t ();

            // create NORM instance, session, etc
            int init(const char* network_, bool send, bool recv);
            void shutdown();

            //  i_engine interface implementation.
            //  Plug the engine to the session.
            virtual void plug (zmq::io_thread_t *io_thread_,
                               class session_base_t *session_);

            //  Terminate and deallocate the engine. Note that 'detached'
            //  events are not fired on termination.
            virtual void terminate ();

            //  This method is called by the session to signalise that more
            //  messages can be written to the pipe.
            virtual void restart_input ();

            //  This method is called by the session to signalise that there
            //  are messages to send available.
            virtual void restart_output ();

            virtual void zap_msg_available () {};

            virtual const char *get_endpoint () const;

            // i_poll_events interface implementation.
            // (we only need in_event() for NormEvent notification)
            // (i.e., don't have any output events or timers (yet))
            void in_event ();

        private:
            void unplug();
            void send_data();
            void recv_data(NormObjectHandle stream);


            enum {BUFFER_SIZE = 2048};

            // Used to keep track of streams from multiple senders
            class NormRxStreamState
            {
                public:
                    NormRxStreamState(NormObjectHandle normStream,
                                      int64_t          maxMsgSize);
                    ~NormRxStreamState();

                    NormObjectHandle GetStreamHandle() const
                        {return norm_stream;}

                    bool Init();

                    void SetRxReady(bool state)
                        {rx_ready = state;}
                    bool IsRxReady() const
                        {return rx_ready;}

                    void SetSync(bool state)
                        {in_sync = state;}
                    bool InSync() const
                        {return in_sync;}

                    // These are used to feed data to decoder
                    // and its underlying "msg" buffer
                    char* AccessBuffer()
                        {return (char*)(buffer_ptr + buffer_count);}
                    size_t GetBytesNeeded() const
                        {return (buffer_size - buffer_count);}
                    void IncrementBufferCount(size_t count)
                        {buffer_count += count;}
                    msg_t* AccessMsg()
                        {return zmq_decoder->msg();}
                    // This invokes the decoder "decode" method
                    // returning 0 if more data is needed,
                    // 1 if the message is complete, If an error
                    // occurs the 'sync' is dropped and the
                    // decoder re-initialized
                    int Decode();

                    class List
                    {
                        public:
                            List();
                            ~List();

                            void Append(NormRxStreamState& item);
                            void Remove(NormRxStreamState& item);

                            bool IsEmpty() const
                                {return (NULL == head);}

                            void Destroy();

                            class Iterator
                            {
                                public:
                                    Iterator(const List& list);
                                    NormRxStreamState* GetNextItem();
                                private:
                                    NormRxStreamState* next_item;
                            };
                            friend class Iterator;

                        private:
                            NormRxStreamState*  head;
                            NormRxStreamState*  tail;

                    };  // end class zmq::norm_engine_t::NormRxStreamState::List

                    friend class List;

                    List* AccessList()
                        {return list;}


                private:
                    NormObjectHandle            norm_stream;
                    int64_t                     max_msg_size;
                    bool                        in_sync;
                    bool                        rx_ready;
                    v2_decoder_t*               zmq_decoder;
                    bool                        skip_norm_sync;
                    unsigned char*              buffer_ptr;
                    size_t                      buffer_size;
                    size_t                      buffer_count;

                    NormRxStreamState*          prev;
                    NormRxStreamState*          next;
                    NormRxStreamState::List*    list;

            };  // end class zmq::norm_engine_t::NormRxStreamState

            session_base_t*         zmq_session;
            options_t               options;
            NormInstanceHandle      norm_instance;
            handle_t                norm_descriptor_handle;
            NormSessionHandle       norm_session;
            bool                    is_sender;
            bool                    is_receiver;
            // Sender state
            msg_t                   tx_msg;
            v2_encoder_t            zmq_encoder;    // for tx messages (we use v2 for now)
            NormObjectHandle        norm_tx_stream;
            bool                    tx_first_msg;
            bool                    tx_more_bit;
            bool                    zmq_output_ready; // zmq has msg(s) to send
            bool                    norm_tx_ready;    // norm has tx queue vacancy
            // TBD - maybe don't need buffer if can access zmq message buffer directly?
            char                    tx_buffer[BUFFER_SIZE];
            unsigned int            tx_index;
            unsigned int            tx_len;

            // Receiver state
            // Lists of norm rx streams from remote senders
            bool                    zmq_input_ready; // zmq ready to receive msg(s)
            NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
            NormRxStreamState::List rx_ready_list;   // rx streams ready for NormStreamRead()
            NormRxStreamState::List msg_ready_list;  // rx streams w/ msg ready for push to zmq


    };  // end class norm_engine_t
}

#endif // ZMQ_HAVE_NORM

#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__