[svn-commits] mmichelson: branch group/pimp_my_sip r381351 - in /team/group/pimp_my_sip: ./...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Feb 13 10:58:43 CST 2013


Author: mmichelson
Date: Wed Feb 13 10:58:37 2013
New Revision: 381351

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381351
Log:
Start using the threadpool for SIP work.

Operations that formerly occurred on PJSIP threads or Asterisk threads
are now farmed out to servants instead. This takes the load off of
the other threads and also helps to reduce contention by serializing
like tasks.

Review: https://reviewboard.asterisk.org/r/2305


Modified:
    team/group/pimp_my_sip/   (props changed)
    team/group/pimp_my_sip/channels/chan_gulp.c
    team/group/pimp_my_sip/include/asterisk/res_sip.h
    team/group/pimp_my_sip/include/asterisk/res_sip_session.h
    team/group/pimp_my_sip/include/asterisk/threadpool.h
    team/group/pimp_my_sip/main/threadpool.c
    team/group/pimp_my_sip/res/res_sip.c
    team/group/pimp_my_sip/res/res_sip.exports.in
    team/group/pimp_my_sip/res/res_sip/config_transport.c
    team/group/pimp_my_sip/res/res_sip/sip_options.c
    team/group/pimp_my_sip/res/res_sip_session.c

Propchange: team/group/pimp_my_sip/
------------------------------------------------------------------------------
    automerge-propname = pool_shark-integrated

Propchange: team/group/pimp_my_sip/
------------------------------------------------------------------------------
    pool_shark-integrated = /team/group/pimp_my_sip:1-381349

Modified: team/group/pimp_my_sip/channels/chan_gulp.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/channels/chan_gulp.c?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/channels/chan_gulp.c (original)
+++ team/group/pimp_my_sip/channels/chan_gulp.c Wed Feb 13 10:58:37 2013
@@ -132,18 +132,6 @@
 	.update_peer = gulp_set_rtp_peer,
 };
 
-static void pj_thread_register_check(void)
-{
-	pj_thread_desc desc;
-	pj_thread_t *thread;
-
-	if (pj_thread_is_registered() == PJ_TRUE) {
-		return;
-	}
-
-	pj_thread_register("Asterisk Thread", desc, &thread);
-}
-
 /*! \brief Function called to create a new Gulp Asterisk channel */
 static struct ast_channel *gulp_new(struct ast_sip_session *session, int state, const char *exten, const char *title, const char *linkedid, const char *cid_name)
 {
@@ -186,12 +174,24 @@
 	return chan;
 }
 
+static int answer(void *data)
+{
+	pj_status_t status;
+	pjsip_tx_data *packet;
+	struct ast_sip_session *session = data;
+
+	if ((status = pjsip_inv_answer(session->inv_session, 200, NULL, NULL, &packet)) == PJ_SUCCESS) {
+		ast_sip_session_send_response(session, packet);
+	}
+
+	ao2_ref(session, -1);
+	return (status == PJ_SUCCESS) ? 0 : -1;
+}
+
 /*! \brief Function called by core when we should answer a Gulp session */
 static int gulp_answer(struct ast_channel *ast)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pj_status_t status;
-	pjsip_tx_data *packet;
 
 	if (ast_channel_state(ast) == AST_STATE_UP) {
 		return 0;
@@ -199,13 +199,13 @@
 
 	ast_setstate(ast, AST_STATE_UP);
 
-	pj_thread_register_check();
-
-	if ((status = pjsip_inv_answer(session->inv_session, 200, NULL, NULL, &packet)) == PJ_SUCCESS) {
-		ast_sip_session_send_response(session, packet);
-	}
-
-	return (status == PJ_SUCCESS) ? 0 : -1;
+	ao2_ref(session, +1);
+	if (ast_sip_push_task(session->work, answer, session)) {
+		ast_log(LOG_WARNING, "Unable to push answer task to the threadpool. Cannot answer call\n");
+		ao2_cleanup(session);
+		return -1;
+	}
+	return 0;
 }
 
 /*! \brief Function called by core to read any waiting frames */
@@ -268,17 +268,86 @@
 	return res;
 }
 
+struct fixup_data {
+	struct ast_sip_session *session;
+	struct ast_channel *chan;
+};
+
+static int fixup(void *data)
+{
+	struct fixup_data *fix_data = data;
+	fix_data->session->channel = fix_data->chan;
+	return 0;
+}
+
 /*! \brief Function called by core to change the underlying owner channel */
 static int gulp_fixup(struct ast_channel *oldchan, struct ast_channel *newchan)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(newchan);
+	struct fixup_data fix_data;
+	fix_data.session = session;
+	fix_data.chan = newchan;
 
 	if (session->channel != oldchan) {
 		return -1;
 	}
 
-	session->channel = newchan;
-
+	if (ast_sip_push_task_synchronous(session->work, fixup, &fix_data)) {
+		ast_log(LOG_WARNING, "Unable to perform channel fixup\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+struct indicate_data {
+	struct ast_sip_session *session;
+	int condition;
+	int response_code;
+	void *frame_data;
+	size_t datalen;
+};
+
+static void indicate_data_destroy(void *obj)
+{
+	struct indicate_data *ind_data = obj;
+	ast_free(ind_data->frame_data);
+	ao2_ref(ind_data->session, -1);
+}
+
+static struct indicate_data *indicate_data_alloc(struct ast_sip_session *session,
+		int condition, int response_code, const void *frame_data, size_t datalen)
+{
+	struct indicate_data *ind_data = ao2_alloc(sizeof(*ind_data), indicate_data_destroy);
+	if (!ind_data) {
+		return NULL;
+	}
+	ind_data->frame_data = ast_malloc(datalen);
+	if (!ind_data->frame_data) {
+		ao2_ref(ind_data, -1);
+		return NULL;
+	}
+	memcpy(ind_data->frame_data, frame_data, datalen);
+	ind_data->datalen = datalen;
+	ind_data->condition = condition;
+	ind_data->response_code = response_code;
+	ao2_ref(session, +1);
+	ind_data->session = session;
+	return ind_data;
+}
+
+static int indicate(void *data)
+{
+	struct indicate_data *ind_data = data;
+	struct ast_sip_session *session = ind_data->session;
+	int response_code = ind_data->response_code;
+	pjsip_tx_data *packet = NULL;
+
+	if (pjsip_inv_answer(session->inv_session, response_code, NULL, NULL, &packet) == PJ_SUCCESS) {
+		ast_sip_session_send_response(session, packet);
+	}
+
+	ao2_ref(ind_data, -1);
 	return 0;
 }
 
@@ -287,50 +356,47 @@
 {
 	int res = 0;
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pj_status_t status = -1;
-	pjsip_tx_data *packet = NULL;
-
-	pj_thread_register_check();
+	int response_code = 0;
 
 	switch (condition) {
 	case AST_CONTROL_RINGING:
 		if (ast_channel_state(ast) == AST_STATE_RING) {
-			status = pjsip_inv_answer(session->inv_session, 180, NULL, NULL, &packet);
+			response_code = 180;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_BUSY:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 486, NULL, NULL, &packet);
+			response_code = 486;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_CONGESTION:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 503, NULL, NULL, &packet);
+			response_code = 503;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_INCOMPLETE:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 484, NULL, NULL, &packet);
+			response_code = 484;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_PROCEEDING:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 100, NULL, NULL, &packet);
+			response_code = 100;
 		} else {
 			res = -1;
 		}
 		break;
 	case AST_CONTROL_PROGRESS:
 		if (ast_channel_state(ast) != AST_STATE_UP) {
-			status = pjsip_inv_answer(session->inv_session, 183, NULL, NULL, &packet);
+			response_code = 183;
 		} else {
 			res = -1;
 		}
@@ -358,8 +424,18 @@
 		break;
 	}
 
-	if (packet && (status == PJ_SUCCESS)) {
-		ast_sip_session_send_response(session, packet);
+	if (!res && response_code) {
+		struct indicate_data *ind_data = indicate_data_alloc(session, condition, response_code, data, datalen);
+		if (ind_data) {
+			res = ast_sip_push_task(session->work, indicate, ind_data);
+			if (res) {
+				ast_log(LOG_NOTICE, "Cannot send response code %d to endpoint %s. Could queue task properly\n",
+						response_code, ast_sorcery_object_get_id(session->endpoint));
+				ao2_cleanup(ind_data);
+			}
+		} else {
+			res = -1;
+		}
 	}
 
 	return res;
@@ -408,20 +484,32 @@
 	return res;
 }
 
+static int call(void *data)
+{
+	struct ast_sip_session *session = data;
+	pjsip_tx_data *packet;
+
+	if (pjsip_inv_invite(session->inv_session, &packet) != PJ_SUCCESS) {
+		return -1;
+	}
+
+	ast_sip_session_send_request(session, packet);
+
+	ao2_ref(session, -1);
+	return 0;
+}
+
 /*! \brief Function called by core to actually start calling a remote party */
 static int gulp_call(struct ast_channel *ast, const char *dest, int timeout)
 {
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
-	pjsip_tx_data *packet;
-
-	pj_thread_register_check();
-
-	if (pjsip_inv_invite(session->inv_session, &packet) != PJ_SUCCESS) {
+
+	ao2_ref(session, +1);
+	if (ast_sip_push_task_synchronous(session->work, call, session)) {
+		ast_log(LOG_WARNING, "Error attempting to place outbound call to call '%s'\n", dest);
+		ao2_cleanup(session);
 		return -1;
 	}
-
-	ast_sip_session_send_request(session, packet);
-
 	return 0;
 }
 
@@ -473,15 +561,36 @@
 	return 0;
 }
 
-/*! \brief Function called by core to hang up a Gulp session */
-static int gulp_hangup(struct ast_channel *ast)
-{
-	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+struct hangup_data {
+	int cause;
+	struct ast_channel *chan;
+};
+
+static void hangup_data_destroy(void *obj)
+{
+	struct hangup_data *h_data = obj;
+	h_data->chan = ast_channel_unref(h_data->chan);
+}
+
+static struct hangup_data *hangup_data_alloc(int cause, struct ast_channel *chan)
+{
+	struct hangup_data *h_data = ao2_alloc(sizeof(*h_data), hangup_data_destroy);
+	if (!h_data) {
+		return NULL;
+	}
+	h_data->cause = cause;
+	h_data->chan = ast_channel_ref(chan);
+	return h_data;
+}
+
+static int hangup(void *data)
+{
 	pj_status_t status;
 	pjsip_tx_data *packet = NULL;
-	int cause = hangup_cause2sip(ast_channel_hangupcause(session->channel));
-
-	pj_thread_register_check();
+	struct hangup_data *h_data = data;
+	struct ast_channel *ast = h_data->chan;
+	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+	int cause = h_data->cause;
 
 	if (((status = pjsip_inv_end_session(session->inv_session, cause ? cause : 603, NULL, &packet)) == PJ_SUCCESS) && packet) {
 		if (packet->msg->type == PJSIP_RESPONSE_MSG) {
@@ -495,18 +604,52 @@
 	ast_channel_tech_pvt_set(ast, NULL);
 
 	ao2_cleanup(session);
-
-	return (status == PJ_SUCCESS) ? 0 : -1;
-}
-
-/*! \brief Function called by core to create a new outgoing Gulp session */
-static struct ast_channel *gulp_request(const char *type, struct ast_format_cap *cap, const struct ast_channel *requestor, const char *data, int *cause)
-{
+	ao2_cleanup(h_data);
+	return 0;
+}
+
+/*! \brief Function called by core to hang up a Gulp session */
+static int gulp_hangup(struct ast_channel *ast)
+{
+	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
+	int cause = hangup_cause2sip(ast_channel_hangupcause(session->channel));
+	struct hangup_data *h_data = hangup_data_alloc(cause, ast);
+	if (!h_data) {
+		goto failure;
+	}
+
+	if (ast_sip_push_task(session->work, hangup, h_data)) {
+		ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
+		goto failure;
+	}
+	return 0;
+
+failure:
+	/* Go ahead and do our cleanup of the session and channel even if we're not going
+	 * to be able to send our SIP request/response
+	 */
+	ao2_cleanup(h_data);
+	session->channel = NULL;
+	ast_channel_tech_pvt_set(ast, NULL);
+
+	ao2_cleanup(session);
+	return -1;
+}
+
+struct request_data {
+	struct ast_sip_session *session;
+	struct ast_sip_work *work;
+	const char *dest;
+};
+
+static int request(void *obj)
+{
+	struct request_data *req_data = obj;
 	struct ast_sip_endpoint *endpoint = ast_sip_endpoint_alloc("constant");
 	struct ast_sip_session *session = NULL;
 
 	if (!endpoint) {
-		return NULL;
+		return -1;
 	}
 
 	/* TODO: This needs to actually grab a proper endpoint and such */
@@ -515,11 +658,33 @@
 	endpoint->min_se = 90;
 	endpoint->sess_expires = 1800;
 
-	pj_thread_register_check();
-
-	if (!(session = ast_sip_session_create_outgoing(endpoint, data))) {
+	if (!(session = ast_sip_session_create_outgoing(endpoint, req_data->dest, req_data->work))) {
+		return -1;
+	}
+
+	req_data->session = session;
+	return 0;
+}
+
+/*! \brief Function called by core to create a new outgoing Gulp session */
+static struct ast_channel *gulp_request(const char *type, struct ast_format_cap *cap, const struct ast_channel *requestor, const char *data, int *cause)
+{
+	struct ast_sip_work *work = ast_sip_create_work();
+	struct request_data req_data;
+	struct ast_sip_session *session;
+	if (!work) {
 		return NULL;
 	}
+
+	req_data.dest = data;
+	req_data.work = work;
+
+	if (ast_sip_push_task_synchronous(work, request, &req_data)) {
+		ast_sip_destroy_work(work);
+		return NULL;
+	}
+
+	session = req_data.session;
 
 	if (!(session->channel = gulp_new(session, AST_STATE_DOWN, NULL, NULL, requestor ? ast_channel_linkedid(requestor) : NULL, NULL))) {
 		/* Session needs to be terminated prematurely */

Modified: team/group/pimp_my_sip/include/asterisk/res_sip.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/res_sip.h?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/res_sip.h (original)
+++ team/group/pimp_my_sip/include/asterisk/res_sip.h Wed Feb 13 10:58:37 2013
@@ -447,6 +447,63 @@
 int ast_sip_initialize_sorcery_transport(struct ast_sorcery *sorcery);
 
 /*!
+ * \page Threading model for SIP
+ *
+ * There are three major types of threads that SIP will have to deal with:
+ * \li Asterisk threads
+ * \li PJSIP threads
+ * \li SIP threadpool threads (a.k.a. "servants")
+ *
+ * \par Asterisk Threads
+ *
+ * Asterisk threads are those that originate from outside of SIP but within
+ * Asterisk. The most common of these threads are PBX (channel) threads and
+ * the autoservice thread. Most interaction with these threads will be through
+ * channel technology callbacks. Within these threads, it is fine to handle
+ * Asterisk data from outside of SIP, but any handling of SIP data should be
+ * left to servants, \b especially if you wish to call into PJSIP for anything.
+ * Asterisk threads are not registered with PJLIB, so attempting to call into
+ * PJSIP will cause an assertion to be triggered, thus causing the program to
+ * crash.
+ *
+ * \par PJSIP Threads
+ *
+ * PJSIP threads are those that originate from handling of PJSIP events, such
+ * as an incoming SIP request or response, or a transaction timeout. The role
+ * of these threads is to process information as quickly as possible. When your
+ * code gets called into from one of these threads, your goal should be to do
+ * as little as possible before handing the majority of processing off to a
+ * servant. Operations such as remote procedure calls or DNS lookups are never
+ * to be done in these threads since it can cause performance issues.
+ *
+ * \par Servants
+ *
+ * Servants are where the bulk of SIP work should be performed. These threads
+ * exist in order to do the work that Asterisk threads and PJSIP threads hand
+ * off to them. Servant threads register themselves with PJLIB, meaning that
+ * they are capable of calling PJSIP and PJLIB functions if they wish. 
+ *
+ * \par ast_sip_work
+ *
+ * Tasks are handed off to servant threads using the API call \ref ast_sip_push_task.
+ * The first parameter of this call is an \ref ast_sip_work pointer. If this pointer
+ * is NULL, then the work will be handed off to whatever servant can currently handle
+ * the task. If this pointer is non-NULL, then the task will not be executed until
+ * previous tasks pushed with the same \ref ast_sip_work have completed. In other words,
+ * an \ref ast_sip_work is a method of serializing tasks pushed to servants. This can
+ * have several benefits
+ * \li Tasks are executed in the same order they were pushed to servants
+ * \li Reduced contention for shared resources
+ *
+ * \note
+ *
+ * Do not make assumptions about individual threads based on corresponding \ref ast_sip_work.
+ * In other words, just because several tasks use the same \ref ast_sip_work when being pushed
+ * to servants, it does not mean that the same thread is necessarily going to execute those
+ * tasks, even though they are all guaranteed to be executed in sequence.
+ */
+
+/*!
  * \brief Initialize authentication support on a sorcery instance
  *
  * \param sorcery The sorcery instance
@@ -457,12 +514,13 @@
 int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
 
 /*!
+>>>>>>> .merge-right.r381346
  * \brief Create a new SIP work structure
  *
  * A SIP work is a means of grouping together SIP tasks. For instance, one
  * might create a SIP work so that all tasks for a given SIP dialog will
  * be grouped together. Grouping the work together ensures that the
- * threadpool will distribute the tasks in such a way so that grouped work
+ * servants will execute the tasks in such a way so that grouped work
  * will execute sequentially. Executing grouped tasks sequentially means
  * less contention for shared resources.
  *
@@ -479,17 +537,37 @@
 void ast_sip_destroy_work(struct ast_sip_work *work);
  
 /*!
- * \brief Pushes a task into the SIP threadpool
+ * \brief Pushes a task to SIP servants
  *
  * This uses the SIP work provided to determine how to push the task.
- *
- * \param work The SIP work to which the task belongs
+ * If the work param is NULL, then the task will be pushed to the
+ * servants directly. If the work is non-NULL, then the task will be
+ * queued behind other tasks associated with the work.
+ *
+ * \param work The SIP work to which the task belongs. Can be NULL
  * \param sip_task The task to execute
  * \param task_data The parameter to pass to the task when it executes
  * \retval 0 Success
  * \retval -1 Failure
  */
 int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
+
+/*!
+ * \brief Push a task to SIP servants and wait for it to complete
+ *
+ * Like \ref ast_sip_push_task except that it blocks until the task completes.
+ *
+ * \warning \b Never use this function in a SIP servant thread. This can potentially
+ * cause a deadlock. If you are in a SIP servant thread, just call your function
+ * in-line.
+ *
+ * \param work The SIP work to which the task belongs. May be NULL.
+ * \param sip_task The task to execute
+ * \param task_data The parameter to pass to the task when it executes
+ * \retval 0 Success
+ * \retval -1 Failure
+ */
+int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
 
 /*!
  * \brief SIP body description

Modified: team/group/pimp_my_sip/include/asterisk/res_sip_session.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/res_sip_session.h?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/res_sip_session.h (original)
+++ team/group/pimp_my_sip/include/asterisk/res_sip_session.h Wed Feb 13 10:58:37 2013
@@ -78,28 +78,64 @@
  * These can be registered by any module in order to add
  * processing to incoming and outgoing SIP requests and responses
  */
-struct ast_sip_session_supplement{
-	/*! Method on which to call the callbacks. If NULL, call on all methods */
-	const char *method;
-	/*! Notification that the session has begun */
-	void (*session_begin)(struct ast_sip_session *session);
-	/*! Notification that the session has ended */
-	void (*session_end)(struct ast_sip_session *session);
-	/*! Notification that the session is being destroyed */
+struct ast_sip_session_supplement {
+    /*! Method on which to call the callbacks. If NULL, call on all methods */
+    const char *method;
+    /*!
+	 * \brief Notification that the session has begun
+	 * This method will always be called from a SIP servant thread.
+	 */
+    void (*session_begin)(struct ast_sip_session *session);
+    /*! 
+	 * \brief Notification that the session has ended
+	 *
+	 * This method may or may not be called from a SIP servant thread. Do
+	 * not make assumptions about being able to call PJSIP methods from within
+	 * this method.
+	 */
+    void (*session_end)(struct ast_sip_session *session);
+	/*!
+	 * \brief Notification that the session is being destroyed
+	 */
 	void (*session_destroy)(struct ast_sip_session *session);
-	/*!
-	 * \brief Called on incoming SIP request
-	 * This method can indicate a failure in processing in its return. If there
-	 * is a failure, it is required that this method sends a response to the request.
-
-	 */
-	int (*incoming_request)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
-	/*! Called on an incoming SIP response */
-	void (*incoming_response)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
-	/*! Called on an outgoing SIP request */
-	void (*outgoing_request)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
-	/*! Called on an outgoing SIP response */
-	void (*outgoing_response)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
+    /*!
+     * \brief Called on incoming SIP request
+     * This method can indicate a failure in processing in its return. If there
+     * is a failure, it is required that this method sends a response to the request.
+	 * This method is always called from a SIP servant thread.
+	 *
+	 * \note
+	 * The following PJSIP methods will not work properly:
+	 * pjsip_rdata_get_dlg()
+	 * pjsip_rdata_get_tsx()
+	 * The reason is that the rdata passed into this function is a cloned rdata structure,
+	 * and its module data is not copied during the cloning operation.
+	 * If you need to get the dialog, you can get it via session->inv_session->dlg.
+     */
+    int (*incoming_request)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
+    /*! 
+	 * \brief Called on an incoming SIP response
+	 * This method is always called from a SIP servant thread.
+	 *
+	 * \note
+	 * The following PJSIP methods will not work properly:
+	 * pjsip_rdata_get_dlg()
+	 * pjsip_rdata_get_tsx()
+	 * The reason is that the rdata passed into this function is a cloned rdata structure,
+	 * and its module data is not copied during the cloning operation.
+	 * If you need to get the dialog, you can get it via session->inv_session->dlg.
+	 */
+    void (*incoming_response)(struct ast_sip_session *session, struct pjsip_rx_data *rdata);
+    /*!
+	 * \brief Called on an outgoing SIP request
+	 * This method is always called from a SIP servant thread.
+	 */
+    void (*outgoing_request)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
+    /*! 
+	 * \brief Called on an outgoing SIP response
+	 * This method is always called from a SIP servant thread.
+	 */
+    void (*outgoing_response)(struct ast_sip_session *session, struct pjsip_tx_data *tdata);
 	/*! Next item in the list */
 	AST_LIST_ENTRY(ast_sip_session_supplement) next;
 };
@@ -143,16 +179,18 @@
  * as placing all registered supplements onto the session.
  * \param endpoint The endpoint that this session communicates with
  * \param inv_session The PJSIP INVITE session data
- */
-struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv);
+ * \param work SIP work queue to use for this session. May be NULL.
+ */
+struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv, struct ast_sip_work *work);
 
 /*!
  * \brief Create a new outgoing SIP session
  *
  * \param endpoint The endpoint that this session uses for settings
  * \param uri The URI to call
- */
-struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri);
+ * \param work SIP work queue to use for this session. May be NULL.
+ */
+struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri, struct ast_sip_work *work);
 
 /*!
  * \brief Register an SDP handler

Modified: team/group/pimp_my_sip/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/threadpool.h?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/threadpool.h (original)
+++ team/group/pimp_my_sip/include/asterisk/threadpool.h Wed Feb 13 10:58:37 2013
@@ -105,6 +105,20 @@
 	 * beyond this number of threads.
 	 */
 	int max_size;
+	/*!
+	 * \brief Function to call when a thread starts
+	 *
+	 * This is useful if there is something common that all threads
+	 * in a threadpool need to do when they start.
+	 */
+	void (*thread_start)(void);
+	/*!
+	 * \brief Function to call when a thread ends
+	 *
+	 * This is useful if there is common cleanup to execute when
+	 * a thread completes
+	 */
+	void (*thread_end)(void);
 };
 
 /*!

Modified: team/group/pimp_my_sip/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/main/threadpool.c?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/main/threadpool.c (original)
+++ team/group/pimp_my_sip/main/threadpool.c Wed Feb 13 10:58:37 2013
@@ -984,7 +984,13 @@
 {
 	struct worker_thread *worker = arg;
 
+	if (worker->options.thread_start) {
+		worker->options.thread_start();
+	}
 	worker_active(worker);
+	if (worker->options.thread_end) {
+		worker->options.thread_end();
+	}
 	return NULL;
 }
 

Modified: team/group/pimp_my_sip/res/res_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip.c?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/res/res_sip.c (original)
+++ team/group/pimp_my_sip/res/res_sip.c Wed Feb 13 10:58:37 2013
@@ -46,29 +46,42 @@
 
 static struct ast_threadpool *sip_threadpool;
 
-int ast_sip_register_service(pjsip_module *module)
-{
+static int register_service(void *data)
+{
+	pjsip_module **module = data;
 	if (!ast_pjsip_endpoint) {
 		ast_log(LOG_ERROR, "There is no PJSIP endpoint. Unable to register services\n");
 		return -1;
 	}
-	if (pjsip_endpt_register_module(ast_pjsip_endpoint, module) != PJ_SUCCESS) {
-		ast_log(LOG_ERROR, "Unable to register module %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
-		return -1;
-	}
-	ast_debug(1, "Registered SIP service %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
+	if (pjsip_endpt_register_module(ast_pjsip_endpoint, *module) != PJ_SUCCESS) {
+		ast_log(LOG_ERROR, "Unable to register module %.*s\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name));
+		return -1;
+	}
+	ast_debug(1, "Registered SIP service %.*s (%p)\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name), *module);
 	ast_module_ref(ast_module_info->self);
 	return 0;
 }
 
-void ast_sip_unregister_service(pjsip_module *module)
-{
+int ast_sip_register_service(pjsip_module *module)
+{
+	return ast_sip_push_task_synchronous(NULL, register_service, &module);
+}
+
+static int unregister_service(void *data)
+{
+	pjsip_module **module = data;
 	ast_module_unref(ast_module_info->self);
 	if (!ast_pjsip_endpoint) {
-		return;
-	}
-	pjsip_endpt_unregister_module(ast_pjsip_endpoint, module);
-	ast_debug(1, "Unregistered SIP service %.*s\n", (int) pj_strlen(&module->name), pj_strbuf(&module->name));
+		return -1;
+	}
+	pjsip_endpt_unregister_module(ast_pjsip_endpoint, *module);
+	ast_debug(1, "Unregistered SIP service %.*s\n", (int) pj_strlen(&(*module)->name), pj_strbuf(&(*module)->name));
+	return 0;
+}
+
+void ast_sip_unregister_service(pjsip_module *module)
+{
+	ast_sip_push_task_synchronous(NULL, unregister_service, &module);
 }
 
 static struct ast_sip_authenticator *registered_authenticator;
@@ -337,13 +350,19 @@
 static int execute_tasks(void *data)
 {
 	struct ast_sip_work *work = data;
-	while (ast_taskprocessor_execute(work->queue));
+	while (ast_taskprocessor_execute(work->queue)) {
+		/* Empty on purpose */
+	}
+	ao2_cleanup(work);
 	return 0;
 }
 
 static void work_queue_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
-	struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
-	ast_threadpool_push(sip_threadpool, execute_tasks, work);
+	if (was_empty) {
+		struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
+		ao2_ref(work, +1);
+		ast_threadpool_push(sip_threadpool, execute_tasks, work);
+	}
 };
 
 static int work_queue_start(struct ast_taskprocessor_listener *listener)
@@ -363,9 +382,15 @@
 	.shutdown = work_queue_shutdown,
 };
 
+static void work_destroy(void *obj)
+{
+	struct ast_sip_work *work = obj;
+	ast_taskprocessor_unreference(work->queue);
+}
+
 struct ast_sip_work *ast_sip_create_work(void)
 {
-	struct ast_sip_work *work = ast_calloc(1, sizeof(*work));
+	struct ast_sip_work *work = ao2_alloc(sizeof(*work), work_destroy);
 	struct ast_uuid *uuid;
 	struct ast_taskprocessor_listener *listener;
 	char queue_name[AST_UUID_STR_LEN];
@@ -378,7 +403,7 @@
 	 */
 	uuid = ast_uuid_generate();
 	if (!uuid) {
-		ast_free(work);
+		ao2_cleanup(work);
 		return NULL;
 	}
 	ast_uuid_to_str(uuid, queue_name, sizeof(queue_name));
@@ -387,7 +412,7 @@
 	work->queue = ast_taskprocessor_create_with_listener(queue_name, listener);
 	ao2_cleanup(listener);
 	if (!work->queue) {
-		ast_free(work);
+		ao2_cleanup(work);
 		return NULL;
 	}
 	return work;
@@ -395,13 +420,68 @@
 
 void ast_sip_destroy_work(struct ast_sip_work *work)
 {
-	ast_taskprocessor_unreference(work->queue);
-	ast_free(work);
+	ao2_cleanup(work);
 }
 
 int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
 {
-	return ast_taskprocessor_push(work->queue, sip_task, task_data);
+	if (work) {
+		return ast_taskprocessor_push(work->queue, sip_task, task_data);
+	} else {
+		return ast_threadpool_push(sip_threadpool, sip_task, task_data);
+	}
+}
+
+struct sync_task_data {
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int complete;
+	int fail;
+	int (*task)(void *);
+	void *task_data;
+};
+
+static int sync_task(void *data)
+{
+	struct sync_task_data *std = data;
+	std->fail = std->task(std->task_data);
+
+	ast_mutex_lock(&std->lock);
+	std->complete = 1;
+	ast_cond_signal(&std->cond);
+	ast_mutex_unlock(&std->lock);
+	return std->fail;
+}
+
+int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
+{
+	/* This method is an onion */
+	struct sync_task_data std;
+	ast_mutex_init(&std.lock);
+	ast_cond_init(&std.cond, NULL);
+	std.fail = std.complete = 0;
+	std.task = sip_task;
+	std.task_data = task_data;
+
+	if (work) {
+		if (ast_taskprocessor_push(work->queue, sync_task, &std)) {
+			return -1;
+		}
+	} else {
+		if (ast_threadpool_push(sip_threadpool, sync_task, &std)) {
+			return -1;
+		}
+	}
+
+	ast_mutex_lock(&std.lock);
+	while (!std.complete) {
+		ast_cond_wait(&std.cond, &std.lock);
+	}
+	ast_mutex_unlock(&std.lock);
+
+	ast_mutex_destroy(&std.lock);
+	ast_cond_destroy(&std.cond);
+	return std.fail;
 }
 
 void ast_copy_pj_str(char *dest, pj_str_t *src, size_t size)
@@ -445,6 +525,25 @@
 	.priority = PJSIP_MOD_PRIORITY_APPLICATION + 32,
 	.on_rx_request = unhandled_on_rx_request,
 };
+
+AST_THREADSTORAGE(pj_thread_storage);
+
+static void sip_thread_start(void)
+{
+	pj_thread_desc *desc;
+	pj_thread_t *thread;
+
+	desc = ast_threadstorage_get(&pj_thread_storage, sizeof(pj_thread_desc));
+	if (!desc) {
+		ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage. Expect awful things to occur\n");
+		return;
+	}
+	pj_bzero(*desc, sizeof(*desc));
+
+	if (pj_thread_register("Asterisk Thread", *desc, &thread) != PJ_SUCCESS) {
+		ast_log(LOG_ERROR, "Couldn't register thread with PJLIB.\n");
+	}
+}
 
 static int load_module(void)
 {
@@ -465,6 +564,7 @@
 		.max_size = 0,
 		.idle_timeout = 60,
 		.initial_size = 0,
+		.thread_start = sip_thread_start,
 	};
 	sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
 
@@ -517,9 +617,11 @@
 	}
 	if (memory_pool) {
 		pj_pool_release(memory_pool);
+		memory_pool = NULL;
 	}
 	if (ast_pjsip_endpoint) {
 		pjsip_endpt_destroy(ast_pjsip_endpoint);
+		ast_pjsip_endpoint = NULL;
 	}
 	pj_caching_pool_destroy(&caching_pool);
 	/* XXX Should have a way of stopping monitor thread */
@@ -535,22 +637,33 @@
 	return 0;
 }
 
+static int unload_pjsip(void *data)
+{
+	if (memory_pool) {
+		pj_pool_release(memory_pool);
+		memory_pool = NULL;
+	}
+	if (ast_pjsip_endpoint) {
+		pjsip_endpt_destroy(ast_pjsip_endpoint);
+		ast_pjsip_endpoint = NULL;
+	}
+	pj_caching_pool_destroy(&caching_pool);
+	return 0;
+}
+
 static int unload_module(void)
 {
 	ast_res_sip_destroy_configuration();
 	if (monitor_thread) {
 		stop_monitor_thread();
 	}
-	if (memory_pool) {
-		/* This will cause a crash in Asterisk in debug mode, as the thread
-		 * calling this is not a pjsip thread
-		 */
-		pj_pool_release(memory_pool);
-	}
-	if (ast_pjsip_endpoint) {
-		pjsip_endpt_destroy(ast_pjsip_endpoint);
-	}
-	pj_caching_pool_destroy(&caching_pool);
+	/* The thread this is called from cannot call PJSIP/PJLIB functions,
+	 * so we have to push the work to the threadpool to handle
+	 */
+	ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL);
+
+	ast_threadpool_shutdown(sip_threadpool);
+
 	return 0;
 }
 

Modified: team/group/pimp_my_sip/res/res_sip.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip.exports.in?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/res/res_sip.exports.in (original)
+++ team/group/pimp_my_sip/res/res_sip.exports.in Wed Feb 13 10:58:37 2013
@@ -9,6 +9,7 @@
 		LINKER_SYMBOL_PREFIXast_sip_create_work;
 		LINKER_SYMBOL_PREFIXast_sip_destroy_work;
 		LINKER_SYMBOL_PREFIXast_sip_push_task;
+		LINKER_SYMBOL_PREFIXast_sip_push_task_synchronous;
 		LINKER_SYMBOL_PREFIXast_sip_send_request;
 		LINKER_SYMBOL_PREFIXast_sip_requires_authentication;
 		LINKER_SYMBOL_PREFIXast_sip_authenticate_request;

Modified: team/group/pimp_my_sip/res/res_sip/config_transport.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip/config_transport.c?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/res/res_sip/config_transport.c (original)
+++ team/group/pimp_my_sip/res/res_sip/config_transport.c Wed Feb 13 10:58:37 2013
@@ -27,14 +27,26 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/sorcery.h"
 
+static int destroy_transport_state(void *data)
+{
+	pjsip_transport *transport = data;
+	pjsip_transport_shutdown(transport);
+	return 0;
+}
+
 /*! \brief Destructor for transport state information */
 static void transport_state_destroy(void *obj)
 {
 	struct ast_sip_transport_state *state = obj;
+	struct ast_sip_work *work = ast_sip_create_work();
+	if (!work) {
+		ast_log(LOG_WARNING, "Unable to create work structure in order to shutdown transport\n");
+	}
 
 	if (state->transport) {
-		pjsip_transport_shutdown(state->transport);
-	}
+		ast_sip_push_task_synchronous(work, destroy_transport_state, state->transport);
+	}
+	ast_sip_destroy_work(work);
 }
 
 /*! \brief Destructor for transport */

Modified: team/group/pimp_my_sip/res/res_sip/sip_options.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip/sip_options.c?view=diff&rev=381351&r1=381350&r2=381351
==============================================================================
--- team/group/pimp_my_sip/res/res_sip/sip_options.c (original)
+++ team/group/pimp_my_sip/res/res_sip/sip_options.c Wed Feb 13 10:58:37 2013
@@ -72,11 +72,10 @@
 	return PJ_SUCCESS;
 }
 
-static pj_status_t send_options_response(pjsip_rx_data *rdata, int code)
+static pj_status_t send_options_response(pjsip_rx_data *rdata, pjsip_dialog *pj_dlg, int code)
 {
 	pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
 	pjsip_transaction *pj_trans = pjsip_rdata_get_tsx(rdata);
-	pjsip_dialog *pj_dlg = pjsip_rdata_get_dlg(rdata);
 	pjsip_tx_data *tdata;
 	const pjsip_hdr *hdr;
 	pjsip_response_addr res_addr;
@@ -123,27 +122,59 @@
 	return status;
 }
 
-static pj_bool_t options_module_on_rx_request(pjsip_rx_data *rdata)
-{
+struct incoming_options_data {
+	pjsip_rx_data *rdata;
+	pjsip_dialog *dialog;
+};
+
+static void incoming_options_data_destroy(void *obj)
+{
+	struct incoming_options_data *iod = obj;
+	if (iod->rdata) {
+		pjsip_rx_data_free_cloned(iod->rdata);
+	}
+	if (iod->dialog) {
+		pjsip_dlg_dec_session(iod->dialog, &options_module);
+	}
+}
+
+static struct incoming_options_data *incoming_options_data_alloc(pjsip_rx_data *rdata, pjsip_dialog *dialog)
+{
+	struct incoming_options_data *iod = ao2_alloc(sizeof(*iod), incoming_options_data_destroy);
+	if (!iod) {
+		return NULL;
+	}
+	if (pjsip_rx_data_clone(rdata, 0, &iod->rdata) != PJ_SUCCESS) {
+		ao2_cleanup(iod);
+		return NULL;
+	}
+	if (dialog) {
+		iod->dialog = dialog;
+		pjsip_dlg_inc_session(iod->dialog, &options_module);
+	}
+	return iod;
+}
+
+static int incoming_options(void *data)
+{
+	RAII_VAR(struct incoming_options_data *, iod, data, ao2_cleanup);
 	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
 	pjsip_uri *ruri;
 	pjsip_sip_uri *sip_ruri;
+	pjsip_rx_data *rdata = iod->rdata;
+	pjsip_dialog *dlg = iod->dialog;
 	char exten[AST_MAX_EXTENSION];
-
-	if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_options_method)) {
-		return PJ_FALSE;
-	}
 
 	/* If no endpoint is available for the request treat them as completely untrusted */
 	if (!(endpoint = ast_sip_identify_endpoint(rdata))) {
-		send_options_response(rdata, 404);
-		return PJ_TRUE;
+		send_options_response(rdata, dlg, 404);
+		return -1;
 	}
 
 	ruri = rdata->msg_info.msg->line.req.uri;
 	if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
-		send_options_response(rdata, 416);
-		return PJ_TRUE;
+		send_options_response(rdata, dlg, 416);
+		return -1;
 	}
 	
 	sip_ruri = pjsip_uri_get_uri(ruri);
@@ -152,11 +183,31 @@
 	/* XXX TODO: authenticate the user */
 
 	if (ast_shutting_down()) {
-		send_options_response(rdata, 503);
+		send_options_response(rdata, dlg, 503);
 	} else if (!ast_exists_extension(NULL, endpoint->context, exten, 1, NULL)) {
-		send_options_response(rdata, 404);
+		send_options_response(rdata, dlg, 404);
 	} else {
-		send_options_response(rdata, 200);
+		send_options_response(rdata, dlg, 200);
+	}
+	return 0;
+}
+
+static pj_bool_t options_module_on_rx_request(pjsip_rx_data *rdata)
+{
+	struct incoming_options_data *iod;
+	pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
+	if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_options_method)) {
+		return PJ_FALSE;
+	}
+
+	iod = incoming_options_data_alloc(rdata, dlg);
+	if (!iod) {
+		send_options_response(rdata, dlg, 500);
+		return PJ_TRUE;
+	}
+
+	if (ast_sip_push_task(NULL, incoming_options, iod)) {
+		send_options_response(rdata, dlg, 500);
 	}
 
 	return PJ_TRUE;
@@ -222,13 +273,15 @@
 	return info;
 }
 
-static void send_qualify_request(struct ast_sip_endpoint *endpoint)
-{
+static int send_qualify_request(void *data)
+{
+	struct ast_sip_endpoint *endpoint = data;
 	/* YAY! Send an OPTIONS request. */
 
 	ast_sip_send_request("OPTIONS", NULL, NULL, endpoint);
 
-	return;
+	ao2_cleanup(endpoint);
+	return 0;
 }
 
 static int qualify_endpoint_scheduler_cb(const void *data)
@@ -250,8 +303,7 @@
 		return 0;
 	}
 
-	/* Actually do the qualify */
-	send_qualify_request(endpoint);
+	ast_sip_push_task(NULL, send_qualify_request, endpoint);
 
 	return 1;
 }

Modified: team/group/pimp_my_sip/res/res_sip_session.c

[... 521 lines stripped ...]



More information about the svn-commits mailing list