<p>George Joseph has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/11002">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">taskprocessor:  Enable subsystems and overload by subsystem<br><br>To prevent one subsystem's taskprocessors from causing others<br>to stall, new capabilities have been added to taskprocessors.<br><br>* Any taskprocessor name that has a '/' will have the part<br>  before the '/' saved as its "subsystem".<br>  Examples:<br>  "sorcery/acl-0000006a" and "sorcery/aor-00000019"<br>  will be grouped to subsystem "sorcery".<br>  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"<br>  will bn grouped to subsystem "pjsip".<br>  Taskprocessors with no '/' have an empty subsystem.<br>  The "core show taskprocessors" CLI command now shows the subsystem.<br><br>* When a taskprocessor enters high-water alert status and it<br>  has a non-empty system, the subsystem name will be marked as<br>  "overloaded".<br><br>* When a taskprocessor leaves high-water alert status and it<br>  has a non-empty system, the subsystem name will be unmarked as<br>  "overloaded".<br><br>* A new api ast_taskprocessor_is_subsystem_overloaded() has been<br>  added that checks if a specific subsystem is overloaded.<br><br>* The pjsip distributor now uses<br>  ast_taskprocessor_is_subsystem_overloaded("pjsip") to determine<br>  if it should pause accepting new messages.<br><br>REMINDER: The taskprocessor code doesn't take any action itself based<br>on high-water alerts or overloading.  It's up to taskprocessor users<br>to check and take action themselves.  Currently only the pjsip<br>distributor does this.<br><br>Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56<br>---<br>M include/asterisk/taskprocessor.h<br>M main/stasis.c<br>M main/taskprocessor.c<br>M res/res_pjsip/pjsip_distributor.c<br>4 files changed, 97 insertions(+), 11 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/02/11002/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h</span><br><span>index f74989a..a97957f 100644</span><br><span>--- a/include/asterisk/taskprocessor.h</span><br><span>+++ b/include/asterisk/taskprocessor.h</span><br><span>@@ -342,6 +342,18 @@</span><br><span> unsigned int ast_taskprocessor_alert_get(void);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Test if any taskprocessors in the subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 13.26.0</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 16.3.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ *\param subsystem The subsystem to test</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 if no taskprocessors for this subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval non-zero if some task processors for this subsystem are in high water alert.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Set the high and low alert water marks of the given taskprocessor queue.</span><br><span>  * \since 13.10.0</span><br><span>  *</span><br><span>diff --git a/main/stasis.c b/main/stasis.c</span><br><span>index fea9ac5..4ca89b1 100644</span><br><span>--- a/main/stasis.c</span><br><span>+++ b/main/stasis.c</span><br><span>@@ -677,7 +677,7 @@</span><br><span>            char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span> </span><br><span>           /* Create name with seq number appended. */</span><br><span style="color: hsl(0, 100%, 40%);">-             ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",</span><br><span style="color: hsl(120, 100%, 40%);">+                ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c/%s",</span><br><span>                       use_thread_pool ? 'p' : 'm',</span><br><span>                         stasis_topic_name(topic));</span><br><span> </span><br><span>diff --git a/main/taskprocessor.c b/main/taskprocessor.c</span><br><span>index 30aeddb..a0bfef4 100644</span><br><span>--- a/main/taskprocessor.c</span><br><span>+++ b/main/taskprocessor.c</span><br><span>@@ -89,8 +89,12 @@</span><br><span>     unsigned int high_water_alert:1;</span><br><span>     /*! Indicates if the taskprocessor is currently suspended */</span><br><span>         unsigned int suspended:1;</span><br><span style="color: hsl(120, 100%, 40%);">+     /*! \brief Anyything before the first '/' in the name (if there is one) */</span><br><span style="color: hsl(120, 100%, 40%);">+    char *subsystem;</span><br><span>     /*! \brief Friendly name of the taskprocessor */</span><br><span style="color: hsl(0, 100%, 40%);">-        char name[0];</span><br><span style="color: hsl(120, 100%, 40%);">+ char *name;</span><br><span style="color: hsl(120, 100%, 40%);">+   /*! \brief Buffer for subsystem and name */</span><br><span style="color: hsl(120, 100%, 40%);">+   char buffer[0];</span><br><span> };</span><br><span> </span><br><span> /*!</span><br><span>@@ -122,6 +126,8 @@</span><br><span> /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */</span><br><span> static struct ao2_container *tps_singletons;</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+static AST_VECTOR_RW(,char *) overloaded_subsystems;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */</span><br><span> static ast_cond_t cli_ping_cond;</span><br><span> </span><br><span>@@ -271,6 +277,8 @@</span><br><span> static void tps_shutdown(void)</span><br><span> {</span><br><span>        ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));</span><br><span style="color: hsl(120, 100%, 40%);">+       AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, free);</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_FREE(&overloaded_subsystems);</span><br><span>      ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");</span><br><span>         tps_singletons = NULL;</span><br><span> }</span><br><span>@@ -285,6 +293,12 @@</span><br><span>           return -1;</span><br><span>   }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {</span><br><span style="color: hsl(120, 100%, 40%);">+             ao2_ref(tps_singletons, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+          ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+              return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+    }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  ast_cond_init(&cli_ping_cond, NULL);</span><br><span> </span><br><span>         ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));</span><br><span>@@ -478,6 +492,7 @@</span><br><span> static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)</span><br><span> {</span><br><span>    char name[256];</span><br><span style="color: hsl(120, 100%, 40%);">+       char subsystem[256];</span><br><span>         int tcount;</span><br><span>  unsigned long qsize;</span><br><span>         unsigned long maxqsize;</span><br><span>@@ -485,8 +500,8 @@</span><br><span>        struct ao2_container *sorted_tps;</span><br><span>    struct ast_taskprocessor *tps;</span><br><span>       struct ao2_iterator iter;</span><br><span style="color: hsl(0, 100%, 40%);">-#define FMT_HEADERS            "%-45s %10s %10s %10s %10s %10s\n"</span><br><span style="color: hsl(0, 100%, 40%);">-#define FMT_FIELDS          "%-45s %10lu %10lu %10lu %10lu %10lu\n"</span><br><span style="color: hsl(120, 100%, 40%);">+#define FMT_HEADERS          "%-16s %-45s %10s %10s %10s %10s %10s\n"</span><br><span style="color: hsl(120, 100%, 40%);">+#define FMT_FIELDS          "%-16s %-45s %10lu %10lu %10lu %10lu %10lu\n"</span><br><span> </span><br><span>  switch (cmd) {</span><br><span>       case CLI_INIT:</span><br><span>@@ -511,15 +526,16 @@</span><br><span>               return CLI_FAILURE;</span><br><span>  }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");</span><br><span style="color: hsl(120, 100%, 40%);">+      ast_cli(a->fd, "\n" FMT_HEADERS, "Subsystem", "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");</span><br><span>      tcount = 0;</span><br><span>  iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);</span><br><span>   while ((tps = ao2_iterator_next(&iter))) {</span><br><span style="color: hsl(120, 100%, 40%);">+                ast_copy_string(subsystem, tps->subsystem, sizeof(subsystem));</span><br><span>            ast_copy_string(name, tps->name, sizeof(name));</span><br><span>           qsize = tps->tps_queue_size;</span><br><span>              maxqsize = tps->stats.max_qsize;</span><br><span>          processed = tps->stats._tasks_processed_count;</span><br><span style="color: hsl(0, 100%, 40%);">-               ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,</span><br><span style="color: hsl(120, 100%, 40%);">+               ast_cli(a->fd, FMT_FIELDS, subsystem, name, processed, qsize, maxqsize,</span><br><span>                   tps->tps_queue_low, tps->tps_queue_high);</span><br><span>              ast_taskprocessor_unreference(tps);</span><br><span>          ++tcount;</span><br><span>@@ -548,6 +564,45 @@</span><br><span>     return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+       char *s;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+     s = AST_VECTOR_GET_CMP(&overloaded_subsystems, subsystem, !strcmp);</span><br><span style="color: hsl(120, 100%, 40%);">+       AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   return !!s;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+static void add_subsystem_alert(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+   char *s;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    if (ast_strlen_zero(subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+                || ast_taskprocessor_is_subsystem_overloaded(subsystem)) {</span><br><span style="color: hsl(120, 100%, 40%);">+            return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   s = ast_strdup(subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+    if (!s) {</span><br><span style="color: hsl(120, 100%, 40%);">+             return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_APPEND(&overloaded_subsystems, s);</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+static void remove_subsystem_alert(const char *subsystem)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+  if (ast_strlen_zero(subsystem)) {</span><br><span style="color: hsl(120, 100%, 40%);">+             return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_REMOVE_CMP_UNORDERED(&overloaded_subsystems, subsystem, !strcmp, ast_free);</span><br><span style="color: hsl(120, 100%, 40%);">+    AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! Count of the number of taskprocessors in high water alert. */</span><br><span> static unsigned int tps_alert_count;</span><br><span> </span><br><span>@@ -571,11 +626,17 @@</span><br><span>      ast_rwlock_wrlock(&tps_alert_lock);</span><br><span>      old = tps_alert_count;</span><br><span>       tps_alert_count += delta;</span><br><span style="color: hsl(0, 100%, 40%);">-       if (DEBUG_ATLEAST(3)</span><br><span style="color: hsl(0, 100%, 40%);">-            /* and tps_alert_count becomes zero or non-zero */</span><br><span style="color: hsl(0, 100%, 40%);">-              && !old != !tps_alert_count) {</span><br><span style="color: hsl(120, 100%, 40%);">+        if (/* tps_alert_count becomes zero or non-zero */</span><br><span style="color: hsl(120, 100%, 40%);">+            !old != !tps_alert_count) {</span><br><span>          ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",</span><br><span>                        tps->name, tps_alert_count ? "triggered" : "cleared");</span><br><span style="color: hsl(120, 100%, 40%);">+         if (tps->subsystem[0] != '\0') {</span><br><span style="color: hsl(120, 100%, 40%);">+                   if (tps_alert_count) {</span><br><span style="color: hsl(120, 100%, 40%);">+                                add_subsystem_alert(tps->subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+                       } else {</span><br><span style="color: hsl(120, 100%, 40%);">+                              remove_subsystem_alert(tps->subsystem);</span><br><span style="color: hsl(120, 100%, 40%);">+                    }</span><br><span style="color: hsl(120, 100%, 40%);">+             }</span><br><span>    }</span><br><span>    ast_rwlock_unlock(&tps_alert_lock);</span><br><span> }</span><br><span>@@ -747,8 +808,15 @@</span><br><span> static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)</span><br><span> {</span><br><span>         struct ast_taskprocessor *p;</span><br><span style="color: hsl(120, 100%, 40%);">+  char *subsystem_separator;</span><br><span style="color: hsl(120, 100%, 40%);">+    size_t subsystem_length = 0;</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-        p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);</span><br><span style="color: hsl(120, 100%, 40%);">+ subsystem_separator = strchr(name, '/');</span><br><span style="color: hsl(120, 100%, 40%);">+      if (subsystem_separator) {</span><br><span style="color: hsl(120, 100%, 40%);">+            subsystem_length = subsystem_separator - name;</span><br><span style="color: hsl(120, 100%, 40%);">+        }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   p = ao2_alloc(sizeof(*p) + strlen(name) + subsystem_length + 2, tps_taskprocessor_dtor);</span><br><span>     if (!p) {</span><br><span>            ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);</span><br><span>               return NULL;</span><br><span>@@ -758,6 +826,12 @@</span><br><span>  p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;</span><br><span>         p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+        if (subsystem_length) {</span><br><span style="color: hsl(120, 100%, 40%);">+               strncpy(p->buffer, name, subsystem_length);</span><br><span style="color: hsl(120, 100%, 40%);">+        }</span><br><span style="color: hsl(120, 100%, 40%);">+     p->buffer[subsystem_length] = '\0';</span><br><span style="color: hsl(120, 100%, 40%);">+        p->subsystem = p->buffer;</span><br><span style="color: hsl(120, 100%, 40%);">+       p->name = p->buffer + subsystem_length + 1;</span><br><span>    strcpy(p->name, name); /*SAFE*/</span><br><span> </span><br><span>       ao2_ref(listener, +1);</span><br><span>diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c</span><br><span>index d356e37..cbd49c3 100644</span><br><span>--- a/res/res_pjsip/pjsip_distributor.c</span><br><span>+++ b/res/res_pjsip/pjsip_distributor.c</span><br><span>@@ -534,7 +534,7 @@</span><br><span>                ao2_cleanup(dist);</span><br><span>           return PJ_TRUE;</span><br><span>      } else {</span><br><span style="color: hsl(0, 100%, 40%);">-                if (ast_taskprocessor_alert_get()) {</span><br><span style="color: hsl(120, 100%, 40%);">+          if (ast_taskprocessor_is_subsystem_overloaded("pjsip")) {</span><br><span>                  /*</span><br><span>                    * When taskprocessors get backed up, there is a good chance that</span><br><span>                     * we are being overloaded and need to defer adding new work to</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/11002">change 11002</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/11002"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 16 </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56 </div>
<div style="display:none"> Gerrit-Change-Number: 11002 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: George Joseph <gjoseph@digium.com> </div>