[asterisk-commits] dlee: branch dlee/stasis-core r381701 - in /team/dlee/stasis-core: include/as...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Feb 18 16:20:29 CST 2013


Author: dlee
Date: Mon Feb 18 16:20:25 2013
New Revision: 381701

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381701
Log:
Initial stasis-core work, for review

Added:
    team/dlee/stasis-core/include/asterisk/stasis.h   (with props)
    team/dlee/stasis-core/main/stasis.c   (with props)
    team/dlee/stasis-core/main/stasis_cache.c   (with props)
    team/dlee/stasis-core/main/stasis_message.c   (with props)
    team/dlee/stasis-core/tests/test_stasis.c   (with props)
Modified:
    team/dlee/stasis-core/include/asterisk/channel.h
    team/dlee/stasis-core/include/asterisk/channel_internal.h
    team/dlee/stasis-core/main/asterisk.c
    team/dlee/stasis-core/main/channel.c
    team/dlee/stasis-core/main/channel_internal_api.c
    team/dlee/stasis-core/main/manager.c
    team/dlee/stasis-core/main/pbx.c

Modified: team/dlee/stasis-core/include/asterisk/channel.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/channel.h?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/channel.h (original)
+++ team/dlee/stasis-core/include/asterisk/channel.h Mon Feb 18 16:20:25 2013
@@ -125,7 +125,6 @@
 
 #include "asterisk/abstract_jb.h"
 #include "asterisk/astobj2.h"
-
 #include "asterisk/poll-compat.h"
 
 #if defined(__cplusplus) || defined(c_plusplus)
@@ -151,6 +150,7 @@
 #include "asterisk/channelstate.h"
 #include "asterisk/ccss.h"
 #include "asterisk/framehook.h"
+#include "asterisk/stasis.h"
 
 #define DATASTORE_INHERIT_FOREVER	INT_MAX
 
@@ -4102,4 +4102,130 @@
 void ast_channel_dialed_causes_clear(const struct ast_channel *chan);
 
 struct ast_flags *ast_channel_flags(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Structure representing a snapshot of channel state.
+ *
+ * While not enforced programmatically, this object is shared across multiple
+ * threads, and should be threated as an immutable object.
+ */
+struct ast_channel_snapshot {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(name);			/*!< ASCII unique channel name */
+		AST_STRING_FIELD(accountcode);		/*!< Account code for billing */
+		AST_STRING_FIELD(peeraccount);		/*!< Peer account code for billing */
+		AST_STRING_FIELD(userfield);		/*!< Userfield for CEL billing */
+		AST_STRING_FIELD(uniqueid);		/*!< Unique Channel Identifier */
+		AST_STRING_FIELD(linkedid);		/*!< Linked Channel Identifier -- gets propagated by linkage */
+		AST_STRING_FIELD(parkinglot);		/*!< Default parking lot, if empty, default parking lot */
+		AST_STRING_FIELD(hangupsource);		/*!< Who is responsible for hanging up this channel */
+		AST_STRING_FIELD(appl);			/*!< Current application */
+		AST_STRING_FIELD(data);			/*!< Data passed to current application */
+		AST_STRING_FIELD(context);		/*!< Dialplan: Current extension context */
+		AST_STRING_FIELD(exten);		/*!< Dialplan: Current extension number */
+		AST_STRING_FIELD(caller_name);		/*!< Caller ID Name */
+		AST_STRING_FIELD(caller_number);	/*!< Caller ID Number */
+		AST_STRING_FIELD(connected_name);	/*!< Connected Line Name */
+		AST_STRING_FIELD(connected_number);	/*!< Connected Line Number */
+	);
+
+	struct timeval creationtime;	/*!< The time of channel creation */
+	enum ast_channel_state state;	/*!< State of line */
+	int priority;			/*!< Dialplan: Current extension priority */
+	int amaflags;			/*!< AMA flags for billing */
+	int hangupcause;		/*!< Why is the channel hanged up. See causes.h */
+	struct ast_flags flags;		/*!< channel flags of AST_FLAG_ type */
+};
+
+/*!
+ * \since 12
+ * \brief Generate a snapshot of the channel state. This is an ao2 object, so
+ * ao2_cleanup() to deallocate.
+ *
+ * \param chan The channel from which to generate a snapshot
+ *
+ * \retval pointer on success (must be ast_freed)
+ * \retval NULL on error
+ */
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Compare two channel state snapshots, in their entirety.
+ *
+ * \param obj The first snapshot to compare
+ * \param args The second snapshot to compare
+ * \param flags astobj2 comparison flags
+ *
+ * \retval CMP_MATCH | CMP_STOP if object match.
+ * \retval 0 if objects don't match.
+ */
+int ast_channel_snapshot_cmp(void *obj, void *arg, int flags);
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_snapshot.
+ *
+ * \retval Message type for \ref ast_channel_snapshot.
+ */
+struct stasis_message_type *ast_channel_snapshot(void);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \param chan Channel.
+ *
+ * \retval Topic for channel's events.
+ * \retval \c NULL if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_events(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for all channels.
+ * \retval Topic for all channel events.
+ */
+struct stasis_topic *ast_channel_events_all(void);
+
+/*!
+ * \since 12
+ * \brief A caching topic which caches \ref ast_channel_snapshot messages from
+ * ast_channel_events_all(void).
+ *
+ * \retval Topic for all channel events.
+ */
+struct stasis_caching_topic *ast_channel_events_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Variable set event.
+ */
+struct ast_channel_varset_event {
+	char *channel_name;
+	char *uniqueid;
+	char *variable;
+	char *value;
+};
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_varset_event messages.
+ *
+ * \retval Message type for \ref ast_channel_varset_event messages.
+ */
+struct stasis_message_type *ast_channel_varset_event(void);
+
+/*!
+ * \since 12
+ * \brief Publish a \ref ast_channel_varset_event for a channel.
+ *
+ * \param chan Channel to pulish the event for, or \c NULL for 'none'.
+ * \param variable Name of the variable being set
+ * \param value Value.
+ */
+void ast_channel_send_varset_event(struct ast_channel *chan,
+				   const char *variable, const char *value);
+
 #endif /* _ASTERISK_CHANNEL_H */

Modified: team/dlee/stasis-core/include/asterisk/channel_internal.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/channel_internal.h?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/channel_internal.h (original)
+++ team/dlee/stasis-core/include/asterisk/channel_internal.h Mon Feb 18 16:20:25 2013
@@ -23,4 +23,5 @@
 void ast_channel_internal_finalize(struct ast_channel *chan);
 int ast_channel_internal_is_finalized(struct ast_channel *chan);
 void ast_channel_internal_cleanup(struct ast_channel *chan);
+void ast_channel_internal_setup_events(struct ast_channel *chan);
 

Added: team/dlee/stasis-core/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/include/asterisk/stasis.h?view=auto&rev=381701
==============================================================================
--- team/dlee/stasis-core/include/asterisk/stasis.h (added)
+++ team/dlee/stasis-core/include/asterisk/stasis.h Mon Feb 18 16:20:25 2013
@@ -1,0 +1,444 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_H
+#define _ASTERISK_STASIS_H
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
+ * detailed documentation.
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ * \since 12
+ *
+ * \page stasis Stasis Message Bus API
+ *
+ * \par Intro
+ *
+ * The Stasis Message Bus is a loosely typed mechanism for distributing messages
+ * within Asterisk. It is designed to be:
+ *  - Loosely coupled; new message types can be added in seperate modules
+ *  - Easy to use; publishing and subscribing are straightforward operations
+ *  - Consistent memory management; all message bus objects are AO2 managed objects, using
+ *    ao2_ref() and ao2_cleanup() to manage the reference counting.
+ *
+ * There are three main concepts for using the Stasis Message Bus:
+ *  - \ref stasis_message
+ *  - \ref stasis_topic
+ *  - \ref stasis_subscription
+ *
+ * \par stasis_message
+ *
+ * Central to the Stasis Message Bus is the \ref stasis_message, the messages
+ * that are sent on the bus. These messages have:
+ *  - a type (as defined by a \ref stasis_message_type)
+ *  - a value - a \c void pointer to an AO2 object
+ *  - a timestamp when it was created
+ *
+ * Once a \ref stasis_message has been created, it is immutable and cannot
+ * change. The same goes for the value of the message (although this cannot be
+ * enforced in code). Messages themselves are reference-counted, AO2 objects,
+ * along with their values. By being both reference counted and immutable,
+ * messages can be shared throughout the system without any concerns for
+ * threading. (Well, the objects must be allocated with \ref
+ * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
+ * safe. But other than that, no worries).
+ *
+ * The type of a message is defined by an instance of \ref stasis_message_type,
+ * which can be created by calling stasis_message_type_create(). Message types
+ * are named, which is useful in debugging. It is recommended that the string
+ * name for a message type match the name of the struct that's stored in the
+ * message. For example, name for \ref stasis_cache_update's message type is \c
+ * "stasis_cache_update".
+ *
+ * \par stasis_topic
+ *
+ * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
+ * subscribed, and \ref stasis_message's may be published. Any message published
+ * to the topic is dispatched to all of its subscribers. The topic itself may be
+ * named, which is useful in debugging.
+ *
+ * Topics themselves are reference counted objects, and automagically
+ * unsubscribe all of their subscribers when they are destroyed. Topics are also
+ * thread safe, so no worries about publishing/subscribing/unsubscribing to a
+ * topic concurrently from multiple threads. It's also designed to handle the
+ * case of unsubscribing from a topic from within the subscription handler.
+ *
+ * \par Forwarding
+ *
+ * There is one special case of topics that's worth noting: forwarding
+ * messages. It's a fairly common use case to want to forward all the messages
+ * published on one topic to another one (for example, an aggregator topic that
+ * publishes all the events from a set of other topics). This can be
+ * accomplished easily using stasis_forward_all(). This sets up the forwarding
+ * between the two topics, and returns a \ref stasis_subscription, which can be
+ * unsubscribed to stop the forwarding.
+ *
+ * \par Caching
+ *
+ * Another common use case is to want to cache certain messages that are
+ * published on the bus. Usually these events are snapshots of the current state
+ * in the system, and it's desirable to query that state from the cache without
+ * locking the original object. It's also desirable for subscribers of the
+ * caching topic to receive messages that have both the old cache value and the
+ * new value being put into the cache. For this, we have
+ * stasis_caching_topic_create(), providing it with the topic which publishes
+ * the messages that you wish to cache, and a function that can identify
+ * cacheable messages.
+ *
+ * The returned \ref stasis_caching_topic provides a topic that forwards
+ * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * stasis_cache_update message which provides the old snapshot (or \c NULL if
+ * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
+ * removed from the cache). A stasis_cache_clear_create() message must be sent
+ * to the topic in order to remove entries from the cache.
+ *
+ * As with all things Stasis, the \ref stasis_caching_topic is a reference
+ * counted AO2 object.
+ *
+ * \par stasis_subscriber
+ *
+ * Any topic may be subscribed to by simply providing stasis_subscribe() the
+ * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
+ * data that is passed back to the handler. Invocations on the subscription's
+ * handler are serialized, but differen invocations may occur on different
+ * threads (this usually isn't important unless you use thread locals or
+ * something similar).
+ *
+ * Since the topic (by necessity) holds a reference to the subscription,
+ * reference counting alone is insufficient to terminate a subscription. In
+ * order to stop receiving messages, call stasis_unsubscribe() with your \ref
+ * stasis_subscription. This will remove the topic's reference to the
+ * subscription, and allow it to be destroyed when all of the other references
+ * are cleaned up.
+ */
+
+/*! @{ */
+
+/*!
+ * \brief Metadata about a \ref stasis_message.
+ * \since 12
+ */
+struct stasis_message_type;
+
+/*!
+ * \brief Register a new message type.
+ *
+ * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
+ * with it.
+ *
+ * \param name Name of the new type.
+ * \return Pointer to the new type.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type_create(const char *name);
+
+/*!
+ * \brief Gets the name of a given message type
+ * \param type The type to get.
+ * \return Name of the type.
+ * \return \c NULL if \a type is \c NULL.
+ * \since 12
+ */
+const char *stasis_message_type_name(const struct stasis_message_type *type);
+
+/*!
+ * \brief Opaque type for a Stasis message.
+ * \since 12
+ */
+struct stasis_message;
+
+/*!
+ * \brief Create a new message.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \return New message
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Get the message type for a \ref stasis_message.
+ * \param msg Message to type
+ * \return Type of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the data contained in a message.
+ * \param msg Message.
+ * \return Immutable data pointer
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+void *stasis_message_data(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the time when a message was created.
+ * \param msg Message.
+ * \return Pointer to the \a timeval when the message was created.
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic to which messages may be posted, and subscribers, well, subscribe
+ * \since 12
+ */
+struct stasis_topic;
+
+/*!
+ * \brief Create a new topic.
+ * \param name Name of the new topic.
+ * \return New topic instance.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_topic *stasis_topic_create(const char *name);
+
+/*!
+ * \brief Return the name of a topic.
+ * \param topic Topic.
+ * \return Name of the topic.
+ * \return \c NULL if topic is \c NULL.
+ * \since 12
+ */
+const char *stasis_topic_name(const struct stasis_topic *topic);
+
+/*!
+ * \brief Publish a message to a topic's subscribers.
+ * \param topic Topic.
+ * \param message Message to publish.
+ * \since 12
+ */
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Publish a message from a specified topic to all the subscribers of a
+ * possibly different topic.
+ * \param topic Topic to publish message to.
+ * \param topic Original topic message was from.
+ * \param message Message
+ * \since 12
+ */
+void stasis_forward_message(struct stasis_topic *topic,
+			    struct stasis_topic *publisher_topic,
+			    struct stasis_message *message);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Callback function type for Stasis subscriptions.
+ * \param data Data field provided with subscription.
+ * \param topic Topic to which the message was published.
+ * \param message Published message.
+ * \since 12
+ */
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
+ * \brief Create a subscription.
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but may not always occur on
+ * the same thread. The invocation order of different subscriptions is
+ * unspecified.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
+					     stasis_subscription_cb callback,
+					     void *data);
+
+/*!
+ * \brief Cancel a subscription.
+ *
+ * Note that in an asynchronous system, there may still be messages queued or
+ * in transit to the subscription's callback. These will still be delivered.
+ * There will be a final 'SubscriptionCancelled' message, indicating the
+ * delivery of the final message.
+ *
+ * \param subscription Subscription to cancel.
+ * \since 12
+ */
+void stasis_unsubscribe(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Create a subscription which forwards all messages from one topic to
+ * another.
+ *
+ * Note that the \a topic parameter of the invoked callback will the be \a topic
+ * the message was sent to, not the topic the subscriber subscribed to.
+ *
+ * \param from_topic Topic to forward.
+ * \param to_topic Destination topic of forwarded messages.
+ * \return New forwarding subscription.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic wrapper, which caches certain messages.
+ * \since 12
+ */
+struct stasis_caching_topic;
+
+/*!
+ * \brief Message type for cache update messages.
+ * \return Message type for cache update messages.
+ * \since 12
+ */
+struct stasis_message_type *stasis_cache_update(void);
+
+/*!
+ * \brief Cache update message
+ * \since 12
+ */
+struct stasis_cache_update {
+	/*! \brief Topic that published \c new_snapshot */
+	struct stasis_topic *topic;
+	/*! \brief Convenience reference to snapshot type */
+	struct stasis_message_type *type;
+	/*! \brief Old value from the cache */
+	struct stasis_message *old_snapshot;
+	/*! \brief New value */
+	struct stasis_message *new_snapshot;
+};
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from its cache.
+ * \param type Message type.
+ * \param id Unique id of the snapshot to clear.
+ * \return Message which, when sent to the \a topic, will clear the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a topic which monitors and caches messages from another topic.
+ *
+ * The idea is that some topics publish 'snapshots' of some other object's state
+ * that should be cached. When these snapshot messages are received, the cache
+ * is updated, and a stasis_cache_update() message is forwarded, which has both
+ * the original snapshot message and the new message.
+ *
+ * \param original_topic Topic publishing snapshot messages.
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New topic which changes snapshot messages to stasis_cache_update()
+ *         messages, and forwards all other messages from the original topic.
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * \brief Returns the topic of cached events from a caching topics.
+ * \param caching_topic The caching topic.
+ * \return The topic that publishes cache update events, along with passthrough events
+ *         from the underlying topic.
+ * \return \c NULL if \a caching_topic is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Retrieve an item from the cache.
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param id Identity of the snapshot to retrieve.
+ * \return Message from the cache. The cache still owns the message, so
+ *         ao2_ref() if you want to keep it.
+ * \return \c NULL if message is not found.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
+					struct stasis_message_type *type,
+					const char *id);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Initialize the Stasis subsystem
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_init(void);
+
+/*!
+ * \private
+ * \brief called by stasis_init() for cache initialization.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_cache_init(void);
+
+/*! @} */
+
+#endif /* _ASTERISK_STASIS_H */

Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Propchange: team/dlee/stasis-core/include/asterisk/stasis.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/dlee/stasis-core/main/asterisk.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/asterisk.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/asterisk.c (original)
+++ team/dlee/stasis-core/main/asterisk.c Mon Feb 18 16:20:25 2013
@@ -240,6 +240,7 @@
 #include "asterisk/aoc.h"
 #include "asterisk/uuid.h"
 #include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
 
 #include "../defaults.h"
 
@@ -4120,6 +4121,11 @@
 		exit(1);
 	}
 
+	if (stasis_init()) {
+		printf("Stasis initialization failed.\n%s", term_quit());
+		exit(1);
+	}
+
 	ast_makesocket();
 	sigemptyset(&sigs);
 	sigaddset(&sigs, SIGHUP);

Modified: team/dlee/stasis-core/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/channel.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/channel.c (original)
+++ team/dlee/stasis-core/main/channel.c Mon Feb 18 16:20:25 2013
@@ -151,6 +151,15 @@
 
 /*! \brief All active channels on the system */
 static struct ao2_container *channels;
+
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__ast_channel_snapshot;
+
+static struct stasis_message_type *__ast_channel_varset_event;
+
+struct stasis_topic *__ast_channel_events_all;
+
+struct stasis_caching_topic *__ast_channel_events_all_cached;
 
 /*! \brief map AST_CAUSE's to readable string representations
  *
@@ -213,6 +222,69 @@
 	{ AST_CAUSE_PROTOCOL_ERROR, "PROTOCOL_ERROR", "Protocol error, unspecified" },
 	{ AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
 };
+
+static void stasis_publish_channel_state(struct ast_channel *chan)
+{
+	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+	snapshot = ast_channel_snapshot_create(chan);
+	if (!snapshot) {
+		ast_log(LOG_ERROR, "Allocation error\n");
+		return;
+	}
+
+	message = stasis_message_create(ast_channel_snapshot(), snapshot);
+	if (!message) {
+		return;
+	}
+
+	ast_assert(ast_channel_events(chan) != NULL);
+	stasis_publish(ast_channel_events(chan), message);
+}
+
+static void ast_channel_varset_event_dtor(void *obj)
+{
+	struct ast_channel_varset_event *event = obj;
+	ast_free(event->channel_name);
+	ast_free(event->uniqueid);
+	ast_free(event->variable);
+	ast_free(event->value);
+}
+
+void ast_channel_send_varset_event(struct ast_channel *chan, const char *name, const char *value)
+{
+	RAII_VAR(struct ast_channel_varset_event *, event, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+	event = ao2_alloc(sizeof(*event), ast_channel_varset_event_dtor);
+	if (chan) {
+		event->channel_name = ast_strdup(ast_channel_name(chan));
+		event->uniqueid = ast_strdup(ast_channel_name(chan));
+	} else {
+		event->channel_name = ast_strdup("none");
+		event->uniqueid = ast_strdup("none");
+	}
+	event->variable = ast_strdup(name);
+	event->value = ast_strdup(value);
+
+	msg = stasis_message_create(ast_channel_varset_event(), event);
+
+	if (chan) {
+		stasis_publish(ast_channel_events(chan), msg);
+	} else {
+		stasis_publish(ast_channel_events_all(), msg);
+	}
+}
+
+
+static void stasis_publish_cache_clear(struct ast_channel *chan)
+{
+	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+	message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+	stasis_publish(ast_channel_events(chan), message);
+}
 
 struct ast_variable *ast_channeltype_list(void)
 {
@@ -1073,6 +1145,8 @@
 		ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
 	}
 
+	ast_channel_internal_setup_events(tmp);
+
 	if (!ast_strlen_zero(name_fmt)) {
 		char *slash, *slash2;
 		/* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1219,7 @@
 	 * a lot of data into this func to do it here!
 	 */
 	if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
-		/*** DOCUMENTATION
-			<managerEventInstance>
-				<synopsis>Raised when a new channel is created.</synopsis>
-				<syntax>
-					<xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
-					<xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
-				</syntax>
-			</managerEventInstance>
-		***/
-		ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
-			"Channel: %s\r\n"
-			"ChannelState: %d\r\n"
-			"ChannelStateDesc: %s\r\n"
-			"CallerIDNum: %s\r\n"
-			"CallerIDName: %s\r\n"
-			"AccountCode: %s\r\n"
-			"Exten: %s\r\n"
-			"Context: %s\r\n"
-			"Uniqueid: %s\r\n",
-			ast_channel_name(tmp),
-			state,
-			ast_state2str(state),
-			S_OR(cid_num, ""),
-			S_OR(cid_name, ""),
-			ast_channel_accountcode(tmp),
-			S_OR(exten, ""),
-			S_OR(context, ""),
-			ast_channel_uniqueid(tmp));
+		stasis_publish_channel_state(tmp);
 	}
 
 	ast_channel_internal_finalize(tmp);
@@ -2893,39 +2940,9 @@
 	ast_channel_unlock(chan);
 
 	ast_cc_offer(chan);
-	/*** DOCUMENTATION
-		<managerEventInstance>
-			<synopsis>Raised when a channel is hung up.</synopsis>
-				<syntax>
-					<parameter name="Cause">
-						<para>A numeric cause code for why the channel was hung up.</para>
-					</parameter>
-					<parameter name="Cause-txt">
-						<para>A description of why the channel was hung up.</para>
-					</parameter>
-				</syntax>
-		</managerEventInstance>
-	***/
-	ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
-		"Channel: %s\r\n"
-		"Uniqueid: %s\r\n"
-		"CallerIDNum: %s\r\n"
-		"CallerIDName: %s\r\n"
-		"ConnectedLineNum: %s\r\n"
-		"ConnectedLineName: %s\r\n"
-		"AccountCode: %s\r\n"
-		"Cause: %d\r\n"
-		"Cause-txt: %s\r\n",
-		ast_channel_name(chan),
-		ast_channel_uniqueid(chan),
-		S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
-		S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
-		S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
-		S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
-		ast_channel_accountcode(chan),
-		ast_channel_hangupcause(chan),
-		ast_cause2str(ast_channel_hangupcause(chan))
-		);
+
+	stasis_publish_channel_state(chan);
+	stasis_publish_cache_clear(chan);
 
 	if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
 		!ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7452,7 @@
 	 * we override what they are saying the state is and things go amuck. */
 	ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
 
-	/* setstate used to conditionally report Newchannel; this is no more */
-	/*** DOCUMENTATION
-		<managerEventInstance>
-			<synopsis>Raised when a channel's state changes.</synopsis>
-			<syntax>
-				<parameter name="ChannelState">
-					<para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
-				</parameter>
-				<parameter name="ChannelStateDesc">
-					<enumlist>
-						<enum name="Down"/>
-						<enum name="Rsrvd"/>
-						<enum name="OffHook"/>
-						<enum name="Dialing"/>
-						<enum name="Ring"/>
-						<enum name="Ringing"/>
-						<enum name="Up"/>
-						<enum name="Busy"/>
-						<enum name="Dialing Offhook"/>
-						<enum name="Pre-ring"/>
-						<enum name="Unknown"/>
-					</enumlist>
-				</parameter>
-			</syntax>
-		</managerEventInstance>
-	***/
-	ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
-		"Channel: %s\r\n"
-		"ChannelState: %d\r\n"
-		"ChannelStateDesc: %s\r\n"
-		"CallerIDNum: %s\r\n"
-		"CallerIDName: %s\r\n"
-		"ConnectedLineNum: %s\r\n"
-		"ConnectedLineName: %s\r\n"
-		"Uniqueid: %s\r\n",
-		ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
-		S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
-		S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
-		S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
-		S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
-		ast_channel_uniqueid(chan));
+	stasis_publish_channel_state(chan);
 
 	return 0;
 }
@@ -8644,6 +8621,12 @@
 
 static void channels_shutdown(void)
 {
+	ao2_cleanup(__ast_channel_snapshot);
+	__ast_channel_snapshot = NULL;
+	ao2_cleanup(__ast_channel_events_all);
+	__ast_channel_events_all = NULL;
+	ao2_cleanup(__ast_channel_events_all_cached);
+	__ast_channel_events_all_cached = NULL;
 	ast_data_unregister(NULL);
 	ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
 	if (channels) {
@@ -8653,6 +8636,16 @@
 	}
 }
 
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+	struct ast_channel_snapshot *snapshot;
+	if (ast_channel_snapshot() != stasis_message_type(message)) {
+		return NULL;
+	}
+	snapshot = stasis_message_data(message);
+	return snapshot->uniqueid;
+}
+
 void ast_channels_init(void)
 {
 	channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8654,12 @@
 		ao2_container_register("channels", channels, prnt_channel_key);
 	}
 
+	__ast_channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+	__ast_channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
+
+	__ast_channel_events_all = stasis_topic_create("ast_channel_events_all");
+	__ast_channel_events_all_cached = stasis_caching_topic_create(__ast_channel_events_all, channel_snapshot_get_id);
+
 	ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
 
 	ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8667,7 @@
 	ast_plc_reload();
 
 	ast_register_atexit(channels_shutdown);
+
 }
 
 /*! \brief Print call group and pickup group ---*/
@@ -11279,3 +11279,99 @@
 {
 	ao2_unlink(channels, chan);
 }
+
+static void ast_channel_snapshot_dtor(void *obj)
+{
+	struct ast_channel_snapshot *snapshot = obj;
+	ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+	snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+	if (ast_string_field_init(snapshot, 1024)) {
+		return NULL;
+	}
+
+	ast_string_field_set(snapshot, name, ast_channel_name(chan));
+	ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+	ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+	ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+	ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+	ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+	ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+	ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+	if (ast_channel_appl(chan)) {
+		ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+	}
+	if (ast_channel_data(chan)) {
+		ast_string_field_set(snapshot, data, ast_channel_data(chan));
+	}
+	ast_string_field_set(snapshot, context, ast_channel_context(chan));
+	ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+	ast_string_field_set(snapshot, caller_name,
+		S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+	ast_string_field_set(snapshot, caller_number,
+		S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+	ast_string_field_set(snapshot, connected_name,
+		S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+	ast_string_field_set(snapshot, connected_number,
+		S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+	snapshot->creationtime = ast_channel_creationtime(chan);
+	snapshot->state = ast_channel_state(chan);
+	snapshot->priority = ast_channel_priority(chan);
+	snapshot->amaflags = ast_channel_amaflags(chan);
+	snapshot->hangupcause = ast_channel_hangupcause(chan);
+	snapshot->flags = *ast_channel_flags(chan);
+
+	ao2_ref(snapshot, +1);
+	return snapshot;
+}
+
+int ast_channel_snapshot_cmp(void *obj, void *arg, int flags)
+{
+	/* Compare string fields */
+	const struct ast_channel_snapshot *one = obj, *two = arg;
+	int res;
+	size_t non_str_len, offset;
+
+	if (flags != 0) {
+		/* No support for OBJ_KEY matching */
+		return 0;
+	}
+
+	res = ast_string_fields_cmp(one, two);
+	if (res) {
+		return res;
+	}
+
+	/* Compare the rest of the structure */
+	offset = (const void *)&one->creationtime - (const void *)one;
+	non_str_len = sizeof(*one) - offset;
+	return memcmp(one + offset, two + offset, non_str_len);
+}
+
+struct stasis_message_type *ast_channel_varset_event(void)
+{
+	return __ast_channel_varset_event;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+	return __ast_channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_events_all(void)
+{
+	return __ast_channel_events_all;
+}
+
+struct stasis_caching_topic *ast_channel_events_all_cached(void)
+{
+	return __ast_channel_events_all_cached;
+}

Modified: team/dlee/stasis-core/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/channel_internal_api.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/channel_internal_api.c (original)
+++ team/dlee/stasis-core/main/channel_internal_api.c Mon Feb 18 16:20:25 2013
@@ -195,6 +195,8 @@
 	char dtmf_digit_to_emulate;			/*!< Digit being emulated */
 	char sending_dtmf_digit;			/*!< Digit this channel is currently sending out. (zero if not sending) */
 	struct timeval sending_dtmf_tv;		/*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+	struct stasis_topic *topic;			/*!< Topic for all channel's events */
+	struct stasis_subscription *forwarder;		/*!< Subscription for event forwarding to all topic */
 };
 
 /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@
 	}
 
 	ast_string_field_free_memory(chan);
+
+	stasis_unsubscribe(chan->forwarder);
+	chan->forwarder = NULL;
+
+	ao2_cleanup(chan->topic);
+	chan->topic = NULL;
 }
 
 void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@
 {
 	return chan->finalized;
 }
+
+struct stasis_topic *ast_channel_events(struct ast_channel *chan)
+{
+	return chan->topic;
+}
+
+void ast_channel_internal_setup_events(struct ast_channel *chan)
+{
+	ast_assert(chan->topic == NULL);
+	ast_assert(chan->forwarder == NULL);
+	chan->topic = stasis_topic_create(chan->uniqueid);
+	chan->forwarder = stasis_forward_all(chan->topic, ast_channel_events_all());
+}

Modified: team/dlee/stasis-core/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-core/main/manager.c?view=diff&rev=381701&r1=381700&r2=381701
==============================================================================
--- team/dlee/stasis-core/main/manager.c (original)
+++ team/dlee/stasis-core/main/manager.c Mon Feb 18 16:20:25 2013
@@ -91,6 +91,7 @@
 #include "asterisk/aoc.h"
 #include "asterisk/stringfields.h"
 #include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
 
 /*** DOCUMENTATION
 	<manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@
                         manager.conf will be present upon starting a new session.</para>
 		</description>
 	</manager>
+	<managerEvent language="en_US" name="Newchannel">
+		<managerEventInstance class="EVENT_FLAG_CALL">
+			<synopsis>Raised when a new channel is created.</synopsis>
+			<syntax>
+				<parameter name="Channel">
+				</parameter>
+				<parameter name="ChannelState">
+					<para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+				</parameter>
+				<parameter name="ChannelStateDesc">
+					<enumlist>
+						<enum name="Down"/>
+						<enum name="Rsrvd"/>

[... 1868 lines stripped ...]



More information about the asterisk-commits mailing list