[asterisk-commits] dhubbard: branch dhubbard/named_processors r108891 - in /team/dhubbard/named_...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 14 22:11:11 CDT 2008


Author: dhubbard
Date: Fri Mar 14 22:11:10 2008
New Revision: 108891

URL: http://svn.digium.com/view/asterisk?view=rev&rev=108891
Log:
get rid of task.c and task.h

Removed:
    team/dhubbard/named_processors/include/asterisk/task.h
    team/dhubbard/named_processors/main/task.c
Modified:
    team/dhubbard/named_processors/apps/app_queue.c
    team/dhubbard/named_processors/include/asterisk/taskprocessor.h
    team/dhubbard/named_processors/main/Makefile
    team/dhubbard/named_processors/main/taskprocessor.c
    team/dhubbard/named_processors/res/res_testobserver.c
    team/dhubbard/named_processors/res/sandbox/simobject.c
    team/dhubbard/named_processors/res/sandbox/taskconsumer.c

Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Fri Mar 14 22:11:10 2008
@@ -93,7 +93,6 @@
 #include "asterisk/strings.h"
 #include "asterisk/global_datastores.h"
 #include "asterisk/taskprocessor.h"
-#include "asterisk/task.h"
 
 /*!
  * \par Please read before modifying this file.
@@ -791,10 +790,10 @@
 	}
 	sc->state = state;
 	strcpy(sc->dev, device);
-	t = get_available_task(handle_statechange, sc, "app_queue-device_state_cb");
+	t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
 	if (tpsp->queue_task(tpsp, t) < 0) {
 		ast_log(LOG_WARNING, "queue_task failed!!\n");
-		release_task(t);
+		ast_task_free(t);
 	}
 }
 

Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Fri Mar 14 22:11:10 2008
@@ -23,6 +23,17 @@
 
 #ifndef __taskprocessor_h__
 #define __taskprocessor_h__
+
+struct a_task {
+	int (* execute)(struct a_task* t);
+	void *_datap;
+	size_t _datapsize;
+	void* _p_producer;
+	void* _p_consumer;
+	time_t _timestamp;
+	char _source[256];
+	AST_LIST_ENTRY(a_task) list;
+};
 
 struct taskprocessor_singleton_stats {
 	unsigned long _max_qsize;
@@ -56,6 +67,22 @@
 
 unsigned char _evtq_poll_thread_run;
 
+/*
+ * get an available a_task structure from the pool of available
+ * structures.  If no structures are available, allocate one
+ */
+struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src);
+int task_pool_size(void);
+/*
+ * release a_task structure back to the pool
+ */
+int ast_task_free(struct a_task* task);
+
+int stop_taskpool(void);
+int destroy_task_pool(void);
+
+int noop_task_execute(struct a_task* t);
+
 int create_taskprocessor_singleton(const char* name, void* (*func)(void*));
 int start_taskprocessor_singleton(const char* name);
 int size_of_taskprocessor_singleton_list(void);

Modified: team/dhubbard/named_processors/main/Makefile
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/Makefile?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/main/Makefile (original)
+++ team/dhubbard/named_processors/main/Makefile Fri Mar 14 22:11:10 2008
@@ -30,7 +30,7 @@
 	cryptostub.o sha1.o http.o fixedjitterbuf.o abstract_jb.o \
 	strcompat.o threadstorage.o dial.o event.o adsistub.o audiohook.o \
 	astobj2.o hashtab.o global_datastores.o $(RESAMPLE_OBJS) version.o \
-	features.o task.o taskprocessor.o
+	features.o taskprocessor.o
 
 # we need to link in the objects statically, not as a library, because
 # otherwise modules will not have them available if none of the static

Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Fri Mar 14 22:11:10 2008
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2007, Dwayne M. Hubbard 
+ * Copyright (C) 2007-2008, Dwayne M. Hubbard 
  *
  * Dwayne M. Hubbard <dhubbard at digium.com>
  *
@@ -21,7 +21,6 @@
 #include <asterisk.h>
 #include <asterisk/astobj2.h>
 #include <asterisk/cli.h>
-#include <asterisk/task.h>
 #include <asterisk/taskprocessor.h>
 #include <signal.h>
 #include <sys/time.h>
@@ -30,24 +29,27 @@
 
 #define DEFAULT_POLL_FREQUENCY 1 
 
+static int _global_killflag = 0;
 AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
-static int _global_killflag = 0;
-
+
+static int _global_clireg = 0;
 AST_MUTEX_DEFINE_STATIC(_global_clireg_lock);
-static int _global_clireg = 0;
 
 pthread_attr_t _attribute[100];
 AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
 static int _taskprocessor_singletons_list_size = 0;
+
+static long _task_pool_size = 0;
+AST_LIST_HEAD_STATIC(_task_pool, a_task);
+
+static int _global_kill_taskpool = 0;
+AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
 
 static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
 static void destroy_taskprocessor_singleton(void *tps);
 static int remove_taskprocessor_singleton(const char* name);
 static int taskprocessor_ping(struct a_task* e);
 
-/*********
- *  CLI
- *********/
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
@@ -55,6 +57,141 @@
 	AST_CLI_DEFINE(cli_taskprocessor_ping, "Ping a named task processors"),
 	AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
 };
+
+int noop_task_execute(struct a_task* t)
+{
+	ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
+	return 0;
+}
+
+/*
+ * get an available a_task structure from the pool of available
+ * structures.  If no structures are available, allocate one
+ */
+struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src)
+{
+	struct a_task *t;
+	int lock_failures = 0;
+
+	AST_LIST_LOCK(&_task_pool);
+	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&_task_pool);
+		usleep(1);
+		if (lock_failures > 10) {
+			ast_log(LOG_ERROR, "\n\tcannot lock task pool for \'%s\'.\n", src);
+			return NULL;
+		}
+		AST_LIST_LOCK(&_task_pool);
+	}
+	/* If the kill_taskpool flag is TRUE, then we should not give any more pooled tasks to requestors */
+	if (_global_kill_taskpool) {
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		ast_log(LOG_NOTICE, "task pool is no longer available to requestors.\n");
+		return NULL;
+	}
+	if (AST_LIST_EMPTY(&_task_pool)) {
+		t = ast_calloc(1,sizeof(*t));
+		if (!t) {
+			ast_log(LOG_ERROR, "Poop.  ast_calloc failed to get a task for \'%s\'\n", src);
+			ast_mutex_unlock(&_global_kill_taskpool_lock);
+			AST_LIST_UNLOCK(&_task_pool);
+			return NULL;
+		}
+		t->execute = task_exe;
+		t->_datap = datap;
+		strncpy(t->_source, src, sizeof(t->_source));
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		return t;
+	}
+	t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+	_task_pool_size -= 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	AST_LIST_UNLOCK(&_task_pool);
+	if (t) {
+		t->execute = task_exe;
+		t->_datap = datap;
+		strncpy(t->_source, src, sizeof(t->_source));
+	}
+	return t;
+}
+	
+/*
+ * release a_task structure back to the pool
+ */
+int ast_task_free(struct a_task* t)
+{
+	int lock_failures = 0;
+	if (t->_datap) {
+		ast_free(t->_datap);
+	}
+	memset(t, 0, sizeof(struct a_task));
+	AST_LIST_LOCK(&_task_pool);
+	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&_task_pool);
+		usleep(1);
+		if (lock_failures > 10) {
+			ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+			return 0;
+		}
+		AST_LIST_LOCK(&_task_pool);
+	}
+	if (_global_kill_taskpool != 0) {
+		ast_log(LOG_NOTICE, "task pool is no longer available for task releasing\n");
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		return 0;
+	}
+	AST_LIST_INSERT_TAIL(&_task_pool, t, list);
+	_task_pool_size += 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	AST_LIST_UNLOCK(&_task_pool);
+	return 0;
+}
+
+/*
+ * stop the task pool
+ */
+int stop_taskpool()
+{
+	ast_mutex_lock(&_global_kill_taskpool_lock);
+	_global_kill_taskpool = 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	return 0;
+}
+
+int task_pool_size()
+{
+	int size = 0;
+	AST_LIST_LOCK(&_task_pool);
+	size = _task_pool_size;
+	AST_LIST_UNLOCK(&_task_pool);
+	return size;
+}
+
+/*
+ * destroy a_task pool
+ */
+int destroy_task_pool(void)
+{
+	struct a_task* t = NULL;
+
+	AST_LIST_LOCK(&_task_pool);
+	while (!AST_LIST_EMPTY(&_task_pool)) {
+		t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+		if (t) {
+			ast_free(t);
+			t = NULL;
+		}
+		_task_pool_size -= 1;
+	}
+	AST_LIST_UNLOCK(&_task_pool);
+	return 0;
+}
+
 
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
@@ -92,14 +229,14 @@
 		return CLI_SUCCESS;
 	}
 
-	t = get_available_task(taskprocessor_ping, 0, "cli_taskprocessor_ping");
+	t = ast_task_alloc(taskprocessor_ping, 0, "cli_taskprocessor_ping");
 	if (!t) {
 		ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
 		return RESULT_SUCCESS;	
 	}
 	if (push_task_to_taskprocessor_singleton(p, t) < 0) {
 		ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
-		release_task(t);
+		ast_task_free(t);
 	}
 	return RESULT_SUCCESS;	
 }
@@ -146,14 +283,12 @@
 
 static void* default_taskprocessor_thread_function(void* data)
 {
-	struct taskprocessor_singleton_info* i = NULL;
-	struct a_task* t = NULL;
-	unsigned long size = 0;
-	unsigned long task_count = 0;
-	long qsize = 0;
+	struct taskprocessor_singleton_info* i;
+	struct a_task* t;
+	long size;
+	struct timeval tv;
 	int killflag = 0;
 	struct timespec ts = { 0, };
-	struct timeval tv;
 
 	i = (struct taskprocessor_singleton_info*)data;
 	if (!i) {
@@ -166,12 +301,12 @@
 			/* stuff is in the queue */
 			t = pop_task_from_taskprocessor_singleton(i);
 			if (!t) {
-				ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+				ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", size);
 				continue;
 			}
 			if (!t->execute) {
 				ast_log(LOG_ERROR, "task is missing its execute callback.\n");
-				release_task(t);
+				ast_task_free(t);
 				continue;
 			}
 			if (t->execute(t) < 0) {
@@ -183,10 +318,9 @@
 				if (size > i->_stats->_max_qsize) {
 					i->_stats->_max_qsize = size;
 				}
-				task_count = i->_stats->_tasks_processed_count;
 			}
 			ast_mutex_unlock(&i->_taskprocessor_lock);
-			release_task(t);
+			ast_task_free(t);
 			t = NULL;
 		
 			ast_mutex_lock(&_global_killflag_lock);
@@ -210,7 +344,7 @@
 		/* stuff is in the queue */
 		t = pop_task_from_taskprocessor_singleton(i);
 		if (t) {
-			release_task(t);
+			ast_task_free(t);
 			t = NULL;
 		}
 	}
@@ -444,7 +578,7 @@
 		lock_failures++;
 		AST_LIST_UNLOCK(&tp->_queue);
 		usleep(1);
-		if (lock_failures > 10000) {
+		if (lock_failures > 10) {
 			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
 			return -1;
 		}
@@ -468,7 +602,7 @@
 		lock_failures++;
 		AST_LIST_UNLOCK(&tp->_queue);
 		usleep(1);
-		if (lock_failures > 10000) {
+		if (lock_failures > 10) {
 			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
 			return t;
 		}
@@ -493,7 +627,7 @@
 		lock_failures++;
 		AST_LIST_UNLOCK(&tp->_queue);
 		usleep(1);
-		if (lock_failures > 10000) {
+		if (lock_failures > 10) {
 			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
 			return size;
 		}

Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Fri Mar 14 22:11:10 2008
@@ -49,7 +49,6 @@
 #include <asterisk/options.h>
 #include <asterisk/config.h>
 #include <asterisk/astobj2.h>
-#include <asterisk/task.h>
 #include <asterisk/taskprocessor.h>
 
 #include "sandbox/include/sandbox.h"
@@ -165,7 +164,7 @@
 			}
 			if (!t->execute) {
 				ast_log(LOG_ERROR, "task is missing its execute callback.\n");
-				release_task(t);
+				ast_task_free(t);
 				continue;
 			}
 			if (t->execute(t) < 0) {
@@ -180,7 +179,7 @@
 				task_count = i->_stats->_tasks_processed_count;
 			}
 			ast_mutex_unlock(&i->_taskprocessor_lock);
-			release_task(t);
+			ast_task_free(t);
 			t = NULL;
 		
 			ast_mutex_lock(&_global_killflag_lock);
@@ -206,7 +205,7 @@
 		/* stuff is in the queue */
 		t = pop_task_from_taskprocessor_singleton(i);
 		if (t) {
-			release_task(t);
+			ast_task_free(t);
 			t = NULL;
 		}
 	}

Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Fri Mar 14 22:11:10 2008
@@ -16,7 +16,6 @@
  * at the top of the source tree.
  */
 #include <asterisk.h>
-#include <asterisk/task.h>
 #include <asterisk/taskprocessor.h>
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
@@ -125,7 +124,7 @@
 	struct a_task* t = NULL;
 	
 	if (s && s->_producer) {
-		t = get_available_task(simobject_taskhandler, 0, "start_simobject");
+		t = ast_task_alloc(simobject_taskhandler, 0, "start_simobject");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			return -1;
@@ -134,7 +133,7 @@
 		t->_p_consumer = s->_consumer;
 		if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!  Releasing task back to task pool\n", s->_name);
-			release_task(t);
+			ast_task_free(t);
 			return -1;
 		}
 		ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
@@ -147,7 +146,7 @@
 	struct a_task* t = NULL;
 	
 	if (s && s->_producer) {
-		t = get_available_task(simobject_taskhandler, 0, "start_simproducer");
+		t = ast_task_alloc(simobject_taskhandler, 0, "start_simproducer");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			return -1;
@@ -230,7 +229,7 @@
 			ast_log(LOG_ERROR, "Huh? we have no private simobject pointer?  This is bad!\n");
 			break;
 		}
-		t = get_available_task(simobject_taskhandler, 0, "_simproducer_thread_function");
+		t = ast_task_alloc(simobject_taskhandler, 0, "_simproducer_thread_function");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			break;
@@ -242,7 +241,7 @@
 		} else {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)!  Releasing task to task pool\n"
 				, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
-			release_task(t);
+			ast_task_free(t);
 			break;
 		}
 		ast_mutex_lock(&i->_taskprocessor_lock);

Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Fri Mar 14 22:11:10 2008
@@ -22,7 +22,6 @@
  *  its event.
  */
 #include <asterisk.h>
-#include <asterisk/task.h>
 #include <asterisk/taskprocessor.h>
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 




More information about the asterisk-commits mailing list