<p>George Joseph <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/c/asterisk/+/12999">View Change</a></p><div style="white-space:pre-wrap">Approvals:
  Joshua Colp: Looks good to me, but someone else must approve
  Benjamin Keith Ford: Looks good to me, but someone else must approve
  George Joseph: Looks good to me, approved; Approved for Submit

</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">serializer: move/add asterisk serializer pool functionality<br><br>Serializer pools have previously existed in Asterisk. However, for the most<br>part the code has been duplicated across modules. This patch abstracts the<br>code into an 'ast_serializer_pool' object. As well the code is now centralized<br>in serializer.c/h.<br><br>In addition serializer pools can now optionally be monitored by a shutdown<br>group. This will prevent the pool from being destroyed until all serializers<br>have completed.<br><br>Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971<br>---<br>A include/asterisk/serializer.h<br>M include/asterisk/taskprocessor.h<br>A main/serializer.c<br>M main/taskprocessor.c<br>M tests/test_taskprocessor.c<br>5 files changed, 371 insertions(+), 1 deletion(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h</span><br><span>new file mode 100644</span><br><span>index 0000000..1a1eb83</span><br><span>--- /dev/null</span><br><span>+++ b/include/asterisk/serializer.h</span><br><span>@@ -0,0 +1,85 @@</span><br><span style="color: hsl(120, 100%, 40%);">+/*</span><br><span style="color: hsl(120, 100%, 40%);">+ * Asterisk -- An open source telephony toolkit.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Copyright (C) 2019, Sangoma Technologies Corporation</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Kevin Harwell <kharwell@digium.com></span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * See http://www.asterisk.org for more information about</span><br><span style="color: hsl(120, 100%, 40%);">+ * the Asterisk project. Please do not directly contact</span><br><span style="color: hsl(120, 100%, 40%);">+ * any of the maintainers of this project for assistance;</span><br><span style="color: hsl(120, 100%, 40%);">+ * the project provides a web site, mailing lists and IRC</span><br><span style="color: hsl(120, 100%, 40%);">+ * channels for your use.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This program is free software, distributed under the terms of</span><br><span style="color: hsl(120, 100%, 40%);">+ * the GNU General Public License Version 2. See the LICENSE file</span><br><span style="color: hsl(120, 100%, 40%);">+ * at the top of the source tree.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#ifndef _AST_SERIALIZER_H</span><br><span style="color: hsl(120, 100%, 40%);">+#define _AST_SERIALIZER_H</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_threadpool;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * Maintains a named pool of thread pooled taskprocessors. Also if configured</span><br><span style="color: hsl(120, 100%, 40%);">+ * a shutdown group can be enabled that will ensure all serializers have</span><br><span style="color: hsl(120, 100%, 40%);">+ * completed any assigned task before destruction.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Destroy the serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Attempt to destroy the serializer pool. If a shutdown group has been enabled,</span><br><span style="color: hsl(120, 100%, 40%);">+ * and times out waiting for threads to complete, then this function will return</span><br><span style="color: hsl(120, 100%, 40%);">+ * the number of remaining threads, and the pool will not be destroyed.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool to destroy</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Create a serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Create a serializer pool with an optional shutdown group. If a timeout greater</span><br><span style="color: hsl(120, 100%, 40%);">+ * than -1 is specified then a shutdown group is enabled on the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param name The base name for the pool, and used when building taskprocessor(s)</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param size The size of the pool</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param threadpool The backing threadpool to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param timeout The timeout used if using a shutdown group (-1 = disabled)</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval A newly allocated serializer pool object, or NULL on error</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,</span><br><span style="color: hsl(120, 100%, 40%);">+     unsigned int size, struct ast_threadpool *threadpool, int timeout);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Retrieve the base name of the serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool object</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval The base name given to the pool</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Retrieve a serializer from the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool object</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval A serializer/taskprocessor</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set taskprocessor alert levels for the serializers in the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool to destroy</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success, or -1 on error.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#endif /* _AST_SERIALIZER_H */</span><br><span>diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h</span><br><span>index 2f49e47..5145565 100644</span><br><span>--- a/include/asterisk/taskprocessor.h</span><br><span>+++ b/include/asterisk/taskprocessor.h</span><br><span>@@ -305,6 +305,15 @@</span><br><span> unsigned int ast_taskprocessor_seq_num(void);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Append the next sequence number to the given string, and copy into the buffer.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param buf Where to copy the appended taskprocessor name.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param size How large is buf including null terminator.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param name A name to append the sequence number to.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Build a taskprocessor name with a sequence number on the end.</span><br><span>  * \since 13.8.0</span><br><span>  *</span><br><span>diff --git a/main/serializer.c b/main/serializer.c</span><br><span>new file mode 100644</span><br><span>index 0000000..280ada0</span><br><span>--- /dev/null</span><br><span>+++ b/main/serializer.c</span><br><span>@@ -0,0 +1,189 @@</span><br><span style="color: hsl(120, 100%, 40%);">+/*</span><br><span style="color: hsl(120, 100%, 40%);">+ * Asterisk -- An open source telephony toolkit.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Copyright (C) 2019, Sangoma Technologies Corporation</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Kevin Harwell <kharwell@digium.com></span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * See http://www.asterisk.org for more information about</span><br><span style="color: hsl(120, 100%, 40%);">+ * the Asterisk project. Please do not directly contact</span><br><span style="color: hsl(120, 100%, 40%);">+ * any of the maintainers of this project for assistance;</span><br><span style="color: hsl(120, 100%, 40%);">+ * the project provides a web site, mailing lists and IRC</span><br><span style="color: hsl(120, 100%, 40%);">+ * channels for your use.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This program is free software, distributed under the terms of</span><br><span style="color: hsl(120, 100%, 40%);">+ * the GNU General Public License Version 2. See the LICENSE file</span><br><span style="color: hsl(120, 100%, 40%);">+ * at the top of the source tree.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk.h"</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/astobj2.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/serializer.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/taskprocessor.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/threadpool.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/utils.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/vector.h"</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool {</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! Shutdown group to monitor serializers. */</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_serializer_shutdown_group *shutdown_group;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! Time to wait if using a shutdown group. */</span><br><span style="color: hsl(120, 100%, 40%);">+        int shutdown_group_timeout;</span><br><span style="color: hsl(120, 100%, 40%);">+   /*! A pool of taskprocessor(s) */</span><br><span style="color: hsl(120, 100%, 40%);">+     AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;</span><br><span style="color: hsl(120, 100%, 40%);">+      /*! Base name for the pool */</span><br><span style="color: hsl(120, 100%, 40%);">+ char name[];</span><br><span style="color: hsl(120, 100%, 40%);">+};</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+      if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+     }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   /* Clear out the serializers */</span><br><span style="color: hsl(120, 100%, 40%);">+       AST_VECTOR_RW_WRLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+      AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    /* If using a shutdown group then wait for all threads to complete */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (pool->shutdown_group) {</span><br><span style="color: hsl(120, 100%, 40%);">+                int remaining;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+              ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+              remaining = ast_serializer_shutdown_group_join(</span><br><span style="color: hsl(120, 100%, 40%);">+                       pool->shutdown_group, pool->shutdown_group_timeout);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+          if (remaining) {</span><br><span style="color: hsl(120, 100%, 40%);">+                      /* If we've timed out don't fully cleanup yet */</span><br><span style="color: hsl(120, 100%, 40%);">+                      ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "</span><br><span style="color: hsl(120, 100%, 40%);">+                           "'%d' dependencies still processing.\n", pool->name, remaining);</span><br><span style="color: hsl(120, 100%, 40%);">+                 return remaining;</span><br><span style="color: hsl(120, 100%, 40%);">+             }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+           ao2_ref(pool->shutdown_group, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+         pool->shutdown_group = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_FREE(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+        ast_free(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+     return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,</span><br><span style="color: hsl(120, 100%, 40%);">+      unsigned int size, struct ast_threadpool *threadpool, int timeout)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_serializer_pool *pool;</span><br><span style="color: hsl(120, 100%, 40%);">+     char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span style="color: hsl(120, 100%, 40%);">+        size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(size > 0);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);</span><br><span style="color: hsl(120, 100%, 40%);">+  if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   strcpy(pool->name, name); /* safe */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+     pool->shutdown_group_timeout = timeout;</span><br><span style="color: hsl(120, 100%, 40%);">+    pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_INIT(&pool->serializers, size);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        for (idx = 0; idx < size; ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+         struct ast_taskprocessor *tps;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+              /* Create name with seq number appended. */</span><br><span style="color: hsl(120, 100%, 40%);">+           ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+            tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);</span><br><span style="color: hsl(120, 100%, 40%);">+         if (!tps) {</span><br><span style="color: hsl(120, 100%, 40%);">+                   ast_serializer_pool_destroy(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+                    ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",</span><br><span style="color: hsl(120, 100%, 40%);">+                                 tps_name);</span><br><span style="color: hsl(120, 100%, 40%);">+                    return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+          }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+           if (AST_VECTOR_APPEND(&pool->serializers, tps)) {</span><br><span style="color: hsl(120, 100%, 40%);">+                      ast_serializer_pool_destroy(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+                    ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",</span><br><span style="color: hsl(120, 100%, 40%);">+                                 tps_name);</span><br><span style="color: hsl(120, 100%, 40%);">+                    return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+          }</span><br><span style="color: hsl(120, 100%, 40%);">+     }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   return pool;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+    return pool->name;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+    struct ast_taskprocessor *res;</span><br><span style="color: hsl(120, 100%, 40%);">+        size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_RDLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+      if (AST_VECTOR_SIZE(&pool->serializers) == 0) {</span><br><span style="color: hsl(120, 100%, 40%);">+                AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+              return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   res = AST_VECTOR_GET(&pool->serializers, 0);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Choose the taskprocessor with the smallest queue */</span><br><span style="color: hsl(120, 100%, 40%);">+        for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+           struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);</span><br><span style="color: hsl(120, 100%, 40%);">+               if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {</span><br><span style="color: hsl(120, 100%, 40%);">+                   res = cur;</span><br><span style="color: hsl(120, 100%, 40%);">+            }</span><br><span style="color: hsl(120, 100%, 40%);">+     }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+      return res;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+        size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+   long tps_queue_high;</span><br><span style="color: hsl(120, 100%, 40%);">+  long tps_queue_low;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+     }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   tps_queue_high = high;</span><br><span style="color: hsl(120, 100%, 40%);">+        if (tps_queue_high <= 0) {</span><br><span style="color: hsl(120, 100%, 40%);">+         ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "</span><br><span style="color: hsl(120, 100%, 40%);">+                           "trigger level '%ld'\n", pool->name, tps_queue_high);</span><br><span style="color: hsl(120, 100%, 40%);">+            tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   tps_queue_low = low;</span><br><span style="color: hsl(120, 100%, 40%);">+  if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {</span><br><span style="color: hsl(120, 100%, 40%);">+             ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "</span><br><span style="color: hsl(120, 100%, 40%);">+                              "level '%ld'\n", pool->name, tps_queue_low);</span><br><span style="color: hsl(120, 100%, 40%);">+             tps_queue_low = -1;</span><br><span style="color: hsl(120, 100%, 40%);">+   }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+           struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);</span><br><span style="color: hsl(120, 100%, 40%);">+               if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {</span><br><span style="color: hsl(120, 100%, 40%);">+                 ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",</span><br><span style="color: hsl(120, 100%, 40%);">+                                       ast_taskprocessor_name(cur));</span><br><span style="color: hsl(120, 100%, 40%);">+         }</span><br><span style="color: hsl(120, 100%, 40%);">+     }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span>diff --git a/main/taskprocessor.c b/main/taskprocessor.c</span><br><span>index 47d75d3..52cc5e0 100644</span><br><span>--- a/main/taskprocessor.c</span><br><span>+++ b/main/taskprocessor.c</span><br><span>@@ -1280,11 +1280,22 @@</span><br><span>  return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+#define SEQ_STR_SIZE (1 + 8 + 1)     /* Dash plus 8 hex digits plus null terminator */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+    int final_size = strlen(name) + SEQ_STR_SIZE;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+       ast_assert(buf != NULL && name != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+      ast_assert(final_size <= size);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+  snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)</span><br><span> {</span><br><span>     va_list ap;</span><br><span>  int user_size;</span><br><span style="color: hsl(0, 100%, 40%);">-#define SEQ_STR_SIZE (1 + 8 + 1)  /* Dash plus 8 hex digits plus null terminator */</span><br><span> </span><br><span>        ast_assert(buf != NULL);</span><br><span>     ast_assert(SEQ_STR_SIZE <= size);</span><br><span>diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c</span><br><span>index 70cb556..031151c 100644</span><br><span>--- a/tests/test_taskprocessor.c</span><br><span>+++ b/tests/test_taskprocessor.c</span><br><span>@@ -35,6 +35,8 @@</span><br><span> #include "asterisk/taskprocessor.h"</span><br><span> #include "asterisk/module.h"</span><br><span> #include "asterisk/astobj2.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/serializer.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/threadpool.h"</span><br><span> </span><br><span> /*!</span><br><span>  * \brief userdata associated with baseline taskprocessor test</span><br><span>@@ -889,6 +891,78 @@</span><br><span>      return AST_TEST_PASS;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Baseline test for a serializer pool</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This test ensures that when a task is added to a taskprocessor that</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been allocated with a default listener that the task gets executed</span><br><span style="color: hsl(120, 100%, 40%);">+ * as expected</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+AST_TEST_DEFINE(serializer_pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+   RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);</span><br><span style="color: hsl(120, 100%, 40%);">+ RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);</span><br><span style="color: hsl(120, 100%, 40%);">+   RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);</span><br><span style="color: hsl(120, 100%, 40%);">+   struct ast_threadpool_options options = {</span><br><span style="color: hsl(120, 100%, 40%);">+             .version = AST_THREADPOOL_OPTIONS_VERSION,</span><br><span style="color: hsl(120, 100%, 40%);">+            .idle_timeout = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+            .auto_increment = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+          .initial_size = 1,</span><br><span style="color: hsl(120, 100%, 40%);">+            .max_size = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+        };</span><br><span style="color: hsl(120, 100%, 40%);">+    /* struct ast_taskprocessor *tps; */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        switch (cmd) {</span><br><span style="color: hsl(120, 100%, 40%);">+        case TEST_INIT:</span><br><span style="color: hsl(120, 100%, 40%);">+               info->name = "serializer_pool";</span><br><span style="color: hsl(120, 100%, 40%);">+          info->category = "/main/taskprocessor/";</span><br><span style="color: hsl(120, 100%, 40%);">+         info->summary = "Test using a serializer pool";</span><br><span style="color: hsl(120, 100%, 40%);">+          info->description =</span><br><span style="color: hsl(120, 100%, 40%);">+                        "Ensures that a queued task gets executed.";</span><br><span style="color: hsl(120, 100%, 40%);">+                return AST_TEST_NOT_RUN;</span><br><span style="color: hsl(120, 100%, 40%);">+      case TEST_EXECUTE:</span><br><span style="color: hsl(120, 100%, 40%);">+            break;</span><br><span style="color: hsl(120, 100%, 40%);">+        }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));</span><br><span style="color: hsl(120, 100%, 40%);">+    ast_test_validate(test, serializer_pool = ast_serializer_pool_create(</span><br><span style="color: hsl(120, 100%, 40%);">+                                           "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */</span><br><span style="color: hsl(120, 100%, 40%);">+   ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));</span><br><span style="color: hsl(120, 100%, 40%);">+   ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));</span><br><span style="color: hsl(120, 100%, 40%);">+      ast_test_validate(test, task_data = task_data_create());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    task_data->wait_time = 4000; /* task takes 4 seconds */</span><br><span style="color: hsl(120, 100%, 40%);">+    ast_test_validate(test, !ast_taskprocessor_push(</span><br><span style="color: hsl(120, 100%, 40%);">+                                                ast_serializer_pool_get(serializer_pool), task, task_data));</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+      if (!ast_serializer_pool_destroy(serializer_pool)) {</span><br><span style="color: hsl(120, 100%, 40%);">+          ast_test_status_update(test, "Unexpected pool destruction!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+             /*</span><br><span style="color: hsl(120, 100%, 40%);">+             * The pool should have timed out, so if it destruction reports success</span><br><span style="color: hsl(120, 100%, 40%);">+                * we need to fail.</span><br><span style="color: hsl(120, 100%, 40%);">+            */</span><br><span style="color: hsl(120, 100%, 40%);">+           serializer_pool = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+               return AST_TEST_FAIL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   ast_test_validate(test, !task_wait(task_data));</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+     /* The first attempt should have failed. Second try should destroy successfully */</span><br><span style="color: hsl(120, 100%, 40%);">+    if (ast_serializer_pool_destroy(serializer_pool)) {</span><br><span style="color: hsl(120, 100%, 40%);">+           ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+              /*</span><br><span style="color: hsl(120, 100%, 40%);">+             * If this fails we'll try again on return to hopefully avoid a memory leak.</span><br><span style="color: hsl(120, 100%, 40%);">+               * If it again times out a third time, well not much we can do.</span><br><span style="color: hsl(120, 100%, 40%);">+                */</span><br><span style="color: hsl(120, 100%, 40%);">+           return AST_TEST_FAIL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   /* Test passed, so set pool to NULL to avoid "re-running" destroy */</span><br><span style="color: hsl(120, 100%, 40%);">+        serializer_pool = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+     return AST_TEST_PASS;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> static int unload_module(void)</span><br><span> {</span><br><span>    ast_test_unregister(default_taskprocessor);</span><br><span>@@ -897,6 +971,7 @@</span><br><span>    ast_test_unregister(taskprocessor_listener);</span><br><span>         ast_test_unregister(taskprocessor_shutdown);</span><br><span>         ast_test_unregister(taskprocessor_push_local);</span><br><span style="color: hsl(120, 100%, 40%);">+        ast_test_unregister(serializer_pool);</span><br><span>        return 0;</span><br><span> }</span><br><span> </span><br><span>@@ -908,6 +983,7 @@</span><br><span>     ast_test_register(taskprocessor_listener);</span><br><span>   ast_test_register(taskprocessor_shutdown);</span><br><span>   ast_test_register(taskprocessor_push_local);</span><br><span style="color: hsl(120, 100%, 40%);">+  ast_test_register(serializer_pool);</span><br><span>  return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span> </span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/c/asterisk/+/12999">change 12999</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/c/asterisk/+/12999"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 16 </div>
<div style="display:none"> Gerrit-Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971 </div>
<div style="display:none"> Gerrit-Change-Number: 12999 </div>
<div style="display:none"> Gerrit-PatchSet: 4 </div>
<div style="display:none"> Gerrit-Owner: Kevin Harwell <kharwell@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Benjamin Keith Ford <bford@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Friendly Automation </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-MessageType: merged </div>