[Asterisk-code-review] res pjsip pubsub.c: Recreate subscriptions using distributor... (asterisk[certified/13.8])

Richard Mudgett asteriskteam at digium.com
Thu Jun 9 10:36:49 CDT 2016


Richard Mudgett has uploaded a new change for review.

  https://gerrit.asterisk.org/2996

Change subject: res_pjsip_pubsub.c: Recreate subscriptions using distributor serializer.
......................................................................

res_pjsip_pubsub.c: Recreate subscriptions using distributor serializer.

* Resolves potential reentrancy problems if system restarted in the middle
of subscription message transactions.

* Fixes memory leak recreating persistent subscriptions when the
subscription resource tree could not be created.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I71e34d7ae8ed35a694f1030e820e2548c48697be
---
M res/res_pjsip_pubsub.c
1 file changed, 139 insertions(+), 72 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/96/2996/1

diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index c0dd348..0761fe8 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -355,7 +355,7 @@
 struct subscription_persistence {
 	/*! Sorcery object details */
 	SORCERY_OBJECT(details);
-	/*! The name of the endpoint involved in the subscrption */
+	/*! The name of the endpoint involved in the subscription */
 	char *endpoint;
 	/*! SIP message that creates the subscription */
 	char packet[PJSIP_MAX_PKT_LEN];
@@ -1347,22 +1347,134 @@
 static int initial_notify_task(void *obj);
 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
 
+/*! Persistent subscription recreation continuation under distributor serializer data */
+struct persistence_recreate_data {
+	struct subscription_persistence *persistence;
+	pjsip_rx_data *rdata;
+};
+
+/*!
+ * \internal
+ * \brief subscription_persistence_recreate continuation under distributor serializer.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int sub_persistence_recreate(void *obj)
+{
+	struct persistence_recreate_data *recreate_data = obj;
+	struct subscription_persistence *persistence = recreate_data->persistence;
+	pjsip_rx_data *rdata = recreate_data->rdata;
+	struct ast_sip_endpoint *endpoint;
+	struct sip_subscription_tree *sub_tree;
+	struct ast_sip_pubsub_body_generator *generator;
+	struct ast_sip_subscription_handler *handler;
+	char *resource;
+	pjsip_sip_uri *request_uri;
+	size_t resource_size;
+	int resp;
+	struct resource_tree tree;
+	pjsip_expires_hdr *expires_header;
+
+	request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
+	resource_size = pj_strlen(&request_uri->user) + 1;
+	resource = ast_alloca(resource_size);
+	ast_copy_pj_str(resource, &request_uri->user, resource_size);
+
+	handler = subscription_get_handler_from_rdata(rdata);
+	if (!handler || !handler->notifier) {
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	generator = subscription_get_generator_from_rdata(rdata, handler);
+	if (!generator) {
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
+		pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+	/* Getting the endpoint may take some time that can affect the expiration. */
+	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
+		persistence->endpoint);
+	if (!endpoint) {
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		ao2_ref(endpoint, -1);
+		return 0;
+	}
+
+	/* Update the expiration header with the new expiration */
+	expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
+		rdata->msg_info.msg->hdr.next);
+	if (!expires_header) {
+		expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
+		if (!expires_header) {
+			ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
+				persistence->endpoint);
+			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+			ao2_ref(endpoint, -1);
+			return 0;
+		}
+		pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
+	}
+	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+	if (expires_header->ivalue <= 0) {
+		/* The subscription expired since we started recreating the subscription. */
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		ao2_ref(endpoint, -1);
+		return 0;
+	}
+
+	memset(&tree, 0, sizeof(tree));
+	resp = build_resource_tree(endpoint, handler, resource, &tree,
+		ast_sip_pubsub_has_eventlist_support(rdata));
+	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+		pj_status_t dlg_status;
+
+		sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
+			&tree, &dlg_status);
+		if (!sub_tree) {
+			if (dlg_status != PJ_EEXISTS) {
+				ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
+					persistence->endpoint);
+				ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+			}
+		} else {
+			sub_tree->persistence = ao2_bump(persistence);
+			subscription_persistence_update(sub_tree, rdata);
+			if (ast_sip_push_task(sub_tree->serializer, initial_notify_task,
+				ao2_bump(sub_tree))) {
+				/* Could not send initial subscribe NOTIFY */
+				pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+				ao2_ref(sub_tree, -1);
+			}
+		}
+	} else {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+	}
+	resource_tree_destroy(&tree);
+	ao2_ref(endpoint, -1);
+
+	return 0;
+}
+
 /*! \brief Callback function to perform the actual recreation of a subscription */
 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
 {
 	struct subscription_persistence *persistence = obj;
 	pj_pool_t *pool = arg;
-	pjsip_rx_data rdata = { { 0, }, };
-	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
-	struct sip_subscription_tree *sub_tree;
-	struct ast_sip_pubsub_body_generator *generator;
-	int resp;
-	char *resource;
-	size_t resource_size;
-	pjsip_sip_uri *request_uri;
-	struct resource_tree tree;
-	pjsip_expires_hdr *expires_header;
-	struct ast_sip_subscription_handler *handler;
+	struct ast_taskprocessor *serializer;
+	pjsip_rx_data rdata;
+	struct persistence_recreate_data recreate_data;
 
 	/* If this subscription has already expired remove it */
 	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
@@ -1370,86 +1482,41 @@
 		return 0;
 	}
 
-	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
-	if (!endpoint) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
+	memset(&rdata, 0, sizeof(rdata));
 	pj_pool_reset(pool);
 	rdata.tp_info.pool = pool;
 
 	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
 		persistence->transport_key, persistence->local_name, persistence->local_port)) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
 			persistence->endpoint);
 		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
 		return 0;
 	}
 
 	if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
-		ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n",
-				ast_sorcery_object_get_id(endpoint));
+		ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
+			persistence->endpoint);
 		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
 		return 0;
 	}
 
-	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
-	resource_size = pj_strlen(&request_uri->user) + 1;
-	resource = ast_alloca(resource_size);
-	ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
-	/* Update the expiration header with the new expiration */
-	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
-	if (!expires_header) {
-		expires_header = pjsip_expires_hdr_create(pool, 0);
-		if (!expires_header) {
-			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-			return 0;
-		}
-		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
-	}
-	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
-	handler = subscription_get_handler_from_rdata(&rdata);
-	if (!handler || !handler->notifier) {
+	/* Continue the remainder in the distributor serializer */
+	serializer = ast_sip_get_distributor_serializer(&rdata);
+	if (!serializer) {
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
+			persistence->endpoint);
 		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
 		return 0;
 	}
-
-	generator = subscription_get_generator_from_rdata(&rdata, handler);
-	if (!generator) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
-			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
-	memset(&tree, 0, sizeof(tree));
-	resp = build_resource_tree(endpoint, handler, resource, &tree,
-		ast_sip_pubsub_has_eventlist_support(&rdata));
-	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
-		pj_status_t dlg_status;
-
-		sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status);
-		if (!sub_tree) {
-			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-			ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
-			return 0;
-		}
-		sub_tree->persistence = ao2_bump(persistence);
-		subscription_persistence_update(sub_tree, &rdata);
-		if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
-			pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
-			ao2_ref(sub_tree, -1);
-		}
-	} else {
+	recreate_data.persistence = persistence;
+	recreate_data.rdata = &rdata;
+	if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
+		ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
+			persistence->endpoint);
 		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
 	}
-	resource_tree_destroy(&tree);
+	ast_taskprocessor_unreference(serializer);
 
 	return 0;
 }

-- 
To view, visit https://gerrit.asterisk.org/2996
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I71e34d7ae8ed35a694f1030e820e2548c48697be
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: certified/13.8
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>



More information about the asterisk-code-review mailing list