[svn-commits] file: branch file/pimp_sip_media r381523 - in /team/file/pimp_sip_media: ./ c...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Feb 14 19:04:43 CST 2013


Author: file
Date: Thu Feb 14 19:04:32 2013
New Revision: 381523

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381523
Log:
Multiple revisions 381351,381422,381424,381432,381453,381476

........
  r381351 | mmichelson | 2013-02-13 12:58:37 -0400 (Wed, 13 Feb 2013) | 10 lines
  
  Start using the threadpool for SIP work.
  
  Operations that formerly occurred on PJSIP threads or Asterisk threads
  are now farmed out to servants instead. This takes the load off of
  the other threads and also helps to reduce contention by serializing
  like tasks.
  
  Review: https://reviewboard.asterisk.org/r/2305
........
  r381422 | mmichelson | 2013-02-14 11:33:36 -0400 (Thu, 14 Feb 2013) | 3 lines
  
  Remove some subversion properties that may have been causing automerge issues.
........
  r381424 | root | 2013-02-14 12:17:33 -0400 (Thu, 14 Feb 2013) | 30 lines
  
  Merged revisions 381366,381398 via svnmerge from 
  file:///srv/subversion/repos/asterisk/trunk
  
  ................
    r381366 | mjordan | 2013-02-13 21:49:52 -0600 (Wed, 13 Feb 2013) | 19 lines
    
    Don't throw a spurious error when using DBdeltree
    
    The function call ast_db_deltree returns the number of row deleted, or a
    negative number if it failed. DBdeltree was treating any non-zero return
    as an error, causing a spurious verbose error message to be displayed.
    
    This patch handles the return code of ast_db_deltree correctly.
    
    (closes issue ASTERISK-21070)
    Reported by: ianc
    patches:
      dbdeltree.diff uploaded by ianc (License #5955)
    ........
    
    Merged revisions 381364 from http://svn.asterisk.org/svn/asterisk/branches/1.8
    ........
    
    Merged revisions 381365 from http://svn.asterisk.org/svn/asterisk/branches/11
  ................
    r381398 | seanbright | 2013-02-14 08:45:09 -0600 (Thu, 14 Feb 2013) | 2 lines
    
    Update the name of the update_tags utility in the git mirror how-to.
  ................
........
  r381432 | root | 2013-02-14 13:18:56 -0400 (Thu, 14 Feb 2013) | 21 lines
  
  Merged revisions 381427 via svnmerge from 
  file:///srv/subversion/repos/asterisk/trunk
  
  ........
    r381427 | seanbright | 2013-02-14 11:06:02 -0600 (Thu, 14 Feb 2013) | 14 lines
    
    Use a shuffling algorithm to find unused IAX2 call numbers.
    
    While adding red-black tree containers to astobj2 in r376575, Richard pointed
    out the way chan_iax2 finds unused call numbers will prevent ao2_container
    integrity checks at runtime.
    
    This patch removes the ao2_container and instead uses fixed sized arrays and a
    modified Fisher-Yates-Durstenfeld shuffle to maintain the call number list.
    
    While the locking semantics are similar to the ao2_container implementation,
    this implementation should be faster and more memory efficient.
    
    Review: https://reviewboard.asterisk.org/r/2288/
  ........
........
  r381453 | root | 2013-02-14 15:18:52 -0400 (Thu, 14 Feb 2013) | 22 lines
  
  Merged revisions 381448 via svnmerge from 
  file:///srv/subversion/repos/asterisk/trunk
  
  ........
    r381448 | kmoore | 2013-02-14 12:47:56 -0600 (Thu, 14 Feb 2013) | 15 lines
    
    Revamp of terminal color codes
    
    The core module related to coloring terminal output was old and needed
    some love.  The main thing here was an attempt to get rid of the
    obscene number of stack-local buffers that were allocated for no other
    reason than to colorize some output.  Instead, this uses a simple trick
    to allocate several buffers within threadlocal storage, then
    automatically rotates between them, so that you can make multiple calls
    to the colorization routine within one function and not need to
    allocate multiple buffers.
    
    Review: https://reviewboard.asterisk.org/r/2241/
    Patches:
        bug.patch uploaded by Tilghman Lesher
  ........
........
  r381476 | root | 2013-02-14 16:18:47 -0400 (Thu, 14 Feb 2013) | 45 lines
  
  Merged revisions 381465,381469-381471 via svnmerge from 
  file:///srv/subversion/repos/asterisk/trunk
  
  ................
    r381465 | wedhorn | 2013-02-14 13:25:52 -0600 (Thu, 14 Feb 2013) | 12 lines
    
    Respect callerid presentation in skinny.
    
    Fix chan_skinny so that it respects callerID presentation of inbound calls to 
    device and a couple of other minor fixes: 145 packet (add OCTAL_FROM amd callerid),
    and dont send dialednumber message if protocol >= 17. 
    
    (closes issue ASTERISK-21066)
    Reported by: snuffy
    Tested by: snuffy, myself
    Patches: 
        skinny-respect-clid-restrictions-v2.diff uploaded by snuffy (license 5024)
  ................
    r381469 | rmudgett | 2013-02-14 13:52:14 -0600 (Thu, 14 Feb 2013) | 13 lines
    
    End stuck DTMF if AST_SOFTHANGUP_ASYNCGOTO because it isn't a real hangup.
    
    It doesn't hurt to check AST_SOFTHANGUP_UNBRIDGE either, but it should not
    be set outside of a bridge.
    
    (issue ASTERISK-20492)
    ........
    
    Merged revisions 381466 from http://svn.asterisk.org/svn/asterisk/branches/1.8
    ........
    
    Merged revisions 381467 from http://svn.asterisk.org/svn/asterisk/branches/11
  ................
    r381470 | wedhorn | 2013-02-14 13:55:29 -0600 (Thu, 14 Feb 2013) | 5 lines
    
    Add back sending dialnumber to skinny.
    
    Don't know why it seemed to work during testing, but it really is needed 
    for protocol v17 (and probably above).
  ................
    r381471 | wedhorn | 2013-02-14 13:58:33 -0600 (Thu, 14 Feb 2013) | 2 lines
    
    Remove extraneous stuff from r381470.
  ................
........

Merged revisions 381351,381422,381424,381432,381453,381476 from http://svn.asterisk.org/svn/asterisk/team/group/pimp_my_sip

Modified:
    team/file/pimp_sip_media/   (props changed)
    team/file/pimp_sip_media/channels/chan_gulp.c
    team/file/pimp_sip_media/include/asterisk/res_sip.h
    team/file/pimp_sip_media/include/asterisk/res_sip_session.h
    team/file/pimp_sip_media/include/asterisk/threadpool.h
    team/file/pimp_sip_media/main/threadpool.c
    team/file/pimp_sip_media/res/res_sip.c
    team/file/pimp_sip_media/res/res_sip.exports.in
    team/file/pimp_sip_media/res/res_sip/config_transport.c
    team/file/pimp_sip_media/res/res_sip/sip_options.c
    team/file/pimp_sip_media/res/res_sip_session.c

Propchange: team/file/pimp_sip_media/
------------------------------------------------------------------------------
--- pimp-integrated (original)
+++ pimp-integrated Thu Feb 14 19:04:32 2013
@@ -1,1 +1,1 @@
-/team/group/pimp_my_sip:1-381346
+/team/group/pimp_my_sip:1-381522

Modified: team/file/pimp_sip_media/channels/chan_gulp.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/channels/chan_gulp.c?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/channels/chan_gulp.c (original)
+++ team/file/pimp_sip_media/channels/chan_gulp.c Thu Feb 14 19:04:32 2013
@@ -141,18 +141,6 @@
 	.update_peer = gulp_set_rtp_peer,
 };
 
-static void pj_thread_register_check(void)
-{
-	pj_thread_desc desc;
-	pj_thread_t *thread;
-
-	if (pj_thread_is_registered() == PJ_TRUE) {
-		return;
-	}
-
-	pj_thread_register("Asterisk Thread", desc, &thread);
-}
-
 /*! \brief Function called to create a new Gulp Asterisk channel */
 static struct ast_channel *gulp_new(struct ast_sip_session *session, int state, const char *exten, const char *title, const char *linkedid, const char *cid_name)
 {
@@ -190,12 +178,24 @@
 	return chan;
 }
 
+static int answer(void *data)
+{
+	pj_status_t status;
+	pjsip_tx_data *packet;
+	struct ast_sip_session *session = data;
+
+	if ((status = pjsip_inv_answer(session->inv_session, 200, NULL, NULL, &packet)) == PJ_SUCCESS) {
+		ast_sip_session_send_response(session, packet);
+	}
+
+	ao2_ref(session, -1);
+	return (status == PJ_SUCCESS) ? 0 : -1;
+}
+
 /*! \brief Function called by core when we should answer a Gulp session */
 static int gulp_answer(struct ast_channel *ast)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pj_status_t status;
-	pjsip_tx_data *packet;
 
 	if (ast_channel_state(ast) == AST_STATE_UP) {
 		return 0;
@@ -203,13 +203,13 @@
 
 	ast_setstate(ast, AST_STATE_UP);
 
-	pj_thread_register_check();
-
-	if ((status = pjsip_inv_answer(session->inv_session, 200, NULL, NULL, &packet)) == PJ_SUCCESS) {
-		ast_sip_session_send_response(session, packet);
-	}
-
-	return (status == PJ_SUCCESS) ? 0 : -1;
+	ao2_ref(session, +1);
+	if (ast_sip_push_task(session->work, answer, session)) {
+		ast_log(LOG_WARNING, "Unable to push answer task to the threadpool. Cannot answer call\n");
+		ao2_cleanup(session);
+		return -1;
+	}
+	return 0;
 }
 
 /*! \brief Function called by core to read any waiting frames */
@@ -272,17 +272,86 @@
 	return res;
 }
 
+struct fixup_data {
+	struct ast_sip_session *session;
+	struct ast_channel *chan;
+};
+
+static int fixup(void *data)
+{
+	struct fixup_data *fix_data = data;
+	fix_data->session->channel = fix_data->chan;
+	return 0;
+}
+
 /*! \brief Function called by core to change the underlying owner channel */
 static int gulp_fixup(struct ast_channel *oldchan, struct ast_channel *newchan)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(newchan);
+	struct fixup_data fix_data;
+	fix_data.session = session;
+	fix_data.chan = newchan;
 
 	if (session->channel != oldchan) {
 		return -1;
 	}
 
-	session->channel = newchan;
-
+	if (ast_sip_push_task_synchronous(session->work, fixup, &fix_data)) {
+		ast_log(LOG_WARNING, "Unable to perform channel fixup\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+struct indicate_data {
+	struct ast_sip_session *session;
+	int condition;
+	int response_code;
+	void *frame_data;
+	size_t datalen;
+};
+
+static void indicate_data_destroy(void *obj)
+{
+	struct indicate_data *ind_data = obj;
+	ast_free(ind_data->frame_data);
+	ao2_ref(ind_data->session, -1);
+}
+
+static struct indicate_data *indicate_data_alloc(struct ast_sip_session *session,
+		int condition, int response_code, const void *frame_data, size_t datalen)
+{
+	struct indicate_data *ind_data = ao2_alloc(sizeof(*ind_data), indicate_data_destroy);
+	if (!ind_data) {
+		return NULL;
+	}
+	ind_data->frame_data = ast_malloc(datalen);
+	if (!ind_data->frame_data) {
+		ao2_ref(ind_data, -1);
+		return NULL;
+	}
+	memcpy(ind_data->frame_data, frame_data, datalen);
+	ind_data->datalen = datalen;
+	ind_data->condition = condition;
+	ind_data->response_code = response_code;
+	ao2_ref(session, +1);
+	ind_data->session = session;
+	return ind_data;
+}
+
+static int indicate(void *data)
+{
+	struct indicate_data *ind_data = data;
+	struct ast_sip_session *session = ind_data->session;
+	int response_code = ind_data->response_code;
+	pjsip_tx_data *packet = NULL;
+
+	if (pjsip_inv_answer(session->inv_session, response_code, NULL, NULL, &packet) == PJ_SUCCESS) {
+		ast_sip_session_send_response(session, packet);
+	}
+
+	ao2_ref(ind_data, -1);
 	return 0;
 }
 
@@ -291,50 +360,47 @@
 {
 	int res = 0;
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pj_status_t status = -1;
-	pjsip_tx_data *packet = NULL;
-
-	pj_thread_register_check();
+	int response_code = 0;
 
 	switch (condition) {
 	case AST_CONTROL_RINGING:
 		if (ast_channel_state(ast) == AST_STATE_RING) {
-			status = pjsip_inv_answer(session->inv_session, 180, NULL, NULL, &packet);
+			response_code = 180;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_BUSY:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 486, NULL, NULL, &packet);
+			response_code = 486;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_CONGESTION:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 503, NULL, NULL, &packet);
+			response_code = 503;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_INCOMPLETE:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 484, NULL, NULL, &packet);
+			response_code = 484;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_PROCEEDING:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 100, NULL, NULL, &packet);
+			response_code = 100;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_PROGRESS:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 183, NULL, NULL, &packet);
+			response_code = 183;
 		} else {
 			res = -1;
 		}
@@ -362,8 +428,18 @@
 		break;
 	}
 
-	if (packet && (status == PJ_SUCCESS)) {
-		ast_sip_session_send_response(session, packet);
+	if (!res && response_code) {
+		struct indicate_data *ind_data = indicate_data_alloc(session, condition, response_code, data, datalen);
+		if (ind_data) {
+			res = ast_sip_push_task(session->work, indicate, ind_data);
+			if (res) {
+				ast_log(LOG_NOTICE, "Cannot send response code %d to endpoint %s. Could queue task properly\n",
+						response_code, ast_sorcery_object_get_id(session->endpoint));
+				ao2_cleanup(ind_data);
+			}
+		} else {
+			res = -1;
+		}
 	}
 
 	return res;
@@ -416,20 +492,32 @@
 	return res;
 }
 
+static int call(void *data)
+{
+	struct ast_sip_session *session = data;
+	pjsip_tx_data *packet;
+
+	if (pjsip_inv_invite(session->inv_session, &packet) != PJ_SUCCESS) {
+		return -1;
+	}
+
+	ast_sip_session_send_request(session, packet);
+
+	ao2_ref(session, -1);
+	return 0;
+}
+
 /*! \brief Function called by core to actually start calling a remote party */
 static int gulp_call(struct ast_channel *ast, const char *dest, int timeout)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pjsip_tx_data *packet;
-
-	pj_thread_register_check();
-
-	if (pjsip_inv_invite(session->inv_session, &packet) != PJ_SUCCESS) {
+
+	ao2_ref(session, +1);
+	if (ast_sip_push_task_synchronous(session->work, call, session)) {
+		ast_log(LOG_WARNING, "Error attempting to place outbound call to call '%s'\n", dest);
+		ao2_cleanup(session);
 		return -1;
 	}
-
-	ast_sip_session_send_request(session, packet);
-
 	return 0;
 }
 
@@ -481,15 +569,36 @@
 	return 0;
 }
 
-/*! \brief Function called by core to hang up a Gulp session */
-static int gulp_hangup(struct ast_channel *ast)
-{
-	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+struct hangup_data {
+	int cause;
+	struct ast_channel *chan;
+};
+
+static void hangup_data_destroy(void *obj)
+{
+	struct hangup_data *h_data = obj;
+	h_data->chan = ast_channel_unref(h_data->chan);
+}
+
+static struct hangup_data *hangup_data_alloc(int cause, struct ast_channel *chan)
+{
+	struct hangup_data *h_data = ao2_alloc(sizeof(*h_data), hangup_data_destroy);
+	if (!h_data) {
+		return NULL;
+	}
+	h_data->cause = cause;
+	h_data->chan = ast_channel_ref(chan);
+	return h_data;
+}
+
+static int hangup(void *data)
+{
 	pj_status_t status;
 	pjsip_tx_data *packet = NULL;
-	int cause = hangup_cause2sip(ast_channel_hangupcause(session->channel));
-
-	pj_thread_register_check();
+	struct hangup_data *h_data = data;
+	struct ast_channel *ast = h_data->chan;
+	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+	int cause = h_data->cause;
 
 	if (((status = pjsip_inv_end_session(session->inv_session, cause ? cause : 603, NULL, &packet)) == PJ_SUCCESS) && packet) {
 		if (packet->msg->type == PJSIP_RESPONSE_MSG) {
@@ -503,18 +612,52 @@
 	ast_channel_tech_pvt_set(ast, NULL);
 
 	ao2_cleanup(session);
-
-	return (status == PJ_SUCCESS) ? 0 : -1;
-}
-
-/*! \brief Function called by core to create a new outgoing Gulp session */
-static struct ast_channel *gulp_request(const char *type, struct ast_format_cap *cap, const struct ast_channel *requestor, const char *data, int *cause)
-{
+	ao2_cleanup(h_data);
+	return 0;
+}
+
+/*! \brief Function called by core to hang up a Gulp session */
+static int gulp_hangup(struct ast_channel *ast)
+{
+	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+	int cause = hangup_cause2sip(ast_channel_hangupcause(session->channel));
+	struct hangup_data *h_data = hangup_data_alloc(cause, ast);
+	if (!h_data) {
+		goto failure;
+	}
+
+	if (ast_sip_push_task(session->work, hangup, h_data)) {
+		ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
+		goto failure;
+	}
+	return 0;
+
+failure:
+	/* Go ahead and do our cleanup of the session and channel even if we're not going
+	 * to be able to send our SIP request/response
+	 */
+	ao2_cleanup(h_data);
+	session->channel = NULL;
+	ast_channel_tech_pvt_set(ast, NULL);
+
+	ao2_cleanup(session);
+	return -1;
+}
+
+struct request_data {
+	struct ast_sip_session *session;
+	struct ast_sip_work *work;
+	const char *dest;
+};
+
+static int request(void *obj)
+{
+	struct request_data *req_data = obj;
 	struct ast_sip_endpoint *endpoint = ast_sip_endpoint_alloc("constant");
 	struct ast_sip_session *session = NULL;
 
 	if (!endpoint) {
-		return NULL;
+		return -1;
 	}
 
 	/* TODO: This needs to actually grab a proper endpoint and such */
@@ -523,11 +666,33 @@
 	endpoint->min_se = 90;
 	endpoint->sess_expires = 1800;
 
-	pj_thread_register_check();
-
-	if (!(session = ast_sip_session_create_outgoing(endpoint, data))) {
+	if (!(session = ast_sip_session_create_outgoing(endpoint, req_data->dest, req_data->work))) {
+		return -1;
+	}
+
+	req_data->session = session;
+	return 0;
+}
+
+/*! \brief Function called by core to create a new outgoing Gulp session */
+static struct ast_channel *gulp_request(const char *type, struct ast_format_cap *cap, const struct ast_channel *requestor, const char *data, int *cause)
+{
+	struct ast_sip_work *work = ast_sip_create_work();
+	struct request_data req_data;
+	struct ast_sip_session *session;
+	if (!work) {
 		return NULL;
 	}
+
+	req_data.dest = data;
+	req_data.work = work;
+
+	if (ast_sip_push_task_synchronous(work, request, &req_data)) {
+		ast_sip_destroy_work(work);
+		return NULL;
+	}
+
+	session = req_data.session;
 
 	if (!(session->channel = gulp_new(session, AST_STATE_DOWN, NULL, NULL, requestor ? ast_channel_linkedid(requestor) : NULL, NULL))) {
 		/* Session needs to be terminated prematurely */

Modified: team/file/pimp_sip_media/include/asterisk/res_sip.h
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/include/asterisk/res_sip.h?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/include/asterisk/res_sip.h (original)
+++ team/file/pimp_sip_media/include/asterisk/res_sip.h Thu Feb 14 19:04:32 2013
@@ -455,6 +455,63 @@
 int ast_sip_initialize_sorcery_transport(struct ast_sorcery *sorcery);
 
 /*!
+ * \page Threading model for SIP
+ *
+ * There are three major types of threads that SIP will have to deal with:
+ * \li Asterisk threads
+ * \li PJSIP threads
+ * \li SIP threadpool threads (a.k.a. "servants")
+ *
+ * \par Asterisk Threads
+ *
+ * Asterisk threads are those that originate from outside of SIP but within
+ * Asterisk. The most common of these threads are PBX (channel) threads and
+ * the autoservice thread. Most interaction with these threads will be through
+ * channel technology callbacks. Within these threads, it is fine to handle
+ * Asterisk data from outside of SIP, but any handling of SIP data should be
+ * left to servants, \b especially if you wish to call into PJSIP for anything.
+ * Asterisk threads are not registered with PJLIB, so attempting to call into
+ * PJSIP will cause an assertion to be triggered, thus causing the program to
+ * crash.
+ *
+ * \par PJSIP Threads
+ *
+ * PJSIP threads are those that originate from handling of PJSIP events, such
+ * as an incoming SIP request or response, or a transaction timeout. The role
+ * of these threads is to process information as quickly as possible. When your
+ * code gets called into from one of these threads, your goal should be to do
+ * as little as possible before handing the majority of processing off to a
+ * servant. Operations such as remote procedure calls or DNS lookups are never
+ * to be done in these threads since it can cause performance issues.
+ *
+ * \par Servants
+ *
+ * Servants are where the bulk of SIP work should be performed. These threads
+ * exist in order to do the work that Asterisk threads and PJSIP threads hand
+ * off to them. Servant threads register themselves with PJLIB, meaning that
+ * they are capable of calling PJSIP and PJLIB functions if they wish. 
+ *
+ * \par ast_sip_work
+ *
+ * Tasks are handed off to servant threads using the API call \ref ast_sip_push_task.
+ * The first parameter of this call is an \ref ast_sip_work pointer. If this pointer
+ * is NULL, then the work will be handed off to whatever servant can currently handle
+ * the task. If this pointer is non-NULL, then the task will not be executed until
+ * previous tasks pushed with the same \ref ast_sip_work have completed. In other words,
+ * an \ref ast_sip_work is a method of serializing tasks pushed to servants. This can
+ * have several benefits
+ * \li Tasks are executed in the same order they were pushed to servants
+ * \li Reduced contention for shared resources
+ *
+ * \note
+ *
+ * Do not make assumptions about individual threads based on corresponding \ref ast_sip_work.
+ * In other words, just because several tasks use the same \ref ast_sip_work when being pushed
+ * to servants, it does not mean that the same thread is necessarily going to execute those
+ * tasks, even though they are all guaranteed to be executed in sequence.
+ */
+
+/*!
  * \brief Initialize authentication support on a sorcery instance
  *
  * \param sorcery The sorcery instance
@@ -465,12 +522,13 @@
 int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
 
 /*!
+>>>>>>> .merge-right.r381346
  * \brief Create a new SIP work structure
  *
  * A SIP work is a means of grouping together SIP tasks. For instance, one
  * might create a SIP work so that all tasks for a given SIP dialog will
  * be grouped together. Grouping the work together ensures that the
- * threadpool will distribute the tasks in such a way so that grouped work
+ * servants will execute the tasks in such a way so that grouped work
  * will execute sequentially. Executing grouped tasks sequentially means
  * less contention for shared resources.
  *
@@ -487,17 +545,37 @@
 void ast_sip_destroy_work(struct ast_sip_work *work);
  
 /*!
- * \brief Pushes a task into the SIP threadpool
+ * \brief Pushes a task to SIP servants
  *
  * This uses the SIP work provided to determine how to push the task.
- *
- * \param work The SIP work to which the task belongs
+ * If the work param is NULL, then the task will be pushed to the
+ * servants directly. If the work is non-NULL, then the task will be
+ * queued behind other tasks associated with the work.
+ *
+ * \param work The SIP work to which the task belongs. Can be NULL
  * \param sip_task The task to execute
  * \param task_data The parameter to pass to the task when it executes
  * \retval 0 Success
  * \retval -1 Failure
  */
 int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
+
+/*!
+ * \brief Push a task to SIP servants and wait for it to complete
+ *
+ * Like \ref ast_sip_push_task except that it blocks until the task completes.
+ *
+ * \warning \b Never use this function in a SIP servant thread. This can potentially
+ * cause a deadlock. If you are in a SIP servant thread, just call your function
+ * in-line.
+ *
+ * \param work The SIP work to which the task belongs. May be NULL.
+ * \param sip_task The task to execute
+ * \param task_data The parameter to pass to the task when it executes
+ * \retval 0 Success
+ * \retval -1 Failure
+ */
+int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
 
 /*!
  * \brief SIP body description

Modified: team/file/pimp_sip_media/include/asterisk/res_sip_session.h
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/include/asterisk/res_sip_session.h?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/include/asterisk/res_sip_session.h (original)
+++ team/file/pimp_sip_media/include/asterisk/res_sip_session.h Thu Feb 14 19:04:32 2013
@@ -92,28 +92,64 @@
  * These can be registered by any module in order to add
  * processing to incoming and outgoing SIP requests and responses
  */
-struct ast_sip_session_supplement{
-	/*! Method on which to call the callbacks. If NULL, call on all methods */
-	const char *method;
-	/*! Notification that the session has begun */
-	void (*session_begin)(struct ast_sip_session *session);
-	/*! Notification that the session has ended */
-	void (*session_end)(struct ast_sip_session *session);
-	/*! Notification that the session is being destroyed */
+struct ast_sip_session_supplement {
+    /*! Method on which to call the callbacks. If NULL, call on all methods */
+    const char *method;
+    /*!
+	 * \brief Notification that the session has begun
+	 * This method will always be called from a SIP servant thread.
+	 */
+    void (*session_begin)(struct ast_sip_session *session);
+    /*! 
+	 * \brief Notification that the session has ended
+	 *
+	 * This method may or may not be called from a SIP servant thread. Do
+	 * not make assumptions about being able to call PJSIP methods from within
+	 * this method.
+	 */
+    void (*session_end)(struct ast_sip_session *session);
+	/*!
+	 * \brief Notification that the session is being destroyed
+	 */
 	void (*session_destroy)(struct ast_sip_session *session);
-	/*!
-	 * \brief Called on incoming SIP request
-	 * This method can indicate a failure in processing in its return. If there
-	 * is a failure, it is required that this method sends a response to the request.
-
-	 */
-	int (*incoming_request)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
-	/*! Called on an incoming SIP response */
-	void (*incoming_response)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
-	/*! Called on an outgoing SIP request */
-	void (*outgoing_request)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
-	/*! Called on an outgoing SIP response */
-	void (*outgoing_response)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
+    /*!
+     * \brief Called on incoming SIP request
+     * This method can indicate a failure in processing in its return. If there
+     * is a failure, it is required that this method sends a response to the request.
+	 * This method is always called from a SIP servant thread.
+	 *
+	 * \note
+	 * The following PJSIP methods will not work properly:
+	 * pjsip_rdata_get_dlg()
+	 * pjsip_rdata_get_tsx()
+	 * The reason is that the rdata passed into this function is a cloned rdata structure,
+	 * and its module data is not copied during the cloning operation.
+	 * If you need to get the dialog, you can get it via session->inv_session->dlg.
+     */
+    int (*incoming_request)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
+    /*! 
+	 * \brief Called on an incoming SIP response
+	 * This method is always called from a SIP servant thread.
+	 *
+	 * \note
+	 * The following PJSIP methods will not work properly:
+	 * pjsip_rdata_get_dlg()
+	 * pjsip_rdata_get_tsx()
+	 * The reason is that the rdata passed into this function is a cloned rdata structure,
+	 * and its module data is not copied during the cloning operation.
+	 * If you need to get the dialog, you can get it via session->inv_session->dlg.
+	 */
+    void (*incoming_response)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
+    /*!
+	 * \brief Called on an outgoing SIP request
+	 * This method is always called from a SIP servant thread.
+	 */
+    void (*outgoing_request)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
+    /*! 
+	 * \brief Called on an outgoing SIP response
+	 * This method is always called from a SIP servant thread.
+	 */
+    void (*outgoing_response)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
 	/*! Next item in the list */
 	AST_LIST_ENTRY(ast_sip_session_supplement) next;
 };
@@ -179,16 +215,18 @@
  * as placing all registered supplements onto the session.
  * \param endpoint The endpoint that this session communicates with
  * \param inv_session The PJSIP INVITE session data
- */
-struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv);
+ * \param work SIP work queue to use for this session. May be NULL.
+ */
+struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv, struct ast_sip_work *work);
 
 /*!
  * \brief Create a new outgoing SIP session
  *
  * \param endpoint The endpoint that this session uses for settings
  * \param uri The URI to call
- */
-struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri);
+ * \param work SIP work queue to use for this session. May be NULL.
+ */
+struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri, struct ast_sip_work *work);
 
 /*!
  * \brief Register an SDP handler

Modified: team/file/pimp_sip_media/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/include/asterisk/threadpool.h?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/include/asterisk/threadpool.h (original)
+++ team/file/pimp_sip_media/include/asterisk/threadpool.h Thu Feb 14 19:04:32 2013
@@ -105,6 +105,20 @@
 	 * beyond this number of threads.
 	 */
 	int max_size;
+	/*!
+	 * \brief Function to call when a thread starts
+	 *
+	 * This is useful if there is something common that all threads
+	 * in a threadpool need to do when they start.
+	 */
+	void (*thread_start)(void);
+	/*!
+	 * \brief Function to call when a thread ends
+	 *
+	 * This is useful if there is common cleanup to execute when
+	 * a thread completes
+	 */
+	void (*thread_end)(void);
 };
 
 /*!

Modified: team/file/pimp_sip_media/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/main/threadpool.c?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/main/threadpool.c (original)
+++ team/file/pimp_sip_media/main/threadpool.c Thu Feb 14 19:04:32 2013
@@ -984,7 +984,13 @@
 {
 	struct worker_thread *worker = arg;
 
+	if (worker->options.thread_start) {
+		worker->options.thread_start();
+	}
 	worker_active(worker);
+	if (worker->options.thread_end) {
+		worker->options.thread_end();
+	}
 	return NULL;
 }
 

Modified: team/file/pimp_sip_media/res/res_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/res/res_sip.c?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/res/res_sip.c (original)
+++ team/file/pimp_sip_media/res/res_sip.c Thu Feb 14 19:04:32 2013
@@ -46,29 +46,42 @@
 
 static struct ast_threadpool *sip_threadpool;
 
-int ast_sip_register_service(pjsip_module *module)
-{
+static int register_service(void *data)
+{
+	pjsip_module **module = data;
 	if (!ast_pjsip_endpoint) {
 		ast_log(LOG_ERROR, "There is no PJSIP endpoint. Unable to register services\n");
 		return -1;
 	}
-	if (pjsip_endpt_register_module(ast_pjsip_endpoint, module) != PJ_SUCCESS) {
-		ast_log(LOG_ERROR, "Unable to register module %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
-		return -1;
-	}
-	ast_debug(1, "Registered SIP service %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
+	if (pjsip_endpt_register_module(ast_pjsip_endpoint, *module) != PJ_SUCCESS) {
+		ast_log(LOG_ERROR, "Unable to register module %.*s\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name));
+		return -1;
+	}
+	ast_debug(1, "Registered SIP service %.*s (%p)\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name), *module);
 	ast_module_ref(ast_module_info->self);
 	return 0;
 }
 
-void ast_sip_unregister_service(pjsip_module *module)
-{
+int ast_sip_register_service(pjsip_module *module)
+{
+	return ast_sip_push_task_synchronous(NULL, register_service, &module);
+}
+
+static int unregister_service(void *data)
+{
+	pjsip_module **module = data;
 	ast_module_unref(ast_module_info->self);
 	if (!ast_pjsip_endpoint) {
-		return;
-	}
-	pjsip_endpt_unregister_module(ast_pjsip_endpoint, module);
-	ast_debug(1, "Unregistered SIP service %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
+		return -1;
+	}
+	pjsip_endpt_unregister_module(ast_pjsip_endpoint, *module);
+	ast_debug(1, "Unregistered SIP service %.*s\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name));
+	return 0;
+}
+
+void ast_sip_unregister_service(pjsip_module *module)
+{
+	ast_sip_push_task_synchronous(NULL, unregister_service, &module);
 }
 
 static struct ast_sip_authenticator *registered_authenticator;
@@ -337,13 +350,19 @@
 static int execute_tasks(void *data)
 {
 	struct ast_sip_work *work = data;
-	while (ast_taskprocessor_execute(work->queue));
+	while (ast_taskprocessor_execute(work->queue)) {
+		/* Empty on purpose */
+	}
+	ao2_cleanup(work);
 	return 0;
 }
 
 static void work_queue_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
-	struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
-	ast_threadpool_push(sip_threadpool, execute_tasks, work);
+	if (was_empty) {
+		struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
+		ao2_ref(work, +1);
+		ast_threadpool_push(sip_threadpool, execute_tasks, work);
+	}
 };
 
 static int work_queue_start(struct ast_taskprocessor_listener *listener)
@@ -363,9 +382,15 @@
 	.shutdown = work_queue_shutdown,
 };
 
+static void work_destroy(void *obj)
+{
+	struct ast_sip_work *work = obj;
+	ast_taskprocessor_unreference(work->queue);
+}
+
 struct ast_sip_work *ast_sip_create_work(void)
 {
-	struct ast_sip_work *work = ast_calloc(1, sizeof(*work));
+	struct ast_sip_work *work = ao2_alloc(sizeof(*work), work_destroy);
 	struct ast_uuid *uuid;
 	struct ast_taskprocessor_listener *listener;
 	char queue_name[AST_UUID_STR_LEN];
@@ -378,7 +403,7 @@
 	 */
 	uuid = ast_uuid_generate();
 	if (!uuid) {
-		ast_free(work);
+		ao2_cleanup(work);
 		return NULL;
 	}
 	ast_uuid_to_str(uuid, queue_name, sizeof(queue_name));
@@ -387,7 +412,7 @@
 	work->queue = ast_taskprocessor_create_with_listener(queue_name, listener);
 	ao2_cleanup(listener);
 	if (!work->queue) {
-		ast_free(work);
+		ao2_cleanup(work);
 		return NULL;
 	}
 	return work;
@@ -395,13 +420,68 @@
 
 void ast_sip_destroy_work(struct ast_sip_work *work)
 {
-	ast_taskprocessor_unreference(work->queue);
-	ast_free(work);
+	ao2_cleanup(work);
 }
 
 int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
 {
-	return ast_taskprocessor_push(work->queue, sip_task, task_data);
+	if (work) {
+		return ast_taskprocessor_push(work->queue, sip_task, task_data);
+	} else {
+		return ast_threadpool_push(sip_threadpool, sip_task, task_data);
+	}
+}
+
+struct sync_task_data {
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int complete;
+	int fail;
+	int (*task)(void *);
+	void *task_data;
+};
+
+static int sync_task(void *data)
+{
+	struct sync_task_data *std = data;
+	std->fail = std->task(std->task_data);
+
+	ast_mutex_lock(&std->lock);
+	std->complete = 1;
+	ast_cond_signal(&std->cond);
+	ast_mutex_unlock(&std->lock);
+	return std->fail;
+}
+
+int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
+{
+	/* This method is an onion */
+	struct sync_task_data std;
+	ast_mutex_init(&std.lock);
+	ast_cond_init(&std.cond, NULL);
+	std.fail = std.complete = 0;
+	std.task = sip_task;
+	std.task_data = task_data;
+
+	if (work) {
+		if (ast_taskprocessor_push(work->queue, sync_task, &std)) {
+			return -1;
+		}
+	} else {
+		if (ast_threadpool_push(sip_threadpool, sync_task, &std)) {
+			return -1;
+		}
+	}
+
+	ast_mutex_lock(&std.lock);
+	while (!std.complete) {
+		ast_cond_wait(&std.cond, &std.lock);
+	}
+	ast_mutex_unlock(&std.lock);
+
+	ast_mutex_destroy(&std.lock);
+	ast_cond_destroy(&std.cond);
+	return std.fail;
 }
 
 void ast_copy_pj_str(char *dest, pj_str_t *src, size_t size)
@@ -445,6 +525,25 @@
 	.priority = PJSIP_MOD_PRIORITY_APPLICATION + 32,
 	.on_rx_request = unhandled_on_rx_request,
 };
+
+AST_THREADSTORAGE(pj_thread_storage);
+
+static void sip_thread_start(void)
+{
+	pj_thread_desc *desc;
+	pj_thread_t *thread;
+
+	desc = ast_threadstorage_get(&pj_thread_storage, sizeof(pj_thread_desc));
+	if (!desc) {
+		ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage. Expect awful things to occur\n");
+		return;
+	}
+	pj_bzero(*desc, sizeof(*desc));
+
+	if (pj_thread_register("Asterisk Thread", *desc, &thread) != PJ_SUCCESS) {
+		ast_log(LOG_ERROR, "Couldn't register thread with PJLIB.\n");
+	}
+}
 
 static int load_module(void)
 {
@@ -465,6 +564,7 @@
 		.max_size = 0,
 		.idle_timeout = 60,
 		.initial_size = 0,
+		.thread_start = sip_thread_start,
 	};
 	sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
 
@@ -517,9 +617,11 @@
 	}
 	if (memory_pool) {
 		pj_pool_release(memory_pool);
+		memory_pool = NULL;
 	}
 	if (ast_pjsip_endpoint) {
 		pjsip_endpt_destroy(ast_pjsip_endpoint);
+		ast_pjsip_endpoint = NULL;
 	}
 	pj_caching_pool_destroy(&caching_pool);
 	/* XXX Should have a way of stopping monitor thread */
@@ -535,22 +637,33 @@
 	return 0;
 }
 
+static int unload_pjsip(void *data)
+{
+	if (memory_pool) {
+		pj_pool_release(memory_pool);
+		memory_pool = NULL;
+	}
+	if (ast_pjsip_endpoint) {
+		pjsip_endpt_destroy(ast_pjsip_endpoint);
+		ast_pjsip_endpoint = NULL;
+	}
+	pj_caching_pool_destroy(&caching_pool);
+	return 0;
+}
+
 static int unload_module(void)
 {
 	ast_res_sip_destroy_configuration();
 	if (monitor_thread) {
 		stop_monitor_thread();
 	}
-	if (memory_pool) {
-		/* This will cause a crash in Asterisk in debug mode, as the thread
-		 * calling this is not a pjsip thread
-		 */
-		pj_pool_release(memory_pool);
-	}
-	if (ast_pjsip_endpoint) {
-		pjsip_endpt_destroy(ast_pjsip_endpoint);
-	}
-	pj_caching_pool_destroy(&caching_pool);
+	/* The thread this is called from cannot call PJSIP/PJLIB functions,
+	 * so we have to push the work to the threadpool to handle
+	 */
+	ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL);
+
+	ast_threadpool_shutdown(sip_threadpool);
+
 	return 0;
 }
 

Modified: team/file/pimp_sip_media/res/res_sip.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/res/res_sip.exports.in?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================
--- team/file/pimp_sip_media/res/res_sip.exports.in (original)
+++ team/file/pimp_sip_media/res/res_sip.exports.in Thu Feb 14 19:04:32 2013
@@ -9,6 +9,7 @@
 		LINKER_SYMBOL_PREFIXast_sip_create_work;
 		LINKER_SYMBOL_PREFIXast_sip_destroy_work;
 		LINKER_SYMBOL_PREFIXast_sip_push_task;
+		LINKER_SYMBOL_PREFIXast_sip_push_task_synchronous;
 		LINKER_SYMBOL_PREFIXast_sip_send_request;
 		LINKER_SYMBOL_PREFIXast_sip_requires_authentication;
 		LINKER_SYMBOL_PREFIXast_sip_authenticate_request;

Modified: team/file/pimp_sip_media/res/res_sip/config_transport.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_sip_media/res/res_sip/config_transport.c?view=diff&rev=381523&r1=381522&r2=381523
==============================================================================

[... 722 lines stripped ...]



More information about the svn-commits mailing list