<html>
<head>
<base href="https://wiki.asterisk.org/wiki">
<link rel="stylesheet" href="/wiki/s/en/2176/25/9/_/styles/combined.css?spaceKey=AST&forWysiwyg=true" type="text/css">
</head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
<h2><a href="https://wiki.asterisk.org/wiki/display/AST/Stasis+Message+Bus">Stasis Message Bus</a></h2>
<h4>Page <b>edited</b> by <a href="https://wiki.asterisk.org/wiki/display/~dlee">David M. Lee</a>
</h4>
<div id="versionComment">
<b>Comment:</b>
Updated according to https://reviewboard.asterisk.org/r/2437/<br />
</div>
<br/>
<h4>Changes (143)</h4>
<div id="page-diffs">
<table class="diff" cellpadding="0" cellspacing="0">
<tr><td class="diff-unchanged" >{numberedheadings} <br> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{note} <br>This Specification is still in draft form. <br>{note} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">While the basic API's for the Stasis Message Bus are [documented using Doxygen|http://doxygen.asterisk.org/trunk/stasis.html], there's still a lot to be said about how to use those API's within Asterisk. <br></td></tr>
<tr><td class="diff-unchanged" > <br>{toc} <br> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h1. Stasis Core <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h1. Message Bus <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h2. Introduction <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">The overall stasis-core API can be best described as a publish/subscribe message bus. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Asterisk has used a publish/subscribe event subsystem for device state and MWI notifications for some time. This model has served Asterisk well: it easily allows consumers of MWI and device state to process those events without impacting the producers of those notifications. Ideally, AMI - and other interfaces that consume Asterisk state - would use such a mechanism to interact with the Asterisk core and modules. Unfortunately, channel state is not available in such a mechanism, nor is there a generic way to publish a wide variety of messages. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Data that needs to be published on the message bus is encapsulated in a {{stasis_message}}, which associates the message data (which is simply an AO2 managed {{void *}}) along with a {{stasis_message_type}}. (The message also has the {{timeval}} when it was created, which is generally useful, since messages are received asynchronously). <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Stasis Core fulfills that role. It unifies the current AMI events, channel state, along with the existing device state/MWI to create a generic publish/subscribe message bus. This allows: <br>* AMI to be built on top of the message, isolating changes in the AMI protocol from the Asterisk core and vice versa <br>* New interfaces to be developed rapidly, as they can share the information pipeline that feeds AMI and other existing components <br>* Dramatically simplifies components that have to track channel state, as they no longer have to be distributed throughout the Asterisk codebase <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Messages are published to a {{stasis_topic}}. To receive the messages published to a topic, use {{stasis_subscribe()}} to create a subscription. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{note:title=Disclaimer} <br>There's a bit of a chicken and the egg problem in the design/implementation right now - do we refactor the existing event system to be Stasis Core? Or is Stasis Core simply an expanded event system that includes channel state and arbitrary keyed messages? <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">When topics are subscribed to/unsubscribed to, {{stasis_subscription_change}} are sent to all of the topic's subscribers. On unsubscribe, the {{stasis_subscription_change}} will be the last message received by the unsubscriber, which can be used to kick off any cleanup that might be necessary. The convenience function {{stasis_subscription_final_message()}} can be used to check if a message is the final unsubscribe message. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Rather than solving this particular problem, this specification is attempting to focus on the contracts that users of Stasis Core (regardless of how it gets constructed) need to know about. As such, the API will look *awfully* similar to the existing event system. In the end, it may end up *being* the existing event system, and the names will stay the same. View this as contracts the API has to provide, rather than a guarantee that it will end up being exactly what is specified below. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">To forward all of the messages from one topic to another, use {{stasis_forward_all()}}. This is useful for creating aggregation topics, like {{[ast_channel_topic_all()|#ast_channel_topic_all()]}}, which collect all of the messages published to a set of other topics. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">(However, by the end of all of this, this should reflect the actual API) <br>{note} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Routing and Caching <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h2. Terminology <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">In addition to these fundamental concepts, there are a few extra objects in the Stasis Message Bus arsenal. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">|| Term || Definition || <br>| AMI | The Asterisk Manager Interface, a legacy interface that is built on top of Stasis Core | <br>| Stasis HTTP | An interface built on top of Stasis Core that exposes functionality in a REST API | <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">For subscriptions that deal with many different types of messages, the {{stasis_message_router}} can be used to route messages based on type. This gets rid of a lot of ugly looking {{if/else}} chains and replaces them with callback dispatch. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Another common use case within Asterisk is to have a subscription that monitors state changes for snapshots that are published to a topic. To aid in this, we have {{stasis_caching_topic}}. This is a _topic filter_, which presents its own topic with filtered and modified messages from an original topic. In the case of the caching topic, it watches for snapshot messages, compares them with their prior values. For these messages, it publishes a {{stasis_cache_update}} message with both the old and new values of the snapshot. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h2. Protocol Overview <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Design Philosophy <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">The Stasis Core is a binary message bus that allows components to publish messages to interested subscribers. Messages are comprised of key/type/value tuples. The type can be anything as simple as an {{int}} to as complex as a struct resident in heap memory. It is up to the subscribers of messages to understand how to decode the various typed values and - if they do not know how to decode the values - to pass on them. Stasis Core provides: <br>* A thread-safe mechanism for publishing messages to subscribers, such that publisher threads are never blocked by message delivery <br>* A cache for objects that allows said objects to be updated and queried by subscribers <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">By convention (since there's no effective way to enforce this in C), messages are immutable. Not only the {{stasis_message}} type itself, but also the data it contains. This allows messages to be shared freely throughout the system without locking. If you find yourself in the situation where you want to modify the contents of a {{stasis_message}}, what you actually want to do is copy the message, and change the copy. While on the surface this seems wasteful and inefficient, the gains in reducing lock contention win the day. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h2. Semantics and Syntax <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Messages are opaque to the message bus, so you don't have to modify {{stasis.c}} or {{stasis.h}} to add a new topic or message type. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h3. Message Sending/Receiving <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">The {{stasis_topic}} object is thread safe. Subscribes, unsubscribes and publishes may happen from any thread. There are some ordering guarantees about message delivery, but not many. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Any Asterisk module may publish messages on the Stasis Core message bus. Messages are identified by a MessageID field. Subscribers subscribe for messages based on the MessageID field, and register a callback handler that will be called by Stasis Core when a message matching the MessageID field is detected. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">* A {{stasis_publish()}} that begins after a {{stasis_publish()}} to the same topic completes will be delivered in order. <br>** In general, this means that publications from different threads to the same topic are unordered, but publications from the same thread are. <br>* Publications to different topics are not ordered. <br>* The final message delivered to a subscription will be the {{stasis_subscription_final_message()}}. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h3. Threading/Memory Model <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">The callback invocations for a {{stasis_topic}} are serialized, although invocations can happen out of any thread from the Stasis thread pool. This is a comfortable middle ground between allocating a thread for every subscription (which would result in too many threads in the system) and invoking callbacks in parallel (which can cause insanity). If your particular handling can benefit from being parallelized, then you can dispatch it to a thread pool in your callback. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">As the Stasis Core exists as a service available to modules in Asterisk, it is important to define ownership of message objects and what threads they are serviced on. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h1. Topics and messages <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Messages and their data places on the Stasis Core message bus are immutable; that is, once a value is formatted and placed into a message, it cannot be changed. This means that values MUST not be reference counted objects and that publishers MUST not keep a reference to any value placed in a message. Similarly, subscribers MUST not store a reference to values placed on a message; once a message is delivered and processed Stasis Core is responsible for reclaiming the memory associated with any values. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">While Stasis allows for the proliferation of many, many message types, we feel that creating too many message types would complicate the overall API. We have a small number of first class objects, (channel snapshots, bridge snapshots, etc.). If a message needs a first class object plus a small piece of additional data, {{blob}} objects are provided which attach a [JSON object|http://doxygen.asterisk.org/trunk/d4/d05/json_8h.html] to the first class object. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Ownership during Publication <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">The details of topics and messages are documented [on doxygen.asterisk.org|http://doxygen.asterisk.org/trunk/df/deb/group__StasisTopicsAndMessages.html]. Some of the more important items to be aware of: <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">When a publisher places a Stasis message on the bus, it relinquishes ownership of the message object and must not hold any references to the message or values contained within the message. Placing a message on a queue for servicing by the bus occurs within the context of the thread doing the publishing. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. First Class Message Object <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">When a subscriber receives a published message, it is done in the context of a thread that exists on a thread pool. It is up to the subscriber to determine if they want to service the message completely on the thread pool thread, or marshal the data contained in the message onto some other thread. When the defined handler entry point returns, the thread servicing the request is returned to the pool. The message that was handled is implicitly reclaimed when the routine returns; if the message elements need to persist beyond the lifetime of the message handling, the subscriber must copy the message data. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h3. {{ast_channel_snaphshot}} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Ownership during Subscription <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">A snapshot of the primary state of a channel, created by {{ast_channel_snapshot_create()}}. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Subscribing for messages happens within the context of the thread performing the subscribing. A subscription token is handed to the subscribing module that represents that particular subscription session. A subscription may be cancelled through the token given to the subscriber. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Second Class Message Objects <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{note} <br>Once a subscriber has cancelled a subscription, published events may still be in flight to that subscriber. While no new events may be placed in the subscriber's queue, they should be ready to handle events that are currently in flight. <br>{note} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h3. {{ast_channel_blob}} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h3. Message Layout <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">Often times, you find that you want to attach a piece of data to a snapshot for your message. For these cases, there's {{ast_channel_blob}}, which associated a channel snapshot with a JSON object (which is required to have a {{type}} field identifying it). <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Messages placed on the Stasis Core bus are variable length and may contain any number of fields. Each message is comprised of a header, and a payload. The header provides an identifier for the message, publisher information, and other data to be consumed by subscribers to determine if the message is something they want to process. The payload contains information that is self describing and can be extracted by subscribers. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">These are used to publish messages such as hangup requests, channel variable set events, etc. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{note} <br>Multiple modules may publish messages with the same MessageID field, in which case the messages are treated as being identical. If the contents of the messages are different, it is the job of subscribers to parse out the key/type/value tuples and handle them appropriately. <br>{note} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Channel Topics <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">For the purposes of this specification, we leave the actual message definitions to the implementation and instead focus on the specification of the Stasis message headers. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. {{ast_channel_topic(chan)}} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Header Definition <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">This is the topic to which channel associated messages are published, including [channel snapshots|#ast_channel_snaphshot]. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">|| Field || Type || Required || Description || <br>| ID | char * | Yes | A unique identifier for the message on the bus. Note that this doesn't have to be a UUID, as it only has to be unique on this instance of Asterisk. A scheme that allows for globally unique identifiers across time is sufficient. | <br>| ResponseID | char * | No (NULL) | If this message is a response to another message placed on the Stasis bus, the identifier of the message it is a response for. | <br>| MessageID | ast_stasis_message_type | Yes | The unique identifier for the message. Determined by the publisher. | <br>| Timestamp | long | Yes | An epoch timestamp for the message. This is provided by Stasis Core when the messages are handled. | <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. {{ast_channel_topic_all()}} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Key/Type/Value Tuples <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">This is an aggregation topic, to which all messages published to individual channel topics are forwarded. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Each value in a message is defined by a key/type/value tuple. The key is a {{char *}} that uniquely identifies for a given message the value in that message. The type is an enumeration value that defines the type the value has. The value is some blob of data to be extracted based on the type. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. {{ast_channel_topic_all_cached()}} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h3. API <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">This is a caching topic wrapping {{ast_channel_topic_all()}}, which caches {{ast_channel_snaphshot}} messages. <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{warning} <br>If any section was up for significant changes, it'd be this one. Everything here is subject to change still. <br>{warning} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h1. Sample Code <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Types <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Publishing messages <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_sub}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">{code:c|title=foo.h} <br>#include "asterisk/stasis.h" <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">A subscription token returned from successful subscriptions. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">/*! \brief Some structure containing the content of your message. */ <br>struct ast_foo { <br>        /* stuffz */ <br>}; <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_nv_object}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">/*! \brief Message type for \ref ast_foo */ <br>struct stasis_message_type *ast_foo_type(void); <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">A tuple containing name, a data type, and a value matching that data type. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">/*! <br> * \brief Topic for the foo module. <br> */ <br>struct stasis_topic *ast_foo_topic(void); <br>{code} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_type}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">{code:c|title=foo.c} <br>#include "asterisk.h" <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">An enumeration containing high level message types. Currently, these would be anticipated to encompass: <br>* Mailbox State <br>* Device State <br>* Channel State <br>* Bridge State <br>* Presence <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">ASTERISK_FILE_VERSION(__FILE__, "$Revision$") <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">Note that these do *not* map to AMI events. Rather, they are high level concepts in Asterisk that a large variety of messages could correspond to. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">#include "asterisk/astobj2.h" <br>#include "asterisk/module.h" <br>#include "asterisk/stasis.h" <br>#include "foo.h" <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">static struct stasis_message_type *foo_type; <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">An object containing the generic message information. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">struct stasis_topic *foo_topic; <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Subscription <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">struct stasis_message_type *ast_foo_type(void) <br>{ <br>        return foo_type; <br>} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_subscribe}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">struct stasis_topic *ast_foo_topic(void) <br>{ <br>        return foo_topic; <br>} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code} <br></td></tr>
<tr><td class="diff-unchanged" >/*! <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \brief Subscribe to stasis messages <br></td></tr>
<tr><td class="diff-changed-lines" >* <span class="diff-added-words"style="background-color: #dfd;">\brief Convenience function to publish an \ref ast_foo message to the</span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \param message_type The type of messages to subscribe to <br> * \param cb The function to be called with messages <br> * \param description Description of the subscription. <br> * \param userdata data to be passed to the event callback <br> * <br> * The rest of the arguments to this function specify additional parameters for <br> * the subscription to filter which messages are passed to this subscriber. The <br> * arguments must be in sets of ast_nv_object, which define a name/type/value <br> * tuple. <br> * \code <br> * <struct ast_nv_object *>, [ struct ast_nv_object * ], SENTINEL <br> * \endcode <br> * and must end with SENTINEL. <br> * <br> * \return This returns a reference to the subscription for use with <br> * un-subscribing later. If there is a failure in creating the <br> * subscription, NULL will be returned. <br> * <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;"> * \ref foo_topic. <br></td></tr>
<tr><td class="diff-unchanged" > */ <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">struct ast_stasis_sub *ast_stasis_subscribe( <br> ast_stasis_message_type message_type, <br> ast_stasis_cb_t callback, <br> void *userdata, <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">static void publish_foo(struct ast_foo *foo) <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">...);</span> <span class="diff-added-words"style="background-color: #dfd;">{</span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_unsubscribe}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        msg = stasis_message_create(foo_type, foo); <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        if (!msg) { <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">{code}</span> <span class="diff-added-words"style="background-color: #dfd;">return;</span> <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">/*!</span> <span class="diff-added-words"style="background-color: #dfd;">}</span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \brief Un-subscribe from stasis messages <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">*</span> <span class="diff-added-words"style="background-color: #dfd;"> </span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \param message_sub This is the reference to the subscription returned by <br> * ast_stasis_subscribe. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        stasis_publish(foo_topic, msg); <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">*</span> <span class="diff-added-words"style="background-color: #dfd;">}</span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * This function will remove a subscription and free the associated data <br> * structures. <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">*</span> <span class="diff-added-words"style="background-color: #dfd;"> </span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \return NULL for convenience. <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">/* <br> * Imagine lots of cool foo-related code here, which uses the above <br> * publish_foo message. Other foo-related messages may also be published <br> * to the foo_topic. <br></td></tr>
<tr><td class="diff-unchanged" > */ <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">struct ast_stasis_sub *ast_stasis_unsubscribe(struct ast_stasis_sub *message_sub); <br>{code} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h4. Events <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">static int unload_module(void) <br>{ <br>        ao2_cleanup(foo_type); <br>        foo_type = NULL; <br>        ao2_cleanup(foo_topic); <br>        foo_topic = NULL; <br>        return 0; <br>} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_new}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">static int load_module(void) <br>{ <br>        foo_type = stasis_message_type_create("ast_foo"); <br>        if (!foo_type) { <br>                return AST_MODULE_LOAD_FAILURE; <br>        } <br>        foo_topic = stasis_topic_create("foo"); <br>        if (!foo_topic) { <br>                return AST_MODULE_LOAD_FAILURE; <br>        } <br>        return AST_MODULE_LOAD_SUCCESS; <br>} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "The wonders of foo", <br>                .load = load_module, <br>                .unload = unload_module <br>        ); <br></td></tr>
<tr><td class="diff-unchanged" >{code} <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">/*! <br> * \brief Create a new stasis message <br> * <br> * \param message_type The type of message to create <br> * <br> * The rest of the arguments to this function specify information to add to the <br> * message. These must be ast_nv_objects. <br> * \code <br> * <struct ast_nv_object*>, [ struct ast_nv_object * ], SENTINEL <br> * \endcode <br> * and must end with SENTINEL. <br> * <br> * \return This returns the message that has been created. If there is an error <br> * creating the event, NULL will be returned. <br> * <br> * Example usage: <br> * <br> * \code <br> * if (!(event = ast_stasis_message_new(AST_EVENT_CHANNEL, <br> * ast_create_nv_object_from_channel("channel_one", chan1), <br> * ast_create_nv_object_from_channel("channel_two", chan2), <br> * SENTINEL))) { <br> * return; <br> * } <br> * \endcode <br> * <br> */ <br>struct ast_stasis_message *ast_stasis_message_new(enum ast_stasis_message_type message_type, ...); <br>{code} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_new_response}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Subscribing (no message router) <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">{code} <br>/*! <br> * \brief Create a new stasis message that responds to a previous message <br> * <br> * \param original The original message to respond to <br> * \param message_type The type of message to create <br> * <br> * The rest of the arguments to this function specify information to add to the <br> * message. These must be ast_nv_objects. <br> * \code <br> * <struct ast_nv_object*>, [ struct ast_nv_object * ], SENTINEL <br> * \endcode <br> * and must end with SENTINEL. <br> * <br> * \return This returns the message that has been created. If there is an error <br> * creating the event, NULL will be returned. <br> * <br> * Example usage: <br> * <br> * \code <br> * if (!(event = ast_stasis_message_new_response(original, AST_EVENT_CHANNEL, <br> * ast_create_nv_object_from_channel("channel_one", chan1), <br> * ast_create_nv_object_from_channel("channel_two", chan2), <br> * SENTINEL))) { <br> * return; <br> * } <br> * \endcode <br> * <br> */ <br>struct ast_stasis_message *ast_stasis_message_new_response(struct ast_stasis_message *original, enum ast_stasis_message_type message_type, ...); <br>{code} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">{code:c|title=bar.c} <br>#include "asterisk/astobj2.h" <br>#include "asterisk/stasis.h" <br>#include "foo.h" <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_destroy}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">struct ast_bar { <br>        struct stasis_subscription *sub; <br>}; <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">static void bar_dtor(void *obj) <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">{code}</span> <span class="diff-added-words"style="background-color: #dfd;">{</span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">/*! <br> * \brief Destroy a stasis message <br> * <br> * \param message the message to destroy <br> * <br> * \return Nothing <br> * <br> * \note Messages that have been queued should *not* be destroyed by the code that <br> * created the message. It will be automatically destroyed after being <br> * dispatched to the appropriate subscribers. <br> * <br> * \returns NULL as a convenience <br> */ <br>struct ast_stasis_message * ast_stasis_message_destroy(struct ast_stasis_message *message); <br>{code} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        struct ast_bar *bar = obj; <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_queue}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">        /* Since the subscription holds a reference, unsubscribe <br>         * should happen before destruction. <br>         */ <br>        ast_assert(bar->sub == NULL); <br>} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;"> <br>static void bar_callback(void *data, <br>                         struct stasis_subscription *sub, <br>                         struct stasis_topic *topic, <br>                         struct stasis_message *message) <br>{ <br>        struct ast_bar *bar = data; <br> <br>        if (stasis_subscription_final_message(sub, message)) { <br>                /* Final message; we can clean ourselves up */ <br>                ao2_cleanup(bar); <br>                return; <br>        } <br> <br>        if (ast_foo_type() == stasis_message_type(message)) { <br>                struct ast_foo *foo = stasis_message_data(message); <br>                /* A fooing we will go... */ <br>        } else if (ast_whatever_type() == stasis_message_type(message)) { <br>                struct ast_whatever *whatever = stasis_message_data (message); <br>                /* whatever */ <br>        } <br>} <br> <br>struct ast_bar *ast_bar_create(void) <br>{ <br>        RAII_VAR(struct ast_bar *, bar, NULL, ao2_cleanup); <br> <br>        bar = ao2_alloc(sizeof(*bar), bar_dtor); <br>        if (!bar) { <br>                return NULL; <br>        } <br> <br>        bar->sub = stasis_subscribe(ast_foo_topic(), bar_callback, bar); <br>        if (!bar->sub) { <br>                return NULL; <br>        } <br> <br>        ao2_ref(bar, +1); /* The subscription hold a ref to bar */ <br> <br>        ao2_ref(bar, +1); /* And we're returning a ref to bar */ <br>        return bar; <br>} <br> <br>void ast_bar_shutdown(struct ast_bar *bar) <br>{ <br>        if (!bar) { <br>                return NULL; <br>        } <br>        stasis_unsubscribe(bar->sub); <br>        bar->sub = NULL; <br>} <br></td></tr>
<tr><td class="diff-unchanged" >{code} <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">/*! <br> * \brief Queue a stasis message <br> * <br> * \param message the message to be queued <br> * <br> * \retval zero success <br> * \retval non-zero failure. Note that the caller of this function is <br> * responsible for destroying the event in the case of a failure. <br> * <br> * This function queues a message to be dispatched to all of the appropriate <br> * subscribers. This function will not block while the message is being <br> * dispatched because the message is queued up for a dispatching thread <br> * to handle. <br> */ <br>int ast_stasis_message_queue(struct ast_stasis_message *message); <br>{code} <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;">h5. {{ast_stasis_message_queue_and_cache}} <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">h2. Subscribing (with message router) <br></td></tr>
<tr><td class="diff-unchanged" > <br></td></tr>
<tr><td class="diff-added-lines" style="background-color: #dfd;">{code:c|title=bar2.c} <br>#include "asterisk/astobj2.h" <br>#include "asterisk/stasis.h" <br>#include "asterisk/stasis_message_router.h" <br>#include "foo.h" <br> <br>struct ast_bar { <br>        struct stasis_message_router *router; <br>}; <br> <br>static void bar_dtor(void *obj) <br>{ <br>        struct ast_bar *bar = obj; <br> <br>        /* Since the subscription holds a reference, unsubscribe <br>         * should happen before destruction. <br>         */ <br>        ast_assert(bar->router == NULL); <br>} <br> <br> <br>static void bar_default(void *data, <br>                        struct stasis_subscription *sub, <br>                        struct stasis_topic *topic, <br>                        struct stasis_message *message) <br>{ <br>        struct ast_bar *bar = data; <br>        if (stasis_subscription_final_message(sub, message)) { <br>                /* Final message; we can clean ourselves up */ <br>                ao2_cleanup(bar); <br>        } <br>} <br> <br>static void bar_foo(void *data, <br>                 struct stasis_subscription *sub, <br>                 struct stasis_topic *topic, <br>                 struct stasis_message *message) <br>{ <br>        struct ast_bar *bar = data; <br>        struct ast_foo *foo; <br> <br>        ast_assert(ast_foo_type() == stasis_message_type(message)); <br>        foo = stasis_message_data(message); <br>        /* A fooing we will go... */ <br> <br>} <br> <br>static void bar_whatever(void *data, <br>                         struct stasis_subscription *sub, <br>                         struct stasis_topic *topic, <br>                         struct stasis_message *message) <br>{ <br>        struct ast_bar *bar = data; <br>        struct ast_whatever *whatever; <br> <br>        ast_assert(ast_whatever_type() == stasis_message_type(message)); <br>        whatever = stasis_message_data (message); <br>        /* whatever */ <br>} <br> <br>struct ast_bar *ast_bar_create(void) <br>{ <br>        RAII_VAR(struct ast_bar *, bar, NULL, ao2_cleanup); <br>        int r; <br> <br>        bar = ao2_alloc(sizeof(*bar), bar_dtor); <br>        if (!bar) { <br>                return NULL; <br>        } <br> <br>        bar->router = stasis_message_router_create(ast_foo_topic()); <br>        if (!bar->router) { <br>                return NULL; <br>        } <br> <br>        r = stasis_message_router_set_default(bar->router, bar_default, bar); <br>        if (r != 0) { <br>                ast_bar_shutdown(bar); <br>                return NULL; <br>        } <br>        ao2_ref(bar, +1); /* The subscription hold a ref to bar */ <br> <br>        r |= stasis_message_router_add( <br>                bar->router, ast_foo_type(), bar_foo, bar); <br>        r |= stasis_message_router_add( <br>                bar->router, ast_whatever_type(), bar_whatever, bar); <br>        if (r != 0) { <br>                ast_bar_shutdown(bar); <br>                return NULL; <br>        } <br> <br>        ao2_ref(bar, +1); <br>        return bar; <br>} <br> <br>void ast_bar_shutdown(struct ast_bar *bar) <br>{ <br>        if (!bar) { <br>                return; <br>        } <br>        stasis_message_router_unsubscribe(bar->router); <br>        bar->router = NULL; <br>} <br></td></tr>
<tr><td class="diff-unchanged" >{code} <br></td></tr>
<tr><td class="diff-changed-lines" ><span class="diff-deleted-words"style="color:#999;background-color:#fdd;text-decoration:line-through;">/*!</span> <span class="diff-added-words"style="background-color: #dfd;"> </span> <br></td></tr>
<tr><td class="diff-deleted-lines" style="color:#999;background-color:#fdd;text-decoration:line-through;"> * \brief Queue and cache a message <br> * <br> * \param message the message to be queued and cached <br> * <br> * \details <br> * The purpose of caching messages is so that the core can retain the last known <br> * information for messages that represent some sort of state. That way, when <br> * code needs to find out the current state, it can query the cache. <br> * <br> * \retval 0 success <br> * \retval non-zero failure. <br> */ <br>int ast_stasis_message_queue_and_cache(struct ast_stasis_message *message); <br>{code} <br></td></tr>
<tr><td class="diff-unchanged" >{numberedheadings} <br></td></tr>
</table>
</div> <h4>Full Content</h4>
<div class="notificationGreySide">
<p>While the basic API's for the Stasis Message Bus are <a href="http://doxygen.asterisk.org/trunk/stasis.html" class="external-link" rel="nofollow">documented using Doxygen</a>, there's still a lot to be said about how to use those API's within Asterisk.</p>
<div>
<ul>
<li><a href='#StasisMessageBus-MessageBus'>1. Message Bus</a></li>
<ul>
<li><a href='#StasisMessageBus-RoutingandCaching'>1.1. Routing and Caching</a></li>
<li><a href='#StasisMessageBus-DesignPhilosophy'>1.2. Design Philosophy</a></li>
</ul>
<li><a href='#StasisMessageBus-Topicsandmessages'>2. Topics and messages</a></li>
<ul>
<li><a href='#StasisMessageBus-FirstClassMessageObject'>2.1. First Class Message Object</a></li>
<ul>
<li><a href='#StasisMessageBus-%7B%7Bastchannelsnaphshot%7D%7D'>2.1.1. <tt>ast_channel_snaphshot</tt></a></li>
</ul>
<li><a href='#StasisMessageBus-SecondClassMessageObjects'>2.2. Second Class Message Objects</a></li>
<ul>
<li><a href='#StasisMessageBus-%7B%7Bastchannelblob%7D%7D'>2.2.1. <tt>ast_channel_blob</tt></a></li>
</ul>
<li><a href='#StasisMessageBus-ChannelTopics'>2.3. Channel Topics</a></li>
<li><a href='#StasisMessageBus-%7B%7Bastchanneltopic%28chan%29%7D%7D'>2.4. <tt>ast_channel_topic(chan)</tt></a></li>
<li><a href='#StasisMessageBus-%7B%7Bastchanneltopicall%28%29%7D%7D'>2.5. <tt>ast_channel_topic_all()</tt></a></li>
<li><a href='#StasisMessageBus-%7B%7Bastchanneltopicallcached%28%29%7D%7D'>2.6. <tt>ast_channel_topic_all_cached()</tt></a></li>
</ul>
<li><a href='#StasisMessageBus-SampleCode'>3. Sample Code</a></li>
<ul>
<li><a href='#StasisMessageBus-Publishingmessages'>3.1. Publishing messages</a></li>
<li><a href='#StasisMessageBus-Subscribing%28nomessagerouter%29'>3.2. Subscribing (no message router)</a></li>
<li><a href='#StasisMessageBus-Subscribing%28withmessagerouter%29'>3.3. Subscribing (with message router)</a></li>
</ul>
</ul></div>
<h1><a name="StasisMessageBus-MessageBus"></a>1. Message Bus</h1>
<p>The overall stasis-core API can be best described as a publish/subscribe message bus.</p>
<p>Data that needs to be published on the message bus is encapsulated in a <tt>stasis_message</tt>, which associates the message data (which is simply an AO2 managed <tt>void *</tt>) along with a <tt>stasis_message_type</tt>. (The message also has the <tt>timeval</tt> when it was created, which is generally useful, since messages are received asynchronously).</p>
<p>Messages are published to a <tt>stasis_topic</tt>. To receive the messages published to a topic, use <tt>stasis_subscribe()</tt> to create a subscription.</p>
<p>When topics are subscribed to/unsubscribed to, <tt>stasis_subscription_change</tt> are sent to all of the topic's subscribers. On unsubscribe, the <tt>stasis_subscription_change</tt> will be the last message received by the unsubscriber, which can be used to kick off any cleanup that might be necessary. The convenience function <tt>stasis_subscription_final_message()</tt> can be used to check if a message is the final unsubscribe message.</p>
<p>To forward all of the messages from one topic to another, use <tt>stasis_forward_all()</tt>. This is useful for creating aggregation topics, like <tt><a href="#StasisMessageBus-astchanneltopicall%28%29">ast_channel_topic_all()</a></tt>, which collect all of the messages published to a set of other topics.</p>
<h2><a name="StasisMessageBus-RoutingandCaching"></a>1.1. Routing and Caching</h2>
<p>In addition to these fundamental concepts, there are a few extra objects in the Stasis Message Bus arsenal.</p>
<p>For subscriptions that deal with many different types of messages, the <tt>stasis_message_router</tt> can be used to route messages based on type. This gets rid of a lot of ugly looking <tt>if/else</tt> chains and replaces them with callback dispatch.</p>
<p>Another common use case within Asterisk is to have a subscription that monitors state changes for snapshots that are published to a topic. To aid in this, we have <tt>stasis_caching_topic</tt>. This is a <em>topic filter</em>, which presents its own topic with filtered and modified messages from an original topic. In the case of the caching topic, it watches for snapshot messages, compares them with their prior values. For these messages, it publishes a <tt>stasis_cache_update</tt> message with both the old and new values of the snapshot.</p>
<h2><a name="StasisMessageBus-DesignPhilosophy"></a>1.2. Design Philosophy</h2>
<p>By convention (since there's no effective way to enforce this in C), messages are immutable. Not only the <tt>stasis_message</tt> type itself, but also the data it contains. This allows messages to be shared freely throughout the system without locking. If you find yourself in the situation where you want to modify the contents of a <tt>stasis_message</tt>, what you actually want to do is copy the message, and change the copy. While on the surface this seems wasteful and inefficient, the gains in reducing lock contention win the day.</p>
<p>Messages are opaque to the message bus, so you don't have to modify <tt>stasis.c</tt> or <tt>stasis.h</tt> to add a new topic or message type.</p>
<p>The <tt>stasis_topic</tt> object is thread safe. Subscribes, unsubscribes and publishes may happen from any thread. There are some ordering guarantees about message delivery, but not many.</p>
<ul>
        <li>A <tt>stasis_publish()</tt> that begins after a <tt>stasis_publish()</tt> to the same topic completes will be delivered in order.
        <ul>
                <li>In general, this means that publications from different threads to the same topic are unordered, but publications from the same thread are.</li>
        </ul>
        </li>
        <li>Publications to different topics are not ordered.</li>
        <li>The final message delivered to a subscription will be the <tt>stasis_subscription_final_message()</tt>.</li>
</ul>
<p>The callback invocations for a <tt>stasis_topic</tt> are serialized, although invocations can happen out of any thread from the Stasis thread pool. This is a comfortable middle ground between allocating a thread for every subscription (which would result in too many threads in the system) and invoking callbacks in parallel (which can cause insanity). If your particular handling can benefit from being parallelized, then you can dispatch it to a thread pool in your callback.</p>
<h1><a name="StasisMessageBus-Topicsandmessages"></a>2. Topics and messages</h1>
<p>While Stasis allows for the proliferation of many, many message types, we feel that creating too many message types would complicate the overall API. We have a small number of first class objects, (channel snapshots, bridge snapshots, etc.). If a message needs a first class object plus a small piece of additional data, <tt>blob</tt> objects are provided which attach a <a href="http://doxygen.asterisk.org/trunk/d4/d05/json_8h.html" class="external-link" rel="nofollow">JSON object</a> to the first class object.</p>
<p>The details of topics and messages are documented <a href="http://doxygen.asterisk.org/trunk/df/deb/group__StasisTopicsAndMessages.html" class="external-link" rel="nofollow">on doxygen.asterisk.org</a>. Some of the more important items to be aware of:</p>
<h2><a name="StasisMessageBus-FirstClassMessageObject"></a>2.1. First Class Message Object</h2>
<h3><a name="StasisMessageBus-%7B%7Bastchannelsnaphshot%7D%7D"></a>2.1.1. <tt>ast_channel_snaphshot</tt></h3>
<p>A snapshot of the primary state of a channel, created by <tt>ast_channel_snapshot_create()</tt>. </p>
<h2><a name="StasisMessageBus-SecondClassMessageObjects"></a>2.2. Second Class Message Objects</h2>
<h3><a name="StasisMessageBus-%7B%7Bastchannelblob%7D%7D"></a>2.2.1. <tt>ast_channel_blob</tt></h3>
<p>Often times, you find that you want to attach a piece of data to a snapshot for your message. For these cases, there's <tt>ast_channel_blob</tt>, which associated a channel snapshot with a JSON object (which is required to have a <tt>type</tt> field identifying it).</p>
<p>These are used to publish messages such as hangup requests, channel variable set events, etc.</p>
<h2><a name="StasisMessageBus-ChannelTopics"></a>2.3. Channel Topics</h2>
<h2><a name="StasisMessageBus-%7B%7Bastchanneltopic%28chan%29%7D%7D"></a>2.4. <tt>ast_channel_topic(chan)</tt></h2>
<p>This is the topic to which channel associated messages are published, including <a href="#StasisMessageBus-astchannelsnaphshot">channel snapshots</a>.</p>
<h2><a name="StasisMessageBus-%7B%7Bastchanneltopicall%28%29%7D%7D"></a>2.5. <tt>ast_channel_topic_all()</tt></h2>
<p>This is an aggregation topic, to which all messages published to individual channel topics are forwarded.</p>
<h2><a name="StasisMessageBus-%7B%7Bastchanneltopicallcached%28%29%7D%7D"></a>2.6. <tt>ast_channel_topic_all_cached()</tt></h2>
<p>This is a caching topic wrapping <tt>ast_channel_topic_all()</tt>, which caches <tt>ast_channel_snaphshot</tt> messages.</p>
<h1><a name="StasisMessageBus-SampleCode"></a>3. Sample Code</h1>
<h2><a name="StasisMessageBus-Publishingmessages"></a>3.1. Publishing messages</h2>
<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader" style="border-bottom-width: 1px;"><b>foo.h</b></div><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: c; gutter: false">#include "asterisk/stasis.h"
/*! \brief Some structure containing the content of your message. */
struct ast_foo {
        /* stuffz */
};
/*! \brief Message type for \ref ast_foo */
struct stasis_message_type *ast_foo_type(void);
/*!
* \brief Topic for the foo module.
*/
struct stasis_topic *ast_foo_topic(void);</pre>
</div></div>
<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader" style="border-bottom-width: 1px;"><b>foo.c</b></div><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: c; gutter: false">#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
#include "foo.h"
static struct stasis_message_type *foo_type;
struct stasis_topic *foo_topic;
struct stasis_message_type *ast_foo_type(void)
{
        return foo_type;
}
struct stasis_topic *ast_foo_topic(void)
{
        return foo_topic;
}
/*!
* \brief Convenience function to publish an \ref ast_foo message to the
* \ref foo_topic.
*/
static void publish_foo(struct ast_foo *foo)
{
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
        msg = stasis_message_create(foo_type, foo);
        if (!msg) {
                return;
        }
        stasis_publish(foo_topic, msg);
}
/*
* Imagine lots of cool foo-related code here, which uses the above
* publish_foo message. Other foo-related messages may also be published
* to the foo_topic.
*/
static int unload_module(void)
{
        ao2_cleanup(foo_type);
        foo_type = NULL;
        ao2_cleanup(foo_topic);
        foo_topic = NULL;
        return 0;
}
static int load_module(void)
{
        foo_type = stasis_message_type_create("ast_foo");
        if (!foo_type) {
                return AST_MODULE_LOAD_FAILURE;
        }
        foo_topic = stasis_topic_create("foo");
        if (!foo_topic) {
                return AST_MODULE_LOAD_FAILURE;
        }
        return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "The wonders of foo",
                .load = load_module,
                .unload = unload_module
        );</pre>
</div></div>
<h2><a name="StasisMessageBus-Subscribing%28nomessagerouter%29"></a>3.2. Subscribing (no message router)</h2>
<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader" style="border-bottom-width: 1px;"><b>bar.c</b></div><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: c; gutter: false">#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
#include "foo.h"
struct ast_bar {
        struct stasis_subscription *sub;
};
static void bar_dtor(void *obj)
{
        struct ast_bar *bar = obj;
        /* Since the subscription holds a reference, unsubscribe
         * should happen before destruction.
         */
        ast_assert(bar->sub == NULL);
}
static void bar_callback(void *data,
                         struct stasis_subscription *sub,
                         struct stasis_topic *topic,
                         struct stasis_message *message)
{
        struct ast_bar *bar = data;
        if (stasis_subscription_final_message(sub, message)) {
                /* Final message; we can clean ourselves up */
                ao2_cleanup(bar);
                return;
        }
        if (ast_foo_type() == stasis_message_type(message)) {
                struct ast_foo *foo = stasis_message_data(message);
                /* A fooing we will go... */
        } else if (ast_whatever_type() == stasis_message_type(message)) {
                struct ast_whatever *whatever = stasis_message_data (message);
                /* whatever */
        }
}
struct ast_bar *ast_bar_create(void)
{
        RAII_VAR(struct ast_bar *, bar, NULL, ao2_cleanup);
        bar = ao2_alloc(sizeof(*bar), bar_dtor);
        if (!bar) {
                return NULL;
        }
        bar->sub = stasis_subscribe(ast_foo_topic(), bar_callback, bar);
        if (!bar->sub) {
                return NULL;
        }
        ao2_ref(bar, +1); /* The subscription hold a ref to bar */
        ao2_ref(bar, +1); /* And we're returning a ref to bar */
        return bar;
}
void ast_bar_shutdown(struct ast_bar *bar)
{
        if (!bar) {
                return NULL;
        }
        stasis_unsubscribe(bar->sub);
        bar->sub = NULL;
}</pre>
</div></div>
<h2><a name="StasisMessageBus-Subscribing%28withmessagerouter%29"></a>3.3. Subscribing (with message router)</h2>
<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader" style="border-bottom-width: 1px;"><b>bar2.c</b></div><div class="codeContent panelContent">
<pre class="theme: Confluence; brush: c; gutter: false">#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "foo.h"
struct ast_bar {
        struct stasis_message_router *router;
};
static void bar_dtor(void *obj)
{
        struct ast_bar *bar = obj;
        /* Since the subscription holds a reference, unsubscribe
         * should happen before destruction.
         */
        ast_assert(bar->router == NULL);
}
static void bar_default(void *data,
                        struct stasis_subscription *sub,
                        struct stasis_topic *topic,
                        struct stasis_message *message)
{
        struct ast_bar *bar = data;
        if (stasis_subscription_final_message(sub, message)) {
                /* Final message; we can clean ourselves up */
                ao2_cleanup(bar);
        }
}
static void bar_foo(void *data,
                 struct stasis_subscription *sub,
                 struct stasis_topic *topic,
                 struct stasis_message *message)
{
        struct ast_bar *bar = data;
        struct ast_foo *foo;
        ast_assert(ast_foo_type() == stasis_message_type(message));
        foo = stasis_message_data(message);
        /* A fooing we will go... */
}
static void bar_whatever(void *data,
                         struct stasis_subscription *sub,
                         struct stasis_topic *topic,
                         struct stasis_message *message)
{
        struct ast_bar *bar = data;
        struct ast_whatever *whatever;
        ast_assert(ast_whatever_type() == stasis_message_type(message));
        whatever = stasis_message_data (message);
        /* whatever */
}
struct ast_bar *ast_bar_create(void)
{
        RAII_VAR(struct ast_bar *, bar, NULL, ao2_cleanup);
        int r;
        bar = ao2_alloc(sizeof(*bar), bar_dtor);
        if (!bar) {
                return NULL;
        }
        bar->router = stasis_message_router_create(ast_foo_topic());
        if (!bar->router) {
                return NULL;
        }
        r = stasis_message_router_set_default(bar->router, bar_default, bar);
        if (r != 0) {
                ast_bar_shutdown(bar);
                return NULL;
        }
        ao2_ref(bar, +1); /* The subscription hold a ref to bar */
        r |= stasis_message_router_add(
                bar->router, ast_foo_type(), bar_foo, bar);
        r |= stasis_message_router_add(
                bar->router, ast_whatever_type(), bar_whatever, bar);
        if (r != 0) {
                ast_bar_shutdown(bar);
                return NULL;
        }
        ao2_ref(bar, +1);
        return bar;
}
void ast_bar_shutdown(struct ast_bar *bar)
{
        if (!bar) {
                return;
        }
        stasis_message_router_unsubscribe(bar->router);
        bar->router = NULL;
}</pre>
</div></div>
</div>
<div id="commentsSection" class="wiki-content pageSection">
<div style="float: right;" class="grey">
<a href="https://wiki.asterisk.org/wiki/users/removespacenotification.action?spaceKey=AST">Stop watching space</a>
<span style="padding: 0px 5px;">|</span>
<a href="https://wiki.asterisk.org/wiki/users/editmyemailsettings.action">Change email notification preferences</a>
</div>
<a href="https://wiki.asterisk.org/wiki/display/AST/Stasis+Message+Bus">View Online</a>
|
<a href="https://wiki.asterisk.org/wiki/pages/diffpagesbyversion.action?pageId=22086002&revisedVersion=4&originalVersion=3">View Changes</a>
|
<a href="https://wiki.asterisk.org/wiki/display/AST/Stasis+Message+Bus?showComments=true&showCommentArea=true#addcomment">Add Comment</a>
</div>
</div>
</div>
</div>
</div>
</body>
</html>