[Asterisk-code-review] serializer: move/add asterisk serializer pool functionality (...asterisk[16])

George Joseph asteriskteam at digium.com
Thu Oct 10 09:10:11 CDT 2019


George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/12999 )

Change subject: serializer: move/add asterisk serializer pool functionality
......................................................................

serializer: move/add asterisk serializer pool functionality

Serializer pools have previously existed in Asterisk. However, for the most
part the code has been duplicated across modules. This patch abstracts the
code into an 'ast_serializer_pool' object. As well the code is now centralized
in serializer.c/h.

In addition serializer pools can now optionally be monitored by a shutdown
group. This will prevent the pool from being destroyed until all serializers
have completed.

Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971
---
A include/asterisk/serializer.h
M include/asterisk/taskprocessor.h
A main/serializer.c
M main/taskprocessor.c
M tests/test_taskprocessor.c
5 files changed, 371 insertions(+), 1 deletion(-)

Approvals:
  Joshua Colp: Looks good to me, but someone else must approve
  Benjamin Keith Ford: Looks good to me, but someone else must approve
  George Joseph: Looks good to me, approved; Approved for Submit



diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h
new file mode 100644
index 0000000..1a1eb83
--- /dev/null
+++ b/include/asterisk/serializer.h
@@ -0,0 +1,85 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2019, Sangoma Technologies Corporation
+ *
+ * Kevin Harwell <kharwell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _AST_SERIALIZER_H
+#define _AST_SERIALIZER_H
+
+struct ast_threadpool;
+
+/*!
+ * Maintains a named pool of thread pooled taskprocessors. Also if configured
+ * a shutdown group can be enabled that will ensure all serializers have
+ * completed any assigned task before destruction.
+ */
+struct ast_serializer_pool;
+
+/*!
+ * \brief Destroy the serializer pool.
+ *
+ * Attempt to destroy the serializer pool. If a shutdown group has been enabled,
+ * and times out waiting for threads to complete, then this function will return
+ * the number of remaining threads, and the pool will not be destroyed.
+ *
+ * \param pool The pool to destroy
+ */
+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool);
+
+/*!
+ * \brief Create a serializer pool.
+ *
+ * Create a serializer pool with an optional shutdown group. If a timeout greater
+ * than -1 is specified then a shutdown group is enabled on the pool.
+ *
+ * \param name The base name for the pool, and used when building taskprocessor(s)
+ * \param size The size of the pool
+ * \param threadpool The backing threadpool to use
+ * \param timeout The timeout used if using a shutdown group (-1 = disabled)
+ *
+ * \retval A newly allocated serializer pool object, or NULL on error
+ */
+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
+	unsigned int size, struct ast_threadpool *threadpool, int timeout);
+
+/*!
+ * \brief Retrieve the base name of the serializer pool.
+ *
+ * \param pool The pool object
+ *
+ * \retval The base name given to the pool
+ */
+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool);
+
+/*!
+ * \brief Retrieve a serializer from the pool.
+ *
+ * \param pool The pool object
+ *
+ * \retval A serializer/taskprocessor
+ */
+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool);
+
+/*!
+ * \brief Set taskprocessor alert levels for the serializers in the pool.
+ *
+ * \param pool The pool to destroy
+ *
+ * \retval 0 on success, or -1 on error.
+ */
+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low);
+
+#endif /* _AST_SERIALIZER_H */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 2f49e47..5145565 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -305,6 +305,15 @@
 unsigned int ast_taskprocessor_seq_num(void);
 
 /*!
+ * \brief Append the next sequence number to the given string, and copy into the buffer.
+ *
+ * \param buf Where to copy the appended taskprocessor name.
+ * \param size How large is buf including null terminator.
+ * \param name A name to append the sequence number to.
+ */
+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name);
+
+/*!
  * \brief Build a taskprocessor name with a sequence number on the end.
  * \since 13.8.0
  *
diff --git a/main/serializer.c b/main/serializer.c
new file mode 100644
index 0000000..280ada0
--- /dev/null
+++ b/main/serializer.c
@@ -0,0 +1,189 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2019, Sangoma Technologies Corporation
+ *
+ * Kevin Harwell <kharwell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#include "asterisk.h"
+
+#include "asterisk/astobj2.h"
+#include "asterisk/serializer.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/utils.h"
+#include "asterisk/vector.h"
+
+struct ast_serializer_pool {
+	/*! Shutdown group to monitor serializers. */
+	struct ast_serializer_shutdown_group *shutdown_group;
+	/*! Time to wait if using a shutdown group. */
+	int shutdown_group_timeout;
+	/*! A pool of taskprocessor(s) */
+	AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
+	/*! Base name for the pool */
+	char name[];
+};
+
+int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
+{
+	if (!pool) {
+		return 0;
+	}
+
+	/* Clear out the serializers */
+	AST_VECTOR_RW_WRLOCK(&pool->serializers);
+	AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);
+	AST_VECTOR_RW_UNLOCK(&pool->serializers);
+
+	/* If using a shutdown group then wait for all threads to complete */
+	if (pool->shutdown_group) {
+		int remaining;
+
+		ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
+
+		remaining = ast_serializer_shutdown_group_join(
+			pool->shutdown_group, pool->shutdown_group_timeout);
+
+		if (remaining) {
+			/* If we've timed out don't fully cleanup yet */
+			ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
+				"'%d' dependencies still processing.\n", pool->name, remaining);
+			return remaining;
+		}
+
+		ao2_ref(pool->shutdown_group, -1);
+		pool->shutdown_group = NULL;
+	}
+
+	AST_VECTOR_RW_FREE(&pool->serializers);
+	ast_free(pool);
+
+	return 0;
+}
+
+struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
+	unsigned int size, struct ast_threadpool *threadpool, int timeout)
+{
+	struct ast_serializer_pool *pool;
+	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+	size_t idx;
+
+	ast_assert(size > 0);
+
+	pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
+	if (!pool) {
+		return NULL;
+	}
+
+	strcpy(pool->name, name); /* safe */
+
+	pool->shutdown_group_timeout = timeout;
+	pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
+
+	AST_VECTOR_RW_INIT(&pool->serializers, size);
+
+	for (idx = 0; idx < size; ++idx) {
+		struct ast_taskprocessor *tps;
+
+		/* Create name with seq number appended. */
+		ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
+
+		tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
+		if (!tps) {
+			ast_serializer_pool_destroy(pool);
+			ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
+					tps_name);
+			return NULL;
+		}
+
+		if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
+			ast_serializer_pool_destroy(pool);
+			ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
+					tps_name);
+			return NULL;
+		}
+	}
+
+	return pool;
+}
+
+const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
+{
+	return pool->name;
+}
+
+struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)
+{
+	struct ast_taskprocessor *res;
+	size_t idx;
+
+	if (!pool) {
+		return NULL;
+	}
+
+	AST_VECTOR_RW_RDLOCK(&pool->serializers);
+	if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
+		AST_VECTOR_RW_UNLOCK(&pool->serializers);
+		return NULL;
+	}
+
+	res = AST_VECTOR_GET(&pool->serializers, 0);
+
+	/* Choose the taskprocessor with the smallest queue */
+	for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
+		struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
+		if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {
+			res = cur;
+		}
+	}
+
+	AST_VECTOR_RW_UNLOCK(&pool->serializers);
+	return res;
+}
+
+int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
+{
+	size_t idx;
+	long tps_queue_high;
+	long tps_queue_low;
+
+	if (!pool) {
+		return 0;
+	}
+
+	tps_queue_high = high;
+	if (tps_queue_high <= 0) {
+		ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
+				"trigger level '%ld'\n", pool->name, tps_queue_high);
+		tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+	}
+
+	tps_queue_low = low;
+	if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+		ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
+				"level '%ld'\n", pool->name, tps_queue_low);
+		tps_queue_low = -1;
+	}
+
+	for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
+		struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
+		if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
+			ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
+					ast_taskprocessor_name(cur));
+		}
+	}
+
+	return 0;
+}
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 47d75d3..52cc5e0 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -1280,11 +1280,22 @@
 	return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
 }
 
+#define SEQ_STR_SIZE (1 + 8 + 1)	/* Dash plus 8 hex digits plus null terminator */
+
+void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
+{
+	int final_size = strlen(name) + SEQ_STR_SIZE;
+
+	ast_assert(buf != NULL && name != NULL);
+	ast_assert(final_size <= size);
+
+	snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
+}
+
 void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
 {
 	va_list ap;
 	int user_size;
-#define SEQ_STR_SIZE (1 + 8 + 1)	/* Dash plus 8 hex digits plus null terminator */
 
 	ast_assert(buf != NULL);
 	ast_assert(SEQ_STR_SIZE <= size);
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 70cb556..031151c 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -35,6 +35,8 @@
 #include "asterisk/taskprocessor.h"
 #include "asterisk/module.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/serializer.h"
+#include "asterisk/threadpool.h"
 
 /*!
  * \brief userdata associated with baseline taskprocessor test
@@ -889,6 +891,78 @@
 	return AST_TEST_PASS;
 }
 
+/*!
+ * \brief Baseline test for a serializer pool
+ *
+ * This test ensures that when a task is added to a taskprocessor that
+ * has been allocated with a default listener that the task gets executed
+ * as expected
+ */
+AST_TEST_DEFINE(serializer_pool)
+{
+	RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
+	RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
+	RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
+	struct ast_threadpool_options options = {
+		.version = AST_THREADPOOL_OPTIONS_VERSION,
+		.idle_timeout = 0,
+		.auto_increment = 0,
+		.initial_size = 1,
+		.max_size = 0,
+	};
+	/* struct ast_taskprocessor *tps; */
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "serializer_pool";
+		info->category = "/main/taskprocessor/";
+		info->summary = "Test using a serializer pool";
+		info->description =
+			"Ensures that a queued task gets executed.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
+	ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
+						  "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
+	ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
+	ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
+	ast_test_validate(test, task_data = task_data_create());
+
+	task_data->wait_time = 4000; /* task takes 4 seconds */
+	ast_test_validate(test, !ast_taskprocessor_push(
+						  ast_serializer_pool_get(serializer_pool), task, task_data));
+
+	if (!ast_serializer_pool_destroy(serializer_pool)) {
+		ast_test_status_update(test, "Unexpected pool destruction!\n");
+		/*
+		 * The pool should have timed out, so if it destruction reports success
+		 * we need to fail.
+		 */
+		serializer_pool = NULL;
+		return AST_TEST_FAIL;
+	}
+
+	ast_test_validate(test, !task_wait(task_data));
+
+	/* The first attempt should have failed. Second try should destroy successfully */
+	if (ast_serializer_pool_destroy(serializer_pool)) {
+		ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
+		/*
+		 * If this fails we'll try again on return to hopefully avoid a memory leak.
+		 * If it again times out a third time, well not much we can do.
+		 */
+		return AST_TEST_FAIL;
+	}
+
+	/* Test passed, so set pool to NULL to avoid "re-running" destroy */
+	serializer_pool = NULL;
+
+	return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
 	ast_test_unregister(default_taskprocessor);
@@ -897,6 +971,7 @@
 	ast_test_unregister(taskprocessor_listener);
 	ast_test_unregister(taskprocessor_shutdown);
 	ast_test_unregister(taskprocessor_push_local);
+	ast_test_unregister(serializer_pool);
 	return 0;
 }
 
@@ -908,6 +983,7 @@
 	ast_test_register(taskprocessor_listener);
 	ast_test_register(taskprocessor_shutdown);
 	ast_test_register(taskprocessor_push_local);
+	ast_test_register(serializer_pool);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/12999
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971
Gerrit-Change-Number: 12999
Gerrit-PatchSet: 4
Gerrit-Owner: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Benjamin Keith Ford <bford at digium.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20191010/82227f39/attachment-0001.html>


More information about the asterisk-code-review mailing list