[Asterisk-code-review] res pjsip: Refactor endpt send transaction (qualify timeout) (asterisk[13])
Joshua Colp
asteriskteam at digium.com
Fri May 22 10:40:48 CDT 2015
Joshua Colp has submitted this change and it was merged.
Change subject: res_pjsip: Refactor endpt_send_transaction (qualify_timeout)
......................................................................
res_pjsip: Refactor endpt_send_transaction (qualify_timeout)
This patch refactors the transaction timeout processing to eliminate
calling the lower level public pjsip functions and reverts to calling
pjsip_endpt_send_request again. This is the result of me noticing
a possible incompatibility with pjproject-2.4 which was causing
contact status flapping.
The original version of this feature used the lower level calls to
get access to the tsx structure in order to cancel the transaction
when our own timer expires. Since we no longer have that access,
if our own timer expires before the pjsip timer, we call the callbacks
and just let the pjsip transaction take it's own course. When the
transaction ends, it discovers the callbacks have already been run
and just cleans itself up.
A few messages in pjsip_configuration were also added/cleaned up.
ASTERISK-25105 #close
Change-Id: I0810f3999cf63f3a72607bbecac36af0a957f33e
Reported-by: George Joseph <george.joseph at fairview5.com>
Tested-by: George Joseph <george.joseph at fairview5.com>
---
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip/pjsip_configuration.c
3 files changed, 202 insertions(+), 154 deletions(-)
Approvals:
Matt Jordan: Looks good to me, but someone else must approve
Joshua Colp: Looks good to me, approved; Verified
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index cbae595..b9ece71 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1304,6 +1304,13 @@
*
* \retval 0 Success
* \retval -1 Failure (out-of-dialog callback will not be called.)
+ *
+ * \note Timeout processing:
+ * There are 2 timers associated with this request, PJSIP timer_b which is
+ * set globally in the "system" section of pjsip.conf, and the timeout specified
+ * on this call. The timer that expires first (before normal completion) will
+ * cause the callback to be run with e->body.tsx_state.type = PJSIP_EVENT_TIMER.
+ * The timer that expires second is simply ignored and the callback is not run again.
*/
int ast_sip_send_out_of_dialog_request(pjsip_tx_data *tdata,
struct ast_sip_endpoint *endpoint, int timeout, void *token,
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 5f68c44..ff18a1d 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -2846,126 +2846,6 @@
#define TIMER_INACTIVE 0
#define TIMEOUT_TIMER2 5
-struct tsx_data {
- void *token;
- void (*cb)(void*, pjsip_event*);
- pjsip_transaction *tsx;
- pj_timer_entry *timeout_timer;
-};
-
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event);
-
-pjsip_module send_tsx_module = {
- .name = { "send_tsx_module", 23 },
- .id = -1,
- .priority = PJSIP_MOD_PRIORITY_APPLICATION,
- .on_tsx_state = &send_tsx_on_tsx_state,
-};
-
-/*! \brief This is the pjsip_tsx_send_msg callback */
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event)
-{
- struct tsx_data *tsx_data;
-
- if (event->type != PJSIP_EVENT_TSX_STATE) {
- return;
- }
-
- tsx_data = (struct tsx_data*) tsx->mod_data[send_tsx_module.id];
- if (tsx_data == NULL) {
- return;
- }
-
- if (tsx->status_code < 200) {
- return;
- }
-
- if (event->body.tsx_state.type == PJSIP_EVENT_TIMER) {
- ast_debug(1, "PJSIP tsx timer expired\n");
- }
-
- if (tsx_data->timeout_timer && tsx_data->timeout_timer->id != TIMER_INACTIVE) {
- pj_mutex_lock(tsx->mutex_b);
- pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
- tsx_data->timeout_timer, TIMER_INACTIVE);
- pj_mutex_unlock(tsx->mutex_b);
- }
-
- /* Call the callback, if any, and prevent the callback from being called again
- * by clearing the transaction's module_data.
- */
- tsx->mod_data[send_tsx_module.id] = NULL;
-
- if (tsx_data->cb) {
- (*tsx_data->cb)(tsx_data->token, event);
- }
-}
-
-static void tsx_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
-{
- struct tsx_data *tsx_data = entry->user_data;
-
- entry->id = TIMER_INACTIVE;
- ast_debug(1, "Internal tsx timer expired\n");
- pjsip_tsx_terminate(tsx_data->tsx, PJSIP_SC_TSX_TIMEOUT);
-}
-
-static pj_status_t endpt_send_transaction(pjsip_endpoint *endpt,
- pjsip_tx_data *tdata, int timeout, void *token,
- pjsip_endpt_send_callback cb)
-{
- pjsip_transaction *tsx;
- struct tsx_data *tsx_data;
- pj_status_t status;
- pjsip_event event;
-
- ast_assert(endpt && tdata);
-
- status = pjsip_tsx_create_uac(&send_tsx_module, tdata, &tsx);
- if (status != PJ_SUCCESS) {
- pjsip_tx_data_dec_ref(tdata);
- ast_log(LOG_ERROR, "Unable to create pjsip uac\n");
- return status;
- }
-
- tsx_data = PJ_POOL_ALLOC_T(tsx->pool, struct tsx_data);
- tsx_data->token = token;
- tsx_data->cb = cb;
- tsx_data->tsx = tsx;
- if (timeout > 0) {
- tsx_data->timeout_timer = PJ_POOL_ALLOC_T(tsx->pool, pj_timer_entry);
- } else {
- tsx_data->timeout_timer = NULL;
- }
- tsx->mod_data[send_tsx_module.id] = tsx_data;
-
- PJSIP_EVENT_INIT_TX_MSG(event, tdata);
- pjsip_tx_data_set_transport(tdata, &tsx->tp_sel);
-
- if (timeout > 0) {
- pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
-
- pj_timer_entry_init(tsx_data->timeout_timer, TIMEOUT_TIMER2,
- tsx_data, &tsx_timer_callback);
- pj_mutex_lock(tsx->mutex_b);
- pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
- tsx_data->timeout_timer, TIMER_INACTIVE);
- pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(tsx->endpt),
- tsx_data->timeout_timer, &timeout_timer_val);
- tsx_data->timeout_timer->id = TIMEOUT_TIMER2;
- pj_mutex_unlock(tsx->mutex_b);
- }
-
- status = (*tsx->state_handler)(tsx, &event);
- pjsip_tx_data_dec_ref(tdata);
- if (status != PJ_SUCCESS) {
- ast_log(LOG_ERROR, "Unable to send message\n");
- return status;
- }
-
- return status;
-}
-
/*! \brief Structure to hold information about an outbound request */
struct send_request_data {
/*! The endpoint associated with this request */
@@ -3010,40 +2890,211 @@
void (*callback)(void *token, pjsip_event *e);
/*! Non-zero when the callback is called. */
unsigned int cb_called;
+ /*! Timeout timer. */
+ pj_timer_entry *timeout_timer;
+ /*! Original timeout. */
+ pj_int32_t timeout;
+ /*! Timeout/cleanup lock. */
+ pj_mutex_t *lock;
+ /*! The transmit data. */
+ pjsip_tx_data *tdata;
};
-static void endpt_send_request_wrapper(void *token, pjsip_event *e)
+/*! \internal This function gets called by pjsip when the transaction ends,
+ * even if it timed out. The lock prevents a race condition if both the pjsip
+ * transaction timer and our own timer expire simultaneously.
+ */
+static void endpt_send_request_cb(void *token, pjsip_event *e)
{
struct send_request_wrapper *req_wrapper = token;
- req_wrapper->cb_called = 1;
- if (req_wrapper->callback) {
- req_wrapper->callback(req_wrapper->token, e);
+ if (e->body.tsx_state.type == PJSIP_EVENT_TIMER) {
+ ast_debug(2, "%p: PJSIP tsx timer expired\n", req_wrapper);
+
+ if (req_wrapper->timeout_timer
+ && req_wrapper->timeout_timer->id != TIMEOUT_TIMER2) {
+ ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+ ao2_ref(req_wrapper, -1);
+ return;
+ }
+ } else {
+ ast_debug(2, "%p: PJSIP tsx response received\n", req_wrapper);
}
+
+ pj_mutex_lock(req_wrapper->lock);
+
+ /* It's possible that our own timer was already processing while
+ * we were waiting on the lock so check the timer id. If it's
+ * still TIMER2 then we still need to process.
+ */
+ if (req_wrapper->timeout_timer
+ && req_wrapper->timeout_timer->id == TIMEOUT_TIMER2) {
+ int timers_cancelled = 0;
+
+ ast_debug(3, "%p: Cancelling timer\n", req_wrapper);
+
+ timers_cancelled = pj_timer_heap_cancel_if_active(
+ pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()),
+ req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+ if (timers_cancelled > 0) {
+ /* If the timer was cancelled the callback will never run so
+ * clean up its reference to the wrapper.
+ */
+ ast_debug(3, "%p: Timer cancelled\n", req_wrapper);
+ ao2_ref(req_wrapper, -1);
+ } else {
+ /* If it wasn't cancelled, it MAY be in the callback already
+ * waiting on the lock so set the id to INACTIVE so
+ * when the callback comes out of the lock, it knows to not
+ * proceed.
+ */
+ ast_debug(3, "%p: Timer already expired\n", req_wrapper);
+ req_wrapper->timeout_timer->id = TIMER_INACTIVE;
+ }
+ }
+
+ /* It's possible that our own timer expired and called the callbacks
+ * so no need to call them again.
+ */
+ if (!req_wrapper->cb_called && req_wrapper->callback) {
+ req_wrapper->callback(req_wrapper->token, e);
+ req_wrapper->cb_called = 1;
+ ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
+ }
+ pj_mutex_unlock(req_wrapper->lock);
ao2_ref(req_wrapper, -1);
}
+/*! \internal This function gets called by our own timer when it expires.
+ * If the timer is cancelled however, the function does NOT get called.
+ * The lock prevents a race condition if both the pjsip transaction timer
+ * and our own timer expire simultaneously.
+ */
+static void send_request_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
+{
+ pjsip_event event;
+ struct send_request_wrapper *req_wrapper = entry->user_data;
+
+ ast_debug(2, "%p: Internal tsx timer expired after %d msec\n",
+ req_wrapper, req_wrapper->timeout);
+
+ pj_mutex_lock(req_wrapper->lock);
+ /* If the id is not TIMEOUT_TIMER2 then the timer was cancelled above
+ * while the lock was being held so just clean up.
+ */
+ if (entry->id != TIMEOUT_TIMER2) {
+ pj_mutex_unlock(req_wrapper->lock);
+ ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+ ao2_ref(req_wrapper, -1);
+ return;
+ }
+
+ ast_debug(3, "%p: Timer handled here\n", req_wrapper);
+
+ PJSIP_EVENT_INIT_TX_MSG(event, req_wrapper->tdata);
+ event.body.tsx_state.type = PJSIP_EVENT_TIMER;
+ entry->id = TIMER_INACTIVE;
+
+ if (!req_wrapper->cb_called && req_wrapper->callback) {
+ req_wrapper->callback(req_wrapper->token, &event);
+ req_wrapper->cb_called = 1;
+ ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
+ }
+
+ pj_mutex_unlock(req_wrapper->lock);
+ ao2_ref(req_wrapper, -1);
+}
+
+static void send_request_wrapper_destructor(void *obj)
+{
+ struct send_request_wrapper *req_wrapper = obj;
+
+ pj_mutex_destroy(req_wrapper->lock);
+ pjsip_tx_data_dec_ref(req_wrapper->tdata);
+ ast_debug(2, "%p: wrapper destroyed\n", req_wrapper);
+}
+
static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint,
- pjsip_tx_data *tdata, int timeout, void *token, pjsip_endpt_send_callback cb)
+ pjsip_tx_data *tdata, pj_int32_t timeout, void *token, pjsip_endpt_send_callback cb)
{
struct send_request_wrapper *req_wrapper;
pj_status_t ret_val;
+ pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
/* Create wrapper to detect if the callback was actually called on an error. */
- req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), NULL,
+ req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), send_request_wrapper_destructor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!req_wrapper) {
pjsip_tx_data_dec_ref(tdata);
return PJ_ENOMEM;
}
+
+ ast_debug(2, "%p: Wrapper created\n", req_wrapper);
+
req_wrapper->token = token;
req_wrapper->callback = cb;
+ req_wrapper->timeout = timeout;
+ req_wrapper->timeout_timer = NULL;
+ req_wrapper->lock = NULL;
+ req_wrapper->tdata = tdata;
+ /* Add a reference to tdata. The wrapper destructor cleans it up. */
+ pjsip_tx_data_add_ref(tdata);
- ao2_ref(req_wrapper, +1);
- ret_val = endpt_send_transaction(ast_sip_get_pjsip_endpoint(), tdata, timeout,
- req_wrapper, endpt_send_request_wrapper);
+ ret_val = pj_mutex_create_simple(tdata->pool, "tsx_timeout", &req_wrapper->lock);
if (ret_val != PJ_SUCCESS) {
char errmsg[PJ_ERR_MSG_SIZE];
+ pj_strerror(ret_val, errmsg, sizeof(errmsg));
+ ast_log(LOG_ERROR, "Error %d '%s' sending %.*s request to endpoint %s\n",
+ (int) ret_val, errmsg, (int) pj_strlen(&tdata->msg->line.req.method.name),
+ pj_strbuf(&tdata->msg->line.req.method.name),
+ endpoint ? ast_sorcery_object_get_id(endpoint) : "<unknown>");
+ pjsip_tx_data_dec_ref(tdata);
+ ao2_ref(req_wrapper, -1);
+ return PJ_ENOMEM;
+ }
+
+ pj_mutex_lock(req_wrapper->lock);
+
+ if (timeout > 0) {
+ pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
+
+ req_wrapper->timeout_timer = PJ_POOL_ALLOC_T(tdata->pool, pj_timer_entry);
+
+ ast_debug(2, "%p: Set timer to %d msec\n", req_wrapper, timeout);
+
+ pj_timer_entry_init(req_wrapper->timeout_timer, TIMEOUT_TIMER2,
+ req_wrapper, &send_request_timer_callback);
+
+ pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+ req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+ /* We need to insure that the wrapper and tdata are available if/when the
+ * timer callback is executed.
+ */
+ ao2_ref(req_wrapper, +1);
+ pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(endpt),
+ req_wrapper->timeout_timer, &timeout_timer_val);
+
+ req_wrapper->timeout_timer->id = TIMEOUT_TIMER2;
+ } else {
+ req_wrapper->timeout_timer = NULL;
+ }
+
+ /* We need to insure that the wrapper and tdata are available when the
+ * transaction callback is executed.
+ */
+ ao2_ref(req_wrapper, +1);
+
+ ret_val = pjsip_endpt_send_request(endpt, tdata, -1, req_wrapper, endpt_send_request_cb);
+ if (ret_val != PJ_SUCCESS) {
+ char errmsg[PJ_ERR_MSG_SIZE];
+
+ if (timeout > 0) {
+ pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+ req_wrapper->timeout_timer, TIMER_INACTIVE);
+ ao2_ref(req_wrapper, -1);
+ }
/* Complain of failure to send the request. */
pj_strerror(ret_val, errmsg, sizeof(errmsg));
@@ -3065,6 +3116,7 @@
ao2_ref(req_wrapper, -1);
}
}
+ pj_mutex_unlock(req_wrapper->lock);
ao2_ref(req_wrapper, -1);
return ret_val;
}
@@ -3080,10 +3132,6 @@
int res;
switch(e->body.tsx_state.type) {
- case PJSIP_EVENT_USER:
- /* Map USER (transaction cancelled by timeout) to TIMER */
- e->body.tsx_state.type = PJSIP_EVENT_TIMER;
- break;
case PJSIP_EVENT_TRANSPORT_ERROR:
case PJSIP_EVENT_TIMER:
break;
@@ -3698,25 +3746,8 @@
return AST_MODULE_LOAD_DECLINE;
}
- if (internal_sip_register_service(&send_tsx_module)) {
- ast_log(LOG_ERROR, "Failed to initialize send request module. Aborting load\n");
- internal_sip_unregister_service(&supplement_module);
- ast_sip_destroy_distributor();
- ast_res_pjsip_destroy_configuration();
- ast_sip_destroy_global_headers();
- stop_monitor_thread();
- ast_sip_destroy_system();
- pj_pool_release(memory_pool);
- memory_pool = NULL;
- pjsip_endpt_destroy(ast_pjsip_endpoint);
- ast_pjsip_endpoint = NULL;
- pj_caching_pool_destroy(&caching_pool);
- return AST_MODULE_LOAD_DECLINE;
- }
-
if (internal_sip_initialize_outbound_authentication()) {
ast_log(LOG_ERROR, "Failed to initialize outbound authentication. Aborting load\n");
- internal_sip_unregister_service(&send_tsx_module);
internal_sip_unregister_service(&supplement_module);
ast_sip_destroy_distributor();
ast_res_pjsip_destroy_configuration();
@@ -3760,7 +3791,6 @@
ast_res_pjsip_destroy_configuration();
ast_sip_destroy_system();
ast_sip_destroy_global_headers();
- internal_sip_unregister_service(&send_tsx_module);
internal_sip_unregister_service(&supplement_module);
if (monitor_thread) {
stop_monitor_thread();
diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c
index 42f16d2..fad915f 100644
--- a/res/res_pjsip/pjsip_configuration.c
+++ b/res/res_pjsip/pjsip_configuration.c
@@ -121,9 +121,20 @@
/*! \brief Function called when stuff relating to a contact happens (created/deleted) */
static void persistent_endpoint_contact_created_observer(const void *object)
{
- char *id = ast_strdupa(ast_sorcery_object_get_id(object)), *aor = NULL;
+ char *id = ast_strdupa(ast_sorcery_object_get_id(object));
+ char *aor = NULL;
+ char *contact = NULL;
- aor = strsep(&id, ";@");
+ aor = id;
+ /* Dynamic contacts are delimited with ";@" and static ones with "@@" */
+ if ((contact = strstr(id, ";@")) || (contact = strstr(id, "@@"))) {
+ *contact = '\0';
+ contact += 2;
+ } else {
+ contact = id;
+ }
+
+ ast_verb(1, "Contact %s/%s has been created\n", aor, contact);
ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
}
@@ -144,7 +155,7 @@
contact = id;
}
- ast_verb(1, "Contact %s/%s is now Unavailable\n", aor, contact);
+ ast_verb(1, "Contact %s/%s has been deleted\n", aor, contact);
ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
}
--
To view, visit https://gerrit.asterisk.org/492
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I0810f3999cf63f3a72607bbecac36af0a957f33e
Gerrit-PatchSet: 4
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: George Joseph <george.joseph at fairview5.com>
Gerrit-Reviewer: George Joseph <george.joseph at fairview5.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-code-review
mailing list