[Asterisk-code-review] res_pjsip/res_pjsip_mwi: use centralized serializer pools (...asterisk[16])

George Joseph asteriskteam at digium.com
Thu Oct 10 09:14:27 CDT 2019


George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/13002 )

Change subject: res_pjsip/res_pjsip_mwi: use centralized serializer pools
......................................................................

res_pjsip/res_pjsip_mwi: use centralized serializer pools

Both res_pjsip and res_pjsip_mwi made use of serializer pools. However, they
both implemented their own serializer pool functionality that was pretty much
identical in each of the source files. This patch removes the duplicated code,
and uses the new 'ast_serializer_pool' object instead.

Additionally res_pjsip_mwi enables a shutdown group on the pool since if the
timing was right the module could be unloaded while taskprocessor threads still
needed to execute, thus causing a crash.

Change-Id: I959b0805ad024585bbb6276593118be34fbf6e1d
---
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip_mwi.c
3 files changed, 64 insertions(+), 194 deletions(-)

Approvals:
  Joshua Colp: Looks good to me, but someone else must approve
  Benjamin Keith Ford: Looks good to me, but someone else must approve
  George Joseph: Looks good to me, approved
  Friendly Automation: Approved for Submit



diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index e295765..a78311d 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2954,6 +2954,11 @@
 long ast_sip_threadpool_queue_size(void);
 
 /*!
+ * \brief Retrieve the SIP threadpool object
+ */
+struct ast_threadpool *ast_sip_threadpool(void);
+
+/*!
  * \brief Retrieve transport state
  * \since 13.7.1
  *
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 7123938..3e11a6b 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -34,6 +34,7 @@
 #include "asterisk/utils.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/module.h"
+#include "asterisk/serializer.h"
 #include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/uuid.h"
@@ -2829,7 +2830,7 @@
 #define SERIALIZER_POOL_SIZE		8
 
 /*! Pool of serializers to use if not supplied. */
-static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE];
+static struct ast_serializer_pool *sip_serializer_pool;
 
 static pjsip_endpoint *ast_pjsip_endpoint;
 
@@ -4568,71 +4569,10 @@
 	return ast_sip_create_serializer_group(name, NULL);
 }
 
-/*!
- * \internal
- * \brief Shutdown the serializers in the default pool.
- * \since 14.0.0
- *
- * \return Nothing
- */
-static void serializer_pool_shutdown(void)
-{
-	int idx;
-
-	for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) {
-		ast_taskprocessor_unreference(serializer_pool[idx]);
-		serializer_pool[idx] = NULL;
-	}
-}
-
-/*!
- * \internal
- * \brief Setup the serializers in the default pool.
- * \since 14.0.0
- *
- * \retval 0 on success.
- * \retval -1 on error.
- */
-static int serializer_pool_setup(void)
-{
-	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
-	int idx;
-
-	for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) {
-		/* Create name with seq number appended. */
-		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/default");
-
-		serializer_pool[idx] = ast_sip_create_serializer(tps_name);
-		if (!serializer_pool[idx]) {
-			serializer_pool_shutdown();
-			return -1;
-		}
-	}
-	return 0;
-}
-
-static struct ast_taskprocessor *serializer_pool_pick(void)
-{
-	int idx;
-	int pos = 0;
-
-	if (!serializer_pool[0]) {
-		return NULL;
-	}
-
-	for (idx = 1; idx < SERIALIZER_POOL_SIZE; ++idx) {
-		if (ast_taskprocessor_size(serializer_pool[idx]) < ast_taskprocessor_size(serializer_pool[pos])) {
-			pos = idx;
-		}
-	}
-
-	return serializer_pool[pos];
-}
-
 int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
 	if (!serializer) {
-		serializer = serializer_pool_pick();
+		serializer = ast_serializer_pool_get(sip_serializer_pool);
 	}
 
 	return ast_taskprocessor_push(serializer, sip_task, task_data);
@@ -4713,7 +4653,7 @@
 {
 	if (!serializer) {
 		/* Caller doesn't care which PJSIP serializer the task executes under. */
-		serializer = serializer_pool_pick();
+		serializer = ast_serializer_pool_get(sip_serializer_pool);
 		if (!serializer) {
 			/* No serializer picked to execute the task */
 			return -1;
@@ -5071,6 +5011,11 @@
 	return ast_threadpool_queue_size(sip_threadpool);
 }
 
+struct ast_threadpool *ast_sip_threadpool(void)
+{
+	return sip_threadpool;
+}
+
 #ifdef TEST_FRAMEWORK
 AST_TEST_DEFINE(xml_sanitization_end_null)
 {
@@ -5142,7 +5087,7 @@
 	 * These calls need the pjsip endpoint and serializer to clean up.
 	 * If they're not set, then there's nothing to clean up anyway.
 	 */
-	if (ast_pjsip_endpoint && serializer_pool[0]) {
+	if (ast_pjsip_endpoint && sip_serializer_pool) {
 		ast_res_pjsip_cleanup_options_handling();
 		ast_res_pjsip_cleanup_message_filter();
 		ast_sip_destroy_distributor();
@@ -5278,7 +5223,9 @@
 		goto error;
 	}
 
-	if (serializer_pool_setup()) {
+	sip_serializer_pool = ast_serializer_pool_create(
+		"pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1);
+	if (!sip_serializer_pool) {
 		ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n");
 		goto error;
 	}
@@ -5351,7 +5298,7 @@
 
 	/* These functions all check for NULLs and are safe to call at any time */
 	ast_sip_destroy_scheduler();
-	serializer_pool_shutdown();
+	ast_serializer_pool_destroy(sip_serializer_pool);
 	ast_threadpool_shutdown(sip_threadpool);
 
 	return AST_MODULE_LOAD_DECLINE;
@@ -5382,7 +5329,7 @@
 	 */
 	ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
 	ast_sip_destroy_scheduler();
-	serializer_pool_shutdown();
+	ast_serializer_pool_destroy(sip_serializer_pool);
 	ast_threadpool_shutdown(sip_threadpool);
 
 	return 0;
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index 82f6e46..bd25ea7 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -36,6 +36,7 @@
 #include "asterisk/logger.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/serializer.h"
 #include "asterisk/sorcery.h"
 #include "asterisk/stasis.h"
 #include "asterisk/mwi.h"
@@ -57,8 +58,11 @@
 /*! Number of serializers in pool if one not supplied. */
 #define MWI_SERIALIZER_POOL_SIZE 8
 
+/*! Max timeout for all threads to join during an unload. */
+#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */
+
 /*! Pool of serializers to use if not supplied. */
-static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
+static struct ast_serializer_pool *mwi_serializer_pool;
 
 static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
 static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
@@ -129,117 +133,6 @@
 	char id[1];
 };
 
-/*!
- * \internal
- * \brief Shutdown the serializers in the mwi pool.
- * \since 13.12.0
- *
- * \return Nothing
- */
-static void mwi_serializer_pool_shutdown(void)
-{
-	int idx;
-
-	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
-		ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
-		mwi_serializer_pool[idx] = NULL;
-	}
-}
-
-/*!
- * \internal
- * \brief Setup the serializers in the mwi pool.
- * \since 13.12.0
- *
- * \retval 0 on success.
- * \retval -1 on error.
- */
-static int mwi_serializer_pool_setup(void)
-{
-	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
-	int idx;
-
-	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
-		/* Create name with seq number appended. */
-		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
-
-		mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name);
-		if (!mwi_serializer_pool[idx]) {
-			mwi_serializer_pool_shutdown();
-			return -1;
-		}
-	}
-	return 0;
-}
-
-/*!
- * \internal
- * \brief Pick a mwi serializer from the pool.
- * \since 13.12.0
- *
- * \retval least queue size task processor.
- */
-static struct ast_taskprocessor *get_mwi_serializer(void)
-{
-	int idx;
-	int pos;
-
-	if (!mwi_serializer_pool[0]) {
-		return NULL;
-	}
-
-	for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
-		if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
-			pos = idx;
-		}
-	}
-
-	return mwi_serializer_pool[pos];
-}
-
-/*!
- * \internal
- * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
- * \since 13.12.0
- *
- * \retval 0 on success.
- * \retval -1 on error.
- */
-static int mwi_serializer_set_alert_levels(void)
-{
-	int idx;
-	long tps_queue_high;
-	long tps_queue_low;
-
-	if (!mwi_serializer_pool[0]) {
-		return -1;
-	}
-
-	tps_queue_high = ast_sip_get_mwi_tps_queue_high();
-	if (tps_queue_high <= 0) {
-		ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
-			tps_queue_high);
-		tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
-	}
-
-	tps_queue_low = ast_sip_get_mwi_tps_queue_low();
-	if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
-		ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
-			tps_queue_low);
-		tps_queue_low = -1;
-	}
-
-	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
-		if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
-			ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
-				idx);
-		}
-	}
-
-	return 0;
-}
-
-
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
 		struct stasis_message *msg);
 
@@ -1218,7 +1111,7 @@
 	struct mwi_subscription *mwi_sub = obj;
 	struct ast_taskprocessor *serializer = mwi_sub->is_solicited
 		? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
-		: get_mwi_serializer();
+		: ast_serializer_pool_get(mwi_serializer_pool);
 
 	if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
 		ao2_ref(mwi_sub, -1);
@@ -1233,7 +1126,8 @@
 	struct mwi_subscription *mwi_sub = userdata;
 
 	if (stasis_subscription_final_message(sub, msg)) {
-		if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) {
+		if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
+				serialized_cleanup, ao2_bump(mwi_sub))) {
 			ao2_ref(mwi_sub, -1);
 		}
 		return;
@@ -1396,7 +1290,8 @@
 		return 0;
 	}
 
-	if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
+	if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
+			serialized_notify, ao2_bump(mwi_sub))) {
 		ao2_ref(mwi_sub, -1);
 	}
 
@@ -1518,7 +1413,8 @@
 		return;
 	}
 
-	ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+	ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
+		send_initial_notify_all, NULL);
 
 	stasis_unsubscribe(sub);
 }
@@ -1527,7 +1423,8 @@
 {
 	ast_free(default_voicemail_extension);
 	default_voicemail_extension = ast_sip_get_default_voicemail_extension();
-	mwi_serializer_set_alert_levels();
+	ast_serializer_pool_set_alerts(mwi_serializer_pool,
+		ast_sip_get_mwi_tps_queue_high(), ast_sip_get_mwi_tps_queue_low());
 }
 
 static struct ast_sorcery_observer global_observer = {
@@ -1548,14 +1445,16 @@
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
-	if (mwi_serializer_pool_setup()) {
+	mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi",
+		MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME);
+	if (!mwi_serializer_pool) {
 		ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
 	}
 
 	solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS,
 		mwi_sub_hash, NULL, mwi_sub_cmp);
 	if (!solicited_mwi) {
-		mwi_serializer_pool_shutdown();
+		ast_serializer_pool_destroy(mwi_serializer_pool);
 		ast_sip_unregister_subscription_handler(&mwi_handler);
 		return AST_MODULE_LOAD_DECLINE;
 	}
@@ -1563,7 +1462,7 @@
 	unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS,
 		mwi_sub_hash, NULL, mwi_sub_cmp);
 	if (!unsolicited_mwi) {
-		mwi_serializer_pool_shutdown();
+		ast_serializer_pool_destroy(mwi_serializer_pool);
 		ast_sip_unregister_subscription_handler(&mwi_handler);
 		ao2_ref(solicited_mwi, -1);
 		return AST_MODULE_LOAD_DECLINE;
@@ -1576,7 +1475,8 @@
 	if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
 		create_mwi_subscriptions();
 		if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
-			ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+			ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
+				send_initial_notify_all, NULL);
 		} else {
 			struct stasis_subscription *sub;
 
@@ -1586,6 +1486,15 @@
 		}
 	}
 
+	if (!mwi_serializer_pool) {
+		/*
+		 * If the mwi serializer pool was unable to be established then the module will
+		 * use the default serializer pool. If this happens prevent manual unloading
+		 * since there would now exist the potential for a crash on unload.
+		 */
+		ast_module_shutdown_ref(ast_module_info->self);
+	}
+
 	return AST_MODULE_LOAD_SUCCESS;
 }
 
@@ -1594,13 +1503,22 @@
 	ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
 	ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
 
-	ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
-	ao2_ref(unsolicited_mwi, -1);
-	unsolicited_mwi = NULL;
+	if (unsolicited_mwi) {
+		ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
+		ao2_ref(unsolicited_mwi, -1);
+		unsolicited_mwi = NULL;
+	}
 
-	ao2_cleanup(solicited_mwi);
+	if (solicited_mwi) {
+		ao2_ref(solicited_mwi, -1);
+		solicited_mwi = NULL;
+	}
 
-	mwi_serializer_pool_shutdown();
+	if (ast_serializer_pool_destroy(mwi_serializer_pool)) {
+		ast_log(LOG_WARNING, "Unload incomplete. Try again later\n");
+		return -1;
+	}
+	mwi_serializer_pool = NULL;
 
 	ast_sip_unregister_subscription_handler(&mwi_handler);
 

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/13002
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-Change-Id: I959b0805ad024585bbb6276593118be34fbf6e1d
Gerrit-Change-Number: 13002
Gerrit-PatchSet: 3
Gerrit-Owner: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Benjamin Keith Ford <bford at digium.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20191010/ac11e3b3/attachment-0001.html>


More information about the asterisk-code-review mailing list