[asterisk-commits] dlee: branch dlee/stasis-forward-optimization r399655 - in /team/dlee/stasis-...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Sep 24 01:01:55 CDT 2013
Author: dlee
Date: Tue Sep 24 01:01:53 2013
New Revision: 399655
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399655
Log:
Merged revisions 399651-399654 from http://svn.asterisk.org/svn/asterisk/team/dlee/taskprocessor-optimization
Modified:
team/dlee/stasis-forward-optimization/ (props changed)
team/dlee/stasis-forward-optimization/include/asterisk/stasis_message_router.h
team/dlee/stasis-forward-optimization/include/asterisk/taskprocessor.h
team/dlee/stasis-forward-optimization/main/stasis_message_router.c
team/dlee/stasis-forward-optimization/main/taskprocessor.c
team/dlee/stasis-forward-optimization/tests/test_stasis.c
Propchange: team/dlee/stasis-forward-optimization/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Sep 24 01:01:53 2013
@@ -1,1 +1,1 @@
-/team/dlee/taskprocessor-optimization:1-399638
+/team/dlee/taskprocessor-optimization:1-399654
Modified: team/dlee/stasis-forward-optimization/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/stasis_message_router.h?view=diff&rev=399655&r1=399654&r2=399655
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/stasis-forward-optimization/include/asterisk/stasis_message_router.h Tue Sep 24 01:01:53 2013
@@ -100,6 +100,9 @@
* updates for types not handled by routes added with
* stasis_message_router_add_cache_update().
*
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -120,6 +123,9 @@
* A particular \a message_type may have at most one cache route per \a router.
* These are distinct from regular routes, so one could have both a regular
* route and a cache route for the same \a message_type.
+ *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
*
* \param router Router to add the route to.
* \param message_type Subtype of cache update to route.
Modified: team/dlee/stasis-forward-optimization/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/taskprocessor.h?view=diff&rev=399655&r1=399654&r2=399655
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/taskprocessor.h (original)
+++ team/dlee/stasis-forward-optimization/include/asterisk/taskprocessor.h Tue Sep 24 01:01:53 2013
@@ -109,6 +109,7 @@
* \param listener The listener
*/
void (*shutdown)(struct ast_taskprocessor_listener *listener);
+ void (*dtor)(struct ast_taskprocessor_listener *listener);
};
/*!
Modified: team/dlee/stasis-forward-optimization/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis_message_router.c?view=diff&rev=399655&r1=399654&r2=399655
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis_message_router.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis_message_router.c Tue Sep 24 01:01:53 2013
@@ -53,12 +53,28 @@
struct stasis_message_route routes[];
};
+static int table_has_route(struct route_table *table,
+ struct stasis_message_type *message_type)
+{
+ size_t idx;
+
+ for (idx = 0; idx < table->current_size; ++idx) {
+ if (table->routes[idx].message_type == message_type) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
static int table_add_route(struct route_table **table_ptr,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
struct route_table *table = *table_ptr;
struct stasis_message_route *route;
+
+ ast_assert(!table_has_route(table, message_type));
if (table->current_size + 1 > table->max_size) {
size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
Modified: team/dlee/stasis-forward-optimization/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/taskprocessor.c?view=diff&rev=399655&r1=399654&r2=399655
==============================================================================
--- team/dlee/stasis-forward-optimization/main/taskprocessor.c (original)
+++ team/dlee/stasis-forward-optimization/main/taskprocessor.c Tue Sep 24 01:01:53 2013
@@ -147,6 +147,15 @@
ast_free(pvt);
}
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
+{
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+ default_listener_pvt_destroy(pvt);
+
+ listener->user_data = NULL;
+}
+
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
@@ -161,7 +170,7 @@
while (!pvt->dead) {
res = ast_sem_wait(&pvt->sem);
- if (res != 0 && errno != EINTR) {
+ if (res != 0 && errno != EINTR) {
ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
strerror(errno));
/* Just give up */
@@ -174,6 +183,9 @@
res = ast_sem_getvalue(&pvt->sem, &sem_value);
ast_assert(res == 0 && sem_value == 0);
+ /* Free the shutdown reference */
+ ao2_ref(listener->tps, -1);
+
return NULL;
}
@@ -208,18 +220,34 @@
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ int res;
+
+ /* Hold a reference during shutdown */
+ ao2_ref(listener->tps, +1);
ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
- pthread_join(pvt->poll_thread, NULL);
+ if (pthread_self() == pvt->poll_thread) {
+ res = pthread_detach(pvt->poll_thread);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+ strerror(errno));
+ }
+ } else {
+ res = pthread_join(pvt->poll_thread, NULL);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_join(): %s\n",
+ strerror(errno));
+ }
+ }
pvt->poll_thread = AST_PTHREADT_NULL;
- default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
+ .dtor = default_listener_pvt_dtor,
};
/*!
@@ -264,9 +292,7 @@
/* release task resources */
static void *tps_task_free(struct tps_task *task)
{
- if (task) {
- ast_free(task);
- }
+ ast_free(task);
return NULL;
}
@@ -356,8 +382,8 @@
char name[256];
int tcount;
unsigned long qsize;
- unsigned long maxqsize = 0;
- unsigned long processed = 0;
+ unsigned long maxqsize;
+ unsigned long processed;
struct ast_taskprocessor *p;
struct ao2_iterator i;
@@ -378,8 +404,6 @@
ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
i = ao2_iterator_init(tps_singletons, 0);
while ((p = ao2_iterator_next(&i))) {
- ast_assert(p->stats != NULL);
-
ast_copy_string(name, p->name, sizeof(name));
qsize = p->tps_queue_size;
maxqsize = p->stats->max_qsize;
@@ -425,13 +449,8 @@
/* free it */
ast_free(t->stats);
t->stats = NULL;
-
ast_free((char *) t->name);
if (t->listener) {
- /* This code should not be reached since the listener
- * should have been destroyed before the taskprocessor could
- * be destroyed
- */
ao2_ref(t->listener, -1);
t->listener = NULL;
}
@@ -472,10 +491,21 @@
ao2_ref(listener->tps, -1);
}
+static void taskprocessor_listener_dtor(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+
+ if (listener->callbacks->dtor) {
+ listener->callbacks->dtor(listener);
+ }
+}
+
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+ NULL, ao2_cleanup);
+
+ listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
if (!listener) {
return NULL;
@@ -529,7 +559,6 @@
ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
return NULL;
}
-
if (!(p->name = ast_strdup(name))) {
ao2_ref(p, -1);
return NULL;
@@ -594,7 +623,6 @@
p = __allocate_taskprocessor(name, listener);
if (!p) {
- default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -651,7 +679,6 @@
return -1;
}
ao2_lock(tps);
- ao2_ref(tps, +1); /* Let's say the queued task has a reference */
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
/* The currently executing task counts as still in queue */
@@ -666,18 +693,13 @@
struct tps_task *t;
int size;
- ast_assert(tps->stats != NULL);
-
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
-
- /* Empty queue; return false */
if (!t) {
ao2_unlock(tps);
-
- /* Since there was no task, no need to decrement the refcount */
return 0;
}
+
tps->executing = 1;
ao2_unlock(tps);
@@ -692,9 +714,11 @@
tps->executing = 0;
size = tps_taskprocessor_depth(tps);
/* If we executed a task, bump the stats */
- tps->stats->_tasks_processed_count++;
- if (size > tps->stats->max_qsize) {
- tps->stats->max_qsize = size;
+ if (tps->stats) {
+ tps->stats->_tasks_processed_count++;
+ if (size > tps->stats->max_qsize) {
+ tps->stats->max_qsize = size;
+ }
}
ao2_unlock(tps);
@@ -702,7 +726,5 @@
if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
-
- ao2_ref(tps, -1); /* task no longer has a reference */
return size > 0;
}
Modified: team/dlee/stasis-forward-optimization/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/tests/test_stasis.c?view=diff&rev=399655&r1=399654&r2=399655
==============================================================================
--- team/dlee/stasis-forward-optimization/tests/test_stasis.c (original)
+++ team/dlee/stasis-forward-optimization/tests/test_stasis.c Tue Sep 24 01:01:53 2013
@@ -863,52 +863,6 @@
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
ast_test_validate(test, 0 == ao2_container_count(cache_dump));
-
- return AST_TEST_PASS;
-}
-
-AST_TEST_DEFINE(route_conflicts)
-{
- RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
- RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
- RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
- RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
- int ret;
-
- switch (cmd) {
- case TEST_INIT:
- info->name = __func__;
- info->category = test_category;
- info->summary =
- "Multiple routes to the same message_type should fail";
- info->description =
- "Multiple routes to the same message_type should fail";
- return AST_TEST_NOT_RUN;
- case TEST_EXECUTE:
- break;
- }
-
- topic = stasis_topic_create("TestTopic");
- ast_test_validate(test, NULL != topic);
-
- consumer1 = consumer_create(1);
- ast_test_validate(test, NULL != consumer1);
- consumer2 = consumer_create(1);
- ast_test_validate(test, NULL != consumer2);
-
- test_message_type = stasis_message_type_create("TestMessage", NULL);
- ast_test_validate(test, NULL != test_message_type);
-
- uut = stasis_message_router_create(topic);
- ast_test_validate(test, NULL != uut);
-
- ret = stasis_message_router_add(
- uut, test_message_type, consumer_exec, consumer1);
- ast_test_validate(test, 0 == ret);
- ret = stasis_message_router_add(
- uut, test_message_type, consumer_exec, consumer2);
- ast_test_validate(test, 0 != ret);
return AST_TEST_PASS;
}
@@ -1373,7 +1327,6 @@
AST_TEST_UNREGISTER(cache_filter);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(cache_dump);
- AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
@@ -1397,7 +1350,6 @@
AST_TEST_REGISTER(cache_filter);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(cache_dump);
- AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
More information about the asterisk-commits
mailing list