[Asterisk-code-review] pjsip distributor.c: Consistently pick a serializer for mess... (asterisk[13])

Richard Mudgett asteriskteam at digium.com
Mon Jun 6 15:05:22 CDT 2016


Richard Mudgett has uploaded a new change for review.

  https://gerrit.asterisk.org/2948

Change subject: pjsip_distributor.c: Consistently pick a serializer for messages.
......................................................................

pjsip_distributor.c: Consistently pick a serializer for messages.

Incoming messages that are not part of a dialog or a recognized response
to one of our requests need to be sent to a consistent serializer.  Under
load we may be queueing retransmissions before we can process the original
message.  We don't need to throw these messages onto random serializers
and cause reentrancy and message sequencing problems.

* Created a pool of pjsip/distributor serializers that get picked by
hashing the call-id and remote tag strings of the received messages.

* Made ast_sip_destroy_distributor() destroy items in the reverse order of
creation.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I2ce769389fc060d9f379977f559026fbcb632407
---
M include/asterisk/res_pjsip.h
M res/res_pjsip/pjsip_distributor.c
2 files changed, 166 insertions(+), 5 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/48/2948/1

diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index b64ad62..e3eab7c 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1336,6 +1336,17 @@
 struct ast_taskprocessor *ast_sip_create_serializer_group_named(const char *name, struct ast_serializer_shutdown_group *shutdown_group);
 
 /*!
+ * \brief Determine the distributor serializer for the SIP message.
+ * \since 13.10.0
+ *
+ * \param rdata The incoming message.
+ *
+ * \retval Calculated distributor serializer on success.
+ * \retval NULL on error.
+ */
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata);
+
+/*!
  * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
  *
  * Passing a NULL serializer is a way to remove a serializer from a dialog.
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 288a3e0..528ccb6 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -59,6 +59,12 @@
 	char src_name[];
 };
 
+/*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
+#define DISTRIBUTOR_POOL_SIZE		31
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
+
 /*!
  * \internal
  * \brief Record the task's serializer name on the tdata structure.
@@ -278,6 +284,83 @@
 	return dlg;
 }
 
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to add to the hash
+ * \param[in] hash The hash value to add to
+ *
+ * \details
+ * This version of the function is for when you need to compute a
+ * string hash of more than one string.
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * \sa http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash_add(pj_str_t *str, int hash)
+{
+	size_t len;
+	const char *pos;
+
+	len = pj_strlen(str);
+	pos = pj_strbuf(str);
+	while (len--) {
+		hash = hash * 33 ^ *pos++;
+	}
+
+	return hash;
+}
+
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to hash
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash(pj_str_t *str)
+{
+	return pjstr_hash_add(str, 5381);
+}
+
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
+{
+	int hash;
+	pj_str_t *remote_tag;
+	struct ast_taskprocessor *serializer;
+
+	if (!rdata->msg_info.msg) {
+		return NULL;
+	}
+
+	if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+		remote_tag = &rdata->msg_info.from->tag;
+	} else {
+		remote_tag = &rdata->msg_info.to->tag;
+	}
+
+	/* Compute the hash from the SIP message call-id and remote-tag */
+	hash = pjstr_hash(&rdata->msg_info.cid->id);
+	hash = pjstr_hash_add(remote_tag, hash);
+	hash = abs(hash);
+
+	serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
+	if (serializer) {
+		ast_debug(3, "Calculated serializer %s to use for %s\n",
+			ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
+	}
+	return serializer;
+}
+
 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
 
 static pjsip_module endpoint_mod = {
@@ -324,12 +407,23 @@
 		ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
 			pjsip_rx_data_get_info(rdata));
 		serializer = find_request_serializer(rdata);
+		if (!serializer) {
+			/*
+			 * Pick a serializer for the unmatched response.  Maybe
+			 * the stack can figure out what it is for, or we really
+			 * should just toss it regardless.
+			 */
+			serializer = ast_sip_get_distributor_serializer(rdata);
+		}
 	} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
 		|| !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
 		/* We have a BYE or CANCEL request without a serializer. */
 		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
 			PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
 		return PJ_TRUE;
+	} else {
+		/* Pick a serializer for the out-of-dialog request. */
+		serializer = ast_sip_get_distributor_serializer(rdata);
 	}
 
 	pjsip_rx_data_clone(rdata, 0, &clone);
@@ -349,7 +443,10 @@
 		ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
 		pjsip_rx_data_free_cloned(clone);
 	} else {
-		ast_sip_push_task(serializer, distribute, clone);
+		if (ast_sip_push_task(serializer, distribute, clone)) {
+			ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+			pjsip_rx_data_free_cloned(clone);
+		}
 	}
 
 	ast_taskprocessor_unreference(serializer);
@@ -796,6 +893,7 @@
 
 	return 0;
 }
+
 static int cli_unid_print_body(void *obj, void *arg, int flags)
 {
 	struct unidentified_request *unid = obj;
@@ -886,12 +984,58 @@
 	.loaded = global_loaded,
 };
 
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \return Nothing
+ */
+static void distributor_pool_shutdown(void)
+{
+	int idx;
+
+	for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+		ast_taskprocessor_unreference(distributor_pool[idx]);
+		distributor_pool[idx] = NULL;
+	}
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int distributor_pool_setup(void)
+{
+	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+	int idx;
+
+	for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+		/* Create name with seq number appended. */
+		ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
+
+		distributor_pool[idx] = ast_sip_create_serializer_named(tps_name);
+		if (!distributor_pool[idx]) {
+			return -1;
+		}
+	}
+	return 0;
+}
 
 int ast_sip_initialize_distributor(void)
 {
 	unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
 		DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
 	if (!unidentified_requests) {
+		return -1;
+	}
+
+	if (distributor_pool_setup()) {
+		ast_sip_destroy_distributor();
 		return -1;
 	}
 
@@ -927,8 +1071,10 @@
 		return -1;
 	}
 
-	unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
+	unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
 	if (!unid_formatter) {
+		ast_sip_destroy_distributor();
 		ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
 		return -1;
 	}
@@ -940,6 +1086,7 @@
 	unid_formatter->get_id = cli_unid_get_id;
 	unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
 	ast_sip_register_cli_formatter(unid_formatter);
+
 	ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
 	return 0;
@@ -950,17 +1097,20 @@
 	ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
 	ast_sip_unregister_cli_formatter(unid_formatter);
 
-	internal_sip_unregister_service(&distributor_mod);
-	internal_sip_unregister_service(&endpoint_mod);
 	internal_sip_unregister_service(&auth_mod);
+	internal_sip_unregister_service(&endpoint_mod);
+	internal_sip_unregister_service(&distributor_mod);
 
 	ao2_cleanup(artificial_auth);
 	ao2_cleanup(artificial_endpoint);
-	ao2_cleanup(unidentified_requests);
 
 	ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
 
 	if (prune_context) {
 		ast_sched_context_destroy(prune_context);
 	}
+
+	distributor_pool_shutdown();
+
+	ao2_cleanup(unidentified_requests);
 }

-- 
To view, visit https://gerrit.asterisk.org/2948
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ce769389fc060d9f379977f559026fbcb632407
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>



More information about the asterisk-code-review mailing list