[asterisk-commits] mmichelson: branch group/pimp_my_sip r379494 - in /team/group/pimp_my_sip: in...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Jan 18 16:33:16 CST 2013


Author: mmichelson
Date: Fri Jan 18 16:33:12 2013
New Revision: 379494

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=379494
Log:
Add threadpool support to res_sip.

Here's how it works:

An ast_sip_work contains a taskprocessor. When a session
wishes to do work, it queues a task onto this taskprocessor.
When the taskprocessor listener is told that a task has been pushed,
it reacts by enqueueing itself onto the threadpool. The threadpool's
threads then pull tasks out of the ast_sip_work's taskprocessor
until it's empty.

What this does is make all tasks pertaining to a session get executed
sequentially by whatever thread is currently available in the threadpool.

The threadpool itself has hardcoded options in it for now. This will be
controllable via general settings for res_sip.


Modified:
    team/group/pimp_my_sip/include/asterisk/res_sip_session.h
    team/group/pimp_my_sip/main/taskprocessor.c
    team/group/pimp_my_sip/res/res_sip.c
    team/group/pimp_my_sip/res/res_sip_session.c

Modified: team/group/pimp_my_sip/include/asterisk/res_sip_session.h
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/include/asterisk/res_sip_session.h?view=diff&rev=379494&r1=379493&r2=379494
==============================================================================
--- team/group/pimp_my_sip/include/asterisk/res_sip_session.h (original)
+++ team/group/pimp_my_sip/include/asterisk/res_sip_session.h Fri Jan 18 16:33:12 2013
@@ -63,6 +63,8 @@
 	struct ao2_container *datastores;
     /* Media information */
     struct ast_sip_session_media media;
+	/* Workspace for tasks relating to this SIP session */
+	struct ast_sip_work *work;
 };
 
 /*!

Modified: team/group/pimp_my_sip/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/main/taskprocessor.c?view=diff&rev=379494&r1=379493&r2=379494
==============================================================================
--- team/group/pimp_my_sip/main/taskprocessor.c (original)
+++ team/group/pimp_my_sip/main/taskprocessor.c Fri Jan 18 16:33:12 2013
@@ -604,7 +604,6 @@
 	/* Unref listener here since the taskprocessor has gained a reference to the listener */
 	ao2_ref(listener, -1);
 	return p;
-
 }
 
 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)

Modified: team/group/pimp_my_sip/res/res_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip.c?view=diff&rev=379494&r1=379493&r2=379494
==============================================================================
--- team/group/pimp_my_sip/res/res_sip.c (original)
+++ team/group/pimp_my_sip/res/res_sip.c Fri Jan 18 16:33:12 2013
@@ -31,12 +31,17 @@
 #include "asterisk/utils.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/module.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/uuid.h"
 
 /*** MODULEINFO
 	<support_level>core</support_level>
  ***/
 
 static pjsip_endpoint *ast_pjsip_endpoint;
+
+static struct ast_threadpool *sip_threadpool;
 
 int ast_sip_register_service(pjsip_module *module)
 {
@@ -383,21 +388,78 @@
 	return 0;
 }
 
+struct ast_sip_work {
+	struct ast_taskprocessor *queue;
+};
+
+static int execute_tasks(void *data)
+{
+	struct ast_sip_work *work = data;
+	while (ast_taskprocessor_execute(work->queue));
+	return 0;
+}
+
+static void work_queue_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
+	struct ast_sip_work *work = ast_taskprocessor_listener_get_user_data(listener);
+	ast_threadpool_push(sip_threadpool, execute_tasks, work);
+};
+
+static int work_queue_start(struct ast_taskprocessor_listener *listener)
+{
+	/* No-op */
+	return 0;
+}
+
+static void work_queue_shutdown(struct ast_taskprocessor_listener *listener)
+{
+	/* No-op */
+}
+
+static struct ast_taskprocessor_listener_callbacks sip_tps_listener_callbacks = {
+	.task_pushed = work_queue_task_pushed,
+	.start = work_queue_start,
+	.shutdown = work_queue_shutdown,
+};
+
 struct ast_sip_work *ast_sip_create_work(void)
 {
-	/* XXX stub */
-	return NULL;
+	struct ast_sip_work *work = ast_calloc(1, sizeof(*work));
+	struct ast_uuid *uuid;
+	struct ast_taskprocessor_listener *listener;
+	char queue_name[AST_UUID_STR_LEN];
+	if (!work) {
+		return NULL;
+	}
+	/* Ugh, having to name taskprocessors really makes things annoying here.
+	 * For all intents and purposes, no one should even know this exists. What
+	 * we'll do is create a UUID and use that as the name.
+	 */
+	uuid = ast_uuid_generate();
+	if (!uuid) {
+		ast_free(work);
+		return NULL;
+	}
+	ast_uuid_to_str(uuid, queue_name, sizeof(queue_name));
+	ast_free(uuid);
+	listener = ast_taskprocessor_listener_alloc(&sip_tps_listener_callbacks, work);
+	work->queue = ast_taskprocessor_create_with_listener(queue_name, listener);
+	ao2_cleanup(listener);
+	if (!work->queue) {
+		ast_free(work);
+		return NULL;
+	}
+	return work;
 }
 
 void ast_sip_destroy_work(struct ast_sip_work *work)
 {
-	/* XXX stub */
+	ast_taskprocessor_unreference(work->queue);
+	ast_free(work);
 }
 
 int ast_sip_push_task(struct ast_sip_work *work, int (*sip_task)(void *), void *task_data)
 {
-	/* XXX stub */
-	return 0;
+	return ast_taskprocessor_push(work->queue, sip_task, task_data);
 }
 
 pj_caching_pool caching_pool;
@@ -442,14 +504,27 @@
 	 */
 	pj_status_t status;
 
-        if (pj_init() != PJ_SUCCESS) {
-                return AST_MODULE_LOAD_DECLINE;
-        }
-
-        if (pjlib_util_init() != PJ_SUCCESS) {
-                pj_shutdown();
-                return AST_MODULE_LOAD_DECLINE;
-        }
+	/* XXX For the time being, create hard-coded threadpool
+	 * options. Just bump up by five threads every time we
+	 * don't have any available threads. Idle threads time
+	 * out after a minute. No maximum size
+	 */
+	struct ast_threadpool_options options = {
+		.auto_increment = 5,
+		.max_size = 0,
+		.idle_timeout = 60,
+		.initial_size = 0,
+	};
+	sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
+
+	if (pj_init() != PJ_SUCCESS) {
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
+	if (pjlib_util_init() != PJ_SUCCESS) {
+		pj_shutdown();
+		return AST_MODULE_LOAD_DECLINE;
+	}
 
 	pj_caching_pool_init(&caching_pool, NULL, 1024 * 1024);
 	pjsip_endpt_create(&caching_pool.factory, "SIP", &ast_pjsip_endpoint);

Modified: team/group/pimp_my_sip/res/res_sip_session.c
URL: http://svnview.digium.com/svn/asterisk/team/group/pimp_my_sip/res/res_sip_session.c?view=diff&rev=379494&r1=379493&r2=379494
==============================================================================
--- team/group/pimp_my_sip/res/res_sip_session.c (original)
+++ team/group/pimp_my_sip/res/res_sip_session.c Fri Jan 18 16:33:12 2013
@@ -408,6 +408,7 @@
 		supplement->session_end(session);
 		ast_free(supplement);
 	}
+	ast_sip_destroy_work(session->work);
 	ao2_cleanup(session->datastores);
 	AST_LIST_HEAD_DESTROY(&session->supplements);
 }
@@ -439,6 +440,7 @@
 	if (!session->datastores) {
 		return NULL;
 	}
+	session->work = ast_sip_create_work();
 	session->endpoint = endpoint;
 	session->inv_session = inv_session;
 	if (add_supplements(session)) {




More information about the asterisk-commits mailing list