[Asterisk-code-review] res pjsip outbound publish: Add multi-user support per confi... (asterisk[master])

Kevin Harwell asteriskteam at digium.com
Tue Apr 26 16:58:02 CDT 2016


Kevin Harwell has uploaded a new change for review.

  https://gerrit.asterisk.org/2709

Change subject: res_pjsip_outbound_publish: Add multi-user support per configuration
......................................................................

res_pjsip_outbound_publish: Add multi-user support per configuration

Added a new multi_user option that when specified allows a particular
configuration to be used for multiple users. It does this by replacing
the user portion of the server uri with a dynamically created one.

Two new API calls have been added in order to make use of the new
functionality:

ast_sip_publish_user_send - Sends an outgoing publish message based on the
given user. If state for the user already exists it uses that, otherwise
it dynamically creates new outbound publishing state for the user at that
time.

ast_sip_publish_user_remove - Removes all outbound publish state objects
associated with the user. This essentially stops outbound publishing for
the user.

ASTERISK-25965 #close

Change-Id: Ib88dde024cc83c916424645d4f5bb84a0fa936cc
---
M CHANGES
M include/asterisk/res_pjsip_outbound_publish.h
M res/res_pjsip_outbound_publish.c
3 files changed, 547 insertions(+), 105 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/09/2709/1

diff --git a/CHANGES b/CHANGES
index e7f0656..66e594d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -234,6 +234,11 @@
   outbound registration, registration is retried at the given interval up to
   'max_retries'.
 
+res_pjsip_outbound_publish
+------------------
+ * Added a new multi_user option that when set to 'yes' allows a given configuration
+ to be used for multiple users.
+
 CEL Backends
 ------------------
 
diff --git a/include/asterisk/res_pjsip_outbound_publish.h b/include/asterisk/res_pjsip_outbound_publish.h
index debec94..d4c246f 100644
--- a/include/asterisk/res_pjsip_outbound_publish.h
+++ b/include/asterisk/res_pjsip_outbound_publish.h
@@ -162,4 +162,22 @@
 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
 	const struct ast_sip_body *body);
 
+/*!
+ * \brief Send an outgoing PUBLISH message based on the user
+ *
+ * \param user The user send to
+ * \param body An optional body to add to the PUBLISH
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_publish_user_send(const char *user, const struct ast_sip_body *body);
+
+/*!
+ * \brief Remove the given user and stop outbound publishing for it
+ *
+ * \param user The user to remove
+ */
+void ast_sip_publish_user_remove(const char *user);
+
 #endif /* RES_PJSIP_OUTBOUND_PUBLISH_H */
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index 5b27bdf..81046b5 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -94,6 +94,10 @@
 						<literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
 					</description>
 				</configOption>
+				<configOption name="multi_user" default="no">
+					<synopsis>Enable multi-user support</synopsis>
+					<description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
+				</configOption>
 				<configOption name="type">
 					<synopsis>Must be of type 'outbound-publish'.</synopsis>
 				</configOption>
@@ -137,6 +141,8 @@
 	unsigned int max_auth_attempts;
 	/*! \brief Configured authentication credentials */
 	struct ast_sip_auth_vector outbound_auths;
+	/*! \brief The publishing client is used for multiple users when true */
+	unsigned int multi_user;
 };
 
 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
@@ -161,14 +167,24 @@
 	struct ast_sip_outbound_publish *publish;
 	/*! \brief The name of the transport to be used for the publish */
 	char *transport_name;
+	/*! \brief Memory pool for uri objects */
+	pj_pool_t *pool;
+	/*! \brief URI for the entity and server */
+	pjsip_sip_uri *server_uri;
+	/*! \brief URI for the To header */
+	pjsip_sip_uri *to_uri;
+	/*! \brief URI for the From header */
+	pjsip_sip_uri *from_uri;
 };
 
 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
 struct ast_sip_outbound_publish_state {
+	/*! \brief Publish state id - same as publish configuration id */
+	char *id;
+	/*! \brief Multi-user identity */
+	char *user;
 	/*! \brief Outbound publish client */
 	struct ast_sip_outbound_publish_client *client;
-	/* publish state id lookup key - same as publish configuration id */
-	char id[0];
 };
 
 /*! \brief Unloading data */
@@ -179,6 +195,70 @@
 	ast_cond_t cond;
 } unloading;
 
+/*! \brief Default number of multi-user publish  buckets */
+#define DEFAULT_MULTI_BUCKETS 13
+static struct ao2_container *multi_publishes;
+
+/*! \brief hashing function for multi-user publish objects */
+static int outbound_publish_multi_hash(const void *obj, const int flags)
+{
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		key = ast_sorcery_object_get_id(obj);
+		break;
+	default:
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_hash(key);
+}
+
+/*! \brief Comparator function for multi-user publish objects */
+static int outbound_publish_multi_cmp(void *obj, void *arg, int flags)
+{
+	const struct ast_sip_outbound_publish *object_left = obj;
+	const struct ast_sip_outbound_publish *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = ast_sorcery_object_get_id(object_right);
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcmp(ast_sorcery_object_get_id(object_left), right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		/* Not supported by container. */
+		ast_assert(0);
+		return 0;
+	default:
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	return CMP_MATCH;
+}
+
+/*!
+ * \brief Used for locking while loading/reloading
+ *
+ * Loading or reloading items cannot be added or removed from the
+ * current_states container. This is because during a load/reload
+ * a separate container (new_states) is populated with the new incoming
+ * back end data. Once all new objects have been applied this new_states
+ * container is swapped and becomes current_states. If objects were
+ * added or removed to current_states while new_states is being built
+ * then some items may be missed.
+ */
+AST_RWLOCK_DEFINE_STATIC(load_lock);
 /*! \brief Default number of client state container buckets */
 #define DEFAULT_STATE_BUCKETS 31
 static AO2_GLOBAL_OBJ_STATIC(current_states);
@@ -206,7 +286,7 @@
 	return ast_str_hash(key);
 }
 
-/*! \brief comparator function for client objects */
+/*! \brief Comparator function (by id) for state objects */
 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
 {
 	const struct ast_sip_outbound_publish_state *object_left = obj;
@@ -235,9 +315,32 @@
 	return CMP_MATCH;
 }
 
+/*! \brief Comparator function (by user) for state objects */
+static int outbound_publish_state_find_by_user(void *obj, void *arg, int flags)
+{
+	const struct ast_sip_outbound_publish_state *state = obj;
+	const char *user = arg;
+
+	return strcmp(state->user, user) == 0 ? CMP_MATCH : 0;
+}
+
+static struct ao2_container *find_states_by_user(const char *user)
+{
+	struct ao2_container *states, *res;
+
+	ast_assert((states = ao2_global_obj_ref(current_states)) != NULL);
+
+	res = ao2_callback(states, OBJ_MULTIPLE,
+			   outbound_publish_state_find_by_user, (void *)user);
+
+	ao2_ref(states, -1);
+	return res;
+}
+
 static struct ao2_container *get_publishes_and_update_state(void)
 {
 	struct ao2_container *container;
+	SCOPED_WRLOCK(lock, &load_lock);
 
 	container = ast_sorcery_retrieve_by_fields(
 		ast_sip_get_sorcery(), "outbound-publish",
@@ -656,6 +759,92 @@
 	return res;
 }
 
+static int publish_client_send(void *obj, void *arg, int flags)
+{
+	struct ast_sip_outbound_publish_state *state = obj;
+
+	return ast_sip_publish_client_send(state->client, arg);
+}
+
+static struct ast_sip_outbound_publish_state *validate_and_create_state(
+	struct ast_sip_outbound_publish *publish, const char *user);
+static int apply_initialize_state(struct ast_sip_outbound_publish *publish,
+	struct ast_sip_outbound_publish_state *state);
+
+static int publish_create_client_send(void *obj, void *arg, void *data, int flags)
+{
+	struct ast_sip_outbound_publish *publish = obj;
+	struct ast_sip_outbound_publish_state *new_state;
+	struct ao2_container *states;
+	int res;
+
+	ast_assert((states = ao2_global_obj_ref(current_states)) != NULL);
+
+	if (!(new_state = validate_and_create_state(publish, data)) ||
+	    (apply_initialize_state(publish, new_state))) {
+		ao2_cleanup(new_state);
+		ao2_ref(states, -1);
+		return -1;
+	}
+
+	/* Can't have any new items added to current_states while we are [re]loading */
+	ast_rwlock_wrlock(&load_lock);
+	if (!ao2_link(states, new_state)) {
+		ast_rwlock_unlock(&load_lock);
+		ao2_ref(new_state, -1);
+		ao2_ref(states, -1);
+		return -1;
+	}
+	ast_rwlock_unlock(&load_lock);
+
+	res = ast_sip_publish_client_send(new_state->client, arg);
+
+	ao2_ref(new_state, -1);
+	ao2_ref(states, -1);
+	return res;
+}
+
+int ast_sip_publish_user_send(const char *user, const struct ast_sip_body *body)
+{
+	struct ao2_container *states = find_states_by_user(user);
+
+	if (states && ao2_container_count(states)) {
+		/* User already has publish client(s) */
+		ao2_callback(states, OBJ_NODATA, publish_client_send, (void *)body);
+		ao2_ref(states, -1);
+		return 0;
+	}
+	ao2_cleanup(states);
+
+	/*
+	 * Since the user does not have publish clients created yet we'll
+	 * create and publish them now. We need to do this for each multi
+	 * user configuration.
+	 */
+	ao2_callback_data(multi_publishes, OBJ_NODATA,
+		publish_create_client_send, (void *)body, (void *)user);
+	return 0;
+}
+
+void ast_sip_publish_user_remove(const char *user)
+{
+	struct ao2_container *states;
+
+	/* Can't remove items from current_states while [re]loading */
+	SCOPED_WRLOCK(lock, &load_lock);
+	ast_assert((states = ao2_global_obj_ref(current_states)) != NULL);
+
+	ao2_callback(states, OBJ_MULTIPLE | OBJ_UNLINK | OBJ_NODATA,
+		     outbound_publish_state_find_by_user, (void *)user);
+	ao2_ref(states, -1);
+}
+
+static int release_pjsip_pool(void *data)
+{
+	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), data);
+	return 0;
+}
+
 /*! \brief Destructor function for publish client */
 static void sip_outbound_publish_client_destroy(void *obj)
 {
@@ -672,6 +861,10 @@
 	ao2_cleanup(client->publish);
 	ast_free(client->transport_name);
 
+	if (client->pool) {
+		ast_sip_push_task_synchronous(NULL, release_pjsip_pool, client->pool);
+	}
+
 	/* if unloading the module and all objects have been unpublished
 	   send the signal to finish unloading */
 	if (unloading.is_unloading) {
@@ -687,8 +880,10 @@
 {
 	struct ast_sip_outbound_publish_client *client = data;
 
-	pjsip_publishc_destroy(client->client);
-	ao2_ref(client, -1);
+	if (client->client) {
+		pjsip_publishc_destroy(client->client);
+		ao2_ref(client, -1);
+	}
 
 	return 0;
 }
@@ -701,9 +896,9 @@
 
 	if (!client->started) {
 		/* If the client was never started, there's nothing to unpublish, so just
-		 * destroy the publication and remove its reference to the client.
+		 * destroy the publication.
 		 */
-		ast_sip_push_task(NULL, explicit_publish_destroy, client);
+		ast_sip_push_task_synchronous(NULL, explicit_publish_destroy, client);
 		return 0;
 	}
 
@@ -738,31 +933,40 @@
 
 	cancel_and_unpublish(state->client);
 	ao2_cleanup(state->client);
+
+	ast_free(state->user);
+	ast_free(state->id);
 }
 
 /*!
  * \internal
  * \brief Check if a publish can be reused
  *
- * This checks if the existing outbound publish's configuration differs from a newly-applied
- * outbound publish.
+ * This checks if the existing outbound state and publish's configuration
+ * differs from a newly-applied outbound state and publish.
  *
- * \param existing The pre-existing outbound publish
- * \param applied The newly-created publish
+ * \param existing The pre-existing state
+ * \param applied The newly-created state
  */
-static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
+static int can_reuse_publish(struct ast_sip_outbound_publish_state *existing,
+			     struct ast_sip_outbound_publish_state *applied)
 {
+	struct ast_sip_outbound_publish_client *ec = existing->client;
+	struct ast_sip_outbound_publish *ep = ec->publish;
+	struct ast_sip_outbound_publish_client *ac = applied->client;
+	struct ast_sip_outbound_publish *ap = ac->publish;
 	int i;
 
-	if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
-		strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
-		strcmp(existing->event, applied->event) ||
-		AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
+	if (pjsip_uri_cmp(PJSIP_URI_IN_OTHER, ec->server_uri, ac->server_uri) != PJ_SUCCESS ||
+	    pjsip_uri_cmp(PJSIP_URI_IN_OTHER, ec->to_uri, ac->to_uri) != PJ_SUCCESS ||
+	    pjsip_uri_cmp(PJSIP_URI_IN_OTHER, ec->from_uri, ac->from_uri) != PJ_SUCCESS ||
+	    strcmp(ep->outbound_proxy, ap->outbound_proxy) || strcmp(ep->event, ap->event) ||
+	    AST_VECTOR_SIZE(&ep->outbound_auths) != AST_VECTOR_SIZE(&ap->outbound_auths)) {
 		return 0;
 	}
 
-	for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
-		if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
+	for (i = 0; i < AST_VECTOR_SIZE(&ep->outbound_auths); ++i) {
+		if (strcmp(AST_VECTOR_GET(&ep->outbound_auths, i), AST_VECTOR_GET(&ap->outbound_auths, i))) {
 			return 0;
 		}
 	}
@@ -780,8 +984,10 @@
 	pjsip_publishc_opt opt = {
 		.queue_request = PJ_FALSE,
 	};
+	char server_buf[128], to_buf[128], from_buf[128];
 	pj_str_t event, server_uri, to_uri, from_uri;
 	pj_status_t status;
+	int size;
 
 	if (client->client) {
 		return 0;
@@ -809,55 +1015,30 @@
 		pjsip_publishc_set_route_set(client->client, &route_set);
 	}
 
-	pj_cstr(&event, publish->event);
-	pj_cstr(&server_uri, publish->server_uri);
-	pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
-	pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
-
-	status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
-		publish->expiration);
-	if (status == PJSIP_EINVALIDURI) {
-		pj_pool_t *pool;
-		pj_str_t tmp;
-		pjsip_uri *uri;
-
-		pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
-		if (!pool) {
-			ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
-				ast_sorcery_object_get_id(publish));
-			pjsip_publishc_destroy(client->client);
-			return -1;
-		}
-
-		pj_strdup2_with_null(pool, &tmp, publish->server_uri);
-		uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-		if (!uri) {
-			ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
-				publish->server_uri, ast_sorcery_object_get_id(publish));
-		}
-
-		if (!ast_strlen_zero(publish->to_uri)) {
-			pj_strdup2_with_null(pool, &tmp, publish->to_uri);
-			uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-			if (!uri) {
-				ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
-					publish->to_uri, ast_sorcery_object_get_id(publish));
-			}
-		}
-
-		if (!ast_strlen_zero(publish->from_uri)) {
-			pj_strdup2_with_null(pool, &tmp, publish->from_uri);
-			uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-			if (!uri) {
-				ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
-					publish->from_uri, ast_sorcery_object_get_id(publish));
-			}
-		}
-
-		pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-		pjsip_publishc_destroy(client->client);
+	if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, client->server_uri,
+				    server_buf, sizeof(server_buf) - 1)) <= 0) {
 		return -1;
-	} else if (status != PJ_SUCCESS) {
+	}
+	pj_strset(&server_uri, server_buf, size);
+
+	if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, client->to_uri,
+				    to_buf, sizeof(to_buf) - 1)) <= 0) {
+		return -1;
+	}
+	pj_strset(&to_uri, to_buf, size);
+
+	if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, client->from_uri,
+				    from_buf, sizeof(from_buf) - 1)) <= 0) {
+		return -1;
+	}
+	pj_strset(&from_uri, from_buf, size);
+
+	pj_cstr(&event, publish->event);
+	status = pjsip_publishc_init(client->client, &event, &server_uri,
+				     &from_uri, &to_uri, publish->expiration);
+	if (status != PJ_SUCCESS) {
+		ast_log(LOG_ERROR, "Could not initialize outbound publish '%s'\n",
+			ast_sorcery_object_get_id(publish));
 		pjsip_publishc_destroy(client->client);
 		return -1;
 	}
@@ -1033,13 +1214,99 @@
 	return CMP_MATCH;
 }
 
+/*!
+ * \internal
+ * \brief Set the user, if given, on the uri
+ *
+ * \param uri the uri to set the user on
+ * \param client the client object
+ * \param user the user to set on the uri
+ */
+static pjsip_sip_uri *sip_outbound_publish_client_set_uri(const char *uri,
+	struct ast_sip_outbound_publish_client *client, const char *user)
+{
+	pj_str_t tmp;
+	pjsip_uri *parsed;
+	pjsip_sip_uri *parsed_uri;
+
+	pj_strdup2_with_null(client->pool, &tmp, uri);
+	if (!(parsed = pjsip_parse_uri(client->pool, tmp.ptr, tmp.slen, 0))) {
+		return NULL;
+	}
+
+	if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
+		return NULL;
+	}
+
+	if (user) {
+		pj_strdup2(client->pool, &parsed_uri->user, user);
+	}
+
+	return parsed_uri;
+}
+
+struct set_uris_data {
+	struct ast_sip_outbound_publish *publish;
+	struct ast_sip_outbound_publish_client *client;
+	const char *user;
+	int res;
+};
+
+static int sip_outbound_publish_client_set_uris(void *data)
+{
+	struct set_uris_data *uris_data = data;
+	struct ast_sip_outbound_publish *publish = uris_data->publish;
+	struct ast_sip_outbound_publish_client *client = uris_data->client;
+
+	uris_data->res = -1;
+	if (!client->pool) {
+		client->pool = pjsip_endpt_create_pool(
+			ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
+		if (!client->pool) {
+			ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
+				ast_sorcery_object_get_id(publish));
+			return -1;
+		}
+	}
+
+	client->server_uri = sip_outbound_publish_client_set_uri(
+		publish->server_uri, client, uris_data->user);
+	if (!client->server_uri) {
+		ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
+			publish->server_uri, ast_sorcery_object_get_id(publish));
+		return -1;
+	}
+
+	client->to_uri = ast_strlen_zero(publish->to_uri) ? client->server_uri :
+		sip_outbound_publish_client_set_uri(publish->to_uri, client, uris_data->user);
+
+	if (!client->to_uri) {
+		ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
+			publish->to_uri, ast_sorcery_object_get_id(publish));
+		return -1;
+	}
+
+	client->from_uri = ast_strlen_zero(publish->from_uri) ? client->server_uri :
+		sip_outbound_publish_client_set_uri(publish->from_uri, client, uris_data->user);
+
+	if (!client->from_uri) {
+		ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
+			publish->from_uri, ast_sorcery_object_get_id(publish));
+		return -1;
+	}
+
+	uris_data->res = 0;
+	return 0;
+}
+
 /*! \brief Allocator function for publish client */
 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
-	struct ast_sip_outbound_publish *publish)
+	struct ast_sip_outbound_publish *publish, const char *user)
 {
 	const char *id = ast_sorcery_object_get_id(publish);
 	struct ast_sip_outbound_publish_state *state =
-		ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
+		ao2_alloc(sizeof(*state), sip_outbound_publish_state_destroy);
+	struct set_uris_data uris_data;
 
 	if (!state) {
 		return NULL;
@@ -1062,29 +1329,162 @@
 	state->client->transport_name = ast_strdup(publish->transport);
 	state->client->publish = ao2_bump(publish);
 
-	strcpy(state->id, id);
+	if (!(state->id = ast_strdup(id))) {
+		ao2_ref(state, -1);
+		return NULL;
+	}
+
+	if (user && !(state->user = ast_strdup(user))) {
+		ao2_ref(state, -1);
+		return NULL;
+	}
+
+	uris_data.publish = publish;
+	uris_data.client = state->client;
+	uris_data.user = user;
+	if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_set_uris,
+					  &uris_data) || uris_data.res) {
+		ao2_ref(state, -1);
+		return NULL;
+	}
+
 	return state;
+}
+
+static int apply_initialize_state(struct ast_sip_outbound_publish *publish,
+				  struct ast_sip_outbound_publish_state *state)
+{
+	if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
+		ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
+			ast_sorcery_object_get_id(publish));
+		return -1;
+	}
+
+	return 0;
+}
+
+static struct ast_sip_outbound_publish_state *validate_and_create_state(
+	struct ast_sip_outbound_publish *publish, const char *user)
+{
+	struct ast_sip_outbound_publish_state *state;
+
+	if (ast_strlen_zero(publish->server_uri)) {
+		ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
+			ast_sorcery_object_get_id(publish));
+		return NULL;
+	}
+
+	if (ast_strlen_zero(publish->event)) {
+		ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
+			ast_sorcery_object_get_id(publish));
+		return NULL;
+	}
+
+	state = sip_outbound_publish_state_alloc(publish, user);
+	if (!state) {
+		ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
+			ast_sorcery_object_get_id(publish));
+		return NULL;
+	};
+
+	return state;
+}
+
+static int apply_update_current(void *obj, void *arg, void *data, int flags)
+{
+	RAII_VAR(struct ast_sip_outbound_publish_state *, new_state, NULL, ao2_cleanup);
+	struct ast_sip_outbound_publish_state *state = obj;
+	struct ast_sip_outbound_publish *old, *publish = arg;
+	int *reuse = data;
+
+	/* Process only those object with matching ids */
+	if (strcmp(state->id, ast_sorcery_object_get_id(publish))) {
+		return 0;
+	}
+
+	/*
+	 * Don't maintain the old state object(s) if the multi_user option changed.
+	 */
+	if ((!publish->multi_user && state->client->publish->multi_user) ||
+	    (publish->multi_user && !state->client->publish->multi_user)) {
+		/*
+		 * By returning non-zero here we'll be stopping iteration. However,
+		 * that is okay since it's safe to assume any other objects in the
+		 * container that could match fall under the above condition.
+		 */
+		return -1;
+	}
+
+	/*
+	 * If we have more than one item in the container it is safe to assume each
+	 * item pertains to a multi-user configuration state. That being the case if
+	 * one item validates and can be reused then all items will validate and can
+	 * be reused. Thus we can skip those checks for subsequent states.
+	 */
+	if (*reuse != 1) {
+		if (!(new_state = validate_and_create_state(publish, state->user))) {
+			/*
+			 * The updated configuration had an error in it, so we want to
+			 * keep the current state (i.e. add the current state object to
+			 * the new_states container).
+			 */
+			return !ao2_link(new_states, state);
+		}
+
+		/*
+		 * If we can't reuse the current state then initialize the
+		 * new state and use that.
+		 */
+		if (!can_reuse_publish(state, new_state)) {
+			*reuse = 0;
+			if (apply_initialize_state(publish, new_state)) {
+				/* If it fails to initialize keep the old */
+				return !ao2_link(new_states, state);
+			}
+			return !ao2_link(new_states, new_state);
+		}
+	}
+
+	*reuse = 1;
+
+	/*
+	 * If we can reuse the current state then keep it, but swap out
+	 * the underlying publish object with the new one.
+	 */
+	old = state->client->publish;
+	state->client->publish = publish;
+	if (apply_initialize_state(publish, state)) {
+		/*
+		 * If the state object fails to [re]initialize then
+		 * swap the old publish info back in and add keep the
+		 * current state object around.
+		 */
+		state->client->publish = publish;
+	} else {
+		/*
+		 * Since we swapped out the publish object the new one
+		 * needs a ref while the old one needs to go away.
+		 */
+		ao2_ref(state->client->publish, +1);
+		ao2_cleanup(old);
+	}
+
+	return !ao2_link(new_states, state);
 }
 
 /*! \brief Apply function which finds or allocates a state structure */
 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
 {
-	RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
-	RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
+	struct ao2_container *states;
+	struct ast_sip_outbound_publish_state *new_state = NULL;
 	struct ast_sip_outbound_publish *applied = obj;
+	int existing = -1;
 
-	if (ast_strlen_zero(applied->server_uri)) {
-		ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
-			ast_sorcery_object_get_id(applied));
-		return -1;
-	} else if (ast_strlen_zero(applied->event)) {
-		ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
-			ast_sorcery_object_get_id(applied));
-		return -1;
-	}
-
+	/*
+	 * New states are being loaded or reloaded. We'll need to add the new
+	 * object if created/updated, or keep the old object if an error occurs.
+	 */
 	if (!new_states) {
-		/* make sure new_states has been allocated as we will be adding to it */
 		new_states = ao2_container_alloc_options(
 			AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
 			outbound_publish_state_hash, outbound_publish_state_cmp);
@@ -1095,34 +1495,40 @@
 		}
 	}
 
-	if (states) {
-		state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
-		if (state) {
-			if (can_reuse_publish(state->client->publish, applied)) {
-				ao2_replace(state->client->publish, applied);
-			} else {
-				ao2_ref(state, -1);
-				state = NULL;
-			}
-		}
+	if ((states = ao2_global_obj_ref(current_states))) {
+		/*
+		 * If state objects already exist that are associated with the object
+		 * being applied then we'll be updating each of those objects.
+		 */
+		ao2_callback_data(states, OBJ_NODATA, apply_update_current,
+				  applied, &existing);
+		ao2_ref(states, -1);
 	}
 
-	if (!state) {
-		state = sip_outbound_publish_state_alloc(applied);
-		if (!state) {
-			ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
-				ast_sorcery_object_get_id(applied));
-			return -1;
-		};
+	if (applied->multi_user) {
+		/*
+		 * If a multi-user configuration then drop the old one if it exists and
+		 * then add the new configuration. Note, nothing is added to the current
+		 * states at this time as states are dynamically added later for multi-
+		 * user configurations.
+		 */
+		ao2_find(multi_publishes, ast_sorcery_object_get_id(obj),
+			 OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
+		ao2_link(multi_publishes, applied);
+		return 0;
 	}
 
-	if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
-		ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
-			ast_sorcery_object_get_id(applied));
-		return -1;
+	/*
+	 * If not multi-user and if it doesn't already exist then
+	 * create and add it to new_states.
+	 */
+	if (existing <= 0 &&
+	    (new_state = validate_and_create_state(applied, NULL)) &&
+	    !apply_initialize_state(applied, new_state)) {
+		ao2_link(new_states, new_state);
 	}
 
-	ao2_link(new_states, state);
+	ao2_cleanup(new_state);
 	return 0;
 }
 
@@ -1137,11 +1543,20 @@
 {
 	CHECK_PJSIP_MODULE_LOADED();
 
+	multi_publishes = ao2_container_alloc(DEFAULT_MULTI_BUCKETS,
+		outbound_publish_multi_hash, outbound_publish_multi_cmp);
+	if (!multi_publishes) {
+		ast_log(LOG_ERROR, "Unable to allocate multi-publishes container\n");
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
 	ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
 	ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
 
 	if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
 		sip_outbound_publish_apply)) {
+		ao2_ref(multi_publishes, -1);
+		ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
@@ -1155,6 +1570,7 @@
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
 	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
 	ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+	ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
 
 	ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
 
@@ -1190,6 +1606,7 @@
 	if (!states || !(unloading.count = ao2_container_count(states))) {
 		return 0;
 	}
+
 	ao2_ref(states, -1);
 
 	ast_mutex_init(&unloading.lock);
@@ -1197,11 +1614,12 @@
 	ast_mutex_lock(&unloading.lock);
 
 	unloading.is_unloading = 1;
+	ao2_ref(multi_publishes, -1);
 	ao2_global_obj_release(current_states);
 
 	/* wait for items to unpublish */
 	ast_verb(5, "Waiting to complete unpublishing task(s)\n");
-	while (unloading.count) {
+	while (unloading.count && !res) {
 		res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
 	}
 	ast_mutex_unlock(&unloading.lock);
@@ -1214,6 +1632,7 @@
 			"in the allowed time\n", unloading.count);
 	} else {
 		ast_verb(5, "All items successfully unpublished\n");
+		ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
 	}
 
 	return res;

-- 
To view, visit https://gerrit.asterisk.org/2709
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib88dde024cc83c916424645d4f5bb84a0fa936cc
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: Kevin Harwell <kharwell at digium.com>



More information about the asterisk-code-review mailing list