[svn-commits] mmichelson: branch group/pimp_my_sip r382112 - in /team/group/pimp_my_sip: ch...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Tue Feb 26 13:55:35 CST 2013


Author: mmichelson
Date: Tue Feb 26 13:55:31 2013
New Revision: 382112

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382112
Log:
Merge pool_shark2 branch.

Copied from the reviewboard description:

* The ast_sip_work structure and some of its methods have been removed in
favor of the generic task serializer.

* A PJSIP module, the message distributor, has been added. This module has a
high priority and gets called into after the PJSIP transport layer but before
the PJSIP transaction layer. The distributor is used as a means of moving an
incoming message from a PJSIP thread into a SIP servant thread. Then from the
SIP servant thread, the message bubbles up through the PJSIP transaction layer,
PJSIP dialog layer, and any relevant PJSIP modules that Asterisk has registered.
This means that any PJSIP module that registers itself at the transaction layer
or higher will have all of its incoming SIP messages already in a SIP servant
thread when called back. Unless they need to serialize operations, they do not
need to do any special processing to ensure the incoming message is handled in a
specific thread. A good example of this is what the SIP OPTIONS handler does in
this set of changes.

* Two new PJSIP modules have been added above the dialog layer. The endpoint
module and the authenticator module are responsible for doing endpoint lookups
and authentication for incoming requests. The placement of these modules means
that they are only ever called into for out-of-dialog requests. This means that
on an initial INVITE, or on a REGISTER, or on an OPTIONS request, or any other
out-of-dialog request, the application-layer modules do not need to worry about
endpoint lookup or authentication since that is done already for them. If they
need the endpoint, they can call ast_pjsip_rdata_get_endpoint(). For dialog-
forming application modules, they will want to save off the endpoint that the
endpoint module finds since the endpoint will not be looked up again on in-
dialog requests, meaning they will not be able to call
ast_pjsip_rdata_get_endpoint().

* Modules that were passing incoming messages off to servants no longer need to
do this, so they have been changed not to. So the session module and options
module now handle incoming requests in-line rather than pushing them to a
servant thread.

* Since it no longer makes sense to pre-allocate a serializer before an
ast_sip_session, the functions for allocating SIP sessions has been changed not
to take a serializer any more. Instead, the allocation routine creates its own.
In addition, the behavior of endpoint references has changed from how it
previously was. Now allocating a session results in the endpoint reference count
increasing by one rather than having the session inherit the reference passed
into it. This makes endpoint reference handling much cleaner, especially in the
session module.

* To assist programmers who want to write PJSIP modules in Asterisk, there is a
helper method called ast_sip_thread_is_servant() that has been added. It can be
used to discern if the thread you currently are in is a SIP servant thread. This
means it is not as necessary to dig into the PJSIP internals to know what type
of thread you are in when called back. Instead, you can be safe and use this
function. This function is currently unused though, since there hasn't been a
need for it yet. This is because all uses of SIP servants where it may be
questionable what type of thread we are in need to serialize the task anyway.

* Documentation of the SIP threadpool has been updated.


Added:
    team/group/pimp_my_sip/res/res_sip/sip_distributor.c   (with props)
Modified:
    team/group/pimp_my_sip/channels/chan_gulp.c
    team/group/pimp_my_sip/include/asterisk/res_sip.h
    team/group/pimp_my_sip/include/asterisk/res_sip_session.h
    team/group/pimp_my_sip/res/pjproject/pjsip/include/pjsip/sip_endpoint.h
    team/group/pimp_my_sip/res/pjproject/pjsip/src/pjsip/sip_endpoint.c
    team/group/pimp_my_sip/res/res_sip.c
    team/group/pimp_my_sip/res/res_sip.exports.in
    team/group/pimp_my_sip/res/res_sip/config_transport.c
    team/group/pimp_my_sip/res/res_sip/sip_options.c
    team/group/pimp_my_sip/res/res_sip_endpoint_identifier_ip.c
    team/group/pimp_my_sip/res/res_sip_session.c

Modified: team/group/pimp_my_sip/channels/chan_gulp.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/channels/chan_gulp.c?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/channels/chan_gulp.c (original)
+++ team/group/pimp_my_sip/channels/chan_gulp.c Tue Feb 26 13:55:31 2013
@@ -53,6 +53,7 @@
 #include "asterisk/app.h"
 #include "asterisk/musiconhold.h"
 #include "asterisk/causes.h"
+#include "asterisk/taskprocessor.h"
 
 #include "asterisk/res_sip.h"
 #include "asterisk/res_sip_session.h"
@@ -204,7 +205,7 @@
 	ast_setstate(ast, AST_STATE_UP);
 
 	ao2_ref(session, +1);
-	if (ast_sip_push_task(session->work, answer, session)) {
+	if (ast_sip_push_task(session->serializer, answer, session)) {
 		ast_log(LOG_WARNING, "Unable to push answer task to the threadpool. Cannot answer call\n");
 		ao2_cleanup(session);
 		return -1;
@@ -296,7 +297,7 @@
 		return -1;
 	}
 
-	if (ast_sip_push_task_synchronous(session->work, fixup, &fix_data)) {
+	if (ast_sip_push_task_synchronous(session->serializer, fixup, &fix_data)) {
 		ast_log(LOG_WARNING, "Unable to perform channel fixup\n");
 		return -1;
 	}
@@ -431,7 +432,7 @@
 	if (!res && response_code) {
 		struct indicate_data *ind_data = indicate_data_alloc(session, condition, response_code, data, datalen);
 		if (ind_data) {
-			res = ast_sip_push_task(session->work, indicate, ind_data);
+			res = ast_sip_push_task(session->serializer, indicate, ind_data);
 			if (res) {
 				ast_log(LOG_NOTICE, "Cannot send response code %d to endpoint %s. Could queue task properly\n",
 						response_code, ast_sorcery_object_get_id(session->endpoint));
@@ -513,7 +514,7 @@
 	struct ast_sip_session *session = ast_channel_tech_pvt(ast);
 
 	ao2_ref(session, +1);
-	if (ast_sip_push_task_synchronous(session->work, call, session)) {
+	if (ast_sip_push_task_synchronous(session->serializer, call, session)) {
 		ast_log(LOG_WARNING, "Error attempting to place outbound call to call '%s'\n", dest);
 		ao2_cleanup(session);
 		return -1;
@@ -626,7 +627,7 @@
 		goto failure;
 	}
 
-	if (ast_sip_push_task(session->work, hangup, h_data)) {
+	if (ast_sip_push_task(session->serializer, hangup, h_data)) {
 		ast_log(LOG_WARNING, "Unable to push hangup task to the threadpool. Expect bad things\n");
 		goto failure;
 	}
@@ -646,14 +647,13 @@
 
 struct request_data {
 	struct ast_sip_session *session;
-	struct ast_sip_work *work;
 	const char *dest;
 };
 
 static int request(void *obj)
 {
 	struct request_data *req_data = obj;
-	struct ast_sip_endpoint *endpoint = ast_sip_endpoint_alloc("constant");
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sip_endpoint_alloc("constant"), ao2_cleanup);
 	struct ast_sip_session *session = NULL;
 
 	if (!endpoint) {
@@ -666,7 +666,7 @@
 	endpoint->min_se = 90;
 	endpoint->sess_expires = 1800;
 
-	if (!(session = ast_sip_session_create_outgoing(endpoint, req_data->dest, req_data->work))) {
+	if (!(session = ast_sip_session_create_outgoing(endpoint, req_data->dest))) {
 		return -1;
 	}
 
@@ -677,18 +677,12 @@
 /*! \brief Function called by core to create a new outgoing Gulp session */
 static struct ast_channel *gulp_request(const char *type, struct ast_format_cap *cap, const struct ast_channel *requestor, const char *data, int *cause)
 {
-	struct ast_sip_work *work = ast_sip_create_work();
 	struct request_data req_data;
 	struct ast_sip_session *session;
-	if (!work) {
-		return NULL;
-	}
 
 	req_data.dest = data;
-	req_data.work = work;
-
-	if (ast_sip_push_task_synchronous(work, request, &req_data)) {
-		ast_sip_destroy_work(work);
+
+	if (ast_sip_push_task_synchronous(NULL, request, &req_data)) {
 		return NULL;
 	}
 

Modified: team/group/pimp_my_sip/include/asterisk/res_sip.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/res_sip.h?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/res_sip.h (original)
+++ team/group/pimp_my_sip/include/asterisk/res_sip.h Tue Feb 26 13:55:31 2013
@@ -39,11 +39,6 @@
 struct pjsip_transport;
 struct pjsip_tpfactory;
 struct pjsip_tls_setting;
-
-/*!
- * \brief Opaque structure representing related SIP tasks
- */
-struct ast_sip_work;
 
 /*!
  * \brief Structure for SIP transport information
@@ -455,6 +450,29 @@
 int ast_sip_initialize_sorcery_transport(struct ast_sorcery *sorcery);
 
 /*!
+ * \brief Initialize authentication support on a sorcery instance
+ *
+ * \param sorcery The sorcery instance
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
+
+/*!
+ * \brief Initialize the distributor module
+ *
+ * The distributor module is responsible for taking an incoming
+ * SIP message and placing it into the threadpool. Once in the threadpool,
+ * the distributor will perform endpoint lookups and authentication, and
+ * then distribute the message up the stack to any further modules.
+ *
+ * \retval -1 Failure
+ * \retval 0 Success
+ */
+int ast_sip_initialize_distributor(void);
+
+/*!
  * \page Threading model for SIP
  *
  * There are three major types of threads that SIP will have to deal with:
@@ -478,11 +496,15 @@
  *
  * PJSIP threads are those that originate from handling of PJSIP events, such
  * as an incoming SIP request or response, or a transaction timeout. The role
- * of these threads is to process information as quickly as possible. When your
- * code gets called into from one of these threads, your goal should be to do
- * as little as possible before handing the majority of processing off to a
- * servant. Operations such as remote procedure calls or DNS lookups are never
- * to be done in these threads since it can cause performance issues.
+ * of these threads is to process information as quickly as possible so that
+ * the next item on the SIP socket(s) can be serviced. On incoming messages,
+ * Asterisk automatically will push the request to a servant thread. When your
+ * module callback is called, processing will already be in a servant. However,
+ * for other PSJIP events, such as transaction state changes due to timer
+ * expirations, your module will be called into from a PJSIP thread. If you
+ * are called into from a PJSIP thread, then you should push whatever processing
+ * is needed to a servant as soon as possible. You can discern if you are currently
+ * in a SIP servant thread using the \ref ast_sip_thread_is_servant function.
  *
  * \par Servants
  *
@@ -491,74 +513,50 @@
  * off to them. Servant threads register themselves with PJLIB, meaning that
  * they are capable of calling PJSIP and PJLIB functions if they wish. 
  *
- * \par ast_sip_work
+ * \par Serializer
  *
  * Tasks are handed off to servant threads using the API call \ref ast_sip_push_task.
- * The first parameter of this call is an \ref ast_sip_work pointer. If this pointer
+ * The first parameter of this call is a serializer. If this pointer
  * is NULL, then the work will be handed off to whatever servant can currently handle
  * the task. If this pointer is non-NULL, then the task will not be executed until
- * previous tasks pushed with the same \ref ast_sip_work have completed. In other words,
- * an \ref ast_sip_work is a method of serializing tasks pushed to servants. This can
- * have several benefits
- * \li Tasks are executed in the same order they were pushed to servants
- * \li Reduced contention for shared resources
+ * previous tasks pushed with the same serializer have completed. For more information
+ * on serializers and the benefits they provide, see \ref ast_threadpool_serializer
  *
  * \note
  *
- * Do not make assumptions about individual threads based on corresponding \ref ast_sip_work.
- * In other words, just because several tasks use the same \ref ast_sip_work when being pushed
+ * Do not make assumptions about individual threads based on a corresponding serializer.
+ * In other words, just because several tasks use the same serializer when being pushed
  * to servants, it does not mean that the same thread is necessarily going to execute those
  * tasks, even though they are all guaranteed to be executed in sequence.
  */
 
 /*!
- * \brief Initialize authentication support on a sorcery instance
- *
- * \param sorcery The sorcery instance
- *
- * \retval -1 failure
- * \retval 0 success
- */
-int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
-
-/*!
->>>>>>> .merge-right.r381346
- * \brief Create a new SIP work structure
- *
- * A SIP work is a means of grouping together SIP tasks. For instance, one
- * might create a SIP work so that all tasks for a given SIP dialog will
- * be grouped together. Grouping the work together ensures that the
- * servants will execute the tasks in such a way so that grouped work
- * will execute sequentially. Executing grouped tasks sequentially means
- * less contention for shared resources.
+ * \brief Create a new serializer for SIP tasks
+ *
+ * See \ref ast_threadpool_serializer for more information on serializers.
+ * SIP creates serializers so that tasks operating on similar data will run
+ * in sequence.
  *
  * \retval NULL Failure
- * \retval non-NULL Newly-created SIP work
- */
-struct ast_sip_work *ast_sip_create_work(void);
- 
-/*!
- * \brief Destroy a SIP work structure
- *
- * \param work The SIP work to destroy
- */
-void ast_sip_destroy_work(struct ast_sip_work *work);
+ * \retval non-NULL Newly-created serializer
+ */
+struct ast_taskprocessor *ast_sip_create_serializer(void);
  
 /*!
  * \brief Pushes a task to SIP servants
  *
- * This uses the SIP work provided to determine how to push the task.
- * If the work param is NULL, then the task will be pushed to the
- * servants directly. If the work is non-NULL, then the task will be
- * queued behind other tasks associated with the work.
- *
- * \param work The SIP work to which the task belongs. Can be NULL
+ * This uses the serializer provided to determine how to push the task.
+ * If the serializer is NULL, then the task will be pushed to the
+ * servants directly. If the serializer is non-NULL, then the task will be
+ * queued behind other tasks associated with the same serializer.
+ *
+ * \param serializer The serializer to which the task belongs. Can be NULL
  * \param sip_task The task to execute
  * \param task_data The parameter to pass to the task when it executes
  * \retval 0 Success
  * \retval -1 Failure
  */
-int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
 
 /*!
  * \brief Push a task to SIP servants and wait for it to complete
@@ -569,13 +567,21 @@
  * cause a deadlock. If you are in a SIP servant thread, just call your function
  * in-line.
  *
- * \param work The SIP work to which the task belongs. May be NULL.
+ * \param serializer The SIP serializer to which the task belongs. May be NULL.
  * \param sip_task The task to execute
  * \param task_data The parameter to pass to the task when it executes
  * \retval 0 Success
  * \retval -1 Failure
  */
-int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data);
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
+
+/*!
+ * \brief Determine if the current thread is a SIP servant thread
+ *
+ * \retval 0 This is not a SIP servant thread
+ * \retval 1 This is a SIP servant thread
+ */
+int ast_sip_thread_is_servant(void);
 
 /*!
  * \brief SIP body description
@@ -726,4 +732,22 @@
  */
 void ast_copy_pj_str(char *dest, pj_str_t *src, size_t size);
 
+/*!
+ * \brief Get the looked-up endpoint on an out-of dialog request or response
+ *
+ * The function may ONLY be called on out-of-dialog requests or responses. For
+ * in-dialog requests and responses, it is required that the user of the dialog
+ * has the looked-up endpoint stored locally.
+ *
+ * This function should never return NULL if the message is out-of-dialog. It will
+ * always return NULL if the message is in-dialog.
+ *
+ * This function will increase the reference count of the returned endpoint by one.
+ * Release your reference using the ao2_ref function when finished.
+ *
+ * \param rdata Out-of-dialog request or response
+ * \return The looked up endpoint
+ */
+struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata);
+
 #endif /* _RES_SIP_H */

Modified: team/group/pimp_my_sip/include/asterisk/res_sip_session.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/res_sip_session.h?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/res_sip_session.h (original)
+++ team/group/pimp_my_sip/include/asterisk/res_sip_session.h Tue Feb 26 13:55:31 2013
@@ -82,8 +82,8 @@
 	struct ao2_container *datastores;
 	/* Media streams */
 	struct ast_sip_session_media media[AST_SIP_MEDIA_SIZE];
-	/* Workspace for tasks relating to this SIP session */
-	struct ast_sip_work *work;
+	/* Serializer for tasks relating to this SIP session */
+	struct ast_taskprocessor *serializer;
 };
 
 /*!
@@ -213,20 +213,27 @@
  *
  * This will take care of allocating the datastores container on the session as well
  * as placing all registered supplements onto the session.
+ *
+ * The endpoint that is passed in will have its reference count increased by one since
+ * the session will be keeping a reference to the endpoint. The session will relinquish
+ * this reference when the session is destroyed.
+ *
  * \param endpoint The endpoint that this session communicates with
  * \param inv_session The PJSIP INVITE session data
- * \param work SIP work queue to use for this session. May be NULL.
- */
-struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv, struct ast_sip_work *work);
+ */
+struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint, pjsip_inv_session *inv);
 
 /*!
  * \brief Create a new outgoing SIP session
+ *
+ * The endpoint that is passed in will have its reference count increased by one since
+ * the session will be keeping a reference to the endpoint. The session will relinquish
+ * this reference when the session is destroyed.
  *
  * \param endpoint The endpoint that this session uses for settings
  * \param uri The URI to call
- * \param work SIP work queue to use for this session. May be NULL.
- */
-struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri, struct ast_sip_work *work);
+ */
+struct ast_sip_session *ast_sip_session_create_outgoing(struct ast_sip_endpoint *endpoint, const char *uri);
 
 /*!
  * \brief Register an SDP handler

Modified: team/group/pimp_my_sip/res/pjproject/pjsip/include/pjsip/sip_endpoint.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/pjproject/pjsip/include/pjsip/sip_endpoint.h?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/res/pjproject/pjsip/include/pjsip/sip_endpoint.h (original)
+++ team/group/pimp_my_sip/res/pjproject/pjsip/include/pjsip/sip_endpoint.h Tue Feb 26 13:55:31 2013
@@ -201,6 +201,77 @@
 PJ_DECL(pj_status_t) pjsip_endpt_unregister_module( pjsip_endpoint *endpt,
 						    pjsip_module *module );
 
+/**
+ * This describes additional parameters to pjsip_endpt_process_rx_data()
+ * function. Application MUST call pjsip_process_rdata_param_default() to
+ * initialize this structure.
+ */
+typedef struct pjsip_process_rdata_param
+{
+    /**
+     * Specify the minimum priority number of the modules that are allowed
+     * to process the message. Default is zero to allow all modules to
+     * process the message.
+     */
+    unsigned start_prio;
+
+    /**
+     * Specify the pointer of the module where processing will start.
+     * The default is NULL, meaning processing will start from the start
+     * of the module list.
+     */
+    void *start_mod;
+
+    /**
+     * Set to N, then processing will start at Nth module after start
+     * module (where start module can be an explicit module as specified
+     * by \a start_mod or the start of module list when \a start_mod is
+     * NULL). For example, if set to 1, then processing will start from
+     * the next module after start module. Default is zero.
+     */
+    unsigned idx_after_start;
+
+    /**
+     * Print nothing to log. Default is PJ_FALSE.
+     */
+    pj_bool_t silent;
+
+} pjsip_process_rdata_param;
+
+/**
+ * Initialize with default.
+ *
+ * @param p     The param.
+ */
+PJ_DECL(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p);
+
+/**
+ * Manually distribute the specified pjsip_rx_data to registered modules.
+ * Normally application does not need to call this function because received
+ * messages will be given to endpoint automatically by transports.
+ *
+ * Application can use this function when it has postponed the processing of
+ * an incoming message, for example to perform long operations such as
+ * database operation or to consult other servers to decide what to do with
+ * the message. In this case, application clones the original rdata, return
+ * from the callback, and perform the long operation. Upon completing the
+ * long operation, it resumes pjsip's module processing by calling this
+ * function, and then free the cloned rdata.
+ *
+ * @param endpt         The endpoint instance.
+ * @param rdata         The rdata to be distributed.
+ * @param p             Optional pointer to param to specify from which module
+ *                      the processing should start.
+ * @param p_handled     Optional pointer to receive last return value of
+ *                      module's \a on_rx_request() or \a on_rx_response()
+ *                      callback.
+ *
+ * @return              PJ_SUCCESS on success.
+ */
+PJ_DECL(pj_status_t) pjsip_endpt_process_rx_data(pjsip_endpoint *endpt,
+                                                 pjsip_rx_data *rdata,
+                                                 pjsip_process_rdata_param *p,
+                                                 pj_bool_t *p_handled);
 
 /**
  * Create pool from the endpoint. All SIP components should allocate their

Modified: team/group/pimp_my_sip/res/pjproject/pjsip/src/pjsip/sip_endpoint.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/pjproject/pjsip/src/pjsip/sip_endpoint.c?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/res/pjproject/pjsip/src/pjsip/sip_endpoint.c (original)
+++ team/group/pimp_my_sip/res/pjproject/pjsip/src/pjsip/sip_endpoint.c Tue Feb 26 13:55:31 2013
@@ -795,6 +795,103 @@
     return endpt->timer_heap;
 }
 
+/* Init with default */ 
+PJ_DEF(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p) 
+{ 
+    pj_bzero(p, sizeof(*p)); 
+} 
+ 
+/* Distribute rdata */ 
+PJ_DEF(pj_status_t) pjsip_endpt_process_rx_data( pjsip_endpoint *endpt, 
+                                                 pjsip_rx_data *rdata, 
+                                                 pjsip_process_rdata_param *p, 
+                                                 pj_bool_t *p_handled) 
+{ 
+    pjsip_msg *msg; 
+    pjsip_process_rdata_param def_prm; 
+    pjsip_module *mod; 
+    pj_bool_t handled = PJ_FALSE; 
+    unsigned i; 
+    pj_status_t status; 
+ 
+    PJ_ASSERT_RETURN(endpt && rdata, PJ_EINVAL); 
+ 
+    if (p==NULL) { 
+        p = &def_prm; 
+        pjsip_process_rdata_param_default(p); 
+    } 
+ 
+    msg = rdata->msg_info.msg; 
+ 
+    if (p_handled) 
+        *p_handled = PJ_FALSE; 
+ 
+    if (!p->silent) { 
+        PJ_LOG(5, (THIS_FILE, "Distributing rdata to modules: %s", 
+                   pjsip_rx_data_get_info(rdata))); 
+        pj_log_push_indent(); 
+    } 
+ 
+    LOCK_MODULE_ACCESS(endpt); 
+ 
+    /* Find start module */ 
+    if (p->start_mod) { 
+        mod = pj_list_find_node(&endpt->module_list, p->start_mod); 
+        if (!mod) { 
+            status = PJ_ENOTFOUND; 
+            goto on_return; 
+        } 
+    } else { 
+        mod = endpt->module_list.next; 
+    } 
+ 
+    /* Start after the specified index */ 
+    for (i=0; i < p->idx_after_start && mod != &endpt->module_list; ++i) { 
+        mod = mod->next; 
+    } 
+ 
+    /* Start with the specified priority */ 
+    while (mod != &endpt->module_list && mod->priority < p->start_prio) { 
+        mod = mod->next; 
+    } 
+ 
+    if (mod == &endpt->module_list) { 
+        status = PJ_ENOTFOUND; 
+        goto on_return; 
+    } 
+ 
+    /* Distribute */ 
+    if (msg->type == PJSIP_REQUEST_MSG) { 
+        do { 
+            if (mod->on_rx_request) 
+                handled = (*mod->on_rx_request)(rdata); 
+            if (handled) 
+                break; 
+            mod = mod->next; 
+        } while (mod != &endpt->module_list); 
+    } else { 
+        do { 
+            if (mod->on_rx_response) 
+                handled = (*mod->on_rx_response)(rdata); 
+            if (handled) 
+                break; 
+            mod = mod->next; 
+        } while (mod != &endpt->module_list); 
+    } 
+ 
+    status = PJ_SUCCESS; 
+ 
+on_return: 
+    if (p_handled) 
+        *p_handled = handled; 
+ 
+    UNLOCK_MODULE_ACCESS(endpt); 
+    if (!p->silent) { 
+        pj_log_pop_indent(); 
+    } 
+    return status; 
+} 
+
 /*
  * This is the callback that is called by the transport manager when it 
  * receives a message from the network.
@@ -803,6 +900,8 @@
 				      pj_status_t status,
 				      pjsip_rx_data *rdata )
 {
+    pjsip_process_rdata_param proc_prm;
+    pj_bool_t handled = PJ_FALSE;
     pjsip_msg *msg = rdata->msg_info.msg;
 
     if (status != PJ_SUCCESS) {
@@ -910,57 +1009,19 @@
     }
 #endif
 
-
-    /* Distribute to modules, starting from modules with highest priority */
-    LOCK_MODULE_ACCESS(endpt);
-
-    if (msg->type == PJSIP_REQUEST_MSG) {
-	pjsip_module *mod;
-	pj_bool_t handled = PJ_FALSE;
-
-	mod = endpt->module_list.next;
-	while (mod != &endpt->module_list) {
-	    if (mod->on_rx_request)
-		handled = (*mod->on_rx_request)(rdata);
-	    if (handled)
-		break;
-	    mod = mod->next;
-	}
-
-	/* No module is able to handle the request. */
-	if (!handled) {
-	    PJ_TODO(ENDPT_RESPOND_UNHANDLED_REQUEST);
-	    PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled by"
-				 " any modules",
-				 pjsip_rx_data_get_info(rdata),
-				 rdata->pkt_info.src_name,
-				 rdata->pkt_info.src_port));
-	}
-
-    } else {
-	pjsip_module *mod;
-	pj_bool_t handled = PJ_FALSE;
-
-	mod = endpt->module_list.next;
-	while (mod != &endpt->module_list) {
-	    if (mod->on_rx_response)
-		handled = (*mod->on_rx_response)(rdata);
-	    if (handled)
-		break;
-	    mod = mod->next;
-	}
-
-	if (!handled) {
-	    PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled"
-				 " by any modules",
-				 pjsip_rx_data_get_info(rdata),
-				 rdata->pkt_info.src_name,
-				 rdata->pkt_info.src_port));
-	}
-    }
-
-    UNLOCK_MODULE_ACCESS(endpt);
-
+    pjsip_process_rdata_param_default(&proc_prm); 
+    proc_prm.silent = PJ_TRUE; 
+ 
+    pjsip_endpt_process_rx_data(endpt, rdata, &proc_prm, &handled); 
+ 
+    /* No module is able to handle the message */ 
+    if (!handled) { 
+	PJ_LOG(4,(THIS_FILE, "%s from %s:%d was dropped/unhandled by" 
+                             " any modules", 
+                             pjsip_rx_data_get_info(rdata), 
+                             rdata->pkt_info.src_name, 
+                             rdata->pkt_info.src_port)); 
+    }
     /* Must clear mod_data before returning rdata to transport, since
      * rdata may be reused.
      */

Modified: team/group/pimp_my_sip/res/res_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip.c?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/res/res_sip.c (original)
+++ team/group/pimp_my_sip/res/res_sip.c Tue Feb 26 13:55:31 2013
@@ -343,90 +343,29 @@
 	return 0;
 }
 
-struct ast_sip_work {
-	struct ast_taskprocessor *queue;
-};
-
-static int execute_tasks(void *data)
-{
-	struct ast_sip_work *work = data;
-	while (ast_taskprocessor_execute(work->queue)) {
-		/* Empty on purpose */
-	}
-	ao2_cleanup(work);
-	return 0;
-}
-
-static void work_queue_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
-	if (was_empty) {
-		struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
-		ao2_ref(work, +1);
-		ast_threadpool_push(sip_threadpool, execute_tasks, work);
-	}
-};
-
-static int work_queue_start(struct ast_taskprocessor_listener *listener)
-{
-	/* No-op */
-	return 0;
-}
-
-static void work_queue_shutdown(struct ast_taskprocessor_listener *listener)
-{
-	/* No-op */
-}
-
-static struct ast_taskprocessor_listener_callbacks sip_tps_listener_callbacks = {
-	.task_pushed = work_queue_task_pushed,
-	.start = work_queue_start,
-	.shutdown = work_queue_shutdown,
-};
-
-static void work_destroy(void *obj)
-{
-	struct ast_sip_work *work = obj;
-	ast_taskprocessor_unreference(work->queue);
-}
-
-struct ast_sip_work *ast_sip_create_work(void)
-{
-	struct ast_sip_work *work = ao2_alloc(sizeof(*work), work_destroy);
-	struct ast_uuid *uuid;
-	struct ast_taskprocessor_listener *listener;
-	char queue_name[AST_UUID_STR_LEN];
-	if (!work) {
+struct ast_taskprocessor *ast_sip_create_serializer(void)
+{
+	struct ast_taskprocessor *serializer;
+	RAII_VAR(struct ast_uuid *, uuid, ast_uuid_generate(), ast_free_ptr);
+	char name[AST_UUID_STR_LEN];
+
+	if (!uuid) {
 		return NULL;
 	}
-	/* Ugh, having to name taskprocessors really makes things annoying here.
-	 * For all intents and purposes, no one should even know this exists. What
-	 * we'll do is create a UUID and use that as the name.
-	 */
-	uuid = ast_uuid_generate();
-	if (!uuid) {
-		ao2_cleanup(work);
+
+	ast_uuid_to_str(uuid, name, sizeof(name));
+
+	serializer = ast_threadpool_serializer(name, sip_threadpool);
+	if (!serializer) {
 		return NULL;
 	}
-	ast_uuid_to_str(uuid, queue_name, sizeof(queue_name));
-	ast_free(uuid);
-	listener = ast_taskprocessor_listener_alloc(&sip_tps_listener_callbacks, work);
-	work->queue = ast_taskprocessor_create_with_listener(queue_name, listener);
-	ao2_cleanup(listener);
-	if (!work->queue) {
-		ao2_cleanup(work);
-		return NULL;
-	}
-	return work;
-}
-
-void ast_sip_destroy_work(struct ast_sip_work *work)
-{
-	ao2_cleanup(work);
-}
-
-int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
-{
-	if (work) {
-		return ast_taskprocessor_push(work->queue, sip_task, task_data);
+	return serializer;
+}
+
+int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
+{
+	if (serializer) {
+		return ast_taskprocessor_push(serializer, sip_task, task_data);
 	} else {
 		return ast_threadpool_push(sip_threadpool, sip_task, task_data);
 	}
@@ -453,7 +392,7 @@
 	return std->fail;
 }
 
-int ast_sip_push_task_synchronous(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
+int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
 {
 	/* This method is an onion */
 	struct sync_task_data std;
@@ -463,8 +402,8 @@
 	std.task = sip_task;
 	std.task_data = task_data;
 
-	if (work) {
-		if (ast_taskprocessor_push(work->queue, sync_task, &std)) {
+	if (serializer) {
+		if (ast_taskprocessor_push(serializer, sync_task, &std)) {
 			return -1;
 		}
 	} else {
@@ -511,27 +450,22 @@
 	pj_thread_join(monitor_thread);
 }
 
-static pj_bool_t unhandled_on_rx_request(pjsip_rx_data *rdata)
-{
-	if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_ack_method)) {
-		pjsip_endpt_respond_stateless(ast_pjsip_endpoint, rdata, 501, NULL, NULL, NULL);
-	}
-
-	return PJ_TRUE;
-}
-
-static pjsip_module unhandled_module = {
-	.name = { "Unhandled", 9 },
-	.priority = PJSIP_MOD_PRIORITY_APPLICATION + 32,
-	.on_rx_request = unhandled_on_rx_request,
-};
-
 AST_THREADSTORAGE(pj_thread_storage);
+AST_THREADSTORAGE(servant_id_storage);
+#define SIP_SERVANT_ID 0xDEFECA7E
 
 static void sip_thread_start(void)
 {
 	pj_thread_desc *desc;
 	pj_thread_t *thread;
+	uint32_t *servant_id;
+
+	servant_id = ast_threadstorage_get(&servant_id_storage, sizeof(*servant_id));
+	if (servant_id) {
+		ast_log(LOG_ERROR, "Could not set SIP servant ID in thread-local storage.\n");
+		return;
+	}
+	*servant_id = SIP_SERVANT_ID;
 
 	desc = ast_threadstorage_get(&pj_thread_storage, sizeof(pj_thread_desc));
 	if (!desc) {
@@ -543,6 +477,18 @@
 	if (pj_thread_register("Asterisk Thread", *desc, &thread) != PJ_SUCCESS) {
 		ast_log(LOG_ERROR, "Couldn't register thread with PJLIB.\n");
 	}
+}
+
+int ast_sip_thread_is_servant(void)
+{
+	uint32_t *servant_id;
+
+	servant_id = ast_threadstorage_get(&servant_id_storage, sizeof(*servant_id));
+	if (!servant_id) {
+		return 0;
+	}
+
+	return *servant_id == SIP_SERVANT_ID;
 }
 
 static int load_module(void)
@@ -601,8 +547,8 @@
 		goto error;
 	}
 
-	if (pjsip_endpt_register_module(ast_pjsip_endpoint, &unhandled_module)) {
-		ast_log(LOG_ERROR, "Failed to register unhandled request module. Aborting load\n");
+	if (ast_sip_initialize_distributor()) {
+		ast_log(LOG_ERROR, "Failed to register distributor module. Aborting load\n");
 		goto error;
 	}
 

Modified: team/group/pimp_my_sip/res/res_sip.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip.exports.in?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/res/res_sip.exports.in (original)
+++ team/group/pimp_my_sip/res/res_sip.exports.in Tue Feb 26 13:55:31 2013
@@ -6,8 +6,7 @@
 		LINKER_SYMBOL_PREFIXast_sip_unregister_authenticator;
 		LINKER_SYMBOL_PREFIXast_sip_register_endpoint_identifier;
 		LINKER_SYMBOL_PREFIXast_sip_unregister_endpoint_identifier;
-		LINKER_SYMBOL_PREFIXast_sip_create_work;
-		LINKER_SYMBOL_PREFIXast_sip_destroy_work;
+		LINKER_SYMBOL_PREFIXast_sip_create_serializer;
 		LINKER_SYMBOL_PREFIXast_sip_push_task;
 		LINKER_SYMBOL_PREFIXast_sip_push_task_synchronous;
 		LINKER_SYMBOL_PREFIXast_sip_send_request;
@@ -27,6 +26,8 @@
 		LINKER_SYMBOL_PREFIXast_sip_get_sorcery;
 		LINKER_SYMBOL_PREFIXast_sip_get_endpoint_from_location;
 		LINKER_SYMBOL_PREFIXast_sip_endpoint_get_location;
+		LINKER_SYMBOL_PREFIXast_pjsip_rdata_get_endpoint;
+		LINKER_SYMBOL_PREFIXast_sip_thread_is_servant;
 		LINKER_SYMBOL_PREFIXpj_*;
 		LINKER_SYMBOL_PREFIXpjsip_*;
 		LINKER_SYMBOL_PREFIXpjmedia_*;

Modified: team/group/pimp_my_sip/res/res_sip/config_transport.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip/config_transport.c?view=diff&rev=382112&r1=382111&r2=382112
==============================================================================
--- team/group/pimp_my_sip/res/res_sip/config_transport.c (original)
+++ team/group/pimp_my_sip/res/res_sip/config_transport.c Tue Feb 26 13:55:31 2013
@@ -38,15 +38,10 @@
 static void transport_state_destroy(void *obj)
 {
 	struct ast_sip_transport_state *state = obj;
-	struct ast_sip_work *work = ast_sip_create_work();
-	if (!work) {
-		ast_log(LOG_WARNING, "Unable to create work structure in order to shutdown transport\n");
-	}
 
 	if (state->transport) {
-		ast_sip_push_task_synchronous(work, destroy_transport_state, state->transport);
-	}
-	ast_sip_destroy_work(work);
+		ast_sip_push_task_synchronous(NULL, destroy_transport_state, state->transport);
+	}
 }
 
 /*! \brief Destructor for transport */

Added: team/group/pimp_my_sip/res/res_sip/sip_distributor.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip/sip_distributor.c?view=auto&rev=382112
==============================================================================
--- team/group/pimp_my_sip/res/res_sip/sip_distributor.c (added)
+++ team/group/pimp_my_sip/res/res_sip/sip_distributor.c Tue Feb 26 13:55:31 2013
@@ -1,0 +1,149 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * Mark Michelson <mmichelson at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#include "asterisk.h"
+#undef bzero
+#define bzero bzero
+#include "pjsip.h"
+
+#include "asterisk/res_sip.h"
+
+static int distribute(void *data);
+
+static pj_bool_t distributor(pjsip_rx_data *rdata)
+{
+	pjsip_rx_data *clone;
+	pjsip_rx_data_clone(rdata, 0, &clone);
+	ast_sip_push_task(NULL, distribute, clone);
+	return PJ_TRUE;
+}
+
+static pjsip_module distributor_mod = {
+	.name = {"Request Distributor", 19},
+	.priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 1,
+	.on_rx_request = distributor,
+	.on_rx_response = distributor,
+};
+
+static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
+
+static pjsip_module endpoint_mod = {
+	.name = {"Endpoint Identifier", 19},
+	.priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
+	.on_rx_request = endpoint_lookup,
+};
+
+static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
+{
+	struct ast_sip_endpoint *endpoint = ast_sip_identify_endpoint(rdata);
+	int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
+
+	if (!endpoint && !is_ack) {
+		/* XXX When we do an alwaysauthreject-like option, we'll need to take that into account
+		 * for this response. Either that, or have a pseudo-endpoint to pass along so that authentication
+		 * will fail
+		 */
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
+		return PJ_TRUE;
+	}
+	rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
+	return PJ_FALSE;
+}
+
+static pj_bool_t authenticate(pjsip_rx_data *rdata)
+{
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
+
+	ast_assert(endpoint != NULL);
+
+	if (ast_sip_requires_authentication(endpoint, rdata)) {
+		pjsip_tx_data *tdata;
+		pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
+		switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
+		case AST_SIP_AUTHENTICATION_CHALLENGE:
+			/* Send the 401 we created for them */
+			pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
+			return PJ_TRUE;
+		case AST_SIP_AUTHENTICATION_SUCCESS:
+			return PJ_FALSE;
+		case AST_SIP_AUTHENTICATION_FAILED:
+			pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
+			return PJ_TRUE;
+		case AST_SIP_AUTHENTICATION_ERROR:
+			pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
+			return PJ_TRUE;
+		}
+	}
+
+	return PJ_FALSE;
+}
+
+static pjsip_module auth_mod = {
+	.name = {"Request Authenticator", 21},
+	.priority = PJSIP_MOD_PRIORITY_APPLICATION - 1,

[... 510 lines stripped ...]



More information about the svn-commits mailing list