Thread::Queue::Multiplex is a subclass of Thread::Queue::Duplex aka TQD which implements a "publish and subscribe" communications model for threads. Subscribers register with the queue, which registers either the provided subscriber ID, or, if no ID is provided, 1 plus the TID of the subscriber's thread, as a subscriber ID. As the publisher publishes messages to the queue, each subscriber receives a copy of the message. If the publication is not simplex, the publisher expects all subscribers to read and respond to the message; otherwise, the publisher simply continues its processing. Thread::Queue::Multiplex provides publish() method counterparts for all the Thread::Queue::Duplex enqueue() methods, e.g., publish_simplex(), publish_urgent(), publish_and_wait(), publish_and_wait_until(), etc.
Subscribers receive and reply to messages using the existing TQD dequeue() and respond() methods. In addition, modified versions of the enqueue() methods are provided to publishers to permit directing a message to a single subscriber, or subset of subscribers, by specifying the scalar subscriber ID (for single subscriber messages), or an arrayref of unique subscriber ID's (for multi-subscriber messages).
Thread::Queue::Multiplex subclass overrides some of the internal behavior of Thread::Queue::Duplex by
* adding a shared hash to hold the list of unique subscriber ID's (provided either explicitly with subscribe(), or derived from 1 + threads->self()->tid() when the subscriber subscribe()s) mapped to a threads::shared array to hold ID's of messages published to the subscriber. (Note: tid() + 1 is used in order to avoid an ID of zero for the root thread).
* adding a shared hash to hold the list of message ID's mapped to a threads::shared array to containing [message ID, flags, refcount, @params], where flags indicates the urgent and/or simplex status of the request, and refcount indicates the number of subscribers assigned to the request. A special refcount value of -1 indicates that only the first subscriber to retrieve/process the request should respond (to mimic the behavior of Thread::Queueu::Duplex), which is specified by the publisher using any of the enqueue methods with a subscriber ID of -1.
* adding a shared hash to hold the list of message ID's mapped to a threads::shared hash containing a reference count of subscribers for the message, and a map of subscriber IDs to their responses. This "pending response" hash is used to accumulate all subscriber responses; when the reference count of a message is zero, the hash of responses is posted to the final response message mapping hash.
* adding a shared hash to hold the map of thread ID's to subscriber ID's. Note: Each thread can have only a single subscriber.
* changing the message mapping hash to map a unique message ID to a hash of unique subscriber ID's, mapped to their response (if any), i.e.,
$msg_map = {
$msgid => {
$subID1 => $subID1_response,
$subID2 => $subID2_response,
etc.
}
}
* when the publisher dequeues the response to a message, it receives a copy of the subscriber mapping hash, and is responsible for iterating over the hash to read each subscriber's results
A normal processing sequence for Thread::Queue::Multiplex might be:
#
# Thread A (the client):
#
...marshal parameters for a coroutine...
my $id = $tqm->publish('function_name', @paramlist);
my $results = $tqm->dequeue_response($id);
while (($subID, $subresult) = each %$results) {
...process $results...
}
#
# Thread B (a subscriber):
#
while (1) {
my $call = $tqm->dequeue;
my ($id, $func, @params) = @$call;
$tqm->respond($id, $self->$func(@params));
}
SYNOPSIS
use Thread::Queue::Multiplex;
#
# create new queue, limiting the max pending requests
# to 20
#
my $tqm = Thread::Queue::Multiplex->new(MaxPending => 20);
#
# register as a subscriber
#
$tqm->subscribe('myID');
#
# unregister as a subscriber
#
$tqm->unsubscribe();
#
# wait for $count subscribers to register
#
$tqm->wait_for_subscribers($count, $timeout);
#
# get the list of current subscribers ID's
#
my @subids = $tqm->get_subscribers();
#
# change the max pending limit
#
$tqm->set_max_pending($limit);
#
# enqueue elements, returning a unique queue ID
# (used in the client)
#
my $id = $tqm->publish("foo", "bar");
#
# publish elements, and wait for a response
# (used in the client)
#
my $resp = $tqm->publish_and_wait("foo", "bar");
#
# publish elements, and wait for a response
# until $timeout secs (used in the client)
#
my $resp = $tqm->publish_and_wait_until($timeout, "foo", "bar");
#
# publish elements at head of queue, returning a
# unique queue ID (used in the client)
#
my $id = $tqm->publish_urgent("foo", "bar");
#
# publish elements at head of queue and wait for response
#
my $resp = $tqm->publish_urgent_and_wait("foo", "bar");
#
# publish elements at head of queue and wait for
# response until $timeout secs
#
my $resp = $tqm->publish_urgent_and_wait_until($timeout, "foo", "bar");
#
# publish elements for simplex operation (no response)
# returning the queue object
#
$tqm->publish_simplex("foo", "bar");
$tqm->publish_simplex_urgent("foo", "bar");
#
#########################################################
#
# subscribers use the existing TQD dequeue() methods
#
#######################################################
#
# modified versions of the TQD base enqueue methods
# to support directed messaging to a single subscriber
# or group of subscribers
#
#######################################################
#
# enqueue elements to a specific subscriber, returning
# a unique queue ID (used in the client)
#
my $id = $tqm->enqueue($subID, "foo", "bar");
#
# enqueue elements to 2 subscribers, and wait for a response
# (used in the client)
#
my $resp = $tqm->enqueue_and_wait([ $subID1, $subID2 ], "foo", "bar");
#
# enqueue elements, and wait for a response
# until $timeout secs (used in the client)
#
my $resp = $tqm->enqueue_and_wait_until($subID, $timeout, "foo", "bar");
#
# SEE Thread::Queue::Duplex for the various publisher enqueue()
# and wait() methods,
# and the subscriber dequeue() methods
#
Product's homepage
Requirements:
· Perl