<p>George Joseph <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/c/asterisk/+/12996">View Change</a></p><div style="white-space:pre-wrap">Approvals:
Joshua Colp: Looks good to me, but someone else must approve
Benjamin Keith Ford: Looks good to me, but someone else must approve
George Joseph: Looks good to me, approved; Approved for Submit
</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">serializer: move/add asterisk serializer pool functionality<br><br>Serializer pools have previously existed in Asterisk. However, for the most<br>part the code has been duplicated across modules. This patch abstracts the<br>code into an 'ast_serializer_pool' object. As well the code is now centralized<br>in serializer.c/h.<br><br>In addition serializer pools can now optionally be monitored by a shutdown<br>group. This will prevent the pool from being destroyed until all serializers<br>have completed.<br><br>Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971<br>---<br>A include/asterisk/serializer.h<br>M include/asterisk/taskprocessor.h<br>A main/serializer.c<br>M main/taskprocessor.c<br>M tests/test_taskprocessor.c<br>5 files changed, 371 insertions(+), 1 deletion(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h</span><br><span>new file mode 100644</span><br><span>index 0000000..1a1eb83</span><br><span>--- /dev/null</span><br><span>+++ b/include/asterisk/serializer.h</span><br><span>@@ -0,0 +1,85 @@</span><br><span style="color: hsl(120, 100%, 40%);">+/*</span><br><span style="color: hsl(120, 100%, 40%);">+ * Asterisk -- An open source telephony toolkit.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Copyright (C) 2019, Sangoma Technologies Corporation</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Kevin Harwell <kharwell@digium.com></span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * See http://www.asterisk.org for more information about</span><br><span style="color: hsl(120, 100%, 40%);">+ * the Asterisk project. Please do not directly contact</span><br><span style="color: hsl(120, 100%, 40%);">+ * any of the maintainers of this project for assistance;</span><br><span style="color: hsl(120, 100%, 40%);">+ * the project provides a web site, mailing lists and IRC</span><br><span style="color: hsl(120, 100%, 40%);">+ * channels for your use.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This program is free software, distributed under the terms of</span><br><span style="color: hsl(120, 100%, 40%);">+ * the GNU General Public License Version 2. See the LICENSE file</span><br><span style="color: hsl(120, 100%, 40%);">+ * at the top of the source tree.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#ifndef _AST_SERIALIZER_H</span><br><span style="color: hsl(120, 100%, 40%);">+#define _AST_SERIALIZER_H</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_threadpool;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * Maintains a named pool of thread pooled taskprocessors. Also if configured</span><br><span style="color: hsl(120, 100%, 40%);">+ * a shutdown group can be enabled that will ensure all serializers have</span><br><span style="color: hsl(120, 100%, 40%);">+ * completed any assigned task before destruction.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Destroy the serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Attempt to destroy the serializer pool. If a shutdown group has been enabled,</span><br><span style="color: hsl(120, 100%, 40%);">+ * and times out waiting for threads to complete, then this function will return</span><br><span style="color: hsl(120, 100%, 40%);">+ * the number of remaining threads, and the pool will not be destroyed.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool to destroy</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Create a serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Create a serializer pool with an optional shutdown group. If a timeout greater</span><br><span style="color: hsl(120, 100%, 40%);">+ * than -1 is specified then a shutdown group is enabled on the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param name The base name for the pool, and used when building taskprocessor(s)</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param size The size of the pool</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param threadpool The backing threadpool to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param timeout The timeout used if using a shutdown group (-1 = disabled)</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval A newly allocated serializer pool object, or NULL on error</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,</span><br><span style="color: hsl(120, 100%, 40%);">+ unsigned int size, struct ast_threadpool *threadpool, int timeout);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Retrieve the base name of the serializer pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool object</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval The base name given to the pool</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Retrieve a serializer from the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool object</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval A serializer/taskprocessor</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set taskprocessor alert levels for the serializers in the pool.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param pool The pool to destroy</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success, or -1 on error.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#endif /* _AST_SERIALIZER_H */</span><br><span>diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h</span><br><span>index 2f49e47..5145565 100644</span><br><span>--- a/include/asterisk/taskprocessor.h</span><br><span>+++ b/include/asterisk/taskprocessor.h</span><br><span>@@ -305,6 +305,15 @@</span><br><span> unsigned int ast_taskprocessor_seq_num(void);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Append the next sequence number to the given string, and copy into the buffer.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param buf Where to copy the appended taskprocessor name.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param size How large is buf including null terminator.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param name A name to append the sequence number to.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Build a taskprocessor name with a sequence number on the end.</span><br><span> * \since 13.8.0</span><br><span> *</span><br><span>diff --git a/main/serializer.c b/main/serializer.c</span><br><span>new file mode 100644</span><br><span>index 0000000..280ada0</span><br><span>--- /dev/null</span><br><span>+++ b/main/serializer.c</span><br><span>@@ -0,0 +1,189 @@</span><br><span style="color: hsl(120, 100%, 40%);">+/*</span><br><span style="color: hsl(120, 100%, 40%);">+ * Asterisk -- An open source telephony toolkit.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Copyright (C) 2019, Sangoma Technologies Corporation</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * Kevin Harwell <kharwell@digium.com></span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * See http://www.asterisk.org for more information about</span><br><span style="color: hsl(120, 100%, 40%);">+ * the Asterisk project. Please do not directly contact</span><br><span style="color: hsl(120, 100%, 40%);">+ * any of the maintainers of this project for assistance;</span><br><span style="color: hsl(120, 100%, 40%);">+ * the project provides a web site, mailing lists and IRC</span><br><span style="color: hsl(120, 100%, 40%);">+ * channels for your use.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This program is free software, distributed under the terms of</span><br><span style="color: hsl(120, 100%, 40%);">+ * the GNU General Public License Version 2. See the LICENSE file</span><br><span style="color: hsl(120, 100%, 40%);">+ * at the top of the source tree.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk.h"</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/astobj2.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/serializer.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/taskprocessor.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/threadpool.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/utils.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/vector.h"</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool {</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! Shutdown group to monitor serializers. */</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_serializer_shutdown_group *shutdown_group;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! Time to wait if using a shutdown group. */</span><br><span style="color: hsl(120, 100%, 40%);">+ int shutdown_group_timeout;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! A pool of taskprocessor(s) */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! Base name for the pool */</span><br><span style="color: hsl(120, 100%, 40%);">+ char name[];</span><br><span style="color: hsl(120, 100%, 40%);">+};</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Clear out the serializers */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_WRLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* If using a shutdown group then wait for all threads to complete */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (pool->shutdown_group) {</span><br><span style="color: hsl(120, 100%, 40%);">+ int remaining;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ remaining = ast_serializer_shutdown_group_join(</span><br><span style="color: hsl(120, 100%, 40%);">+ pool->shutdown_group, pool->shutdown_group_timeout);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (remaining) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* If we've timed out don't fully cleanup yet */</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "</span><br><span style="color: hsl(120, 100%, 40%);">+ "'%d' dependencies still processing.\n", pool->name, remaining);</span><br><span style="color: hsl(120, 100%, 40%);">+ return remaining;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(pool->shutdown_group, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ pool->shutdown_group = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_FREE(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_free(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,</span><br><span style="color: hsl(120, 100%, 40%);">+ unsigned int size, struct ast_threadpool *threadpool, int timeout)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_serializer_pool *pool;</span><br><span style="color: hsl(120, 100%, 40%);">+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];</span><br><span style="color: hsl(120, 100%, 40%);">+ size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(size > 0);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ strcpy(pool->name, name); /* safe */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ pool->shutdown_group_timeout = timeout;</span><br><span style="color: hsl(120, 100%, 40%);">+ pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_INIT(&pool->serializers, size);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ for (idx = 0; idx < size; ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_taskprocessor *tps;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Create name with seq number appended. */</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!tps) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_serializer_pool_destroy(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_name);</span><br><span style="color: hsl(120, 100%, 40%);">+ return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_APPEND(&pool->serializers, tps)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_serializer_pool_destroy(pool);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_name);</span><br><span style="color: hsl(120, 100%, 40%);">+ return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return pool;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return pool->name;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_taskprocessor *res;</span><br><span style="color: hsl(120, 100%, 40%);">+ size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_RDLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_SIZE(&pool->serializers) == 0) {</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+ return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ res = AST_VECTOR_GET(&pool->serializers, 0);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Choose the taskprocessor with the smallest queue */</span><br><span style="color: hsl(120, 100%, 40%);">+ for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ res = cur;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_RW_UNLOCK(&pool->serializers);</span><br><span style="color: hsl(120, 100%, 40%);">+ return res;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ size_t idx;</span><br><span style="color: hsl(120, 100%, 40%);">+ long tps_queue_high;</span><br><span style="color: hsl(120, 100%, 40%);">+ long tps_queue_low;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!pool) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_queue_high = high;</span><br><span style="color: hsl(120, 100%, 40%);">+ if (tps_queue_high <= 0) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "</span><br><span style="color: hsl(120, 100%, 40%);">+ "trigger level '%ld'\n", pool->name, tps_queue_high);</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_queue_low = low;</span><br><span style="color: hsl(120, 100%, 40%);">+ if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "</span><br><span style="color: hsl(120, 100%, 40%);">+ "level '%ld'\n", pool->name, tps_queue_low);</span><br><span style="color: hsl(120, 100%, 40%);">+ tps_queue_low = -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_taskprocessor_name(cur));</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span>diff --git a/main/taskprocessor.c b/main/taskprocessor.c</span><br><span>index 13bdd2b..6466b05 100644</span><br><span>--- a/main/taskprocessor.c</span><br><span>+++ b/main/taskprocessor.c</span><br><span>@@ -1282,11 +1282,22 @@</span><br><span> return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ int final_size = strlen(name) + SEQ_STR_SIZE;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(buf != NULL && name != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(final_size <= size);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)</span><br><span> {</span><br><span> va_list ap;</span><br><span> int user_size;</span><br><span style="color: hsl(0, 100%, 40%);">-#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */</span><br><span> </span><br><span> ast_assert(buf != NULL);</span><br><span> ast_assert(SEQ_STR_SIZE <= size);</span><br><span>diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c</span><br><span>index 70cb556..031151c 100644</span><br><span>--- a/tests/test_taskprocessor.c</span><br><span>+++ b/tests/test_taskprocessor.c</span><br><span>@@ -35,6 +35,8 @@</span><br><span> #include "asterisk/taskprocessor.h"</span><br><span> #include "asterisk/module.h"</span><br><span> #include "asterisk/astobj2.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/serializer.h"</span><br><span style="color: hsl(120, 100%, 40%);">+#include "asterisk/threadpool.h"</span><br><span> </span><br><span> /*!</span><br><span> * \brief userdata associated with baseline taskprocessor test</span><br><span>@@ -889,6 +891,78 @@</span><br><span> return AST_TEST_PASS;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Baseline test for a serializer pool</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This test ensures that when a task is added to a taskprocessor that</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been allocated with a default listener that the task gets executed</span><br><span style="color: hsl(120, 100%, 40%);">+ * as expected</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+AST_TEST_DEFINE(serializer_pool)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);</span><br><span style="color: hsl(120, 100%, 40%);">+ RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);</span><br><span style="color: hsl(120, 100%, 40%);">+ RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);</span><br><span style="color: hsl(120, 100%, 40%);">+ struct ast_threadpool_options options = {</span><br><span style="color: hsl(120, 100%, 40%);">+ .version = AST_THREADPOOL_OPTIONS_VERSION,</span><br><span style="color: hsl(120, 100%, 40%);">+ .idle_timeout = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ .auto_increment = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ .initial_size = 1,</span><br><span style="color: hsl(120, 100%, 40%);">+ .max_size = 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ };</span><br><span style="color: hsl(120, 100%, 40%);">+ /* struct ast_taskprocessor *tps; */</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ switch (cmd) {</span><br><span style="color: hsl(120, 100%, 40%);">+ case TEST_INIT:</span><br><span style="color: hsl(120, 100%, 40%);">+ info->name = "serializer_pool";</span><br><span style="color: hsl(120, 100%, 40%);">+ info->category = "/main/taskprocessor/";</span><br><span style="color: hsl(120, 100%, 40%);">+ info->summary = "Test using a serializer pool";</span><br><span style="color: hsl(120, 100%, 40%);">+ info->description =</span><br><span style="color: hsl(120, 100%, 40%);">+ "Ensures that a queued task gets executed.";</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_TEST_NOT_RUN;</span><br><span style="color: hsl(120, 100%, 40%);">+ case TEST_EXECUTE:</span><br><span style="color: hsl(120, 100%, 40%);">+ break;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, serializer_pool = ast_serializer_pool_create(</span><br><span style="color: hsl(120, 100%, 40%);">+ "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, task_data = task_data_create());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ task_data->wait_time = 4000; /* task takes 4 seconds */</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, !ast_taskprocessor_push(</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_serializer_pool_get(serializer_pool), task, task_data));</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!ast_serializer_pool_destroy(serializer_pool)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_status_update(test, "Unexpected pool destruction!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+ /*</span><br><span style="color: hsl(120, 100%, 40%);">+ * The pool should have timed out, so if it destruction reports success</span><br><span style="color: hsl(120, 100%, 40%);">+ * we need to fail.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ serializer_pool = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_TEST_FAIL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_validate(test, !task_wait(task_data));</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* The first attempt should have failed. Second try should destroy successfully */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_serializer_pool_destroy(serializer_pool)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");</span><br><span style="color: hsl(120, 100%, 40%);">+ /*</span><br><span style="color: hsl(120, 100%, 40%);">+ * If this fails we'll try again on return to hopefully avoid a memory leak.</span><br><span style="color: hsl(120, 100%, 40%);">+ * If it again times out a third time, well not much we can do.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_TEST_FAIL;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Test passed, so set pool to NULL to avoid "re-running" destroy */</span><br><span style="color: hsl(120, 100%, 40%);">+ serializer_pool = NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_TEST_PASS;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> static int unload_module(void)</span><br><span> {</span><br><span> ast_test_unregister(default_taskprocessor);</span><br><span>@@ -897,6 +971,7 @@</span><br><span> ast_test_unregister(taskprocessor_listener);</span><br><span> ast_test_unregister(taskprocessor_shutdown);</span><br><span> ast_test_unregister(taskprocessor_push_local);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_unregister(serializer_pool);</span><br><span> return 0;</span><br><span> }</span><br><span> </span><br><span>@@ -908,6 +983,7 @@</span><br><span> ast_test_register(taskprocessor_listener);</span><br><span> ast_test_register(taskprocessor_shutdown);</span><br><span> ast_test_register(taskprocessor_push_local);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_test_register(serializer_pool);</span><br><span> return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span> </span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/c/asterisk/+/12996">change 12996</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/c/asterisk/+/12996"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 13 </div>
<div style="display:none"> Gerrit-Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971 </div>
<div style="display:none"> Gerrit-Change-Number: 12996 </div>
<div style="display:none"> Gerrit-PatchSet: 2 </div>
<div style="display:none"> Gerrit-Owner: Kevin Harwell <kharwell@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Benjamin Keith Ford <bford@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Friendly Automation </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-MessageType: merged </div>