[svn-commits] mmichelson: branch mmichelson/pool_shark2 r381552 - in /team/mmichelson/pool_...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Fri Feb 15 10:57:48 CST 2013


Author: mmichelson
Date: Fri Feb 15 10:57:45 2013
New Revision: 381552

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381552
Log:
Add a distributor for inbound requests and responses.

This distributor sits underneath the PJSIP transaction layer
and moves all incoming requests and responses into the
SIP threadpool.

The purpose behind this is that we will soon be placing endpoint
identification and authentication at this same point so that
applications can grab the endpoint from the rdata and not have
to worry about authentication.

The distributor is at this part of the stack in order to ease
some complexity in processing. The distributor simply has to
be called into by the PJSIP endpoint. This prevents us from
having to also register the distributor as a transaction or
dialog user.

The reason we want to distribute the incoming messages into the
threadpool is that endpoint lookup and authentication can each
potentially take a (relatively) long time to complete if something
like a remote database lookup is part of the process. For long-running
tasks, we do not want to hold up incoming request processing by the
PJSIP endpoint, so we farm the task off to the threadpool instead.

The next step will be to add the endpoint identifier and authentication
process.


Added:
    team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c   (with props)
Modified:
    team/mmichelson/pool_shark2/include/asterisk/res_sip.h
    team/mmichelson/pool_shark2/res/pjproject/pjsip/include/pjsip/sip_endpoint.h
    team/mmichelson/pool_shark2/res/pjproject/pjsip/src/pjsip/sip_endpoint.c
    team/mmichelson/pool_shark2/res/res_sip.c

Modified: team/mmichelson/pool_shark2/include/asterisk/res_sip.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/pool_shark2/include/asterisk/res_sip.h?view=diff&rev=381552&r1=381551&r2=381552
==============================================================================
--- team/mmichelson/pool_shark2/include/asterisk/res_sip.h (original)
+++ team/mmichelson/pool_shark2/include/asterisk/res_sip.h Fri Feb 15 10:57:45 2013
@@ -442,6 +442,18 @@
 int ast_sip_initialize_sorcery_transport(struct ast_sorcery *sorcery);
 
 /*!
+ * \brief Initialize authentication support on a sorcery instance
+ *
+ * \param sorcery The sorcery instance
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
+
+int ast_sip_initialize_distributor(void);
+
+/*!
  * \page Threading model for SIP
  *
  * There are three major types of threads that SIP will have to deal with:
@@ -496,16 +508,6 @@
  */
 
 /*!
- * \brief Initialize authentication support on a sorcery instance
- *
- * \param sorcery The sorcery instance
- *
- * \retval -1 failure
- * \retval 0 success
- */
-int ast_sip_initialize_sorcery_auth(struct ast_sorcery *sorcery);
-
-/*!
  * \brief Create a new serializer for SIP tasks
  *
  * See \ref ast_threadpool_serializer for more information on serializers.

Modified: team/mmichelson/pool_shark2/res/pjproject/pjsip/include/pjsip/sip_endpoint.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/pool_shark2/res/pjproject/pjsip/include/pjsip/sip_endpoint.h?view=diff&rev=381552&r1=381551&r2=381552
==============================================================================
--- team/mmichelson/pool_shark2/res/pjproject/pjsip/include/pjsip/sip_endpoint.h (original)
+++ team/mmichelson/pool_shark2/res/pjproject/pjsip/include/pjsip/sip_endpoint.h Fri Feb 15 10:57:45 2013
@@ -201,6 +201,77 @@
 PJ_DECL(pj_status_t) pjsip_endpt_unregister_module( pjsip_endpoint *endpt,
 						    pjsip_module *module );
 
+/**
+ * This describes additional parameters to pjsip_endpt_process_rx_data()
+ * function. Application MUST call pjsip_process_rdata_param_default() to
+ * initialize this structure.
+ */
+typedef struct pjsip_process_rdata_param
+{
+    /**
+     * Specify the minimum priority number of the modules that are allowed
+     * to process the message. Default is zero to allow all modules to
+     * process the message.
+     */
+    unsigned start_prio;
+
+    /**
+     * Specify the pointer of the module where processing will start.
+     * The default is NULL, meaning processing will start from the start
+     * of the module list.
+     */
+    void *start_mod;
+
+    /**
+     * Set to N, then processing will start at Nth module after start
+     * module (where start module can be an explicit module as specified
+     * by \a start_mod or the start of module list when \a start_mod is
+     * NULL). For example, if set to 1, then processing will start from
+     * the next module after start module. Default is zero.
+     */
+    unsigned idx_after_start;
+
+    /**
+     * Print nothing to log. Default is PJ_FALSE.
+     */
+    pj_bool_t silent;
+
+} pjsip_process_rdata_param;
+
+/**
+ * Initialize with default.
+ *
+ * @param p     The param.
+ */
+PJ_DECL(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p);
+
+/**
+ * Manually distribute the specified pjsip_rx_data to registered modules.
+ * Normally application does not need to call this function because received
+ * messages will be given to endpoint automatically by transports.
+ *
+ * Application can use this function when it has postponed the processing of
+ * an incoming message, for example to perform long operations such as
+ * database operation or to consult other servers to decide what to do with
+ * the message. In this case, application clones the original rdata, return
+ * from the callback, and perform the long operation. Upon completing the
+ * long operation, it resumes pjsip's module processing by calling this
+ * function, and then free the cloned rdata.
+ *
+ * @param endpt         The endpoint instance.
+ * @param rdata         The rdata to be distributed.
+ * @param p             Optional pointer to param to specify from which module
+ *                      the processing should start.
+ * @param p_handled     Optional pointer to receive last return value of
+ *                      module's \a on_rx_request() or \a on_rx_response()
+ *                      callback.
+ *
+ * @return              PJ_SUCCESS on success.
+ */
+PJ_DECL(pj_status_t) pjsip_endpt_process_rx_data(pjsip_endpoint *endpt,
+                                                 pjsip_rx_data *rdata,
+                                                 pjsip_process_rdata_param *p,
+                                                 pj_bool_t *p_handled);
 
 /**
  * Create pool from the endpoint. All SIP components should allocate their

Modified: team/mmichelson/pool_shark2/res/pjproject/pjsip/src/pjsip/sip_endpoint.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/pool_shark2/res/pjproject/pjsip/src/pjsip/sip_endpoint.c?view=diff&rev=381552&r1=381551&r2=381552
==============================================================================
--- team/mmichelson/pool_shark2/res/pjproject/pjsip/src/pjsip/sip_endpoint.c (original)
+++ team/mmichelson/pool_shark2/res/pjproject/pjsip/src/pjsip/sip_endpoint.c Fri Feb 15 10:57:45 2013
@@ -795,6 +795,103 @@
     return endpt->timer_heap;
 }
 
+/* Init with default */ 
+PJ_DEF(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p) 
+{ 
+    pj_bzero(p, sizeof(*p)); 
+} 
+ 
+/* Distribute rdata */ 
+PJ_DEF(pj_status_t) pjsip_endpt_process_rx_data( pjsip_endpoint *endpt, 
+                                                 pjsip_rx_data *rdata, 
+                                                 pjsip_process_rdata_param *p, 
+                                                 pj_bool_t *p_handled) 
+{ 
+    pjsip_msg *msg; 
+    pjsip_process_rdata_param def_prm; 
+    pjsip_module *mod; 
+    pj_bool_t handled = PJ_FALSE; 
+    unsigned i; 
+    pj_status_t status; 
+ 
+    PJ_ASSERT_RETURN(endpt && rdata, PJ_EINVAL); 
+ 
+    if (p==NULL) { 
+        p = &def_prm; 
+        pjsip_process_rdata_param_default(p); 
+    } 
+ 
+    msg = rdata->msg_info.msg; 
+ 
+    if (p_handled) 
+        *p_handled = PJ_FALSE; 
+ 
+    if (!p->silent) { 
+        PJ_LOG(5, (THIS_FILE, "Distributing rdata to modules: %s", 
+                   pjsip_rx_data_get_info(rdata))); 
+        pj_log_push_indent(); 
+    } 
+ 
+    LOCK_MODULE_ACCESS(endpt); 
+ 
+    /* Find start module */ 
+    if (p->start_mod) { 
+        mod = pj_list_find_node(&endpt->module_list, p->start_mod); 
+        if (!mod) { 
+            status = PJ_ENOTFOUND; 
+            goto on_return; 
+        } 
+    } else { 
+        mod = endpt->module_list.next; 
+    } 
+ 
+    /* Start after the specified index */ 
+    for (i=0; i < p->idx_after_start && mod != &endpt->module_list; ++i) { 
+        mod = mod->next; 
+    } 
+ 
+    /* Start with the specified priority */ 
+    while (mod != &endpt->module_list && mod->priority < p->start_prio) { 
+        mod = mod->next; 
+    } 
+ 
+    if (mod == &endpt->module_list) { 
+        status = PJ_ENOTFOUND; 
+        goto on_return; 
+    } 
+ 
+    /* Distribute */ 
+    if (msg->type == PJSIP_REQUEST_MSG) { 
+        do { 
+            if (mod->on_rx_request) 
+                handled = (*mod->on_rx_request)(rdata); 
+            if (handled) 
+                break; 
+            mod = mod->next; 
+        } while (mod != &endpt->module_list); 
+    } else { 
+        do { 
+            if (mod->on_rx_response) 
+                handled = (*mod->on_rx_response)(rdata); 
+            if (handled) 
+                break; 
+            mod = mod->next; 
+        } while (mod != &endpt->module_list); 
+    } 
+ 
+    status = PJ_SUCCESS; 
+ 
+on_return: 
+    if (p_handled) 
+        *p_handled = handled; 
+ 
+    UNLOCK_MODULE_ACCESS(endpt); 
+    if (!p->silent) { 
+        pj_log_pop_indent(); 
+    } 
+    return status; 
+} 
+
 /*
  * This is the callback that is called by the transport manager when it 
  * receives a message from the network.
@@ -803,6 +900,8 @@
 				      pj_status_t status,
 				      pjsip_rx_data *rdata )
 {
+    pjsip_process_rdata_param proc_prm;
+    pj_bool_t handled = PJ_FALSE;
     pjsip_msg *msg = rdata->msg_info.msg;
 
     if (status != PJ_SUCCESS) {
@@ -910,57 +1009,19 @@
     }
 #endif
 
-
-    /* Distribute to modules, starting from modules with highest priority */
-    LOCK_MODULE_ACCESS(endpt);
-
-    if (msg->type == PJSIP_REQUEST_MSG) {
-	pjsip_module *mod;
-	pj_bool_t handled = PJ_FALSE;
-
-	mod = endpt->module_list.next;
-	while (mod != &endpt->module_list) {
-	    if (mod->on_rx_request)
-		handled = (*mod->on_rx_request)(rdata);
-	    if (handled)
-		break;
-	    mod = mod->next;
-	}
-
-	/* No module is able to handle the request. */
-	if (!handled) {
-	    PJ_TODO(ENDPT_RESPOND_UNHANDLED_REQUEST);
-	    PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled by"
-				 " any modules",
-				 pjsip_rx_data_get_info(rdata),
-				 rdata->pkt_info.src_name,
-				 rdata->pkt_info.src_port));
-	}
-
-    } else {
-	pjsip_module *mod;
-	pj_bool_t handled = PJ_FALSE;
-
-	mod = endpt->module_list.next;
-	while (mod != &endpt->module_list) {
-	    if (mod->on_rx_response)
-		handled = (*mod->on_rx_response)(rdata);
-	    if (handled)
-		break;
-	    mod = mod->next;
-	}
-
-	if (!handled) {
-	    PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled"
-				 " by any modules",
-				 pjsip_rx_data_get_info(rdata),
-				 rdata->pkt_info.src_name,
-				 rdata->pkt_info.src_port));
-	}
-    }
-
-    UNLOCK_MODULE_ACCESS(endpt);
-
+    pjsip_process_rdata_param_default(&proc_prm); 
+    proc_prm.silent = PJ_TRUE; 
+ 
+    pjsip_endpt_process_rx_data(endpt, rdata, &proc_prm, &handled); 
+ 
+    /* No module is able to handle the message */ 
+    if (!handled) { 
+	PJ_LOG(4,(THIS_FILE, "%s from %s:%d was dropped/unhandled by" 
+                             " any modules", 
+                             pjsip_rx_data_get_info(rdata), 
+                             rdata->pkt_info.src_name, 
+                             rdata->pkt_info.src_port)); 
+    }
     /* Must clear mod_data before returning rdata to transport, since
      * rdata may be reused.
      */

Modified: team/mmichelson/pool_shark2/res/res_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/pool_shark2/res/res_sip.c?view=diff&rev=381552&r1=381551&r2=381552
==============================================================================
--- team/mmichelson/pool_shark2/res/res_sip.c (original)
+++ team/mmichelson/pool_shark2/res/res_sip.c Fri Feb 15 10:57:45 2013
@@ -540,6 +540,11 @@
 		goto error;
 	}
 
+	if (ast_sip_initialize_distributor()) {
+		ast_log(LOG_ERROR, "Failed to register distributor module. Aborting load\n");
+		goto error;
+	}
+
 	if (pjsip_endpt_register_module(ast_pjsip_endpoint, &unhandled_module)) {
 		ast_log(LOG_ERROR, "Failed to register unhandled request module. Aborting load\n");
 		goto error;

Added: team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c?view=auto&rev=381552
==============================================================================
--- team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c (added)
+++ team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c Fri Feb 15 10:57:45 2013
@@ -1,0 +1,64 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * Mark Michelson <mmichelson at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#include "asterisk.h"
+#undef bzero
+#define bzero bzero
+#include "pjsip.h"
+
+#include "asterisk/res_sip.h"
+
+static int distribute(void *data);
+
+static pj_bool_t cloner(pjsip_rx_data *rdata)
+{
+	pjsip_rx_data *clone;
+	pjsip_rx_data_clone(rdata, 0, &clone);
+	ast_log(LOG_NOTICE, "Pushing clone task yo!\n");
+	ast_sip_push_task(NULL, distribute, clone);
+	return PJ_TRUE;
+}
+
+pjsip_module cloner_mod = {
+	.name = {"Request Distributor", 19},
+	.priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 1,
+	.on_rx_request = cloner,
+	.on_rx_response = cloner,
+};
+
+static int distribute(void *data)
+{
+	static pjsip_process_rdata_param param = {
+		.start_mod = &cloner_mod,
+		.idx_after_start = 1,
+	};
+	pj_bool_t handled;
+	pjsip_rx_data *rdata = data;
+	ast_log(LOG_NOTICE, "And now I be distroing!\n");
+	pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
+	if (!handled) {
+	    /* This will never happen right now due to the Unhandled module */
+	}
+	pjsip_rx_data_free_cloned(rdata);
+	return 0;
+}
+
+int ast_sip_initialize_distributor(void)
+{
+	return ast_sip_register_service(&cloner_mod);
+}

Propchange: team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/mmichelson/pool_shark2/res/res_sip/sip_distributor.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain




More information about the svn-commits mailing list