[asterisk-commits] mjordan: branch 12 r428681 - in /branches/12: ./ apps/ channels/ configs/ inc...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Dec 1 09:53:20 CST 2014


Author: mjordan
Date: Mon Dec  1 09:53:02 2014
New Revision: 428681

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=428681
Log:
main/stasis: Allow subscriptions to use a threadpool for message delivery

Prior to this patch, all Stasis subscriptions would receive a dedicated
thread for servicing published messages. In contrast, prior to r400178
(see review https://reviewboard.asterisk.org/r/2881/), the subscriptions
shared a thread pool. It was discovered during some initial work on Stasis
that, for a low subscription count with high message throughput, the
threadpool was not as performant as simply having a dedicated thread per
subscriber.

For situations where a subscriber receives a substantial number of messages
and is always present, the model of having a dedicated thread per subscriber
makes sense. While we still have plenty of subscriptions that would follow
this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into
the following two categories:
* Large number of subscriptions, specifically those tied to endpoints/peers.
* Low number of messages. Some subscriptions exist specifically to coordinate
  a single message - the subscription is created, a message is published, the
  delivery is synchronized, and the subscription is destroyed.
In both of the latter two cases, creating a dedicated thread is wasteful (and
in the case of a large number of peers/endpoints, harmful). In those cases,
having shared delivery threads is far more performant.

This patch adds the ability of a subscriber to Stasis to choose whether or not
their messages are dispatched on a dedicated thread or on a threadpool. The
threadpool is configurable through stasis.conf.

Review: https://reviewboard.asterisk.org/r/4193

ASTERISK-24533 #close
Reported by: xrobau
Tested by: xrobau


Added:
    branches/12/configs/stasis.conf.sample   (with props)
Modified:
    branches/12/UPGRADE.txt
    branches/12/apps/app_queue.c
    branches/12/channels/chan_dahdi.c
    branches/12/channels/chan_iax2.c
    branches/12/channels/chan_mgcp.c
    branches/12/channels/chan_sip.c
    branches/12/channels/chan_skinny.c
    branches/12/channels/sig_pri.c
    branches/12/include/asterisk/stasis.h
    branches/12/include/asterisk/stasis_internal.h
    branches/12/include/asterisk/stasis_message_router.h
    branches/12/main/endpoints.c
    branches/12/main/stasis.c
    branches/12/main/stasis_cache.c
    branches/12/main/stasis_channels.c
    branches/12/main/stasis_message_router.c
    branches/12/res/parking/parking_applications.c
    branches/12/res/parking/parking_bridge_features.c
    branches/12/res/res_jabber.c
    branches/12/res/res_pjsip_mwi.c
    branches/12/res/res_pjsip_pubsub.c
    branches/12/res/res_pjsip_refer.c
    branches/12/res/res_stasis_device_state.c
    branches/12/res/res_xmpp.c
    branches/12/tests/test_stasis.c

Modified: branches/12/UPGRADE.txt
URL: http://svnview.digium.com/svn/asterisk/branches/12/UPGRADE.txt?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/UPGRADE.txt (original)
+++ branches/12/UPGRADE.txt Mon Dec  1 09:53:02 2014
@@ -20,6 +20,16 @@
 === UPGRADE-11.txt  -- Upgrade info for 10 to 11
 ===
 ===========================================================
+
+From 12.7.0 to 12.8.0:
+
+Core:
+ - The core of Asterisk uses a message bus called "Stasis" to distribute
+   information to internal components. For performance reasons, the message
+   distribution was modified to make use of a thread pool instead of a
+   dedicated thread per consumer in certain cases. The initial settings for
+   the thread pool can now be configured using 'stasis.conf'. A sample
+   configuration file is provided in the samples directory.
 
 From 12.6.0 to 12.7.0:
 

Modified: branches/12/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/apps/app_queue.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/apps/app_queue.c (original)
+++ branches/12/apps/app_queue.c Mon Dec  1 09:53:02 2014
@@ -5954,7 +5954,7 @@
 		return -1;
 	}
 
-	queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+	queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all());
 	if (!queue_data->bridge_router) {
 		ao2_ref(queue_data, -1);
 		return -1;
@@ -5969,7 +5969,7 @@
 	stasis_message_router_set_default(queue_data->bridge_router,
 			queue_bridge_cb, queue_data);
 
-	queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+	queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all());
 	if (!queue_data->channel_router) {
 		/* Unsubscribing from the bridge router will remove the only ref of queue_data,
 		 * thus beginning the destruction process

Modified: branches/12/channels/chan_dahdi.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/chan_dahdi.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/chan_dahdi.c (original)
+++ branches/12/channels/chan_dahdi.c Mon Dec  1 09:53:02 2014
@@ -12426,7 +12426,7 @@
 
 			mailbox_specific_topic = ast_mwi_topic(tmp->mailbox);
 			if (mailbox_specific_topic) {
-				tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+				tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
 			}
 		}
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI

Modified: branches/12/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/chan_iax2.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/chan_iax2.c (original)
+++ branches/12/channels/chan_iax2.c Mon Dec  1 09:53:02 2014
@@ -12966,7 +12966,7 @@
 
 		mailbox_specific_topic = ast_mwi_topic(peer->mailbox);
 		if (mailbox_specific_topic) {
-			peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+			peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
 		}
 	}
 

Modified: branches/12/channels/chan_mgcp.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/chan_mgcp.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/chan_mgcp.c (original)
+++ branches/12/channels/chan_mgcp.c Mon Dec  1 09:53:02 2014
@@ -4199,7 +4199,7 @@
 
 					mailbox_specific_topic = ast_mwi_topic(e->mailbox);
 					if (mailbox_specific_topic) {
-						e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+						e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
 					}
 				}
 				snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());

Modified: branches/12/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/chan_sip.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/chan_sip.c (original)
+++ branches/12/channels/chan_sip.c Mon Dec  1 09:53:02 2014
@@ -27504,7 +27504,7 @@
 			if (!peer_name) {
 				return;
 			}
-			mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name);
+			mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name);
 		}
 	}
 }

Modified: branches/12/channels/chan_skinny.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/chan_skinny.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/chan_skinny.c (original)
+++ branches/12/channels/chan_skinny.c Mon Dec  1 09:53:02 2014
@@ -8318,7 +8318,7 @@
 
 		mailbox_specific_topic = ast_mwi_topic(l->mailbox);
 		if (mailbox_specific_topic) {
-			l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l);
+			l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
 		}
 	}
 

Modified: branches/12/channels/sig_pri.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/channels/sig_pri.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/channels/sig_pri.c (original)
+++ branches/12/channels/sig_pri.c Mon Dec  1 09:53:02 2014
@@ -9145,7 +9145,7 @@
 
 		mailbox_specific_topic = ast_mwi_topic(mbox_id);
 		if (mailbox_specific_topic) {
-			pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
+			pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
 		}
 		if (!pri->mbox[i].sub) {
 			ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",

Added: branches/12/configs/stasis.conf.sample
URL: http://svnview.digium.com/svn/asterisk/branches/12/configs/stasis.conf.sample?view=auto&rev=428681
==============================================================================
--- branches/12/configs/stasis.conf.sample (added)
+++ branches/12/configs/stasis.conf.sample Mon Dec  1 09:53:02 2014
@@ -1,0 +1,9 @@
+[threadpool]
+;initial_size = 5          ; Initial size of the threadpool.
+;                          ; 0 means the threadpool has no threads initially
+;                          ; until a task needs a thread.
+;idle_timeout_sec = 20     ; Number of seconds a thread should be idle before
+;                          ; dying. 0 means threads never time out.
+;max_size = 50             ; Maximum number of threads in the Stasis threadpool.
+;                          ; 0 means no limit to the number of threads in the
+;                          ; threadpool.

Propchange: branches/12/configs/stasis.conf.sample
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: branches/12/configs/stasis.conf.sample
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: branches/12/configs/stasis.conf.sample
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: branches/12/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis.h?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/include/asterisk/stasis.h (original)
+++ branches/12/include/asterisk/stasis.h Mon Dec  1 09:53:02 2014
@@ -520,6 +520,31 @@
 	stasis_subscription_cb callback, void *data);
 
 /*!
+ * \brief Create a subscription whose callbacks occur on a thread pool
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but will almost certainly not
+ * always happen on the same thread. The invocation order of different subscriptions
+ * is unspecified.
+ *
+ * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
+ * dispatch items to its \c callback. This form of subscription should be used
+ * when many subscriptions may be made to the specified \c topic.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12.8.0
+ */
+struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
+	stasis_subscription_cb callback, void *data);
+
+/*!
  * \brief Cancel a subscription.
  *
  * Note that in an asynchronous system, there may still be messages queued or

Modified: branches/12/include/asterisk/stasis_internal.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis_internal.h?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/include/asterisk/stasis_internal.h (original)
+++ branches/12/include/asterisk/stasis_internal.h Mon Dec  1 09:53:02 2014
@@ -52,8 +52,10 @@
  * \param callback Callback function for subscription messages.
  * \param data Data to be passed to the callback, in addition to the message.
  * \param needs_mailbox Determines whether or not the subscription requires a mailbox.
- *  Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool;
+ *  Subscriptions with mailboxes will be delivered on some non-publisher thread;
  *  subscriptions without mailboxes will be delivered on the publisher thread.
+ * \param use_thread_pool Use the thread pool for the subscription. This is only
+ *  relevant if \c needs_mailbox is non-zero.
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
@@ -62,6 +64,7 @@
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
 	void *data,
-	int needs_mailbox);
+	int needs_mailbox,
+	int use_thread_pool);
 
 #endif /* STASIS_INTERNAL_H_ */

Modified: branches/12/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis_message_router.h?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/include/asterisk/stasis_message_router.h (original)
+++ branches/12/include/asterisk/stasis_message_router.h Mon Dec  1 09:53:02 2014
@@ -59,6 +59,22 @@
 	struct stasis_topic *topic);
 
 /*!
+ * \brief Create a new message router object.
+ *
+ * The subscription created for this message router will dispatch
+ * callbacks on a thread pool.
+ *
+ * \param topic Topic to subscribe route to.
+ *
+ * \return New \ref stasis_message_router.
+ * \return \c NULL on error.
+ *
+ * \since 12.8.0
+ */
+struct stasis_message_router *stasis_message_router_create_pool(
+	struct stasis_topic *topic);
+
+/*!
  * \brief Unsubscribe the router from the upstream topic.
  *
  * \param router Router to unsubscribe.

Modified: branches/12/main/endpoints.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/endpoints.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/main/endpoints.c (original)
+++ branches/12/main/endpoints.c Mon Dec  1 09:53:02 2014
@@ -306,7 +306,7 @@
 	}
 
 	if (!ast_strlen_zero(resource)) {
-		endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
+		endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
 		if (!endpoint->router) {
 			return NULL;
 		}

Modified: branches/12/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/stasis.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/main/stasis.c (original)
+++ branches/12/main/stasis.c Mon Dec  1 09:53:02 2014
@@ -35,12 +35,14 @@
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
 #include "asterisk/vector.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_endpoints.h"
+#include "asterisk/config_options.h"
 
 /*** DOCUMENTATION
 	<managerEvent language="en_US" name="UserEvent">
@@ -60,6 +62,22 @@
 			</see-also>
 		</managerEventInstance>
 	</managerEvent>
+	<configInfo name="stasis" language="en_US">
+		<configFile name="stasis.conf">
+			<configObject name="threadpool">
+				<synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+				<configOption name="initial_size" default="5">
+					<synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+				</configOption>
+				<configOption name="idle_timeout_sec" default="20">
+					<synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+				</configOption>
+				<configOption name="max_size" default="50">
+					<synopsis>Maximum number of threads in the threadpool.</synopsis>
+				</configOption>
+			</configObject>
+		</configFile>
+	</configInfo>
 ***/
 
 /*!
@@ -157,6 +175,9 @@
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -302,7 +323,8 @@
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
 	void *data,
-	int needs_mailbox)
+	int needs_mailbox,
+	int use_thread_pool)
 {
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
@@ -315,19 +337,19 @@
 	if (!sub) {
 		return NULL;
 	}
-
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 	if (needs_mailbox) {
 		/* With a small number of subscribers, a thread-per-sub is
-		 * acceptable. If our usage changes so that we have larger
-		 * numbers of subscribers, we'll probably want to consider
-		 * a threadpool. We had that originally, but with so few
-		 * subscribers it was actually a performance loss instead of
-		 * a gain.
+		 * acceptable. For larger number of subscribers, a thread
+		 * pool should be used.
 		 */
-		sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
-			TPS_REF_DEFAULT);
+		if (use_thread_pool) {
+			sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+		} else {
+			sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+				TPS_REF_DEFAULT);
+		}
 		if (!sub->mailbox) {
 			return NULL;
 		}
@@ -356,7 +378,15 @@
 	stasis_subscription_cb callback,
 	void *data)
 {
-	return internal_stasis_subscribe(topic, callback, data, 1);
+	return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data)
+{
+	return internal_stasis_subscribe(topic, callback, data, 1, 1);
 }
 
 static int sub_cleanup(void *data)
@@ -1215,6 +1245,68 @@
 		ast_str_buffer(body));
 }
 
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+	/*! Initial size of the thread pool */
+	int initial_size;
+	/*! Time, in seconds, before we expire a thread */
+	int idle_timeout_sec;
+	/*! Maximum number of thread to allow */
+	int max_size;
+};
+
+/*! \brief Configuration for stasis */
+struct stasis_config {
+	/*! Thread pool configuration options */
+	struct stasis_threadpool_conf *threadpool_options;
+};
+
+static struct aco_type threadpool_option = {
+	.type = ACO_GLOBAL,
+	.name = "threadpool",
+	.item_offset = offsetof(struct stasis_config, threadpool_options),
+	.category = "^threadpool$",
+	.category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
+struct aco_file stasis_conf = {
+	.filename = "stasis.conf",
+	.types = ACO_TYPES(&threadpool_option),
+};
+
+static void stasis_config_destructor(void *obj)
+{
+	struct stasis_config *cfg = obj;
+
+	ast_free(cfg->threadpool_options);
+}
+
+static void *stasis_config_alloc(void)
+{
+	struct stasis_config *cfg;
+
+	cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor);
+	if (!cfg) {
+		return NULL;
+	}
+
+	cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+	if (!cfg->threadpool_options) {
+		ao2_ref(cfg, -1);
+		return NULL;
+	}
+
+	return cfg;
+}
+
+static AO2_GLOBAL_OBJ_STATIC(globals);
+
+/*! \brief Register information about the configs being processed by this module */
+CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
+        .files = ACO_FILES(&stasis_conf),
+);
 
 /*!
  * @{ \brief Define multi user event message type(s).
@@ -1227,19 +1319,83 @@
 
 /*! @} */
 
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+	ast_threadpool_shutdown(pool);
+	pool = NULL;
+}
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
+	aco_info_destroy(&cfg_info);
+	ao2_global_obj_release(globals);
 }
 
 int stasis_init(void)
 {
+	RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
 	int cache_init;
+	struct ast_threadpool_options threadpool_opts = { 0, };
 
 	/* Be sure the types are cleaned up after the message bus */
 	ast_register_cleanup(stasis_cleanup);
+	ast_register_atexit(stasis_exit);
+
+	if (aco_info_init(&cfg_info)) {
+		return -1;
+	}
+
+	aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+		threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+		FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+		INT_MAX);
+	aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+		threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+		FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+		INT_MAX);
+	aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+		threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+		FLDSET(struct stasis_threadpool_conf, max_size), 0,
+		INT_MAX);
+
+	if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
+		struct stasis_config *default_cfg = stasis_config_alloc();
+
+		if (!default_cfg) {
+			return -1;
+		}
+
+		if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+			ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+			ao2_ref(default_cfg, -1);
+			return -1;
+		}
+
+		ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+		ao2_global_obj_replace_unref(globals, default_cfg);
+		cfg = default_cfg;
+	} else {
+		cfg = ao2_global_obj_ref(globals);
+		if (!cfg) {
+			ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+			return -1;
+		}
+	}
+
+	threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+	threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+	threadpool_opts.auto_increment = 1;
+	threadpool_opts.max_size = cfg->threadpool_options->max_size;
+	threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+	pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+	if (!pool) {
+		ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+		return -1;
+	}
 
 	cache_init = stasis_cache_init();
 	if (cache_init != 0) {

Modified: branches/12/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/stasis_cache.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/main/stasis_cache.c (original)
+++ branches/12/main/stasis_cache.c Mon Dec  1 09:53:02 2014
@@ -881,7 +881,7 @@
 	ao2_ref(cache, +1);
 	caching_topic->cache = cache;
 
-	sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
+	sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
 	if (sub == NULL) {
 		return NULL;
 	}

Modified: branches/12/main/stasis_channels.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/stasis_channels.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/main/stasis_channels.c (original)
+++ branches/12/main/stasis_channels.c Mon Dec  1 09:53:02 2014
@@ -361,7 +361,7 @@
 	}
 
 	if (forwarded) {
-		struct stasis_subscription *subscription = stasis_subscribe(ast_channel_topic(peer), dummy_event_cb, NULL);
+		struct stasis_subscription *subscription = stasis_subscribe_pool(ast_channel_topic(peer), dummy_event_cb, NULL);
 
 		stasis_publish(ast_channel_topic(peer), msg);
 		stasis_unsubscribe_and_join(subscription);

Modified: branches/12/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/stasis_message_router.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/main/stasis_message_router.c (original)
+++ branches/12/main/stasis_message_router.c Mon Dec  1 09:53:02 2014
@@ -206,8 +206,8 @@
 	}
 }
 
-struct stasis_message_router *stasis_message_router_create(
-	struct stasis_topic *topic)
+static struct stasis_message_router *stasis_message_router_create_internal(
+	struct stasis_topic *topic, int use_thread_pool)
 {
 	int res;
 	RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
@@ -224,13 +224,29 @@
 		return NULL;
 	}
 
-	router->subscription = stasis_subscribe(topic, router_dispatch, router);
+	if (use_thread_pool) {
+		router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
+	} else {
+		router->subscription = stasis_subscribe(topic, router_dispatch, router);
+	}
 	if (!router->subscription) {
 		return NULL;
 	}
 
 	ao2_ref(router, +1);
 	return router;
+}
+
+struct stasis_message_router *stasis_message_router_create(
+	struct stasis_topic *topic)
+{
+	return stasis_message_router_create_internal(topic, 0);
+}
+
+struct stasis_message_router *stasis_message_router_create_pool(
+	struct stasis_topic *topic)
+{
+	return stasis_message_router_create_internal(topic, 1);
 }
 
 void stasis_message_router_unsubscribe(struct stasis_message_router *router)

Modified: branches/12/res/parking/parking_applications.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/parking/parking_applications.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/parking/parking_applications.c (original)
+++ branches/12/res/parking/parking_applications.c Mon Dec  1 09:53:02 2014
@@ -832,7 +832,7 @@
 		return -1;
 	}
 
-	if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) {
+	if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) {
 		/* Failed to create subscription */
 		park_announce_subscription_data_destroy(pa_data);
 		return -1;

Modified: branches/12/res/parking/parking_bridge_features.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/parking/parking_bridge_features.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/parking/parking_bridge_features.c (original)
+++ branches/12/res/parking/parking_bridge_features.c Mon Dec  1 09:53:02 2014
@@ -192,7 +192,7 @@
 	strcpy(subscription_data->parkee_uuid, parkee_uuid);
 	strcpy(subscription_data->parker_uuid, parker_uuid);
 
-	if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) {
+	if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
 		return -1;
 	}
 

Modified: branches/12/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_jabber.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_jabber.c (original)
+++ branches/12/res/res_jabber.c Mon Dec  1 09:53:02 2014
@@ -3300,7 +3300,7 @@
 static void aji_init_event_distribution(struct aji_client *client)
 {
 	if (!mwi_sub) {
-		mwi_sub = stasis_subscribe(ast_mwi_topic_all(), aji_mwi_cb, client);
+		mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), aji_mwi_cb, client);
 	}
 	if (!device_state_sub) {
 		RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);

Modified: branches/12/res/res_pjsip_mwi.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_pjsip_mwi.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_pjsip_mwi.c (original)
+++ branches/12/res/res_pjsip_mwi.c Mon Dec  1 09:53:02 2014
@@ -142,7 +142,7 @@
 	strcpy(mwi_stasis_sub->mailbox, mailbox);
 	ao2_ref(mwi_sub, +1);
 	ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id);
-	mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub);
+	mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub);
 	return mwi_stasis_sub;
 }
 

Modified: branches/12/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_pjsip_pubsub.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_pjsip_pubsub.c (original)
+++ branches/12/res/res_pjsip_pubsub.c Mon Dec  1 09:53:02 2014
@@ -1934,7 +1934,7 @@
 	if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
 		ast_sip_push_task(NULL, subscription_persistence_load, NULL);
 	} else {
-		stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+		stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
 	}
 
 	ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,

Modified: branches/12/res/res_pjsip_refer.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_pjsip_refer.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_pjsip_refer.c (original)
+++ branches/12/res/res_pjsip_refer.c Mon Dec  1 09:53:02 2014
@@ -550,7 +550,7 @@
 		/* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to
 		 * detect if the transfer target has answered the call
 		 */
-		refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
+		refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
 		if (!refer->progress->bridge_sub) {
 			struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200,
 				PJSIP_EVSUB_STATE_TERMINATED);

Modified: branches/12/res/res_stasis_device_state.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_stasis_device_state.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_stasis_device_state.c (original)
+++ branches/12/res/res_stasis_device_state.c Mon Dec  1 09:53:02 2014
@@ -330,7 +330,7 @@
 		return 0;
 	}
 
-	if (!(sub->sub = stasis_subscribe(
+	if (!(sub->sub = stasis_subscribe_pool(
 			ast_device_state_topic(sub->device_name),
 			device_state_cb, sub))) {
 		ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",

Modified: branches/12/res/res_xmpp.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_xmpp.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/res/res_xmpp.c (original)
+++ branches/12/res/res_xmpp.c Mon Dec  1 09:53:02 2014
@@ -1606,7 +1606,7 @@
 	xmpp_pubsub_unsubscribe(client, "device_state");
 	xmpp_pubsub_unsubscribe(client, "message_waiting");
 
-	if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
+	if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
 		return;
 	}
 

Modified: branches/12/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/tests/test_stasis.c?view=diff&rev=428681&r1=428680&r2=428681
==============================================================================
--- branches/12/tests/test_stasis.c (original)
+++ branches/12/tests/test_stasis.c Mon Dec  1 09:53:02 2014
@@ -361,6 +361,61 @@
 	return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(subscription_pool_messages)
+{
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+	RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+	int complete;
+	struct stasis_subscription_change *change;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
+		info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	topic = stasis_topic_create("TestTopic");
+	ast_test_validate(test, NULL != topic);
+
+	consumer = consumer_create(0);
+	ast_test_validate(test, NULL != consumer);
+
+	uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+	ast_test_validate(test, NULL != uut);
+	ao2_ref(consumer, +1);
+	expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+	uut = stasis_unsubscribe(uut);
+	complete = consumer_wait_for_completion(consumer);
+	ast_test_validate(test, 1 == complete);
+
+	ast_test_validate(test, 2 == consumer->messages_rxed_len);
+	ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
+	ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
+
+	change = stasis_message_data(consumer->messages_rxed[0]);
+	ast_test_validate(test, topic == change->topic);
+	ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+	ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+	change = stasis_message_data(consumer->messages_rxed[1]);
+	ast_test_validate(test, topic == change->topic);
+	ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+	ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+	return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(publish)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -455,6 +510,55 @@
 	return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(publish_pool)
+{
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+	int actual_len;
+	const char *actual;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test publishing with a threadpool";
+		info->description = "Test publishing to a subscriber whose\n"
+			"subscription dictates messages are received through a\n"
+			"threadpool.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	topic = stasis_topic_create("TestTopic");
+	ast_test_validate(test, NULL != topic);
+
+	consumer = consumer_create(1);
+	ast_test_validate(test, NULL != consumer);
+
+	uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+	ast_test_validate(test, NULL != uut);
+	ao2_ref(consumer, +1);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+	test_message_type = stasis_message_type_create("TestMessage", NULL);
+	test_message = stasis_message_create(test_message_type, test_data);
+
+	stasis_publish(topic, test_message);
+
+	actual_len = consumer_wait_for(consumer, 1);
+	ast_test_validate(test, 1 == actual_len);
+	actual = stasis_message_data(consumer->messages_rxed[0]);
+	ast_test_validate(test, test_data == actual);
+
+	return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(unsubscribe_stops_messages)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -646,6 +750,106 @@
 	ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
 	ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
 	ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(subscription_interleaving)
+{
+	RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+	RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
+	RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
+
+	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+
+	int actual_len;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test sending interleaved events to a parent topic with different subscribers";
+		info->description = "Test sending events to a parent topic.\n"
+			"This test creates three topics (one parent, two children)\n"
+			"and publishes messages alternately between the children.\n"
+			"It verifies that the messages are received in the expected\n"
+			"order, for different subscription types: one with a dedicated\n"
+			"thread, the other on the Stasis threadpool.\n";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	test_message_type = stasis_message_type_create("test", NULL);
+	ast_test_validate(test, NULL != test_message_type);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+
+	test_message1 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message1);
+	test_message2 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message2);
+	test_message3 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message3);
+
+	parent_topic = stasis_topic_create("ParentTestTopic");
+	ast_test_validate(test, NULL != parent_topic);
+	topic1 = stasis_topic_create("Topic1");
+	ast_test_validate(test, NULL != topic1);
+	topic2 = stasis_topic_create("Topic2");
+	ast_test_validate(test, NULL != topic2);
+
+	forward_sub1 = stasis_forward_all(topic1, parent_topic);
+	ast_test_validate(test, NULL != forward_sub1);
+	forward_sub2 = stasis_forward_all(topic2, parent_topic);
+	ast_test_validate(test, NULL != forward_sub2);
+
+	consumer1 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer1);
+
+	consumer2 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer2);
+
+	sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
+	ast_test_validate(test, NULL != sub1);
+	ao2_ref(consumer1, +1);
+
+	sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
+	ast_test_validate(test, NULL != sub2);
+	ao2_ref(consumer2, +1);
+
+	stasis_publish(topic1, test_message1);

[... 169 lines stripped ...]



More information about the asterisk-commits mailing list