[asterisk-commits] dlee: branch dlee/stasis-core r381701 - in /team/dlee/stasis-core: include/as...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Feb 18 16:20:29 CST 2013
Author: dlee
Date: Mon Feb 18 16:20:25 2013
New Revision: 381701
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381701
Log:
Initial stasis-core work, for review
Added:
team/dlee/stasis-core/include/asterisk/stasis.h (with props)
team/dlee/stasis-core/main/stasis.c (with props)
team/dlee/stasis-core/main/stasis_cache.c (with props)
team/dlee/stasis-core/main/stasis_message.c (with props)
team/dlee/stasis-core/tests/test_stasis.c (with props)
Modified:
team/dlee/stasis-core/include/asterisk/channel.h
team/dlee/stasis-core/include/asterisk/channel_internal.h
team/dlee/stasis-core/main/asterisk.c
team/dlee/stasis-core/main/channel.c
team/dlee/stasis-core/main/channel_internal_api.c
team/dlee/stasis-core/main/manager.c
team/dlee/stasis-core/main/pbx.c
Modified: team/dlee/stasis-core/include/asterisk/channel.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/channel.h?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/channel.h (original)
+++ team/dlee/stasis-core/include/asterisk/channel.h Mon Feb 18 16:20:25 2013
@@ -125,7 +125,6 @@
#include "asterisk/abstract_jb.h"
#include "asterisk/astobj2.h"
-
#include "asterisk/poll-compat.h"
#if defined(__cplusplus) || defined(c_plusplus)
@@ -151,6 +150,7 @@
#include "asterisk/channelstate.h"
#include "asterisk/ccss.h"
#include "asterisk/framehook.h"
+#include "asterisk/stasis.h"
#define DATASTORE_INHERIT_FOREVER INT_MAX
@@ -4102,4 +4102,130 @@
void ast_channel_dialed_causes_clear(const struct ast_channel *chan);
struct ast_flags *ast_channel_flags(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Structure representing a snapshot of channel state.
+ *
+ * While not enforced programmatically, this object is shared across multiple
+ * threads, and should be threated as an immutable object.
+ */
+struct ast_channel_snapshot {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(name); /*!< ASCII unique channel name */
+ AST_STRING_FIELD(accountcode); /*!< Account code for billing */
+ AST_STRING_FIELD(peeraccount); /*!< Peer account code for billing */
+ AST_STRING_FIELD(userfield); /*!< Userfield for CEL billing */
+ AST_STRING_FIELD(uniqueid); /*!< Unique Channel Identifier */
+ AST_STRING_FIELD(linkedid); /*!< Linked Channel Identifier -- gets propagated by linkage */
+ AST_STRING_FIELD(parkinglot); /*!< Default parking lot, if empty, default parking lot */
+ AST_STRING_FIELD(hangupsource); /*!< Who is responsible for hanging up this channel */
+ AST_STRING_FIELD(appl); /*!< Current application */
+ AST_STRING_FIELD(data); /*!< Data passed to current application */
+ AST_STRING_FIELD(context); /*!< Dialplan: Current extension context */
+ AST_STRING_FIELD(exten); /*!< Dialplan: Current extension number */
+ AST_STRING_FIELD(caller_name); /*!< Caller ID Name */
+ AST_STRING_FIELD(caller_number); /*!< Caller ID Number */
+ AST_STRING_FIELD(connected_name); /*!< Connected Line Name */
+ AST_STRING_FIELD(connected_number); /*!< Connected Line Number */
+ );
+
+ struct timeval creationtime; /*!< The time of channel creation */
+ enum ast_channel_state state; /*!< State of line */
+ int priority; /*!< Dialplan: Current extension priority */
+ int amaflags; /*!< AMA flags for billing */
+ int hangupcause; /*!< Why is the channel hanged up. See causes.h */
+ struct ast_flags flags; /*!< channel flags of AST_FLAG_ type */
+};
+
+/*!
+ * \since 12
+ * \brief Generate a snapshot of the channel state. This is an ao2 object, so
+ * ao2_cleanup() to deallocate.
+ *
+ * \param chan The channel from which to generate a snapshot
+ *
+ * \retval pointer on success (must be ast_freed)
+ * \retval NULL on error
+ */
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Compare two channel state snapshots, in their entirety.
+ *
+ * \param obj The first snapshot to compare
+ * \param args The second snapshot to compare
+ * \param flags astobj2 comparison flags
+ *
+ * \retval CMP_MATCH | CMP_STOP if object match.
+ * \retval 0 if objects don't match.
+ */
+int ast_channel_snapshot_cmp(void *obj, void *arg, int flags);
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_snapshot.
+ *
+ * \retval Message type for \ref ast_channel_snapshot.
+ */
+struct stasis_message_type *ast_channel_snapshot(void);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \param chan Channel.
+ *
+ * \retval Topic for channel's events.
+ * \retval \c NULL if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_events(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for all channels.
+ * \retval Topic for all channel events.
+ */
+struct stasis_topic *ast_channel_events_all(void);
+
+/*!
+ * \since 12
+ * \brief A caching topic which caches \ref ast_channel_snapshot messages from
+ * ast_channel_events_all(void).
+ *
+ * \retval Topic for all channel events.
+ */
+struct stasis_caching_topic *ast_channel_events_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Variable set event.
+ */
+struct ast_channel_varset_event {
+ char *channel_name;
+ char *uniqueid;
+ char *variable;
+ char *value;
+};
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_varset_event messages.
+ *
+ * \retval Message type for \ref ast_channel_varset_event messages.
+ */
+struct stasis_message_type *ast_channel_varset_event(void);
+
+/*!
+ * \since 12
+ * \brief Publish a \ref ast_channel_varset_event for a channel.
+ *
+ * \param chan Channel to pulish the event for, or \c NULL for 'none'.
+ * \param variable Name of the variable being set
+ * \param value Value.
+ */
+void ast_channel_send_varset_event(struct ast_channel *chan,
+ const char *variable, const char *value);
+
#endif /* _ASTERISK_CHANNEL_H */
Modified: team/dlee/stasis-core/include/asterisk/channel_internal.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/channel_internal.h?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/channel_internal.h (original)
+++ team/dlee/stasis-core/include/asterisk/channel_internal.h Mon Feb 18 16:20:25 2013
@@ -23,4 +23,5 @@
void ast_channel_internal_finalize(struct ast_channel *chan);
int ast_channel_internal_is_finalized(struct ast_channel *chan);
void ast_channel_internal_cleanup(struct ast_channel *chan);
+void ast_channel_internal_setup_events(struct ast_channel *chan);
Added: team/dlee/stasis-core/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/stasis.h?view=auto&rev=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/stasis.h (added)
+++ team/dlee/stasis-core/include/asterisk/stasis.h Mon Feb 18 16:20:25 2013
@@ -1,0 +1,444 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_H
+#define _ASTERISK_STASIS_H
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
+ * detailed documentation.
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ * \since 12
+ *
+ * \page stasis Stasis Message Bus API
+ *
+ * \par Intro
+ *
+ * The Stasis Message Bus is a loosely typed mechanism for distributing messages
+ * within Asterisk. It is designed to be:
+ * - Loosely coupled; new message types can be added in seperate modules
+ * - Easy to use; publishing and subscribing are straightforward operations
+ * - Consistent memory management; all message bus objects are AO2 managed objects, using
+ * ao2_ref() and ao2_cleanup() to manage the reference counting.
+ *
+ * There are three main concepts for using the Stasis Message Bus:
+ * - \ref stasis_message
+ * - \ref stasis_topic
+ * - \ref stasis_subscription
+ *
+ * \par stasis_message
+ *
+ * Central to the Stasis Message Bus is the \ref stasis_message, the messages
+ * that are sent on the bus. These messages have:
+ * - a type (as defined by a \ref stasis_message_type)
+ * - a value - a \c void pointer to an AO2 object
+ * - a timestamp when it was created
+ *
+ * Once a \ref stasis_message has been created, it is immutable and cannot
+ * change. The same goes for the value of the message (although this cannot be
+ * enforced in code). Messages themselves are reference-counted, AO2 objects,
+ * along with their values. By being both reference counted and immutable,
+ * messages can be shared throughout the system without any concerns for
+ * threading. (Well, the objects must be allocated with \ref
+ * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
+ * safe. But other than that, no worries).
+ *
+ * The type of a message is defined by an instance of \ref stasis_message_type,
+ * which can be created by calling stasis_message_type_create(). Message types
+ * are named, which is useful in debugging. It is recommended that the string
+ * name for a message type match the name of the struct that's stored in the
+ * message. For example, name for \ref stasis_cache_update's message type is \c
+ * "stasis_cache_update".
+ *
+ * \par stasis_topic
+ *
+ * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
+ * subscribed, and \ref stasis_message's may be published. Any message published
+ * to the topic is dispatched to all of its subscribers. The topic itself may be
+ * named, which is useful in debugging.
+ *
+ * Topics themselves are reference counted objects, and automagically
+ * unsubscribe all of their subscribers when they are destroyed. Topics are also
+ * thread safe, so no worries about publishing/subscribing/unsubscribing to a
+ * topic concurrently from multiple threads. It's also designed to handle the
+ * case of unsubscribing from a topic from within the subscription handler.
+ *
+ * \par Forwarding
+ *
+ * There is one special case of topics that's worth noting: forwarding
+ * messages. It's a fairly common use case to want to forward all the messages
+ * published on one topic to another one (for example, an aggregator topic that
+ * publishes all the events from a set of other topics). This can be
+ * accomplished easily using stasis_forward_all(). This sets up the forwarding
+ * between the two topics, and returns a \ref stasis_subscription, which can be
+ * unsubscribed to stop the forwarding.
+ *
+ * \par Caching
+ *
+ * Another common use case is to want to cache certain messages that are
+ * published on the bus. Usually these events are snapshots of the current state
+ * in the system, and it's desirable to query that state from the cache without
+ * locking the original object. It's also desirable for subscribers of the
+ * caching topic to receive messages that have both the old cache value and the
+ * new value being put into the cache. For this, we have
+ * stasis_caching_topic_create(), providing it with the topic which publishes
+ * the messages that you wish to cache, and a function that can identify
+ * cacheable messages.
+ *
+ * The returned \ref stasis_caching_topic provides a topic that forwards
+ * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * stasis_cache_update message which provides the old snapshot (or \c NULL if
+ * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
+ * removed from the cache). A stasis_cache_clear_create() message must be sent
+ * to the topic in order to remove entries from the cache.
+ *
+ * As with all things Stasis, the \ref stasis_caching_topic is a reference
+ * counted AO2 object.
+ *
+ * \par stasis_subscriber
+ *
+ * Any topic may be subscribed to by simply providing stasis_subscribe() the
+ * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
+ * data that is passed back to the handler. Invocations on the subscription's
+ * handler are serialized, but differen invocations may occur on different
+ * threads (this usually isn't important unless you use thread locals or
+ * something similar).
+ *
+ * Since the topic (by necessity) holds a reference to the subscription,
+ * reference counting alone is insufficient to terminate a subscription. In
+ * order to stop receiving messages, call stasis_unsubscribe() with your \ref
+ * stasis_subscription. This will remove the topic's reference to the
+ * subscription, and allow it to be destroyed when all of the other references
+ * are cleaned up.
+ */
+
+/*! @{ */
+
+/*!
+ * \brief Metadata about a \ref stasis_message.
+ * \since 12
+ */
+struct stasis_message_type;
+
+/*!
+ * \brief Register a new message type.
+ *
+ * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
+ * with it.
+ *
+ * \param name Name of the new type.
+ * \return Pointer to the new type.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type_create(const char *name);
+
+/*!
+ * \brief Gets the name of a given message type
+ * \param type The type to get.
+ * \return Name of the type.
+ * \return \c NULL if \a type is \c NULL.
+ * \since 12
+ */
+const char *stasis_message_type_name(const struct stasis_message_type *type);
+
+/*!
+ * \brief Opaque type for a Stasis message.
+ * \since 12
+ */
+struct stasis_message;
+
+/*!
+ * \brief Create a new message.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \return New message
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Get the message type for a \ref stasis_message.
+ * \param msg Message to type
+ * \return Type of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the data contained in a message.
+ * \param msg Message.
+ * \return Immutable data pointer
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+void *stasis_message_data(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the time when a message was created.
+ * \param msg Message.
+ * \return Pointer to the \a timeval when the message was created.
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic to which messages may be posted, and subscribers, well, subscribe
+ * \since 12
+ */
+struct stasis_topic;
+
+/*!
+ * \brief Create a new topic.
+ * \param name Name of the new topic.
+ * \return New topic instance.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_topic *stasis_topic_create(const char *name);
+
+/*!
+ * \brief Return the name of a topic.
+ * \param topic Topic.
+ * \return Name of the topic.
+ * \return \c NULL if topic is \c NULL.
+ * \since 12
+ */
+const char *stasis_topic_name(const struct stasis_topic *topic);
+
+/*!
+ * \brief Publish a message to a topic's subscribers.
+ * \param topic Topic.
+ * \param message Message to publish.
+ * \since 12
+ */
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Publish a message from a specified topic to all the subscribers of a
+ * possibly different topic.
+ * \param topic Topic to publish message to.
+ * \param topic Original topic message was from.
+ * \param message Message
+ * \since 12
+ */
+void stasis_forward_message(struct stasis_topic *topic,
+ struct stasis_topic *publisher_topic,
+ struct stasis_message *message);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Callback function type for Stasis subscriptions.
+ * \param data Data field provided with subscription.
+ * \param topic Topic to which the message was published.
+ * \param message Published message.
+ * \since 12
+ */
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
+ * \brief Create a subscription.
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but may not always occur on
+ * the same thread. The invocation order of different subscriptions is
+ * unspecified.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data);
+
+/*!
+ * \brief Cancel a subscription.
+ *
+ * Note that in an asynchronous system, there may still be messages queued or
+ * in transit to the subscription's callback. These will still be delivered.
+ * There will be a final 'SubscriptionCancelled' message, indicating the
+ * delivery of the final message.
+ *
+ * \param subscription Subscription to cancel.
+ * \since 12
+ */
+void stasis_unsubscribe(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Create a subscription which forwards all messages from one topic to
+ * another.
+ *
+ * Note that the \a topic parameter of the invoked callback will the be \a topic
+ * the message was sent to, not the topic the subscriber subscribed to.
+ *
+ * \param from_topic Topic to forward.
+ * \param to_topic Destination topic of forwarded messages.
+ * \return New forwarding subscription.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic wrapper, which caches certain messages.
+ * \since 12
+ */
+struct stasis_caching_topic;
+
+/*!
+ * \brief Message type for cache update messages.
+ * \return Message type for cache update messages.
+ * \since 12
+ */
+struct stasis_message_type *stasis_cache_update(void);
+
+/*!
+ * \brief Cache update message
+ * \since 12
+ */
+struct stasis_cache_update {
+ /*! \brief Topic that published \c new_snapshot */
+ struct stasis_topic *topic;
+ /*! \brief Convenience reference to snapshot type */
+ struct stasis_message_type *type;
+ /*! \brief Old value from the cache */
+ struct stasis_message *old_snapshot;
+ /*! \brief New value */
+ struct stasis_message *new_snapshot;
+};
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from its cache.
+ * \param type Message type.
+ * \param id Unique id of the snapshot to clear.
+ * \return Message which, when sent to the \a topic, will clear the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a topic which monitors and caches messages from another topic.
+ *
+ * The idea is that some topics publish 'snapshots' of some other object's state
+ * that should be cached. When these snapshot messages are received, the cache
+ * is updated, and a stasis_cache_update() message is forwarded, which has both
+ * the original snapshot message and the new message.
+ *
+ * \param original_topic Topic publishing snapshot messages.
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New topic which changes snapshot messages to stasis_cache_update()
+ * messages, and forwards all other messages from the original topic.
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * \brief Returns the topic of cached events from a caching topics.
+ * \param caching_topic The caching topic.
+ * \return The topic that publishes cache update events, along with passthrough events
+ * from the underlying topic.
+ * \return \c NULL if \a caching_topic is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Retrieve an item from the cache.
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param id Identity of the snapshot to retrieve.
+ * \return Message from the cache. The cache still owns the message, so
+ * ao2_ref() if you want to keep it.
+ * \return \c NULL if message is not found.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
+ struct stasis_message_type *type,
+ const char *id);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Initialize the Stasis subsystem
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_init(void);
+
+/*!
+ * \private
+ * \brief called by stasis_init() for cache initialization.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_cache_init(void);
+
+/*! @} */
+
+#endif /* _ASTERISK_STASIS_H */
Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: team/dlee/stasis-core/main/asterisk.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/asterisk.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/asterisk.c (original)
+++ team/dlee/stasis-core/main/asterisk.c Mon Feb 18 16:20:25 2013
@@ -240,6 +240,7 @@
#include "asterisk/aoc.h"
#include "asterisk/uuid.h"
#include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
#include "../defaults.h"
@@ -4120,6 +4121,11 @@
exit(1);
}
+ if (stasis_init()) {
+ printf("Stasis initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
ast_makesocket();
sigemptyset(&sigs);
sigaddset(&sigs, SIGHUP);
Modified: team/dlee/stasis-core/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/channel.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/channel.c (original)
+++ team/dlee/stasis-core/main/channel.c Mon Feb 18 16:20:25 2013
@@ -151,6 +151,15 @@
/*! \brief All active channels on the system */
static struct ao2_container *channels;
+
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__ast_channel_snapshot;
+
+static struct stasis_message_type *__ast_channel_varset_event;
+
+struct stasis_topic *__ast_channel_events_all;
+
+struct stasis_caching_topic *__ast_channel_events_all_cached;
/*! \brief map AST_CAUSE's to readable string representations
*
@@ -213,6 +222,69 @@
{ AST_CAUSE_PROTOCOL_ERROR, "PROTOCOL_ERROR", "Protocol error, unspecified" },
{ AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
};
+
+static void stasis_publish_channel_state(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ snapshot = ast_channel_snapshot_create(chan);
+ if (!snapshot) {
+ ast_log(LOG_ERROR, "Allocation error\n");
+ return;
+ }
+
+ message = stasis_message_create(ast_channel_snapshot(), snapshot);
+ if (!message) {
+ return;
+ }
+
+ ast_assert(ast_channel_events(chan) != NULL);
+ stasis_publish(ast_channel_events(chan), message);
+}
+
+static void ast_channel_varset_event_dtor(void *obj)
+{
+ struct ast_channel_varset_event *event = obj;
+ ast_free(event->channel_name);
+ ast_free(event->uniqueid);
+ ast_free(event->variable);
+ ast_free(event->value);
+}
+
+void ast_channel_send_varset_event(struct ast_channel *chan, const char *name, const char *value)
+{
+ RAII_VAR(struct ast_channel_varset_event *, event, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ event = ao2_alloc(sizeof(*event), ast_channel_varset_event_dtor);
+ if (chan) {
+ event->channel_name = ast_strdup(ast_channel_name(chan));
+ event->uniqueid = ast_strdup(ast_channel_name(chan));
+ } else {
+ event->channel_name = ast_strdup("none");
+ event->uniqueid = ast_strdup("none");
+ }
+ event->variable = ast_strdup(name);
+ event->value = ast_strdup(value);
+
+ msg = stasis_message_create(ast_channel_varset_event(), event);
+
+ if (chan) {
+ stasis_publish(ast_channel_events(chan), msg);
+ } else {
+ stasis_publish(ast_channel_events_all(), msg);
+ }
+}
+
+
+static void stasis_publish_cache_clear(struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+ message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+ stasis_publish(ast_channel_events(chan), message);
+}
struct ast_variable *ast_channeltype_list(void)
{
@@ -1073,6 +1145,8 @@
ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
}
+ ast_channel_internal_setup_events(tmp);
+
if (!ast_strlen_zero(name_fmt)) {
char *slash, *slash2;
/* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1219,7 @@
* a lot of data into this func to do it here!
*/
if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a new channel is created.</synopsis>
- <syntax>
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
- <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "AccountCode: %s\r\n"
- "Exten: %s\r\n"
- "Context: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(tmp),
- state,
- ast_state2str(state),
- S_OR(cid_num, ""),
- S_OR(cid_name, ""),
- ast_channel_accountcode(tmp),
- S_OR(exten, ""),
- S_OR(context, ""),
- ast_channel_uniqueid(tmp));
+ stasis_publish_channel_state(tmp);
}
ast_channel_internal_finalize(tmp);
@@ -2893,39 +2940,9 @@
ast_channel_unlock(chan);
ast_cc_offer(chan);
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel is hung up.</synopsis>
- <syntax>
- <parameter name="Cause">
- <para>A numeric cause code for why the channel was hung up.</para>
- </parameter>
- <parameter name="Cause-txt">
- <para>A description of why the channel was hung up.</para>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
- "Channel: %s\r\n"
- "Uniqueid: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "AccountCode: %s\r\n"
- "Cause: %d\r\n"
- "Cause-txt: %s\r\n",
- ast_channel_name(chan),
- ast_channel_uniqueid(chan),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
- ast_channel_accountcode(chan),
- ast_channel_hangupcause(chan),
- ast_cause2str(ast_channel_hangupcause(chan))
- );
+
+ stasis_publish_channel_state(chan);
+ stasis_publish_cache_clear(chan);
if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
!ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7452,7 @@
* we override what they are saying the state is and things go amuck. */
ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
- /* setstate used to conditionally report Newchannel; this is no more */
- /*** DOCUMENTATION
- <managerEventInstance>
- <synopsis>Raised when a channel's state changes.</synopsis>
- <syntax>
- <parameter name="ChannelState">
- <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
- </parameter>
- <parameter name="ChannelStateDesc">
- <enumlist>
- <enum name="Down"/>
- <enum name="Rsrvd"/>
- <enum name="OffHook"/>
- <enum name="Dialing"/>
- <enum name="Ring"/>
- <enum name="Ringing"/>
- <enum name="Up"/>
- <enum name="Busy"/>
- <enum name="Dialing Offhook"/>
- <enum name="Pre-ring"/>
- <enum name="Unknown"/>
- </enumlist>
- </parameter>
- </syntax>
- </managerEventInstance>
- ***/
- ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
- "Channel: %s\r\n"
- "ChannelState: %d\r\n"
- "ChannelStateDesc: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "Uniqueid: %s\r\n",
- ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
- S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
- S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
- S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
- S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
- ast_channel_uniqueid(chan));
+ stasis_publish_channel_state(chan);
return 0;
}
@@ -8644,6 +8621,12 @@
static void channels_shutdown(void)
{
+ ao2_cleanup(__ast_channel_snapshot);
+ __ast_channel_snapshot = NULL;
+ ao2_cleanup(__ast_channel_events_all);
+ __ast_channel_events_all = NULL;
+ ao2_cleanup(__ast_channel_events_all_cached);
+ __ast_channel_events_all_cached = NULL;
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
@@ -8653,6 +8636,16 @@
}
}
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+ struct ast_channel_snapshot *snapshot;
+ if (ast_channel_snapshot() != stasis_message_type(message)) {
+ return NULL;
+ }
+ snapshot = stasis_message_data(message);
+ return snapshot->uniqueid;
+}
+
void ast_channels_init(void)
{
channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8654,12 @@
ao2_container_register("channels", channels, prnt_channel_key);
}
+ __ast_channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+ __ast_channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
+
+ __ast_channel_events_all = stasis_topic_create("ast_channel_events_all");
+ __ast_channel_events_all_cached = stasis_caching_topic_create(__ast_channel_events_all, channel_snapshot_get_id);
+
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8667,7 @@
ast_plc_reload();
ast_register_atexit(channels_shutdown);
+
}
/*! \brief Print call group and pickup group ---*/
@@ -11279,3 +11279,99 @@
{
ao2_unlink(channels, chan);
}
+
+static void ast_channel_snapshot_dtor(void *obj)
+{
+ struct ast_channel_snapshot *snapshot = obj;
+ ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+ snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+ if (ast_string_field_init(snapshot, 1024)) {
+ return NULL;
+ }
+
+ ast_string_field_set(snapshot, name, ast_channel_name(chan));
+ ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+ ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+ ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+ ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+ ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+ ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+ ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+ if (ast_channel_appl(chan)) {
+ ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+ }
+ if (ast_channel_data(chan)) {
+ ast_string_field_set(snapshot, data, ast_channel_data(chan));
+ }
+ ast_string_field_set(snapshot, context, ast_channel_context(chan));
+ ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+ ast_string_field_set(snapshot, caller_name,
+ S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, caller_number,
+ S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+ ast_string_field_set(snapshot, connected_name,
+ S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+ ast_string_field_set(snapshot, connected_number,
+ S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+ snapshot->creationtime = ast_channel_creationtime(chan);
+ snapshot->state = ast_channel_state(chan);
+ snapshot->priority = ast_channel_priority(chan);
+ snapshot->amaflags = ast_channel_amaflags(chan);
+ snapshot->hangupcause = ast_channel_hangupcause(chan);
+ snapshot->flags = *ast_channel_flags(chan);
+
+ ao2_ref(snapshot, +1);
+ return snapshot;
+}
+
+int ast_channel_snapshot_cmp(void *obj, void *arg, int flags)
+{
+ /* Compare string fields */
+ const struct ast_channel_snapshot *one = obj, *two = arg;
+ int res;
+ size_t non_str_len, offset;
+
+ if (flags != 0) {
+ /* No support for OBJ_KEY matching */
+ return 0;
+ }
+
+ res = ast_string_fields_cmp(one, two);
+ if (res) {
+ return res;
+ }
+
+ /* Compare the rest of the structure */
+ offset = (const void *)&one->creationtime - (const void *)one;
+ non_str_len = sizeof(*one) - offset;
+ return memcmp(one + offset, two + offset, non_str_len);
+}
+
+struct stasis_message_type *ast_channel_varset_event(void)
+{
+ return __ast_channel_varset_event;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+ return __ast_channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_events_all(void)
+{
+ return __ast_channel_events_all;
+}
+
+struct stasis_caching_topic *ast_channel_events_all_cached(void)
+{
+ return __ast_channel_events_all_cached;
+}
Modified: team/dlee/stasis-core/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/channel_internal_api.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/channel_internal_api.c (original)
+++ team/dlee/stasis-core/main/channel_internal_api.c Mon Feb 18 16:20:25 2013
@@ -195,6 +195,8 @@
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+ struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
};
/* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@
}
ast_string_field_free_memory(chan);
+
+ stasis_unsubscribe(chan->forwarder);
+ chan->forwarder = NULL;
+
+ ao2_cleanup(chan->topic);
+ chan->topic = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@
{
return chan->finalized;
}
+
+struct stasis_topic *ast_channel_events(struct ast_channel *chan)
+{
+ return chan->topic;
+}
+
+void ast_channel_internal_setup_events(struct ast_channel *chan)
+{
+ ast_assert(chan->topic == NULL);
+ ast_assert(chan->forwarder == NULL);
+ chan->topic = stasis_topic_create(chan->uniqueid);
+ chan->forwarder = stasis_forward_all(chan->topic, ast_channel_events_all());
+}
Modified: team/dlee/stasis-core/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/manager.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/manager.c (original)
+++ team/dlee/stasis-core/main/manager.c Mon Feb 18 16:20:25 2013
@@ -91,6 +91,7 @@
#include "asterisk/aoc.h"
#include "asterisk/stringfields.h"
#include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@
manager.conf will be present upon starting a new session.</para>
</description>
</manager>
+ <managerEvent language="en_US" name="Newchannel">
+ <managerEventInstance class="EVENT_FLAG_CALL">
+ <synopsis>Raised when a new channel is created.</synopsis>
+ <syntax>
+ <parameter name="Channel">
+ </parameter>
+ <parameter name="ChannelState">
+ <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+ </parameter>
+ <parameter name="ChannelStateDesc">
+ <enumlist>
+ <enum name="Down"/>
+ <enum name="Rsrvd"/>
[... 1868 lines stripped ...]
More information about the asterisk-commits
mailing list