[asterisk-commits] kharwell: trunk r429176 - in /trunk: ./ res/res_pjsip_outbound_publish.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Dec 9 12:36:50 CST 2014


Author: kharwell
Date: Tue Dec  9 12:36:47 2014
New Revision: 429176

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=429176
Log:
res_pjsip_outbound_publish: stack overflow when using non-default sorcery wizard

When using a non-default sorcery wizard (in this instance realtime) for outbound
publishes Asterisk will crash after a stack overflow occurs due to the code
infinitely recursing.  The fix entails removing the outbound publish state
dependency from the outbound publish sorcery object and instead keeping an in
memory container that can be used to lookup the state when needed.

ASTERISK-24514 #close
Reported by: Mark Michelson
Review: https://reviewboard.asterisk.org/r/4178/
........

Merged revisions 429175 from http://svn.asterisk.org/svn/asterisk/branches/13

Modified:
    trunk/   (props changed)
    trunk/res/res_pjsip_outbound_publish.c

Propchange: trunk/
------------------------------------------------------------------------------
Binary property 'branch-13-merged' - no diff available.

Modified: trunk/res/res_pjsip_outbound_publish.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_pjsip_outbound_publish.c?view=diff&rev=429176&r1=429175&r2=429176
==============================================================================
--- trunk/res/res_pjsip_outbound_publish.c (original)
+++ trunk/res/res_pjsip_outbound_publish.c Tue Dec  9 12:36:47 2014
@@ -105,26 +105,6 @@
 	char body_contents[0];
 };
 
-/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
-struct ast_sip_outbound_publish_client {
-	/*! \brief Underlying publish client */
-	pjsip_publishc *client;
-	/*! \brief Timer entry for refreshing publish */
-	pj_timer_entry timer;
-	/*! \brief Publisher datastores set up by handlers */
-	struct ao2_container *datastores;
-	/*! \brief The number of auth attempts done */
-	unsigned int auth_attempts;
-	/*! \brief Queue of outgoing publish messages to send*/
-	AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
-	/*! \brief The message currently being sent */
-	struct sip_outbound_publish_message *sending;
-	/*! \brief Publish client has been fully started and event type informed */
-	unsigned int started;
-	/*! \brief Publish client should be destroyed */
-	unsigned int destroy;
-};
-
 /*! \brief Outbound publish information */
 struct ast_sip_outbound_publish {
 	/*! \brief Sorcery object details */
@@ -148,14 +128,122 @@
 	unsigned int max_auth_attempts;
 	/*! \brief Configured authentication credentials */
 	struct ast_sip_auth_vector outbound_auths;
-	/*! \brief Outbound publish state */
-	struct ast_sip_outbound_publish_client *state;
 };
 
+/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
+struct ast_sip_outbound_publish_client {
+	/*! \brief Underlying publish client */
+	pjsip_publishc *client;
+	/*! \brief Timer entry for refreshing publish */
+	pj_timer_entry timer;
+	/*! \brief Publisher datastores set up by handlers */
+	struct ao2_container *datastores;
+	/*! \brief The number of auth attempts done */
+	unsigned int auth_attempts;
+	/*! \brief Queue of outgoing publish messages to send*/
+	AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
+	/*! \brief The message currently being sent */
+	struct sip_outbound_publish_message *sending;
+	/*! \brief Publish client has been fully started and event type informed */
+	unsigned int started;
+	/*! \brief Publish client should be destroyed */
+	unsigned int destroy;
+	/*! \brief Outbound publish information */
+	struct ast_sip_outbound_publish *publish;
+};
+
+/*! \brief Outbound publish state information (persists for lifetime of a publish) */
+struct ast_sip_outbound_publish_state {
+	/*! \brief Outbound publish client */
+	struct ast_sip_outbound_publish_client *client;
+	/* publish state id lookup key - same as publish configuration id */
+	char id[0];
+};
+
+/*! \brief Unloading data */
+struct unloading_data {
+	int is_unloading;
+	int count;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+} unloading;
+
+/*! \brief Default number of client state container buckets */
+#define DEFAULT_STATE_BUCKETS 31
+static AO2_GLOBAL_OBJ_STATIC(current_states);
+/*! \brief Used on [re]loads to hold new state data */
+static struct ao2_container *new_states;
+
+/*! \brief hashing function for state objects */
+static int outbound_publish_state_hash(const void *obj, const int flags)
+{
+	const struct ast_sip_outbound_publish_state *object;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		object = obj;
+		key = object->id;
+		break;
+	default:
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_hash(key);
+}
+
+/*! \brief comparator function for client objects */
+static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
+{
+	const struct ast_sip_outbound_publish_state *object_left = obj;
+	const struct ast_sip_outbound_publish_state *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->id;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcmp(object_left->id, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		/* Not supported by container. */
+		ast_assert(0);
+		return 0;
+	default:
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	return CMP_MATCH;
+}
+
+static struct ao2_container *get_publishes_and_update_state(void)
+{
+	struct ao2_container *container;
+
+	container = ast_sorcery_retrieve_by_fields(
+		ast_sip_get_sorcery(), "outbound-publish",
+		AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+
+	if (!new_states) {
+		return container;
+	}
+
+	ao2_global_obj_replace_unref(current_states, new_states);
+	ao2_cleanup(new_states);
+	new_states = NULL;
+
+	return container;
+}
+
 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
-
-/*! \brief Container of currently active publish clients */
-static AO2_GLOBAL_OBJ_STATIC(active);
 
 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
 {
@@ -185,12 +273,13 @@
 }
 
 /*! \brief Helper function which sets up the timer to send publication */
-static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, pjsip_rx_data *rdata)
-{
+static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, pjsip_rx_data *rdata)
+{
+	struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
 	pj_time_val delay = { .sec = 0, };
 	pjsip_expires_hdr *expires;
 
-	cancel_publish_refresh(publish->state);
+	cancel_publish_refresh(client);
 
 	/* Determine when we should refresh - we favor the Expires header if possible */
 	expires = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
@@ -204,11 +293,12 @@
 		delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
 	}
 
-	ao2_ref(publish->state, +1);
-	if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publish->state->timer, &delay) != PJ_SUCCESS) {
+	ao2_ref(client, +1);
+	if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
 		ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
-		ao2_ref(publish->state, -1);
-	}
+		ao2_ref(client, -1);
+	}
+	ao2_ref(publish, -1);
 }
 
 /*! \brief Publish client timer callback function */
@@ -229,10 +319,10 @@
 /*! \brief Task for cancelling a refresh timer */
 static int cancel_refresh_timer_task(void *data)
 {
-	struct ast_sip_outbound_publish_client *state = data;
-
-	cancel_publish_refresh(state);
-	ao2_ref(state, -1);
+	struct ast_sip_outbound_publish_client *client = data;
+
+	cancel_publish_refresh(client);
+	ao2_ref(client, -1);
 
 	return 0;
 }
@@ -240,14 +330,14 @@
 /*! \brief Task for sending an unpublish */
 static int send_unpublish_task(void *data)
 {
-	struct ast_sip_outbound_publish_client *state = data;
+	struct ast_sip_outbound_publish_client *client = data;
 	pjsip_tx_data *tdata;
 
-	if (pjsip_publishc_unpublish(state->client, &tdata) == PJ_SUCCESS) {
-		pjsip_publishc_send(state->client, tdata);
-	}
-
-	ao2_ref(state, -1);
+	if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
+		pjsip_publishc_send(client->client, tdata);
+	}
+
+	ao2_ref(client, -1);
 
 	return 0;
 }
@@ -255,53 +345,70 @@
 /*! \brief Helper function which starts or stops publish clients when applicable */
 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
 {
-	RAII_VAR(struct ao2_container *, publishes, ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL), ao2_cleanup);
+	RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
+	struct ao2_container *states;
 	struct ao2_iterator i;
-	struct ast_sip_outbound_publish *publish;
+	struct ast_sip_outbound_publish_state *state;
 
 	if (!publishes) {
 		return;
 	}
 
-	i = ao2_iterator_init(publishes, 0);
-	while ((publish = ao2_iterator_next(&i))) {
+	states = ao2_global_obj_ref(current_states);
+	if (!states) {
+		return;
+	}
+
+	i = ao2_iterator_init(states, 0);
+	while ((state = ao2_iterator_next(&i))) {
+		struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
 		struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
 
-		if (!publish->state->started) {
+		if (!state->client->started) {
 			/* If the publisher client has not yet been started try to start it */
 			if (!handler) {
 				ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
-					publish->event, ast_sorcery_object_get_id(publish));
-			} else if (handler->start_publishing(publish, publish->state)) {
+					  publish->event, ast_sorcery_object_get_id(publish));
+			} else if (handler->start_publishing(publish, state->client)) {
 				ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
 					publish->event, ast_sorcery_object_get_id(publish));
 			} else {
-				publish->state->started = 1;
+				state->client->started = 1;
 			}
-		} else if (publish->state->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
+		} else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
 			/* If the publisher client has been started but it is going away stop it */
-			removed->stop_publishing(publish->state);
-			publish->state->started = 0;
-			if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publish->state))) {
+			removed->stop_publishing(state->client);
+			state->client->started = 0;
+			if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
 				ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
 					ast_sorcery_object_get_id(publish));
-				ao2_ref(publish->state, -1);
+				ao2_ref(state->client, -1);
 			}
 		}
 		ao2_ref(publish, -1);
+		ao2_ref(state, -1);
 	}
 	ao2_iterator_destroy(&i);
+	ao2_ref(states, -1);
 }
 
 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
 {
-	RAII_VAR(struct ast_sip_outbound_publish *, publish, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish", name), ao2_cleanup);
-
-	if (!publish) {
+	RAII_VAR(struct ao2_container *, states,
+		 ao2_global_obj_ref(current_states), ao2_cleanup);
+	RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
+
+	if (!states) {
 		return NULL;
 	}
 
-	return ao2_bump(publish->state);
+	state = ao2_find(states, name, OBJ_SEARCH_KEY);
+	if (!state) {
+		return NULL;
+	}
+
+	ao2_ref(state->client, +1);
+	return state->client;
 }
 
 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
@@ -351,16 +458,7 @@
 static void sip_outbound_publish_destroy(void *obj)
 {
 	struct ast_sip_outbound_publish *publish = obj;
-	SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-	struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
-
-	if (handler) {
-		handler->stop_publishing(publish->state);
-	}
-	if (publish->state) {
-		cancel_publish_refresh(publish->state);
-		ao2_ref(publish->state, -1);
-	}
+
 	ast_sip_auth_vector_destroy(&publish->outbound_auths);
 
 	ast_string_field_free_memory(publish);
@@ -538,19 +636,72 @@
 	return res;
 }
 
+/*! \brief Destructor function for publish client */
+static void sip_outbound_publish_client_destroy(void *obj)
+{
+	struct ast_sip_outbound_publish_client *client = obj;
+	struct sip_outbound_publish_message *message;
+
+	/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
+
+	while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+		ast_free(message);
+	}
+
+	ao2_cleanup(client->datastores);
+	ao2_cleanup(client->publish);
+
+	/* if unloading the module and all objects have been unpublished
+	   send the signal to finish unloading */
+	if (unloading.is_unloading) {
+		ast_mutex_lock(&unloading.lock);
+		if (--unloading.count == 0) {
+			ast_cond_signal(&unloading.cond);
+		}
+		ast_mutex_unlock(&unloading.lock);
+	}
+}
+
+/*! \brief Helper function which cancels and un-publishes a no longer used client */
+static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
+{
+	SCOPED_AO2LOCK(lock, client);
+
+	/* If this publish client is currently publishing stop and terminate any refresh timer */
+	if (client->started) {
+		struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(client->publish->event);
+
+		if (handler) {
+			handler->stop_publishing(client);
+		}
+
+		client->started = 0;
+		if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+			ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
+				ast_sorcery_object_get_id(client->publish));
+			ao2_ref(client, -1);
+		}
+	}
+
+	/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
+	if (!client->sending) {
+		if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+			ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
+				ast_sorcery_object_get_id(client->publish));
+			ao2_ref(client, -1);
+		}
+	}
+	client->destroy = 1;
+	return 0;
+}
+
 /*! \brief Destructor function for publish state */
-static void sip_outbound_publish_client_destroy(void *obj)
-{
-	struct ast_sip_outbound_publish_client *state = obj;
-	struct sip_outbound_publish_message *message;
-
-	/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
-
-	while ((message = AST_LIST_REMOVE_HEAD(&state->queue, entry))) {
-		ast_free(message);
-	}
-
-	ao2_cleanup(state->datastores);
+static void sip_outbound_publish_state_destroy(void *obj)
+{
+	struct ast_sip_outbound_publish_state *state = obj;
+
+	cancel_and_unpublish(state->client);
+	ao2_cleanup(state->client);
 }
 
 /*!
@@ -588,20 +739,23 @@
 /*! \brief Helper function that allocates a pjsip publish client and configures it */
 static int sip_outbound_publish_client_alloc(void *data)
 {
-	struct ast_sip_outbound_publish *publish = data;
+	struct ast_sip_outbound_publish_client *client = data;
+	RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
 	pjsip_publishc_opt opt = {
 		.queue_request = PJ_FALSE,
 	};
 	pj_str_t event, server_uri, to_uri, from_uri;
 	pj_status_t status;
 
-	if (publish->state->client) {
+	if (client->client) {
 		return 0;
-	} else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(publish), sip_outbound_publish_callback,
-		&publish->state->client) != PJ_SUCCESS) {
-		ao2_ref(publish, -1);
+	} else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
+		&client->client) != PJ_SUCCESS) {
+		ao2_ref(client, -1);
 		return -1;
 	}
+
+	publish = ao2_bump(client->publish);
 
 	if (!ast_strlen_zero(publish->outbound_proxy)) {
 		pjsip_route_hdr route_set, *route;
@@ -609,14 +763,14 @@
 
 		pj_list_init(&route_set);
 
-		if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publish->state->client), &ROUTE_HNAME,
+		if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
 			(char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
-			pjsip_publishc_destroy(publish->state->client);
+			pjsip_publishc_destroy(client->client);
 			return -1;
 		}
 		pj_list_insert_nodes_before(&route_set, route);
 
-		pjsip_publishc_set_route_set(publish->state->client, &route_set);
+		pjsip_publishc_set_route_set(client->client, &route_set);
 	}
 
 	pj_cstr(&event, publish->event);
@@ -624,7 +778,7 @@
 	pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
 	pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
 
-	status = pjsip_publishc_init(publish->state->client, &event, &server_uri, &from_uri, &to_uri,
+	status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
 		publish->expiration);
 	if (status == PJSIP_EINVALIDURI) {
 		pj_pool_t *pool;
@@ -635,7 +789,7 @@
 		if (!pool) {
 			ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
 				ast_sorcery_object_get_id(publish));
-			pjsip_publishc_destroy(publish->state->client);
+			pjsip_publishc_destroy(client->client);
 			return -1;
 		}
 
@@ -665,10 +819,10 @@
 		}
 
 		pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-		pjsip_publishc_destroy(publish->state->client);
+		pjsip_publishc_destroy(client->client);
 		return -1;
 	} else if (status != PJ_SUCCESS) {
-		pjsip_publishc_destroy(publish->state->client);
+		pjsip_publishc_destroy(client->client);
 		return -1;
 	}
 
@@ -678,51 +832,52 @@
 /*! \brief Callback function for publish client responses */
 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
 {
-	RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(param->token), ao2_cleanup);
-	SCOPED_AO2LOCK(lock, publish->state);
+	RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
+	RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
+	SCOPED_AO2LOCK(lock, client);
 	pjsip_tx_data *tdata;
 
-	if (publish->state->destroy) {
-		if (publish->state->sending) {
-			publish->state->sending = NULL;
-			if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publish->state))) {
+	if (client->destroy) {
+		if (client->sending) {
+			client->sending = NULL;
+
+			if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
 				return;
 			}
 			ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
 				ast_sorcery_object_get_id(publish));
-			ao2_ref(publish->state, -1);
-		}
-		/* Once the destroy is called this callback will not get called any longer, so drop the publish ref */
-		pjsip_publishc_destroy(publish->state->client);
-		ao2_ref(publish, -1);
+			ao2_ref(client, -1);
+		}
+		/* Once the destroy is called this callback will not get called any longer, so drop the client ref */
+		pjsip_publishc_destroy(client->client);
+		ao2_ref(client, -1);
 		return;
 	}
 
 	if (param->code == 401 || param->code == 407) {
 		if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
 				param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) {
-			pjsip_publishc_send(publish->state->client, tdata);
-		}
-		publish->state->auth_attempts++;
-
-		if (publish->state->auth_attempts == publish->max_auth_attempts) {
-			pjsip_publishc_destroy(publish->state->client);
-			publish->state->client = NULL;
+			pjsip_publishc_send(client->client, tdata);
+		}
+		client->auth_attempts++;
+
+		if (client->auth_attempts == publish->max_auth_attempts) {
+			pjsip_publishc_destroy(client->client);
+			client->client = NULL;
 
 			ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
 				ast_sorcery_object_get_id(publish));
 
 			goto end;
 		}
-
 		return;
 	}
 
-	publish->state->auth_attempts = 0;
+	client->auth_attempts = 0;
 
 	if (param->code == 412) {
-		pjsip_publishc_destroy(publish->state->client);
-		publish->state->client = NULL;
+		pjsip_publishc_destroy(client->client);
+		client->client = NULL;
 
 		if (sip_outbound_publish_client_alloc(publish)) {
 			ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
@@ -731,7 +886,7 @@
 		}
 
 		/* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
-		publish->state->sending = NULL;
+		client->sending = NULL;
 	} else if (param->code == 423) {
 		/* Update the expiration with the new expiration time if available */
 		pjsip_expires_hdr *expires;
@@ -740,34 +895,34 @@
 		if (!expires || !expires->ivalue) {
 			ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
 				ast_sorcery_object_get_id(publish));
-			pjsip_publishc_destroy(publish->state->client);
-			publish->state->client = NULL;
+			pjsip_publishc_destroy(client->client);
+			client->client = NULL;
 			goto end;
 		}
 
-		pjsip_publishc_update_expires(publish->state->client, expires->ivalue);
-		publish->state->sending = NULL;
-	} else if (publish->state->sending) {
+		pjsip_publishc_update_expires(client->client, expires->ivalue);
+		client->sending = NULL;
+	} else if (client->sending) {
 		/* Remove the message currently being sent so that when the queue is serviced another will get sent */
-		AST_LIST_REMOVE_HEAD(&publish->state->queue, entry);
-		ast_free(publish->state->sending);
-		publish->state->sending = NULL;
-	}
-
-	if (AST_LIST_EMPTY(&publish->state->queue)) {
-		schedule_publish_refresh(publish, param->rdata);
+		AST_LIST_REMOVE_HEAD(&client->queue, entry);
+		ast_free(client->sending);
+		client->sending = NULL;
+	}
+
+	if (AST_LIST_EMPTY(&client->queue)) {
+		schedule_publish_refresh(client, param->rdata);
 	}
 
 end:
-	if (!publish->state->client) {
+	if (!client->client) {
 		struct sip_outbound_publish_message *message;
 
-		while ((message = AST_LIST_REMOVE_HEAD(&publish->state->queue, entry))) {
+		while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
 			ast_free(message);
 		}
 	} else {
-		if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(publish->state))) {
-			ao2_ref(publish->state, -1);
+		if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+			ao2_ref(client, -1);
 		}
 	}
 }
@@ -831,31 +986,43 @@
 	return CMP_MATCH;
 }
 
-/*! \brief Allocator function for publish client state */
-static struct ast_sip_outbound_publish_client *sip_outbound_publish_state_alloc(void)
-{
-	struct ast_sip_outbound_publish_client *state = ao2_alloc(sizeof(*state), sip_outbound_publish_client_destroy);
+/*! \brief Allocator function for publish client */
+static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
+	struct ast_sip_outbound_publish *publish)
+{
+	const char *id = ast_sorcery_object_get_id(publish);
+	struct ast_sip_outbound_publish_state *state =
+		ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
 
 	if (!state) {
 		return NULL;
 	}
 
-	state->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
-	if (!state->datastores) {
+	state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
+	if (!state->client) {
 		ao2_ref(state, -1);
 		return NULL;
 	}
 
-	state->timer.user_data = state;
-	state->timer.cb = sip_outbound_publish_timer_cb;
-
+	state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+	if (!state->client->datastores) {
+		ao2_ref(state, -1);
+		return NULL;
+	}
+
+	state->client->timer.user_data = state->client;
+	state->client->timer.cb = sip_outbound_publish_timer_cb;
+	state->client->publish = ao2_bump(publish);
+
+	strcpy(state->id, id);
 	return state;
 }
 
 /*! \brief Apply function which finds or allocates a state structure */
 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
 {
-	RAII_VAR(struct ast_sip_outbound_publish *, existing, ast_sorcery_retrieve_by_id(sorcery, "outbound-publish", ast_sorcery_object_get_id(obj)), ao2_cleanup);
+	RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
+	RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
 	struct ast_sip_outbound_publish *applied = obj;
 
 	if (ast_strlen_zero(applied->server_uri)) {
@@ -868,24 +1035,47 @@
 		return -1;
 	}
 
-	if (!existing) {
-		/* If no existing publish exists we can just start fresh easily */
-		applied->state = sip_outbound_publish_state_alloc();
-	} else {
-		/* If there is an existing publish things are more complicated, we can immediately reuse this state if most stuff remains unchanged */
-		if (can_reuse_publish(existing, applied)) {
-			applied->state = existing->state;
-			ao2_ref(applied->state, +1);
-		} else {
-			applied->state = sip_outbound_publish_state_alloc();
-		}
-	}
-
-	if (!applied->state) {
+	if (!new_states) {
+		/* make sure new_states has been allocated as we will be adding to it */
+		new_states = ao2_container_alloc_options(
+			AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
+			outbound_publish_state_hash, outbound_publish_state_cmp);
+
+		if (!new_states) {
+			ast_log(LOG_ERROR, "Unable to allocate new states container\n");
+			return -1;
+		}
+	}
+
+	if (states) {
+		state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
+		if (state) {
+			if (can_reuse_publish(state->client->publish, applied)) {
+				ao2_replace(state->client->publish, applied);
+			} else {
+				ao2_ref(state, -1);
+				state = NULL;
+			}
+		}
+	}
+
+	if (!state) {
+		state = sip_outbound_publish_state_alloc(applied);
+		if (!state) {
+			ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
+				ast_sorcery_object_get_id(applied));
+			return -1;
+		};
+	}
+
+	if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
+		ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
+			ast_sorcery_object_get_id(applied));
 		return -1;
 	}
 
-	return ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, applied);
+	ao2_link(new_states, state);
+	return 0;
 }
 
 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
@@ -895,82 +1085,15 @@
 	return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
 }
 
-/*! \brief Helper function which prunes old publish clients */
-static void prune_publish_clients(const char *object_type)
-{
-	struct ao2_container *old, *current;
-
-	old = ao2_global_obj_ref(active);
-	if (old) {
-		struct ao2_iterator i;
-		struct ast_sip_outbound_publish *existing;
-
-		i = ao2_iterator_init(old, 0);
-		for (; (existing = ao2_iterator_next(&i)); ao2_ref(existing, -1)) {
-			struct ast_sip_outbound_publish *created;
-
-			created = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish",
-				ast_sorcery_object_get_id(existing));
-			if (created) {
-				if (created->state == existing->state) {
-					ao2_ref(created, -1);
-					continue;
-				}
-				ao2_ref(created, -1);
-			}
-
-			ao2_lock(existing->state);
-
-			/* If this publish client is currently publishing stop and terminate any refresh timer */
-			if (existing->state->started) {
-				struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(existing->event);
-
-				if (handler) {
-					handler->stop_publishing(existing->state);
-				}
-
-				if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(existing->state))) {
-					ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
-						ast_sorcery_object_get_id(existing));
-					ao2_ref(existing->state, -1);
-				}
-			}
-
-			/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
-			if (!existing->state->sending) {
-				if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(existing->state))) {
-					ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
-						ast_sorcery_object_get_id(existing));
-					ao2_ref(existing->state, -1);
-				}
-			}
-
-			existing->state->destroy = 1;
-			ao2_unlock(existing->state);
-		}
-		ao2_iterator_destroy(&i);
-
-		ao2_ref(old, -1);
-	}
-
-	current = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
-	ao2_global_obj_replace_unref(active, current);
-}
-
-static struct ast_sorcery_observer outbound_publish_observer = {
-	.loaded = prune_publish_clients,
-};
-
 static int load_module(void)
 {
+	ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
 	ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
 
 	if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
 		sip_outbound_publish_apply)) {
 		return AST_MODULE_LOAD_DECLINE;
 	}
-
-	ast_sorcery_observer_add(ast_sip_get_sorcery(), "outbound-publish", &outbound_publish_observer);
 
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
@@ -981,6 +1104,7 @@
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
 	ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+
 	ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
 
 	AST_RWLIST_RDLOCK(&publisher_handlers);
@@ -1004,7 +1128,44 @@
 
 static int unload_module(void)
 {
-	return 0;
+	struct timeval start = ast_tvnow();
+	struct timespec end = {
+		.tv_sec = start.tv_sec + 10,
+		.tv_nsec = start.tv_usec * 1000
+	};
+	int res = 0;
+	struct ao2_container *states = ao2_global_obj_ref(current_states);
+
+	if (!states || !(unloading.count = ao2_container_count(states))) {
+		return 0;
+	}
+	ao2_ref(states, -1);
+
+	ast_mutex_init(&unloading.lock);
+	ast_cond_init(&unloading.cond, NULL);
+	ast_mutex_lock(&unloading.lock);
+
+	unloading.is_unloading = 1;
+	ao2_global_obj_release(current_states);
+
+	/* wait for items to unpublish */
+	ast_verb(5, "Waiting to complete unpublishing task(s)\n");
+	while (unloading.count) {
+		res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
+	}
+	ast_mutex_unlock(&unloading.lock);
+
+	ast_mutex_destroy(&unloading.lock);
+	ast_cond_destroy(&unloading.cond);
+
+	if (res) {
+		ast_verb(5, "At least %d items were unable to unpublish "
+			"in the allowed time\n", unloading.count);
+	} else {
+		ast_verb(5, "All items successfully unpublished\n");
+	}
+
+	return res;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",




More information about the asterisk-commits mailing list