[Asterisk-code-review] res pjsip: Refactor endpt send transaction (qualify timeout) (asterisk[master])

George Joseph asteriskteam at digium.com
Wed May 20 11:26:49 CDT 2015


George Joseph has uploaded a new change for review.

  https://gerrit.asterisk.org/493

Change subject: res_pjsip: Refactor endpt_send_transaction (qualify_timeout)
......................................................................

res_pjsip: Refactor endpt_send_transaction (qualify_timeout)

This patch refactors the transaction timeout processing to eliminate
calling the lower level public pjsip functions and reverts to calling
pjsip_endpt_send_request again.  This is the result of me noticing
a possible incompatibility with pjproject-2.4 which was causing
contact status flapping.

The original version of this feature used the lower level calls to
get access to the tsx structure in order to cancel the transaction
when our own timer expires. Since we no longer have that access,
if our own timer expires before the pjsip timer, we call the callbacks
and just let the pjsip transaction take it's own course.  When the
transaction ends, it discovers the callbacks have already been run
and just cleans itself up.

A few messages in pjsip_configuration were also added/cleaned up.

ASTERISK-25015 #close

Change-Id: I0810f3999cf63f3a72607bbecac36af0a957f33e
Reported-by: George Joseph <george.joseph at fairview5.com>
Tested-by: George Joseph <george.joseph at fairview5.com>
---
M res/res_pjsip.c
M res/res_pjsip/pjsip_configuration.c
2 files changed, 195 insertions(+), 154 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/93/493/1

diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 3582fae..b532985 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -2842,126 +2842,6 @@
 #define TIMER_INACTIVE		0
 #define TIMEOUT_TIMER2		5
 
-struct tsx_data {
-	void *token;
-	void (*cb)(void*, pjsip_event*);
-	pjsip_transaction *tsx;
-	pj_timer_entry *timeout_timer;
-};
-
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event);
-
-pjsip_module send_tsx_module = {
-    .name = { "send_tsx_module", 23 },
-    .id = -1,
-    .priority = PJSIP_MOD_PRIORITY_APPLICATION,
-    .on_tsx_state = &send_tsx_on_tsx_state,
-};
-
-/*! \brief This is the pjsip_tsx_send_msg callback */
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event)
-{
-	struct tsx_data *tsx_data;
-
-	if (event->type != PJSIP_EVENT_TSX_STATE) {
-		return;
-	}
-
-	tsx_data = (struct tsx_data*) tsx->mod_data[send_tsx_module.id];
-	if (tsx_data == NULL) {
-		return;
-	}
-
-	if (tsx->status_code < 200) {
-		return;
-	}
-
-	if (event->body.tsx_state.type == PJSIP_EVENT_TIMER) {
-		ast_debug(1, "PJSIP tsx timer expired\n");
-	}
-
-	if (tsx_data->timeout_timer && tsx_data->timeout_timer->id != TIMER_INACTIVE) {
-		pj_mutex_lock(tsx->mutex_b);
-		pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
-			tsx_data->timeout_timer, TIMER_INACTIVE);
-		pj_mutex_unlock(tsx->mutex_b);
-	}
-
-	/* Call the callback, if any, and prevent the callback from being called again
-	 * by clearing the transaction's module_data.
-	 */
-	tsx->mod_data[send_tsx_module.id] = NULL;
-
-	if (tsx_data->cb) {
-		(*tsx_data->cb)(tsx_data->token, event);
-	}
-}
-
-static void tsx_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
-{
-	struct tsx_data *tsx_data = entry->user_data;
-
-	entry->id = TIMER_INACTIVE;
-	ast_debug(1, "Internal tsx timer expired\n");
-	pjsip_tsx_terminate(tsx_data->tsx, PJSIP_SC_TSX_TIMEOUT);
-}
-
-static pj_status_t endpt_send_transaction(pjsip_endpoint *endpt,
-	pjsip_tx_data *tdata, int timeout, void *token,
-	pjsip_endpt_send_callback cb)
-{
-	pjsip_transaction *tsx;
-	struct tsx_data *tsx_data;
-	pj_status_t status;
-	pjsip_event event;
-
-	ast_assert(endpt && tdata);
-
-	status = pjsip_tsx_create_uac(&send_tsx_module, tdata, &tsx);
-	if (status != PJ_SUCCESS) {
-		pjsip_tx_data_dec_ref(tdata);
-		ast_log(LOG_ERROR, "Unable to create pjsip uac\n");
-		return status;
-	}
-
-	tsx_data = PJ_POOL_ALLOC_T(tsx->pool, struct tsx_data);
-	tsx_data->token = token;
-	tsx_data->cb = cb;
-	tsx_data->tsx = tsx;
-	if (timeout > 0) {
-		tsx_data->timeout_timer = PJ_POOL_ALLOC_T(tsx->pool, pj_timer_entry);
-	} else {
-		tsx_data->timeout_timer = NULL;
-	}
-	tsx->mod_data[send_tsx_module.id] = tsx_data;
-
-	PJSIP_EVENT_INIT_TX_MSG(event, tdata);
-	pjsip_tx_data_set_transport(tdata, &tsx->tp_sel);
-
-	if (timeout > 0) {
-		pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
-
-		pj_timer_entry_init(tsx_data->timeout_timer, TIMEOUT_TIMER2,
-			tsx_data, &tsx_timer_callback);
-		pj_mutex_lock(tsx->mutex_b);
-		pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
-			tsx_data->timeout_timer, TIMER_INACTIVE);
-		pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(tsx->endpt),
-			tsx_data->timeout_timer, &timeout_timer_val);
-		tsx_data->timeout_timer->id = TIMEOUT_TIMER2;
-		pj_mutex_unlock(tsx->mutex_b);
-	}
-
-	status = (*tsx->state_handler)(tsx, &event);
-	pjsip_tx_data_dec_ref(tdata);
-	if (status != PJ_SUCCESS) {
-		ast_log(LOG_ERROR, "Unable to send message\n");
-		return status;
-	}
-
-	return status;
-}
-
 /*! \brief Structure to hold information about an outbound request */
 struct send_request_data {
 	/*! The endpoint associated with this request */
@@ -3006,40 +2886,211 @@
 	void (*callback)(void *token, pjsip_event *e);
 	/*! Non-zero when the callback is called. */
 	unsigned int cb_called;
+	/*! Timeout timer. */
+	pj_timer_entry *timeout_timer;
+	/*! Original timeout. */
+	pj_int32_t timeout;
+	/*! Timeout/cleanup lock. */
+	pj_mutex_t *lock;
 };
 
-static void endpt_send_request_wrapper(void *token, pjsip_event *e)
+/*! \internal This function gets called by pjsip when the transaction ends,
+ * even if it timed out.  The lock prevents a race condition if both the pjsip
+ * transaction timer and our own timer expire simultaneously.
+ */
+static void endpt_send_request_cb(void *token, pjsip_event *e)
 {
 	struct send_request_wrapper *req_wrapper = token;
 
-	req_wrapper->cb_called = 1;
-	if (req_wrapper->callback) {
-		req_wrapper->callback(req_wrapper->token, e);
+	if (e->body.tsx_state.type == PJSIP_EVENT_TIMER) {
+		ast_debug(2, "%p: PJSIP tsx timer expired\n",
+			req_wrapper);
+
+		if (req_wrapper->timeout_timer
+			&& req_wrapper->timeout_timer->id != TIMEOUT_TIMER2) {
+			ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+			ao2_ref(req_wrapper, -1);
+			return;
+		}
 	}
+
+	pj_mutex_lock(req_wrapper->lock);
+
+	/* It's possible that our own timer was already processing while
+	 * we were waiting on the lock so check the timer id.  If it's
+	 * still TIMER2 then we still need to process.
+	 */
+	if (req_wrapper->timeout_timer
+		&& req_wrapper->timeout_timer->id == TIMEOUT_TIMER2) {
+		int timers_cancelled = 0;
+
+		ast_debug(2, "%p: Cancelling timer\n", req_wrapper);
+
+		timers_cancelled = pj_timer_heap_cancel_if_active(
+			pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()),
+			req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+		if (timers_cancelled > 0) {
+			/* If the timer was cancelled the callback will never run so
+			 * clean up its reference to the wrapper.
+			 */
+			ast_debug(3, "%p: Timer cancelled\n", req_wrapper);
+			ao2_ref(req_wrapper, -1);
+		} else {
+			/* If it wasn't cancelled, it MAY be in the callback already
+			 * waiting on the lock so set the id to INACTIVE so
+			 * when the callback comes out of the lock, it knows to not
+			 * proceed.
+			 */
+			ast_debug(3, "%p: Timer already expired\n", req_wrapper);
+			req_wrapper->timeout_timer->id = TIMER_INACTIVE;
+		}
+	}
+
+	/* It's possible that our own timer expired and called the callbacks
+	 * so no need to call them again.
+	 */
+	if (!req_wrapper->cb_called && req_wrapper->callback) {
+		req_wrapper->callback(req_wrapper->token, e);
+		req_wrapper->cb_called = 1;
+		ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
+	}
+	pj_mutex_unlock(req_wrapper->lock);
 	ao2_ref(req_wrapper, -1);
 }
 
+/*! \internal This function gets called by our own timer when it expires.
+ * If the timer is cancelled however, the function does NOT get called.
+ * The lock prevents a race condition if both the pjsip transaction timer
+ * and our own timer expire simultaneously.
+ */
+static void send_request_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
+{
+	pjsip_event event;
+	struct send_request_wrapper *req_wrapper = entry->user_data;
+
+	ast_debug(2, "%p: Internal tsx timer expired after %d msec\n",
+		req_wrapper, req_wrapper->timeout);
+
+	/* If the id is not TIMEOUT_TIMER2 then the timer was cancelled above
+	 * between the time the timer expired and we grabbed the lock so just clean up.
+	 */
+	if (entry->id != TIMEOUT_TIMER2) {
+		ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+		ao2_ref(req_wrapper, -1);
+		return;
+	}
+
+	pj_mutex_lock(req_wrapper->lock);
+	/* If the id is not TIMEOUT_TIMER2 then the timer was cancelled above
+	 * while the lock was being held so just clean up.
+	 */
+	if (entry->id != TIMEOUT_TIMER2) {
+		pj_mutex_unlock(req_wrapper->lock);
+		ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+		ao2_ref(req_wrapper, -1);
+		return;
+	}
+
+	ast_debug(3, "%p: Timer handled here\n", req_wrapper);
+
+	event.body.tsx_state.type = PJSIP_EVENT_TIMER;
+	entry->id = TIMER_INACTIVE;
+
+	if (!req_wrapper->cb_called && req_wrapper->callback) {
+		req_wrapper->callback(req_wrapper->token, &event);
+		req_wrapper->cb_called = 1;
+		ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
+	}
+
+	pj_mutex_unlock(req_wrapper->lock);
+	ao2_ref(req_wrapper, -1);
+}
+
+static void send_request_wrapper_destructor(void *obj)
+{
+	struct send_request_wrapper *req_wrapper = obj;
+
+	ast_debug(3, "%p: wrapper destroyed\n", req_wrapper);
+	pj_mutex_destroy(req_wrapper->lock);
+}
+
 static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint,
-	pjsip_tx_data *tdata, int timeout, void *token, pjsip_endpt_send_callback cb)
+	pjsip_tx_data *tdata, pj_int32_t timeout, void *token, pjsip_endpt_send_callback cb)
 {
 	struct send_request_wrapper *req_wrapper;
 	pj_status_t ret_val;
+	pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
 
 	/* Create wrapper to detect if the callback was actually called on an error. */
-	req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), NULL,
+	req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), send_request_wrapper_destructor,
 		AO2_ALLOC_OPT_LOCK_NOLOCK);
 	if (!req_wrapper) {
 		pjsip_tx_data_dec_ref(tdata);
 		return PJ_ENOMEM;
 	}
+
+	ast_debug(2, "%p: Wrapper created\n", req_wrapper);
+
 	req_wrapper->token = token;
 	req_wrapper->callback = cb;
+	req_wrapper->timeout = timeout;
+	req_wrapper->timeout_timer = NULL;
+	req_wrapper->lock = NULL;
 
-	ao2_ref(req_wrapper, +1);
-	ret_val = endpt_send_transaction(ast_sip_get_pjsip_endpoint(), tdata, timeout,
-		req_wrapper, endpt_send_request_wrapper);
+	ret_val = pj_mutex_create_simple(tdata->pool, "tsx_timeout", &req_wrapper->lock);
 	if (ret_val != PJ_SUCCESS) {
 		char errmsg[PJ_ERR_MSG_SIZE];
+		pj_strerror(ret_val, errmsg, sizeof(errmsg));
+		ast_log(LOG_ERROR, "Error %d '%s' sending %.*s request to endpoint %s\n",
+			(int) ret_val, errmsg, (int) pj_strlen(&tdata->msg->line.req.method.name),
+			pj_strbuf(&tdata->msg->line.req.method.name),
+			endpoint ? ast_sorcery_object_get_id(endpoint) : "<unknown>");
+		pjsip_tx_data_dec_ref(tdata);
+		ao2_ref(req_wrapper, -1);
+		return PJ_ENOMEM;
+	}
+
+	pj_mutex_lock(req_wrapper->lock);
+
+	if (timeout > 0) {
+		pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
+
+		req_wrapper->timeout_timer = PJ_POOL_ALLOC_T(tdata->pool, pj_timer_entry);
+
+		ast_debug(2, "%p: Set timer to %d msec\n", req_wrapper, timeout);
+
+		pj_timer_entry_init(req_wrapper->timeout_timer, TIMEOUT_TIMER2,
+			req_wrapper, &send_request_timer_callback);
+
+		pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+			req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+		/* We need to insure that the wrapper is available if/when the
+		 * timer callback is executed.
+		 */
+		ao2_ref(req_wrapper, +1);
+		pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(endpt),
+			req_wrapper->timeout_timer, &timeout_timer_val);
+
+		req_wrapper->timeout_timer->id = TIMEOUT_TIMER2;
+	} else {
+		req_wrapper->timeout_timer = NULL;
+	}
+
+	/* We need to insure that the wrapper is available when the
+	 * transaction callback is executed.
+	 */
+	ao2_ref(req_wrapper, +1);
+	ret_val = pjsip_endpt_send_request(endpt, tdata, -1, req_wrapper, endpt_send_request_cb);
+	if (ret_val != PJ_SUCCESS) {
+		char errmsg[PJ_ERR_MSG_SIZE];
+
+		if (timeout > 0) {
+			pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+				req_wrapper->timeout_timer, TIMER_INACTIVE);
+			ao2_ref(req_wrapper, -1);
+		}
 
 		/* Complain of failure to send the request. */
 		pj_strerror(ret_val, errmsg, sizeof(errmsg));
@@ -3061,6 +3112,7 @@
 			ao2_ref(req_wrapper, -1);
 		}
 	}
+	pj_mutex_unlock(req_wrapper->lock);
 	ao2_ref(req_wrapper, -1);
 	return ret_val;
 }
@@ -3076,10 +3128,6 @@
 	int res;
 
 	switch(e->body.tsx_state.type) {
-	case PJSIP_EVENT_USER:
-		/* Map USER (transaction cancelled by timeout) to TIMER */
-		e->body.tsx_state.type = PJSIP_EVENT_TIMER;
-		break;
 	case PJSIP_EVENT_TRANSPORT_ERROR:
 	case PJSIP_EVENT_TIMER:
 		break;
@@ -3695,25 +3743,8 @@
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
-	if (internal_sip_register_service(&send_tsx_module)) {
-		ast_log(LOG_ERROR, "Failed to initialize send request module. Aborting load\n");
-		internal_sip_unregister_service(&supplement_module);
-		ast_sip_destroy_distributor();
-		ast_res_pjsip_destroy_configuration();
-		ast_sip_destroy_global_headers();
-		stop_monitor_thread();
-		ast_sip_destroy_system();
-		pj_pool_release(memory_pool);
-		memory_pool = NULL;
-		pjsip_endpt_destroy(ast_pjsip_endpoint);
-		ast_pjsip_endpoint = NULL;
-		pj_caching_pool_destroy(&caching_pool);
-		return AST_MODULE_LOAD_DECLINE;
-	}
-
 	if (internal_sip_initialize_outbound_authentication()) {
 		ast_log(LOG_ERROR, "Failed to initialize outbound authentication. Aborting load\n");
-		internal_sip_unregister_service(&send_tsx_module);
 		internal_sip_unregister_service(&supplement_module);
 		ast_sip_destroy_distributor();
 		ast_res_pjsip_destroy_configuration();
@@ -3757,7 +3788,6 @@
 	ast_res_pjsip_destroy_configuration();
 	ast_sip_destroy_system();
 	ast_sip_destroy_global_headers();
-	internal_sip_unregister_service(&send_tsx_module);
 	internal_sip_unregister_service(&supplement_module);
 	if (monitor_thread) {
 		stop_monitor_thread();
diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c
index f147b34..9fa18c7 100644
--- a/res/res_pjsip/pjsip_configuration.c
+++ b/res/res_pjsip/pjsip_configuration.c
@@ -121,9 +121,20 @@
 /*! \brief Function called when stuff relating to a contact happens (created/deleted) */
 static void persistent_endpoint_contact_created_observer(const void *object)
 {
-	char *id = ast_strdupa(ast_sorcery_object_get_id(object)), *aor = NULL;
+	char *id = ast_strdupa(ast_sorcery_object_get_id(object));
+	char *aor = NULL;
+	char *contact = NULL;
 
-	aor = strsep(&id, ";@");
+	aor = id;
+	/* Dynamic contacts are delimited with ";@" and static ones with "@@" */
+	if ((contact = strstr(id, ";@")) || (contact = strstr(id, "@@"))) {
+		*contact = '\0';
+		contact += 2;
+	} else {
+		contact = id;
+	}
+
+	ast_verb(1, "Contact %s/%s has been created\n", aor, contact);
 
 	ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
 }
@@ -144,7 +155,7 @@
 		contact = id;
 	}
 
-	ast_verb(1, "Contact %s/%s is now Unavailable\n", aor, contact);
+	ast_verb(1, "Contact %s/%s has been deleted\n", aor, contact);
 
 	ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
 }

-- 
To view, visit https://gerrit.asterisk.org/493
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0810f3999cf63f3a72607bbecac36af0a957f33e
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: George Joseph <george.joseph at fairview5.com>



More information about the asterisk-code-review mailing list