[Asterisk-code-review] taskprocessor: Enable subsystems and overload by subsystem (asterisk[13])

Joshua C. Colp asteriskteam at digium.com
Tue Feb 26 07:03:55 CST 2019


Joshua C. Colp has submitted this change and it was merged. ( https://gerrit.asterisk.org/11001 )

Change subject: taskprocessor:  Enable subsystems and overload by subsystem
......................................................................

taskprocessor:  Enable subsystems and overload by subsystem

To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.

* Any taskprocessor name that has a '/' will have the part
  before the '/' saved as its "subsystem".
  Examples:
  "sorcery/acl-0000006a" and "sorcery/aor-00000019"
  will be grouped to subsystem "sorcery".
  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"
  will bn grouped to subsystem "pjsip".
  Taskprocessors with no '/' have an empty subsystem.

* When a taskprocessor enters high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will
  be incremented.

* When a taskprocessor leaves high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will be
  decremented.

* A new api ast_taskprocessor_get_subsystem_alert() has been
  added that returns the number of taskprocessors in alert for
  the subsystem.

* A new CLI command "core show taskprocessor alerted subsystems"
  has been added.

* A new unit test was addded.

REMINDER: The taskprocessor code itself doesn't take any action
based on high-water alerts or overloading.  It's up to taskprocessor
users to check and take action themselves.  Currently only the pjsip
distributor does this.

* A new pjsip/global option "taskprocessor_overload_trigger"
  has been added that allows the user to select the trigger
  mechanism the distributor uses to pause accepting new requests.
  "none": Don't pause on any overload condition.
  "global": Pause on ANY taskprocessor overload (the default and
  current behavior)
  "pjsip_only": Pause only on pjsip taskprocessor overloads.

* The core pjsip pool was renamed from "SIP" to "pjsip" so it can
  be properly grouped into the "pjsip" subsystem.

* stasis taskprocessor names were changed to "stasis" as the
  subsystem.

* Sorcery core taskprocessor names were changed to "sorcery" to
  match the object taskprocessors.

Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
---
M CHANGES
M configs/samples/pjsip.conf.sample
A contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
M include/asterisk/taskprocessor.h
M main/sorcery.c
M main/stasis.c
M main/taskprocessor.c
M main/threadpool.c
M res/res_pjsip.c
M res/res_pjsip/config_global.c
M res/res_pjsip/include/res_pjsip_private.h
M res/res_pjsip/pjsip_distributor.c
M tests/test_taskprocessor.c
13 files changed, 525 insertions(+), 12 deletions(-)

Approvals:
  Richard Mudgett: Looks good to me, but someone else must approve
  Kevin Harwell: Looks good to me, but someone else must approve
  Joshua C. Colp: Looks good to me, approved; Approved for Submit



diff --git a/CHANGES b/CHANGES
index 6083b1a..5b418ce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -20,6 +20,15 @@
    types defined in the "disallowed" list are not sent to the application. Note
    that if a type is specified in both lists "disallowed" takes precedence.
 
+res_pjsip
+------------------
+ * A new configuration parameter "taskprocessor_overload_trigger" has been
+   added to the pjsip.conf "globals" section.  The distributor currently stops
+   accepting new requests when any taskprocessor overload is triggered.  The
+   new option allows you to completely disable overload detection (NOT
+   RECOMMENDED), keep the current behavior, or trigger only on pjsip
+   taskprocessor overloads.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.24.0 to Asterisk 13.25.0 ----------
 ------------------------------------------------------------------------------
diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample
index a04ce05..9483af5 100644
--- a/configs/samples/pjsip.conf.sample
+++ b/configs/samples/pjsip.conf.sample
@@ -1111,6 +1111,17 @@
                     ; event when a device refreshes its registration
                     ; (default: "yes")
 
+;taskprocessor_overload_trigger=global
+                ; Set the trigger the distributor will use to detect
+                ; taskprocessor overloads.  When triggered, the distributor
+                ; will not accept any new requests until the overload has
+                ; cleared.
+                : "global": (default) Any taskprocessor overload will trigger.
+                ; "pjsip_only": Only pjsip taskprocessor overloads will trigger.
+                ; "none":  No overload detection will be performed.
+                ; WARNING: The "none" and "pjsip_only" options should be used
+                ; with extreme caution and only to mitigate specific issues.
+                ; Under certain conditions they could make things worse.
 
 ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
 ;==========================ACL SECTION OPTIONS=========================
diff --git a/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
new file mode 100644
index 0000000..6a5b9b2
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
@@ -0,0 +1,42 @@
+"""taskprocessor_overload_trigger
+
+Revision ID: f3c0b8695b66
+Revises: 0838f8db6a61
+Create Date: 2019-02-15 15:03:50.106790
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'f3c0b8695b66'
+down_revision = '0838f8db6a61'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values'
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only']
+
+def upgrade():
+    context = op.get_context()
+
+    if context.bind.dialect.name == 'postgresql':
+        enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+                    name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+        enum.create(op.get_bind(), checkfirst=False)
+
+    op.add_column('ps_globals',
+        sa.Column('taskprocessor_overload_trigger',
+            sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+            name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME,
+            create_type=False)))
+
+def downgrade():
+    if op.get_context().bind.dialect.name == 'mssql':
+        op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals')
+    op.drop_column('ps_globals', 'taskprocessor_overload_trigger')
+
+    if context.bind.dialect.name == 'postgresql':
+        enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+                    name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+        enum.drop(op.get_bind(), checkfirst=False)
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..5278595 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -341,6 +341,19 @@
  */
 unsigned int ast_taskprocessor_alert_get(void);
 
+
+/*!
+ * \brief Get the current taskprocessor high water alert count by sybsystem.
+ * \since 13.26.0
+ * \since 16.3.0
+ *
+ * \param subsystem The subsystem name
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
+
 /*!
  * \brief Set the high and low alert water marks of the given taskprocessor queue.
  * \since 13.10.0
diff --git a/main/sorcery.c b/main/sorcery.c
index 7b01374..3a43fa7 100644
--- a/main/sorcery.c
+++ b/main/sorcery.c
@@ -375,7 +375,7 @@
 	};
 	ast_assert(wizards == NULL);
 
-	threadpool = ast_threadpool_create("Sorcery", NULL, &options);
+	threadpool = ast_threadpool_create("sorcery", NULL, &options);
 	if (!threadpool) {
 		return -1;
 	}
diff --git a/main/stasis.c b/main/stasis.c
index 5835a5a..fbcf37e 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -679,7 +679,7 @@
 		char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
 		/* Create name with seq number appended. */
-		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
 			use_thread_pool ? 'p' : 'm',
 			stasis_topic_name(topic));
 
@@ -2595,7 +2595,7 @@
 	threadpool_opts.auto_increment = 1;
 	threadpool_opts.max_size = cfg->threadpool_options->max_size;
 	threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
-	pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+	pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
 	ao2_ref(cfg, -1);
 	if (!pool) {
 		ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 84c4d2b..9c04edd 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -91,7 +91,11 @@
 	unsigned int high_water_alert:1;
 	/*! Indicates if the taskprocessor is currently suspended */
 	unsigned int suspended:1;
-	/*! \brief Friendly name of the taskprocessor */
+	/*! \brief Anything before the first '/' in the name (if there is one) */
+	char *subsystem;
+	/*! \brief Friendly name of the taskprocessor.
+	 * Subsystem is appended after the name's NULL terminator.
+	 */
 	char name[0];
 };
 
@@ -114,6 +118,16 @@
 	void *user_data;
 };
 
+/*!
+ * Keep track of which subsystems are in alert
+ * and how many of their taskprocessors are overloaded.
+ */
+struct subsystem_alert {
+	unsigned int alert_count;
+	char subsystem[0];
+};
+static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
+
 #ifdef LOW_MEMORY
 #define TPS_MAX_BUCKETS 61
 #else
@@ -140,10 +154,12 @@
 
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
 static struct ast_cli_entry taskprocessor_clis[] = {
 	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
 	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+	AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
 };
 
 struct default_taskprocessor_listener_pvt {
@@ -273,6 +289,8 @@
 static void tps_shutdown(void)
 {
 	ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
+	AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
+	AST_VECTOR_RW_FREE(&overloaded_subsystems);
 	ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
 	tps_singletons = NULL;
 }
@@ -287,6 +305,12 @@
 		return -1;
 	}
 
+	if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
+		ao2_ref(tps_singletons, -1);
+		ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+		return -1;
+	}
+
 	ast_cond_init(&cli_ping_cond, NULL);
 
 	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -550,6 +574,157 @@
 	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
+{
+	return !strcmp(alert->subsystem, subsystem);
+}
+
+static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
+{
+	return strcmp(a->subsystem, b->subsystem);
+}
+
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
+{
+	struct subsystem_alert *alert;
+	unsigned int count = 0;
+	int idx;
+
+	AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+	idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+	if (idx >= 0) {
+		alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+		count = alert->alert_count;
+	}
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+	return count;
+}
+
+static void subsystem_alert_increment(const char *subsystem)
+{
+	struct subsystem_alert *alert;
+	int idx;
+
+	if (ast_strlen_zero(subsystem)) {
+		return;
+	}
+
+	AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+	idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+	if (idx >= 0) {
+		alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+		alert->alert_count++;
+		AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+		return;
+	}
+
+	alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
+	if (!alert) {
+		AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+		return;
+	}
+	alert->alert_count = 1;
+	strcpy(alert->subsystem, subsystem); /* Safe */
+
+	if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
+		ast_free(alert);
+	}
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_alert_decrement(const char *subsystem)
+{
+	struct subsystem_alert *alert;
+	int idx;
+
+	if (ast_strlen_zero(subsystem)) {
+		return;
+	}
+
+	AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+	idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+	if (idx < 0) {
+		ast_log(LOG_ERROR,
+			"Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
+		AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+		return;
+	}
+	alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+
+	alert->alert_count--;
+	if (alert->alert_count <= 0) {
+		AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
+		ast_free(alert);
+	}
+
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_copy(struct subsystem_alert *alert,
+	struct subsystem_alert_vector *vector)
+{
+	struct subsystem_alert *alert_copy;
+	alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
+	if (!alert_copy) {
+		return;
+	}
+	alert_copy->alert_count = alert->alert_count;
+	strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
+	if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
+		ast_free(alert_copy);
+	}
+}
+
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct subsystem_alert_vector sorted_subsystems;
+	int i;
+
+#define FMT_HEADERS_SUBSYSTEM		"%-32s %12s\n"
+#define FMT_FIELDS_SUBSYSTEM		"%-32s %12u\n"
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "core show taskprocessor alerted subsystems";
+		e->usage =
+			"Usage: core show taskprocessor alerted subsystems\n"
+			"	Shows a list of task processor subsystems that are currently alerted\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;
+	}
+
+	if (a->argc != e->args) {
+		return CLI_SHOWUSAGE;
+	}
+
+	if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
+		return CLI_FAILURE;
+	}
+
+	AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+	for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
+		subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
+	}
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+	ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
+
+	for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
+		struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
+		ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
+	}
+
+	ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
+
+	AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
+	AST_VECTOR_FREE(&sorted_subsystems);
+
+	return CLI_SUCCESS;
+}
+
+
 /*! Count of the number of taskprocessors in high water alert. */
 static unsigned int tps_alert_count;
 
@@ -579,6 +754,15 @@
 		ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
 			tps->name, tps_alert_count ? "triggered" : "cleared");
 	}
+
+	if (tps->subsystem[0] != '\0') {
+		if (delta > 0) {
+			subsystem_alert_increment(tps->subsystem);
+		} else {
+			subsystem_alert_decrement(tps->subsystem);
+		}
+	}
+
 	ast_rwlock_unlock(&tps_alert_lock);
 }
 
@@ -749,8 +933,17 @@
 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
 {
 	struct ast_taskprocessor *p;
+	char *subsystem_separator;
+	size_t subsystem_length = 0;
+	size_t name_length;
 
-	p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
+	name_length = strlen(name);
+	subsystem_separator = strchr(name, '/');
+	if (subsystem_separator) {
+		subsystem_length = subsystem_separator - name;
+	}
+
+	p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
 	if (!p) {
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
 		return NULL;
@@ -760,7 +953,9 @@
 	p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
 	p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
 
-	strcpy(p->name, name); /*SAFE*/
+	strcpy(p->name, name); /* Safe */
+	p->subsystem = p->name + name_length + 1;
+	ast_copy_string(p->subsystem, name, subsystem_length + 1);
 
 	ao2_ref(listener, +1);
 	p->listener = listener;
diff --git a/main/threadpool.c b/main/threadpool.c
index 2ab0936..56fbb2c 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -413,7 +413,7 @@
 		return NULL;
 	}
 
-	ast_str_set(&control_tps_name, 0, "%s-control", name);
+	ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
 
 	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
 	ast_free(control_tps_name);
@@ -919,6 +919,7 @@
 	struct ast_taskprocessor *tps;
 	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
+	char *fullname;
 
 	pool = threadpool_alloc(name, options);
 	if (!pool) {
@@ -935,7 +936,9 @@
 		return NULL;
 	}
 
-	tps = ast_taskprocessor_create_with_listener(name, tps_listener);
+	fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
+	sprintf(fullname, "%s/pool", name); /* Safe */
+	tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
 	if (!tps) {
 		return NULL;
 	}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 812c291..32277b1 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1844,6 +1844,26 @@
 				<configOption name="send_contact_status_on_update_registration" default="yes">
 					<synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis>
 				</configOption>
+				<configOption name="taskprocessor_overload_trigger">
+					<synopsis>Trigger scope for taskprocessor overloads</synopsis>
+					<description><para>
+						This option specifies the trigger the distributor will use for
+						detecting taskprocessor overloads.  When it detects an overload condition,
+						the distrubutor will stop accepting new requests until the overload is
+						cleared.
+						</para>
+						<enumlist>
+							<enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum>
+							<enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum>
+							<enum name="none"><para>No overload detection will be performed.</para></enum>
+						</enumlist>
+						<warning><para>
+						The "none" and "pjsip_only" options should be used
+						with extreme caution and only to mitigate specific issues.
+						Under certain conditions they could make things worse.
+						</para></warning>
+					</description>
+				</configOption>
 			</configObject>
 		</configFile>
 	</configInfo>
@@ -4267,7 +4287,7 @@
 	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
 	/* Create name with seq number appended. */
-	ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip-group-serializer");
+	ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/group-serializer");
 
 	return ast_sip_create_serializer_group_named(tps_name, shutdown_group);
 }
@@ -4282,7 +4302,7 @@
 	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
 	/* Create name with seq number appended. */
-	ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip-serializer");
+	ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/serializer");
 
 	return ast_sip_create_serializer_group_named(tps_name, NULL);
 }
@@ -4993,7 +5013,7 @@
 	/* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
 	sip_get_threadpool_options(&options);
 	options.thread_start = sip_thread_start;
-	sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
+	sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
 	if (!sip_threadpool) {
 		goto error;
 	}
diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c
index afeeb1d..365d9aa 100644
--- a/res/res_pjsip/config_global.c
+++ b/res/res_pjsip/config_global.c
@@ -51,6 +51,7 @@
 #define DEFAULT_IGNORE_URI_USER_OPTIONS 0
 #define DEFAULT_USE_CALLERID_CONTACT 0
 #define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 1
+#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
 
 /*!
  * \brief Cached global config object
@@ -110,6 +111,8 @@
 	unsigned int use_callerid_contact;
 	/*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
 	unsigned int send_contact_status_on_update_registration;
+	/*! Trigger the distributor should use to pause accepting new dialogs */
+	enum ast_sip_taskprocessor_overload_trigger overload_trigger;
 };
 
 static void global_destructor(void *obj)
@@ -483,6 +486,58 @@
 	return send_contact_status_on_update_registration;
 }
 
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
+{
+	enum ast_sip_taskprocessor_overload_trigger trigger;
+	struct global_config *cfg;
+
+	cfg = get_global_cfg();
+	if (!cfg) {
+		return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
+	}
+
+	trigger = cfg->overload_trigger;
+	ao2_ref(cfg, -1);
+	return trigger;
+}
+
+static int overload_trigger_handler(const struct aco_option *opt,
+	struct ast_variable *var, void *obj)
+{
+	struct global_config *cfg = obj;
+	if (!strcasecmp(var->value, "none")) {
+		cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
+	} else if (!strcasecmp(var->value, "global")) {
+		cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
+	} else if (!strcasecmp(var->value, "pjsip_only")) {
+		cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
+	} else {
+		ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
+				var->value, var->name);
+		return -1;
+	}
+	return 0;
+}
+
+static const char *overload_trigger_map[] = {
+	[TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
+	[TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
+	[TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
+};
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
+{
+	return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
+		overload_trigger_map[trigger] : "";
+}
+
+static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
+{
+	const struct global_config *cfg = obj;
+	*buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
+	return 0;
+}
+
 /*!
  * \internal
  * \brief Observer to set default global object if none exist.
@@ -646,6 +701,9 @@
 	ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
 		DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
 		OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
+	ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
+		overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
+		overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
 
 	if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
 		return -1;
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 20d6ba4..273a4fa 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -480,4 +480,14 @@
  */
 int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
 
+enum ast_sip_taskprocessor_overload_trigger {
+	TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
+	TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
+	TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
+};
+
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
+
 #endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index d67b942..94f8e17 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -50,6 +50,7 @@
 static unsigned int unidentified_period;
 static unsigned int unidentified_prune_interval;
 static int using_auth_username;
+static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
 
 struct unidentified_request{
 	struct timeval first_seen;
@@ -524,7 +525,10 @@
 		ao2_cleanup(dist);
 		return PJ_TRUE;
 	} else {
-		if (ast_taskprocessor_alert_get()) {
+		if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
+			ast_taskprocessor_alert_get())
+			|| (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
+			ast_taskprocessor_get_subsystem_alert("pjsip"))) {
 			/*
 			 * When taskprocessors get backed up, there is a good chance that
 			 * we are being overloaded and need to defer adding new work to
@@ -1186,6 +1190,8 @@
 
 	ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
 
+	overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
+
 	/* Clean out the old task, if any */
 	ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
 	/* Have to do something with the return value to shut up the stupid compiler. */
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 6428746..70cb556 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -46,6 +46,8 @@
 	ast_mutex_t lock;
 	/*! Boolean indicating that the task was run */
 	int task_complete;
+	/*! Milliseconds to wait before returning */
+	unsigned long wait_time;
 };
 
 static void task_data_dtor(void *obj)
@@ -69,6 +71,7 @@
 	ast_cond_init(&task_data->cond, NULL);
 	ast_mutex_init(&task_data->lock);
 	task_data->task_complete = 0;
+	task_data->wait_time = 0;
 
 	return task_data;
 }
@@ -83,7 +86,11 @@
 static int task(void *data)
 {
 	struct task_data *task_data = data;
+
 	SCOPED_MUTEX(lock, &task_data->lock);
+	if (task_data->wait_time > 0) {
+		usleep(task_data->wait_time * 1000);
+	}
 	task_data->task_complete = 1;
 	ast_cond_signal(&task_data->cond);
 	return 0;
@@ -165,6 +172,143 @@
 	return AST_TEST_PASS;
 }
 
+/*!
+ * \brief Baseline test for subsystem alert
+ */
+AST_TEST_DEFINE(subsystem_alert)
+{
+	RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+#define TEST_DATA_ARRAY_SIZE 10
+#define LOW_WATER_MARK 3
+#define HIGH_WATER_MARK 6
+	struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
+	int res;
+	int i;
+	long queue_count;
+	unsigned int alert_level;
+	unsigned int subsystem_alert_level;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "subsystem_alert";
+		info->category = "/main/taskprocessor/";
+		info->summary = "Test of subsystem alerts";
+		info->description =
+			"Ensures alerts are generated properly.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
+
+	if (!tps) {
+		ast_test_status_update(test, "Unable to create test taskprocessor\n");
+		return AST_TEST_FAIL;
+	}
+
+	ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
+	ast_taskprocessor_suspend(tps);
+
+	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+		task_data[i] = task_data_create();
+		if (!task_data[i]) {
+			ast_test_status_update(test, "Unable to create task_data\n");
+			res = -1;
+			goto data_cleanup;
+		}
+		task_data[i]->wait_time = 500;
+
+		ast_test_status_update(test, "Pushing task %d\n", i);
+		if (ast_taskprocessor_push(tps, task, task_data[i])) {
+			ast_test_status_update(test, "Failed to queue task\n");
+			res = -1;
+			goto data_cleanup;
+		}
+
+		queue_count = ast_taskprocessor_size(tps);
+		alert_level = ast_taskprocessor_alert_get();
+		subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+		if (queue_count == HIGH_WATER_MARK) {
+			if (subsystem_alert_level) {
+				ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
+			}
+			if (alert_level) {
+				ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
+			}
+		} else if (queue_count < HIGH_WATER_MARK) {
+			if (subsystem_alert_level > 0) {
+				ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
+				res = -1;
+			}
+			if (alert_level > 0) {
+				ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
+				res = -1;
+			}
+		} else {
+			if (subsystem_alert_level == 0) {
+				ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
+				res = -1;
+			}
+			if (alert_level == 0) {
+				ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
+				res = -1;
+			}
+		}
+	}
+
+	ast_taskprocessor_unsuspend(tps);
+
+	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+		ast_test_status_update(test, "Waiting on task %d\n", i);
+		if (task_wait(task_data[i])) {
+			ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
+			res = -1;
+			goto data_cleanup;
+		}
+
+		queue_count = ast_taskprocessor_size(tps);
+		alert_level = ast_taskprocessor_alert_get();
+		subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+		if (queue_count == LOW_WATER_MARK) {
+			if (!subsystem_alert_level) {
+				ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
+			}
+			if (!alert_level) {
+				ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
+			}
+		} else if (queue_count > LOW_WATER_MARK) {
+			if (subsystem_alert_level == 0) {
+				ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
+				res = -1;
+			}
+			if (alert_level == 0) {
+				ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
+				res = -1;
+			}
+		} else {
+			if (subsystem_alert_level > 0) {
+				ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
+				res = -1;
+			}
+			if (alert_level > 0) {
+				ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
+				res = -1;
+			}
+		}
+
+	}
+
+data_cleanup:
+	for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+		ao2_cleanup(task_data[i]);
+	}
+
+	return res ? AST_TEST_FAIL : AST_TEST_PASS;
+}
+
 #define NUM_TASKS 20000
 
 /*!
@@ -749,6 +893,7 @@
 {
 	ast_test_unregister(default_taskprocessor);
 	ast_test_unregister(default_taskprocessor_load);
+	ast_test_unregister(subsystem_alert);
 	ast_test_unregister(taskprocessor_listener);
 	ast_test_unregister(taskprocessor_shutdown);
 	ast_test_unregister(taskprocessor_push_local);
@@ -759,6 +904,7 @@
 {
 	ast_test_register(default_taskprocessor);
 	ast_test_register(default_taskprocessor_load);
+	ast_test_register(subsystem_alert);
 	ast_test_register(taskprocessor_listener);
 	ast_test_register(taskprocessor_shutdown);
 	ast_test_register(taskprocessor_push_local);

-- 
To view, visit https://gerrit.asterisk.org/11001
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-MessageType: merged
Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
Gerrit-Change-Number: 11001
Gerrit-PatchSet: 9
Gerrit-Owner: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Friendly Automation (1000185)
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua C. Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190226/2e0fefad/attachment-0001.html>


More information about the asterisk-code-review mailing list