[asterisk-commits] res pjsip outbound publish: Use a serializer shutdown group ... (asterisk[13])
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Aug 9 14:44:25 CDT 2016
Joshua Colp has submitted this change and it was merged.
Change subject: res_pjsip_outbound_publish: Use a serializer shutdown group for unload.
......................................................................
res_pjsip_outbound_publish: Use a serializer shutdown group for unload.
This change replaces the custom unload process for the outbound
publish module with the common serializer shutdown group.
ASTERISK-25217 #close
Change-Id: I280a0384d860c486202d87d2d674394cca77ffb6
---
M res/res_pjsip_outbound_publish.c
1 file changed, 62 insertions(+), 69 deletions(-)
Approvals:
Kevin Harwell: Looks good to me, but someone else must approve
Matt Jordan: Looks good to me, approved
Joshua Colp: Verified
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index ab8a6c9..35eedf0 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -31,6 +31,7 @@
#include "asterisk/res_pjsip_outbound_publish.h"
#include "asterisk/module.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/datastore.h"
/*** DOCUMENTATION
@@ -161,6 +162,8 @@
struct ast_sip_outbound_publish *publish;
/*! \brief The name of the transport to be used for the publish */
char *transport_name;
+ /*! \brief Serializer for stuff and things */
+ struct ast_taskprocessor *serializer;
};
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
@@ -171,13 +174,11 @@
char id[0];
};
-/*! \brief Unloading data */
-struct unloading_data {
- int is_unloading;
- int count;
- ast_mutex_t lock;
- ast_cond_t cond;
-} unloading;
+/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
+#define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
+
+/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
+static struct ast_serializer_shutdown_group *shutdown_group;
/*! \brief Default number of client state container buckets */
#define DEFAULT_STATE_BUCKETS 31
@@ -393,7 +394,7 @@
/* If the publisher client has been started but it is going away stop it */
removed->stop_publishing(state->client);
state->client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
+ if (ast_sip_push_task(state->client->serializer, cancel_refresh_timer_task, ao2_bump(state->client))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
ast_sorcery_object_get_id(publish));
ao2_ref(state->client, -1);
@@ -614,7 +615,7 @@
ast_free(message);
service:
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
ao2_ref(client, -1);
}
return -1;
@@ -656,7 +657,7 @@
AST_LIST_INSERT_TAIL(&client->queue, message, entry);
- res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
+ res = ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client));
if (res) {
ao2_ref(client, -1);
}
@@ -679,16 +680,7 @@
ao2_cleanup(client->datastores);
ao2_cleanup(client->publish);
ast_free(client->transport_name);
-
- /* if unloading the module and all objects have been unpublished
- send the signal to finish unloading */
- if (unloading.is_unloading) {
- ast_mutex_lock(&unloading.lock);
- if (--unloading.count == 0) {
- ast_cond_signal(&unloading.cond);
- }
- ast_mutex_unlock(&unloading.lock);
- }
+ ast_taskprocessor_unreference(client->serializer);
}
static int explicit_publish_destroy(void *data)
@@ -718,7 +710,7 @@
/* If the client was never started, there's nothing to unpublish, so just
* destroy the publication and remove its reference to the client.
*/
- ast_sip_push_task(NULL, explicit_publish_destroy, client);
+ ast_sip_push_task(client->serializer, explicit_publish_destroy, client);
return 0;
}
@@ -728,7 +720,7 @@
}
client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, cancel_refresh_timer_task, ao2_bump(client))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
ao2_ref(client, -1);
@@ -736,7 +728,7 @@
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if (!client->sending) {
- if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
ao2_ref(client, -1);
@@ -897,7 +889,7 @@
if (client->sending) {
client->sending = NULL;
- if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
return;
}
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
@@ -981,7 +973,7 @@
ast_free(message);
}
} else {
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+ if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
ao2_ref(client, -1);
}
}
@@ -1053,6 +1045,7 @@
const char *id = ast_sorcery_object_get_id(publish);
struct ast_sip_outbound_publish_state *state =
ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
if (!state) {
return NULL;
@@ -1070,6 +1063,17 @@
return NULL;
}
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
+ ast_sorcery_object_get_id(publish));
+
+ state->client->serializer = ast_sip_create_serializer_group_named(tps_name,
+ shutdown_group);
+ if (!state->client->serializer) {
+ ao2_ref(state, -1);
+ return NULL;
+ }
+
state->client->timer.user_data = state->client;
state->client->timer.cb = sip_outbound_publish_timer_cb;
state->client->transport_name = ast_strdup(publish->transport);
@@ -1082,7 +1086,7 @@
static int initialize_publish_client(struct ast_sip_outbound_publish *publish,
struct ast_sip_outbound_publish_state *state)
{
- if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
+ if (ast_sip_push_task_synchronous(state->client->serializer, sip_outbound_publish_client_alloc, state->client)) {
ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
return -1;
@@ -1219,9 +1223,40 @@
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
}
+
+static int unload_module(void)
+{
+ int remaining;
+
+ ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
+
+ ao2_global_obj_release(current_states);
+
+ /* Wait for publication serializers to get destroyed. */
+ ast_debug(2, "Waiting for publication to complete for unload.\n");
+ remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
+ if (remaining) {
+ ast_log(LOG_WARNING, "Unload incomplete. Could not stop %d outbound publications. Try again later.\n",
+ remaining);
+ return -1;
+ }
+
+ ast_debug(2, "Successful shutdown.\n");
+
+ ao2_cleanup(shutdown_group);
+ shutdown_group = NULL;
+
+ return 0;
+}
+
static int load_module(void)
{
CHECK_PJSIP_MODULE_LOADED();
+
+ shutdown_group = ast_serializer_shutdown_group_alloc();
+ if (!shutdown_group) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
@@ -1229,6 +1264,7 @@
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
sip_outbound_publish_apply)) {
ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
+ unload_module();
return AST_MODULE_LOAD_DECLINE;
}
@@ -1262,49 +1298,6 @@
sip_outbound_publish_synchronize(NULL);
AST_RWLIST_UNLOCK(&publisher_handlers);
return 0;
-}
-
-static int unload_module(void)
-{
- struct timeval start = ast_tvnow();
- struct timespec end = {
- .tv_sec = start.tv_sec + 10,
- .tv_nsec = start.tv_usec * 1000
- };
- int res = 0;
- struct ao2_container *states = ao2_global_obj_ref(current_states);
-
- if (!states || !(unloading.count = ao2_container_count(states))) {
- return 0;
- }
- ao2_ref(states, -1);
-
- ast_mutex_init(&unloading.lock);
- ast_cond_init(&unloading.cond, NULL);
- ast_mutex_lock(&unloading.lock);
-
- unloading.is_unloading = 1;
- ao2_global_obj_release(current_states);
-
- /* wait for items to unpublish */
- ast_verb(5, "Waiting to complete unpublishing task(s)\n");
- while (unloading.count && !res) {
- res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
- }
- ast_mutex_unlock(&unloading.lock);
-
- ast_mutex_destroy(&unloading.lock);
- ast_cond_destroy(&unloading.cond);
-
- if (res) {
- ast_verb(5, "At least %d items were unable to unpublish "
- "in the allowed time\n", unloading.count);
- } else {
- ast_verb(5, "All items successfully unpublished\n");
- ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
- }
-
- return res;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
--
To view, visit https://gerrit.asterisk.org/3418
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I280a0384d860c486202d87d2d674394cca77ffb6
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-commits
mailing list