[asterisk-commits] threadpool, res pjsip: Add serializer group shutdown API calls. (asterisk[13])
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Jun 26 13:35:07 CDT 2015
Matt Jordan has submitted this change and it was merged.
Change subject: threadpool, res_pjsip: Add serializer group shutdown API calls.
......................................................................
threadpool, res_pjsip: Add serializer group shutdown API calls.
A module trying to unload needs to wait for all serializers it creates and
uses to complete processing before unloading.
ASTERISK-24907
Reported by: Kevin Harwell
Change-Id: I8c80b90f2f82754e8dbb02ddf3c9121e5e966059
---
M include/asterisk/res_pjsip.h
M include/asterisk/threadpool.h
M main/threadpool.c
M res/res_pjsip.c
4 files changed, 207 insertions(+), 11 deletions(-)
Approvals:
Matt Jordan: Looks good to me, approved; Verified
Joshua Colp: Looks good to me, but someone else must approve
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 24706c9..1f9276b 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1112,6 +1112,23 @@
*/
struct ast_taskprocessor *ast_sip_create_serializer(void);
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Create a new serializer for SIP tasks
+ * \since 13.5.0
+ *
+ * See \ref ast_threadpool_serializer for more information on serializers.
+ * SIP creates serializers so that tasks operating on similar data will run
+ * in sequence.
+ *
+ * \param shutdown_group Group shutdown controller. (NULL if no group association)
+ *
+ * \retval NULL Failure
+ * \retval non-NULL Newly-created serializer
+ */
+struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group);
+
/*!
* \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
*
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index e1e7727..942d14f 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -195,6 +195,28 @@
*/
void ast_threadpool_shutdown(struct ast_threadpool *pool);
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Create a serializer group shutdown control object.
+ * \since 13.5.0
+ *
+ * \return ao2 object to control shutdown of a serializer group.
+ */
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void);
+
+/*!
+ * \brief Wait for the serializers in the group to shutdown with timeout.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL)
+ * \param timeout Number of seconds to wait for the serializers in the group to shutdown.
+ * Zero if the timeout is disabled.
+ *
+ * \return Number of seriaizers that did not get shutdown within the timeout.
+ */
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout);
+
/*!
* \brief Serialized execution of tasks within a \ref ast_threadpool.
*
@@ -218,9 +240,40 @@
*
* \param name Name of the serializer. (must be unique)
* \param pool \ref ast_threadpool for execution.
+ *
* \return \ref ast_taskprocessor for enqueuing work.
* \return \c NULL on error.
*/
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_threadpool.
+ * \since 13.5.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it
+ * as a lightweight thread.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relys on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_threadpool for execution.
+ * \param shutdown_group Group shutdown controller. (NULL if no group association)
+ *
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \return \c NULL on error.
+ */
+struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
+ struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+
#endif /* ASTERISK_THREADPOOL_H */
diff --git a/main/threadpool.c b/main/threadpool.c
index 597e83e..4799389 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -1126,18 +1126,126 @@
ast_cond_signal(&worker->cond);
}
+/*! Serializer group shutdown control object. */
+struct ast_serializer_shutdown_group {
+ /*! Shutdown thread waits on this conditional. */
+ ast_cond_t cond;
+ /*! Count of serializers needing to shutdown. */
+ int count;
+};
+
+static void serializer_shutdown_group_dtor(void *vdoomed)
+{
+ struct ast_serializer_shutdown_group *doomed = vdoomed;
+
+ ast_cond_destroy(&doomed->cond);
+}
+
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
+{
+ struct ast_serializer_shutdown_group *shutdown_group;
+
+ shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
+ if (!shutdown_group) {
+ return NULL;
+ }
+ ast_cond_init(&shutdown_group->cond, NULL);
+ return shutdown_group;
+}
+
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
+{
+ int remaining;
+ ast_mutex_t *lock;
+
+ if (!shutdown_group) {
+ return 0;
+ }
+
+ lock = ao2_object_get_lockaddr(shutdown_group);
+ ast_assert(lock != NULL);
+
+ ao2_lock(shutdown_group);
+ if (timeout) {
+ struct timeval start;
+ struct timespec end;
+
+ start = ast_tvnow();
+ end.tv_sec = start.tv_sec + timeout;
+ end.tv_nsec = start.tv_usec * 1000;
+ while (shutdown_group->count) {
+ if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
+ /* Error or timed out waiting for the count to reach zero. */
+ break;
+ }
+ }
+ } else {
+ while (shutdown_group->count) {
+ if (ast_cond_wait(&shutdown_group->cond, lock)) {
+ /* Error */
+ break;
+ }
+ }
+ }
+ remaining = shutdown_group->count;
+ ao2_unlock(shutdown_group);
+ return remaining;
+}
+
+/*!
+ * \internal
+ * \brief Increment the number of serializer members in the group.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ *
+ * \return Nothing
+ */
+static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
+{
+ ao2_lock(shutdown_group);
+ ++shutdown_group->count;
+ ao2_unlock(shutdown_group);
+}
+
+/*!
+ * \internal
+ * \brief Decrement the number of serializer members in the group.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ *
+ * \return Nothing
+ */
+static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
+{
+ ao2_lock(shutdown_group);
+ --shutdown_group->count;
+ if (!shutdown_group->count) {
+ ast_cond_signal(&shutdown_group->cond);
+ }
+ ao2_unlock(shutdown_group);
+}
+
struct serializer {
+ /*! Threadpool the serializer will use to process the jobs. */
struct ast_threadpool *pool;
+ /*! Which group will wait for this serializer to shutdown. */
+ struct ast_serializer_shutdown_group *shutdown_group;
};
static void serializer_dtor(void *obj)
{
struct serializer *ser = obj;
+
ao2_cleanup(ser->pool);
ser->pool = NULL;
+ ao2_cleanup(ser->shutdown_group);
+ ser->shutdown_group = NULL;
}
-static struct serializer *serializer_create(struct ast_threadpool *pool)
+static struct serializer *serializer_create(struct ast_threadpool *pool,
+ struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
@@ -1147,6 +1255,7 @@
}
ao2_ref(pool, +1);
ser->pool = pool;
+ ser->shutdown_group = ao2_bump(shutdown_group);
return ser;
}
@@ -1183,6 +1292,10 @@
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
{
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+ if (ser->shutdown_group) {
+ serializer_shutdown_group_dec(ser->shutdown_group);
+ }
ao2_cleanup(ser);
}
@@ -1192,27 +1305,35 @@
.shutdown = serializer_shutdown,
};
-struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
+ struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
{
- RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
- RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
- struct ast_taskprocessor *tps = NULL;
+ struct serializer *ser;
+ struct ast_taskprocessor_listener *listener;
+ struct ast_taskprocessor *tps;
- ser = serializer_create(pool);
+ ser = serializer_create(pool, shutdown_group);
if (!ser) {
return NULL;
}
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
if (!listener) {
+ ao2_ref(ser, -1);
return NULL;
}
- ser = NULL; /* ownership transferred to listener */
+ /* ser ref transferred to listener */
tps = ast_taskprocessor_create_with_listener(name, listener);
- if (!tps) {
- return NULL;
+ if (tps && shutdown_group) {
+ serializer_shutdown_group_inc(shutdown_group);
}
+ ao2_ref(listener, -1);
return tps;
}
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+ return ast_threadpool_serializer_group(name, pool, NULL);
+}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 27e3f81..1702981 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3322,20 +3322,25 @@
return 0;
}
-struct ast_taskprocessor *ast_sip_create_serializer(void)
+struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group)
{
struct ast_taskprocessor *serializer;
char name[AST_UUID_STR_LEN];
ast_uuid_generate_str(name, sizeof(name));
- serializer = ast_threadpool_serializer(name, sip_threadpool);
+ serializer = ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
if (!serializer) {
return NULL;
}
return serializer;
}
+struct ast_taskprocessor *ast_sip_create_serializer(void)
+{
+ return ast_sip_create_serializer_group(NULL);
+}
+
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (serializer) {
--
To view, visit https://gerrit.asterisk.org/690
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I8c80b90f2f82754e8dbb02ddf3c9121e5e966059
Gerrit-PatchSet: 3
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-commits
mailing list