<p>George Joseph has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/11001">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">taskprocessor: Enable subsystems and overload by subsystem<br><br>To prevent one subsystem's taskprocessors from causing others<br>to stall, new capabilities have been added to taskprocessors.<br><br>* Any taskprocessor name that has a '/' will have the part<br> before the '/' saved as its "subsystem".<br> Examples:<br> "sorcery/acl-0000006a" and "sorcery/aor-00000019"<br> will be grouped to subsystem "sorcery".<br> "pjsip/distributor-00000025" and "pjsip/distributor-00000026"<br> will bn grouped to subsystem "pjsip".<br> Taskprocessors with no '/' have an empty subsystem.<br> The "core show taskprocessors" CLI command now shows the subsystem.<br><br>* When a taskprocessor enters high-water alert status and it<br> has a non-empty system, the subsystem name will be marked as<br> "overloaded".<br><br>* When a taskprocessor leaves high-water alert status and it<br> has a non-empty system, the subsystem name will be unmarked as<br> "overloaded".<br><br>* A new api ast_taskprocessor_is_subsystem_overloaded() has been<br> added that checks if a specific subsystem is overloaded.<br><br>* The pjsip distributor now uses<br> ast_taskprocessor_is_subsystem_overloaded("pjsip") to determine<br> if it should pause accepting new messages.<br><br>REMINDER: The taskprocessor code doesn't take any action itself based<br>on high-water alerts or overloading. It's up to taskprocessor users<br>to check and take action themselves. Currently only the pjsip<br>distributor does this.<br><br>Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56<br>---<br>M include/asterisk/taskprocessor.h<br>M main/stasis.c<br>M main/taskprocessor.c<br>M res/res_pjsip.c<br>M res/res_pjsip/pjsip_distributor.c<br>5 files changed, 99 insertions(+), 13 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/01/11001/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h</span><br><span>index f74989a..a97957f 100644</span><br><span>--- a/include/asterisk/taskprocessor.h</span><br><span>+++ b/include/asterisk/taskprocessor.h</span><br><span>@@ -342,6 +342,18 @@</span><br><span> unsigned int ast_taskprocessor_alert_get(void);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Test if any taskprocessors in the subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 13.26.0</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 16.3.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ *\param subsystem The subsystem to test</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 if no taskprocessors for this subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval non-zero if some task processors for this subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Set the high and low alert water marks of the given taskprocessor queue.</span><br><span> * \since 13.10.0</span><br><span> *</span><br><span>diff --git a/main/stasis.c b/main/stasis.c</span><br><span>index 5835a5a..1d046ce 100644</span><br><span>--- a/main/stasis.c</span><br><span>+++ b/main/stasis.c</span><br><span>@@ -679,7 +679,7 @@</span><br><span> char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span> </span><br><span> /* Create name with seq number appended. */</span><br><span style="color: hsl(0, 100%, 40%);">- ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c/%s",</span><br><span> use_thread_pool ? 'p' : 'm',</span><br><span> stasis_topic_name(topic));</span><br><span> </span><br><span>diff --git a/main/taskprocessor.c b/main/taskprocessor.c</span><br><span>index 84c4d2b..aa42b39 100644</span><br><span>--- a/main/taskprocessor.c</span><br><span>+++ b/main/taskprocessor.c</span><br><span>@@ -91,8 +91,12 @@</span><br><span> unsigned int high_water_alert:1;</span><br><span> /*! Indicates if the taskprocessor is currently suspended */</span><br><span> unsigned int suspended:1;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! \brief Anyything before the first '/' in the name (if there is one) */</span><br><span style="color: hsl(120, 100%, 40%);">+ char *subsystem;</span><br><span> /*! \brief Friendly name of the taskprocessor */</span><br><span style="color: hsl(0, 100%, 40%);">- char name[0];</span><br><span style="color: hsl(120, 100%, 40%);">+ char *name;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! \brief Buffer for subsystem and name */</span><br><span style="color: hsl(120, 100%, 40%);">+ char buffer[0];</span><br><span> };</span><br><span> </span><br><span> /*!</span><br><span>@@ -124,6 +128,8 @@</span><br><span> /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */</span><br><span> static struct ao2_container *tps_singletons;</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+static AST_VECTOR_RW(,char *) overloaded_subsystems;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition */</span><br><span> static ast_cond_t cli_ping_cond;</span><br><span> </span><br><span>@@ -273,6 +279,8 @@</span><br><span> static void tps_shutdown(void)</span><br><span> {</span><br><span> ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, free);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_FREE(&overloaded_subsystems);</span><br><span> ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");</span><br><span> tps_singletons = NULL;</span><br><span> }</span><br><span>@@ -287,6 +295,12 @@</span><br><span> return -1;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(tps_singletons, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> ast_cond_init(&cli_ping_cond, NULL);</span><br><span> </span><br><span> ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));</span><br><span>@@ -480,6 +494,7 @@</span><br><span> static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)</span><br><span> {</span><br><span> char name[256];</span><br><span style="color: hsl(120, 100%, 40%);">+ char subsystem[256];</span><br><span> int tcount;</span><br><span> unsigned long qsize;</span><br><span> unsigned long maxqsize;</span><br><span>@@ -487,8 +502,8 @@</span><br><span> struct ao2_container *sorted_tps;</span><br><span> struct ast_taskprocessor *tps;</span><br><span> struct ao2_iterator iter;</span><br><span style="color: hsl(0, 100%, 40%);">-#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"</span><br><span style="color: hsl(0, 100%, 40%);">-#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"</span><br><span style="color: hsl(120, 100%, 40%);">+#define FMT_HEADERS "%-16s %-45s %10s %10s %10s %10s %10s\n"</span><br><span style="color: hsl(120, 100%, 40%);">+#define FMT_FIELDS "%-16s %-45s %10lu %10lu %10lu %10lu %10lu\n"</span><br><span> </span><br><span> switch (cmd) {</span><br><span> case CLI_INIT:</span><br><span>@@ -513,15 +528,16 @@</span><br><span> return CLI_FAILURE;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_cli(a->fd, "\n" FMT_HEADERS, "Subsystem", "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");</span><br><span> tcount = 0;</span><br><span> iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);</span><br><span> while ((tps = ao2_iterator_next(&iter))) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_copy_string(subsystem, tps->subsystem, sizeof(subsystem));</span><br><span> ast_copy_string(name, tps->name, sizeof(name));</span><br><span> qsize = tps->tps_queue_size;</span><br><span> maxqsize = tps->stats.max_qsize;</span><br><span> processed = tps->stats._tasks_processed_count;</span><br><span style="color: hsl(0, 100%, 40%);">- ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_cli(a->fd, FMT_FIELDS, subsystem, name, processed, qsize, maxqsize,</span><br><span> tps->tps_queue_low, tps->tps_queue_high);</span><br><span> ast_taskprocessor_unreference(tps);</span><br><span> ++tcount;</span><br><span>@@ -550,6 +566,45 @@</span><br><span> return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ char *s;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+ s = AST_VECTOR_GET_CMP(&overloaded_subsystems, subsystem, !strcmp);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return !!s;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+static void add_subsystem_alert(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ char *s;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_strlen_zero(subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+ || ast_taskprocessor_is_subsystem_overloaded(subsystem)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ s = ast_strdup(subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!s) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_APPEND(&overloaded_subsystems, s);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+static void remove_subsystem_alert(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_strlen_zero(subsystem)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_REMOVE_CMP_UNORDERED(&overloaded_subsystems, subsystem, !strcmp, ast_free);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! Count of the number of taskprocessors in high water alert. */</span><br><span> static unsigned int tps_alert_count;</span><br><span> </span><br><span>@@ -573,11 +628,17 @@</span><br><span> ast_rwlock_wrlock(&tps_alert_lock);</span><br><span> old = tps_alert_count;</span><br><span> tps_alert_count += delta;</span><br><span style="color: hsl(0, 100%, 40%);">- if (DEBUG_ATLEAST(3)</span><br><span style="color: hsl(0, 100%, 40%);">- /* and tps_alert_count becomes zero or non-zero */</span><br><span style="color: hsl(0, 100%, 40%);">- && !old != !tps_alert_count) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (/* tps_alert_count becomes zero or non-zero */</span><br><span style="color: hsl(120, 100%, 40%);">+ !old != !tps_alert_count) {</span><br><span> ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",</span><br><span> tps->name, tps_alert_count ? "triggered" : "cleared");</span><br><span style="color: hsl(120, 100%, 40%);">+ if (tps->subsystem[0] != '\0') {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (tps_alert_count) {</span><br><span style="color: hsl(120, 100%, 40%);">+ add_subsystem_alert(tps->subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+ } else {</span><br><span style="color: hsl(120, 100%, 40%);">+ remove_subsystem_alert(tps->subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> }</span><br><span> ast_rwlock_unlock(&tps_alert_lock);</span><br><span> }</span><br><span>@@ -749,8 +810,15 @@</span><br><span> static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)</span><br><span> {</span><br><span> struct ast_taskprocessor *p;</span><br><span style="color: hsl(120, 100%, 40%);">+ char *subsystem_separator;</span><br><span style="color: hsl(120, 100%, 40%);">+ size_t subsystem_length = 0;</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);</span><br><span style="color: hsl(120, 100%, 40%);">+ subsystem_separator = strchr(name, '/');</span><br><span style="color: hsl(120, 100%, 40%);">+ if (subsystem_separator) {</span><br><span style="color: hsl(120, 100%, 40%);">+ subsystem_length = subsystem_separator - name;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ p = ao2_alloc(sizeof(*p) + strlen(name) + subsystem_length + 2, tps_taskprocessor_dtor);</span><br><span> if (!p) {</span><br><span> ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);</span><br><span> return NULL;</span><br><span>@@ -760,6 +828,12 @@</span><br><span> p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;</span><br><span> p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if (subsystem_length) {</span><br><span style="color: hsl(120, 100%, 40%);">+ strncpy(p->buffer, name, subsystem_length);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ p->buffer[subsystem_length] = '\0';</span><br><span style="color: hsl(120, 100%, 40%);">+ p->subsystem = p->buffer;</span><br><span style="color: hsl(120, 100%, 40%);">+ p->name = p->buffer + subsystem_length + 1;</span><br><span> strcpy(p->name, name); /*SAFE*/</span><br><span> </span><br><span> ao2_ref(listener, +1);</span><br><span>diff --git a/res/res_pjsip.c b/res/res_pjsip.c</span><br><span>index 812c291..cf7f47b 100644</span><br><span>--- a/res/res_pjsip.c</span><br><span>+++ b/res/res_pjsip.c</span><br><span>@@ -4267,7 +4267,7 @@</span><br><span> char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span> </span><br><span> /* Create name with seq number appended. */</span><br><span style="color: hsl(0, 100%, 40%);">- ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip-group-serializer");</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/group-serializer");</span><br><span> </span><br><span> return ast_sip_create_serializer_group_named(tps_name, shutdown_group);</span><br><span> }</span><br><span>@@ -4282,7 +4282,7 @@</span><br><span> char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span> </span><br><span> /* Create name with seq number appended. */</span><br><span style="color: hsl(0, 100%, 40%);">- ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip-serializer");</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/serializer");</span><br><span> </span><br><span> return ast_sip_create_serializer_group_named(tps_name, NULL);</span><br><span> }</span><br><span>diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c</span><br><span>index d67b942..49e3d71 100644</span><br><span>--- a/res/res_pjsip/pjsip_distributor.c</span><br><span>+++ b/res/res_pjsip/pjsip_distributor.c</span><br><span>@@ -524,7 +524,7 @@</span><br><span> ao2_cleanup(dist);</span><br><span> return PJ_TRUE;</span><br><span> } else {</span><br><span style="color: hsl(0, 100%, 40%);">- if (ast_taskprocessor_alert_get()) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_taskprocessor_is_subsystem_overloaded("pjsip")) {</span><br><span> /*</span><br><span> * When taskprocessors get backed up, there is a good chance that</span><br><span> * we are being overloaded and need to defer adding new work to</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/11001">change 11001</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/11001"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 13 </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56 </div>
<div style="display:none"> Gerrit-Change-Number: 11001 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: George Joseph <gjoseph@digium.com> </div>