[Asterisk-code-review] taskprocessor: Enable subsystems and overload by subsystem (asterisk[16])

George Joseph asteriskteam at digium.com
Fri Feb 15 13:10:55 CST 2019


George Joseph has uploaded this change for review. ( https://gerrit.asterisk.org/11002


Change subject: taskprocessor:  Enable subsystems and overload by subsystem
......................................................................

taskprocessor:  Enable subsystems and overload by subsystem

To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.

* Any taskprocessor name that has a '/' will have the part
  before the '/' saved as its "subsystem".
  Examples:
  "sorcery/acl-0000006a" and "sorcery/aor-00000019"
  will be grouped to subsystem "sorcery".
  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"
  will bn grouped to subsystem "pjsip".
  Taskprocessors with no '/' have an empty subsystem.
  The "core show taskprocessors" CLI command now shows the subsystem.

* When a taskprocessor enters high-water alert status and it
  has a non-empty system, the subsystem name will be marked as
  "overloaded".

* When a taskprocessor leaves high-water alert status and it
  has a non-empty system, the subsystem name will be unmarked as
  "overloaded".

* A new api ast_taskprocessor_is_subsystem_overloaded() has been
  added that checks if a specific subsystem is overloaded.

* The pjsip distributor now uses
  ast_taskprocessor_is_subsystem_overloaded("pjsip") to determine
  if it should pause accepting new messages.

REMINDER: The taskprocessor code doesn't take any action itself based
on high-water alerts or overloading.  It's up to taskprocessor users
to check and take action themselves.  Currently only the pjsip
distributor does this.

Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
---
M include/asterisk/taskprocessor.h
M main/stasis.c
M main/taskprocessor.c
M res/res_pjsip/pjsip_distributor.c
4 files changed, 97 insertions(+), 11 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/02/11002/1

diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..a97957f 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -342,6 +342,18 @@
 unsigned int ast_taskprocessor_alert_get(void);
 
 /*!
+ * \brief Test if any taskprocessors in the subsystem are in high water alert.
+ * \since 13.26.0
+ * \since 16.3.0
+ *
+ *\param subsystem The subsystem to test
+ *
+ * \retval 0 if no taskprocessors for this subsystem are in high water alert.
+ * \retval non-zero if some task processors for this subsystem are in high water alert.
+ */
+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem);
+
+/*!
  * \brief Set the high and low alert water marks of the given taskprocessor queue.
  * \since 13.10.0
  *
diff --git a/main/stasis.c b/main/stasis.c
index fea9ac5..4ca89b1 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -677,7 +677,7 @@
 		char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
 		/* Create name with seq number appended. */
-		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c/%s",
 			use_thread_pool ? 'p' : 'm',
 			stasis_topic_name(topic));
 
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 30aeddb..a0bfef4 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -89,8 +89,12 @@
 	unsigned int high_water_alert:1;
 	/*! Indicates if the taskprocessor is currently suspended */
 	unsigned int suspended:1;
+	/*! \brief Anyything before the first '/' in the name (if there is one) */
+	char *subsystem;
 	/*! \brief Friendly name of the taskprocessor */
-	char name[0];
+	char *name;
+	/*! \brief Buffer for subsystem and name */
+	char buffer[0];
 };
 
 /*!
@@ -122,6 +126,8 @@
 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
 static struct ao2_container *tps_singletons;
 
+static AST_VECTOR_RW(,char *) overloaded_subsystems;
+
 /*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition */
 static ast_cond_t cli_ping_cond;
 
@@ -271,6 +277,8 @@
 static void tps_shutdown(void)
 {
 	ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
+	AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, free);
+	AST_VECTOR_RW_FREE(&overloaded_subsystems);
 	ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
 	tps_singletons = NULL;
 }
@@ -285,6 +293,12 @@
 		return -1;
 	}
 
+	if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
+		ao2_ref(tps_singletons, -1);
+		ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+		return -1;
+	}
+
 	ast_cond_init(&cli_ping_cond, NULL);
 
 	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -478,6 +492,7 @@
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	char name[256];
+	char subsystem[256];
 	int tcount;
 	unsigned long qsize;
 	unsigned long maxqsize;
@@ -485,8 +500,8 @@
 	struct ao2_container *sorted_tps;
 	struct ast_taskprocessor *tps;
 	struct ao2_iterator iter;
-#define FMT_HEADERS		"%-45s %10s %10s %10s %10s %10s\n"
-#define FMT_FIELDS		"%-45s %10lu %10lu %10lu %10lu %10lu\n"
+#define FMT_HEADERS		"%-16s %-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS		"%-16s %-45s %10lu %10lu %10lu %10lu %10lu\n"
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -511,15 +526,16 @@
 		return CLI_FAILURE;
 	}
 
-	ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Subsystem", "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
 	tcount = 0;
 	iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
 	while ((tps = ao2_iterator_next(&iter))) {
+		ast_copy_string(subsystem, tps->subsystem, sizeof(subsystem));
 		ast_copy_string(name, tps->name, sizeof(name));
 		qsize = tps->tps_queue_size;
 		maxqsize = tps->stats.max_qsize;
 		processed = tps->stats._tasks_processed_count;
-		ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+		ast_cli(a->fd, FMT_FIELDS, subsystem, name, processed, qsize, maxqsize,
 			tps->tps_queue_low, tps->tps_queue_high);
 		ast_taskprocessor_unreference(tps);
 		++tcount;
@@ -548,6 +564,45 @@
 	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem)
+{
+	char *s;
+
+	AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+	s = AST_VECTOR_GET_CMP(&overloaded_subsystems, subsystem, !strcmp);
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+	return !!s;
+}
+
+static void add_subsystem_alert(const char *subsystem)
+{
+	char *s;
+
+	if (ast_strlen_zero(subsystem)
+		|| ast_taskprocessor_is_subsystem_overloaded(subsystem)) {
+		return;
+	}
+
+	s = ast_strdup(subsystem);
+	if (!s) {
+		return;
+	}
+	AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+	AST_VECTOR_APPEND(&overloaded_subsystems, s);
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void remove_subsystem_alert(const char *subsystem)
+{
+	if (ast_strlen_zero(subsystem)) {
+		return;
+	}
+	AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+	AST_VECTOR_REMOVE_CMP_UNORDERED(&overloaded_subsystems, subsystem, !strcmp, ast_free);
+	AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
 /*! Count of the number of taskprocessors in high water alert. */
 static unsigned int tps_alert_count;
 
@@ -571,11 +626,17 @@
 	ast_rwlock_wrlock(&tps_alert_lock);
 	old = tps_alert_count;
 	tps_alert_count += delta;
-	if (DEBUG_ATLEAST(3)
-		/* and tps_alert_count becomes zero or non-zero */
-		&& !old != !tps_alert_count) {
+	if (/* tps_alert_count becomes zero or non-zero */
+		!old != !tps_alert_count) {
 		ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
 			tps->name, tps_alert_count ? "triggered" : "cleared");
+		if (tps->subsystem[0] != '\0') {
+			if (tps_alert_count) {
+				add_subsystem_alert(tps->subsystem);
+			} else {
+				remove_subsystem_alert(tps->subsystem);
+			}
+		}
 	}
 	ast_rwlock_unlock(&tps_alert_lock);
 }
@@ -747,8 +808,15 @@
 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
 {
 	struct ast_taskprocessor *p;
+	char *subsystem_separator;
+	size_t subsystem_length = 0;
 
-	p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
+	subsystem_separator = strchr(name, '/');
+	if (subsystem_separator) {
+		subsystem_length = subsystem_separator - name;
+	}
+
+	p = ao2_alloc(sizeof(*p) + strlen(name) + subsystem_length + 2, tps_taskprocessor_dtor);
 	if (!p) {
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
 		return NULL;
@@ -758,6 +826,12 @@
 	p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
 	p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
 
+	if (subsystem_length) {
+		strncpy(p->buffer, name, subsystem_length);
+	}
+	p->buffer[subsystem_length] = '\0';
+	p->subsystem = p->buffer;
+	p->name = p->buffer + subsystem_length + 1;
 	strcpy(p->name, name); /*SAFE*/
 
 	ao2_ref(listener, +1);
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index d356e37..cbd49c3 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -534,7 +534,7 @@
 		ao2_cleanup(dist);
 		return PJ_TRUE;
 	} else {
-		if (ast_taskprocessor_alert_get()) {
+		if (ast_taskprocessor_is_subsystem_overloaded("pjsip")) {
 			/*
 			 * When taskprocessors get backed up, there is a good chance that
 			 * we are being overloaded and need to defer adding new work to

-- 
To view, visit https://gerrit.asterisk.org/11002
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
Gerrit-Change-Number: 11002
Gerrit-PatchSet: 1
Gerrit-Owner: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190215/83cdba44/attachment-0001.html>


More information about the asterisk-code-review mailing list