[Asterisk-code-review] res pjsip mwi: fix unsolicited mwi blocks PJSIP stack (asterisk[master])

Alexei Gradinari asteriskteam at digium.com
Mon Aug 8 12:58:52 CDT 2016


Alexei Gradinari has uploaded a new change for review.

  https://gerrit.asterisk.org/3428

Change subject: res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack
......................................................................

res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack

The PJSIP taskprocessors could be overflowed on startup
if there are many (thousands) realtime endpoints
configured with unsolicited mwi.
The PJSIP stack could be totally unresponsive for a few minutes
after boot completed.

This patch creates a separate PJSIP serializers pool for mwi
and makes unsolicited mwi use serializers from this pool.
This patch also adds 2 new global options to tune taskprocessor
alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'.

This patch also adds new global option 'mwi_disable_initial_unsolicited'
to disable sending unsolicited mwi to all endpoints on startup.
If disabled then unsolicited mwi will start processing
on next endpoint's contact update.

ASTERISK-26230 #close

Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a
---
M CHANGES
M configs/samples/pjsip.conf.sample
A contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip/config_global.c
M res/res_pjsip_mwi.c
7 files changed, 333 insertions(+), 6 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/28/3428/1

diff --git a/CHANGES b/CHANGES
index 3eadadf..b353e96 100644
--- a/CHANGES
+++ b/CHANGES
@@ -389,6 +389,15 @@
    Additional information can be found in the sample configuration file at
    config/samples/voicemail.conf.sample.
 
+res_pjsip_mwi
+------------------
+ * Added "mwi_tps_queue_high" and "mwi_tps_queue_low" global configuration
+   options to tune taskprocessor alert levels.
+ * Added "mwi_disable_initial_unsolicited" global configuration option
+   to disable sending unsolicited MWI to all endpoints on startup.
+   Additional information can be found in the sample configuration file at
+   config/samples/pjsip.conf.sample.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.10.0 to Asterisk 13.11.0 ----------
 ------------------------------------------------------------------------------
diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample
index 99bdfb9..eac0549 100644
--- a/configs/samples/pjsip.conf.sample
+++ b/configs/samples/pjsip.conf.sample
@@ -964,6 +964,27 @@
                                 ; set to this value if there is no better option (such as
                                 ; auth/realm) to be used
 
+                    ; Asterisk Task Processor Queue Size
+                    ; On heavy loaded system with DB storage you may need to increase
+                    ; taskprocessor queue.
+                    ; If the taskprocessor queue size reached high water level,
+                    ; the alert is triggered.
+                    ; If the alert is set the pjsip distibutor stops processing incoming
+                    ; requests until the alert is cleared.
+                    ; The alert is cleared when taskprocessor queue size drops to the
+                    ; low water clear level.
+                    ; The next options set taskprocessor queue levels for MWI.
+;mwi_tps_queue_high=500 ; Taskprocessor high water alert trigger level.
+;mwi_tps_queue_low=450  ; Taskprocessor low water clear alert level.
+                    ; The default is -1 for 90% of high water level.
+
+                    ; Unsolicited MWI
+                    ; If there are endpoints configured with unsolicited MWI
+                    ; then res_pjsip_mwi module tries to send MWI to all endpoints on startup.
+;mwi_disable_initial_unsolicited=no ; Disable sending unsolicited mwi to all endpoints on startup.
+                    ; If disabled then unsolicited mwi will start processing
+                    ; on the endpoint's next contact update.
+
 ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
 ;==========================ACL SECTION OPTIONS=========================
 ;[acl]
diff --git a/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py
new file mode 100644
index 0000000..d3efa22
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py
@@ -0,0 +1,35 @@
+"""pjsip: add global MWI options
+
+Revision ID: c7a44a5a0851
+Revises: 4a6c67fa9b7a
+Create Date: 2016-08-03 15:08:22.524727
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'c7a44a5a0851'
+down_revision = '4a6c67fa9b7a'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+YESNO_NAME = 'yesno_values'
+YESNO_VALUES = ['yes', 'no']
+
+
+def upgrade():
+    ############################# Enums ##############################
+
+    # yesno_values have already been created, so use postgres enum object
+    # type to get around "already created" issue - works okay with mysql
+    yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)
+
+    op.add_column('ps_globals', sa.Column('mwi_tps_queue_high', sa.Integer))
+    op.add_column('ps_globals', sa.Column('mwi_tps_queue_low', sa.Integer))
+    op.add_column('ps_globals', sa.Column('mwi_disable_initial_unsolicited', yesno_values))
+
+def downgrade():
+    op.drop_column('ps_globals', 'mwi_tps_queue_high')
+    op.drop_column('ps_globals', 'mwi_tps_queue_low')
+    op.drop_column('ps_globals', 'mwi_disable_initial_unsolicited')
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 9bb2a82..cd6b33d 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2417,6 +2417,32 @@
 void ast_sip_unregister_supplement(struct ast_sip_supplement *supplement);
 
 /*!
+ * \brief Retrieve the global MWI taskprocessor high water alert trigger level.
+ *
+ * \since 13.12.0
+ *
+ * \retval the system MWI taskprocessor high water alert trigger level
+ */
+unsigned int ast_sip_get_mwi_tps_queue_high(void);
+
+/*!
+ * \brief Retrieve the global MWI taskprocessor low water clear alert level.
+ *
+ * \since 13.12.0
+ *
+ * \retval the system MWI taskprocessor low water clear alert level
+ */
+int ast_sip_get_mwi_tps_queue_low(void);
+
+/*!
+ * \brief Retrieve the global setting 'disable sending unsolicited mwi on startup'.
+ * \since 13.12.0
+ *
+ * \retval non zero if disable.
+ */
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void);
+
+/*!
  * \brief Retrieve the system debug setting (yes|no|host).
  *
  * \note returned string needs to be de-allocated by caller.
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index ffbf880..8a9a19d 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1499,6 +1499,48 @@
 						set to this value if there is no better option (such as auth/realm) to be
 						used.</synopsis>
 				</configOption>
+				<configOption name="mwi_tps_queue_high" default="500">
+					<synopsis>MWI taskprocessor high water alert trigger level.</synopsis>
+					<description>
+						<para>On a heavily loaded system you may need to adjust the
+						taskprocessor queue limits.  If any taskprocessor queue size
+						reaches its high water level then pjsip will stop processing
+						new requests until the alert is cleared.  The alert clears
+						when all alerting taskprocessor queues have dropped to their
+						low water clear level.
+						</para>
+					</description>
+				</configOption>
+				<configOption name="mwi_tps_queue_low" default="-1">
+					<synopsis>MWI taskprocessor low water clear alert level.</synopsis>
+					<description>
+						<para>On a heavily loaded system you may need to adjust the
+						taskprocessor queue limits.  If any taskprocessor queue size
+						reaches its high water level then pjsip will stop processing
+						new requests until the alert is cleared.  The alert clears
+						when all alerting taskprocessor queues have dropped to their
+						low water clear level.
+						</para>
+						<note><para>Set to -1 for the low water level to be 90% of
+						the high water level.</para></note>
+					</description>
+				</configOption>
+				<configOption name="mwi_disable_initial_unsolicited" default="no">
+					<synopsis>Enable/Disable sending unsolicited MWI to all endpoints on startup.</synopsis>
+					<description>
+						<para>When the initial unsolicited MWI notification are
+						enabled on startup then the initial notifications
+						get sent at startup.  If you have a lot of endpoints
+						(thousands) that use unsolicited MWI then you may
+						want to consider disabling the initial startup
+						notifications.
+						</para>
+						<para>When the initial unsolicited MWI notifications are
+						disabled on startup then the notifications will start
+						on the endpoint's next contact update.
+						</para>
+					</description>
+				</configOption>
 			</configObject>
 		</configFile>
 	</configInfo>
diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c
index 6bb6888..8a1b0d4 100644
--- a/res/res_pjsip/config_global.c
+++ b/res/res_pjsip/config_global.c
@@ -24,6 +24,7 @@
 #include "asterisk/res_pjsip.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/sorcery.h"
+#include "asterisk/taskprocessor.h"
 #include "asterisk/ast_version.h"
 #include "asterisk/res_pjsip_cli.h"
 
@@ -43,6 +44,9 @@
 #define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5
 #define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5
 #define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30
+#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL
+#define DEFAULT_MWI_TPS_QUEUE_LOW -1
+#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0
 
 static char default_useragent[256];
 
@@ -79,6 +83,14 @@
 	unsigned int unidentified_request_period;
 	/* Interval at which expired unidentifed requests will be pruned */
 	unsigned int unidentified_request_prune_interval;
+	struct {
+		/*! Taskprocessor high water alert trigger level */
+		unsigned int tps_queue_high;
+		/*! Taskprocessor low water clear alert level. */
+		int tps_queue_low;
+		/*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */
+		unsigned int disable_initial_unsolicited;
+	} mwi;
 };
 
 static void global_destructor(void *obj)
@@ -314,6 +326,53 @@
 	}
 }
 
+
+unsigned int ast_sip_get_mwi_tps_queue_high(void)
+{
+	unsigned int tps_queue_high;
+	struct global_config *cfg;
+
+	cfg = get_global_cfg();
+	if (!cfg) {
+		return DEFAULT_MWI_TPS_QUEUE_HIGH;
+	}
+
+	tps_queue_high = cfg->mwi.tps_queue_high;
+	ao2_ref(cfg, -1);
+	return tps_queue_high;
+}
+
+int ast_sip_get_mwi_tps_queue_low(void)
+{
+	int tps_queue_low;
+	struct global_config *cfg;
+
+	cfg = get_global_cfg();
+	if (!cfg) {
+		return DEFAULT_MWI_TPS_QUEUE_LOW;
+	}
+
+	tps_queue_low = cfg->mwi.tps_queue_low;
+	ao2_ref(cfg, -1);
+	return tps_queue_low;
+}
+
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void)
+{
+	unsigned int disable_initial_unsolicited;
+	struct global_config *cfg;
+
+	cfg = get_global_cfg();
+	if (!cfg) {
+		return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED;
+	}
+
+	disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited;
+	ao2_ref(cfg, -1);
+	return disable_initial_unsolicited;
+}
+
+
 /*!
  * \internal
  * \brief Observer to set default global object if none exist.
@@ -450,6 +509,15 @@
 		OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval));
 	ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM,
 		OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm));
+	ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high",
+		__stringify(DEFAULT_MWI_TPS_QUEUE_HIGH),
+		OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high));
+	ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low",
+		__stringify(DEFAULT_MWI_TPS_QUEUE_LOW),
+		OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low));
+	ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited",
+		DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no",
+		OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited));
 
 	if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
 		return -1;
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index d86c96c..9553cd7 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -35,6 +35,7 @@
 #include "asterisk/module.h"
 #include "asterisk/logger.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
 #include "asterisk/sorcery.h"
 #include "asterisk/stasis.h"
 #include "asterisk/app.h"
@@ -51,6 +52,12 @@
 #define MWI_SUBTYPE "simple-message-summary"
 
 #define MWI_DATASTORE "MWI datastore"
+
+/*! Number of serializers in pool if one not supplied. */
+#define MWI_SERIALIZER_POOL_SIZE 8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
 
 static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
 static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
@@ -118,6 +125,117 @@
 	 */
 	char id[1];
 };
+
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \return Nothing
+ */
+static void mwi_serializer_pool_shutdown(void)
+{
+	int idx;
+
+	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+		ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
+		mwi_serializer_pool[idx] = NULL;
+	}
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_pool_setup(void)
+{
+	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+	int idx;
+
+	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+		/* Create name with seq number appended. */
+		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
+
+		mwi_serializer_pool[idx] = ast_sip_create_serializer_named(tps_name);
+		if (!mwi_serializer_pool[idx]) {
+			mwi_serializer_pool_shutdown();
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*!
+ * \internal
+ * \brief Pick a mwi serializer from the pool.
+ * \since 13.12.0
+ *
+ * \retval least queue size task processor.
+ */
+static struct ast_taskprocessor *get_mwi_serializer(void)
+{
+	int idx;
+	int pos;
+
+	if (!mwi_serializer_pool[0]) {
+		return NULL;
+	}
+
+	for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+		if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
+			pos = idx;
+		}
+	}
+
+	return mwi_serializer_pool[pos];
+}
+
+/*!
+ * \internal
+ * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_set_alert_levels(void)
+{
+	int idx;
+	long tps_queue_high;
+	long tps_queue_low;
+
+	if (!mwi_serializer_pool[0]) {
+		return -1;
+	}
+
+	tps_queue_high = ast_sip_get_mwi_tps_queue_high();
+	if (tps_queue_high <= 0) {
+		ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
+			tps_queue_high);
+		tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+	}
+
+	tps_queue_low = ast_sip_get_mwi_tps_queue_low();
+	if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+		ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
+			tps_queue_low);
+		tps_queue_low = -1;
+	}
+
+	for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+		if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
+			ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
+				idx);
+		}
+	}
+
+	return 0;
+}
+
 
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
 		struct stasis_message *msg);
@@ -945,7 +1063,7 @@
 	struct mwi_subscription *mwi_sub = obj;
 	struct ast_taskprocessor *serializer = mwi_sub->is_solicited
 		? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
-		: NULL;
+		: get_mwi_serializer();
 
 	if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
 		ao2_ref(mwi_sub, -1);
@@ -1063,7 +1181,7 @@
 		return 0;
 	}
 
-	if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
+	if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
 		ao2_ref(mwi_sub, -1);
 	}
 
@@ -1149,6 +1267,7 @@
 {
 	ast_free(default_voicemail_extension);
 	default_voicemail_extension = ast_sip_get_default_voicemail_extension();
+	mwi_serializer_set_alert_levels();
 }
 
 static struct ast_sorcery_observer global_observer = {
@@ -1175,15 +1294,21 @@
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
+	if (mwi_serializer_pool_setup()) {
+		ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
+	}
+
 	create_mwi_subscriptions();
 	ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
 	ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
 	ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
 
-	if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
-		ast_sip_push_task(NULL, send_initial_notify_all, NULL);
-	} else {
-		stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+	if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
+		if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+			ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+		} else {
+			stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+		}
 	}
 
 	return AST_MODULE_LOAD_SUCCESS;
@@ -1193,6 +1318,7 @@
 {
 	ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
 	ao2_ref(unsolicited_mwi, -1);
+	mwi_serializer_pool_shutdown();
 	ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
 	ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
 	ast_sip_unregister_subscription_handler(&mwi_handler);

-- 
To view, visit https://gerrit.asterisk.org/3428
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: Alexei Gradinari <alex2grad at gmail.com>



More information about the asterisk-code-review mailing list