[Asterisk-code-review] taskprocessor: Warn on unused result from pushing task. (asterisk[master])
Corey Farrell
asteriskteam at digium.com
Tue Oct 16 10:42:57 CDT 2018
Corey Farrell has uploaded this change for review. ( https://gerrit.asterisk.org/10486
Change subject: taskprocessor: Warn on unused result from pushing task.
......................................................................
taskprocessor: Warn on unused result from pushing task.
Add attribute_warn_unused_result to ast_taskprocessor_push,
ast_taskprocessor_push_local and ast_threadpool_push. This will help
ensure we perform the necessary cleanup upon failure.
Change-Id: I7e4079bd7b21cfe52fb431ea79e41314520c3f6d
---
M apps/app_confbridge.c
M include/asterisk/taskprocessor.h
M include/asterisk/threadpool.h
M main/stasis.c
M main/taskprocessor.c
M main/threadpool.c
M tests/test_taskprocessor.c
M tests/test_threadpool.c
8 files changed, 113 insertions(+), 36 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/86/10486/1
diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c
index a4e5c67..a8eef5e 100644
--- a/apps/app_confbridge.c
+++ b/apps/app_confbridge.c
@@ -1111,13 +1111,16 @@
if (conference->playback_queue) {
struct hangup_data hangup;
hangup_data_init(&hangup, conference);
- ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup);
- ast_mutex_lock(&hangup.lock);
- while (!hangup.hungup) {
- ast_cond_wait(&hangup.cond, &hangup.lock);
+ /* BUGBUG: how should we handle ast_taskprocess_push failure here? */
+ if (!ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup)) {
+ ast_mutex_lock(&hangup.lock);
+ while (!hangup.hungup) {
+ ast_cond_wait(&hangup.cond, &hangup.lock);
+ }
+ ast_mutex_unlock(&hangup.lock);
}
- ast_mutex_unlock(&hangup.lock);
+
hangup_data_destroy(&hangup);
} else {
/* Playback queue is not yet allocated. Just hang up the channel straight */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 7c79036..f74989a 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -213,7 +213,8 @@
* \retval -1 failure
* \since 1.6.1
*/
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+ attribute_warn_unused_result;
/*! \brief Local data parameter */
struct ast_taskprocessor_local {
@@ -239,7 +240,8 @@
* \since 12.0.0
*/
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
- int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+ int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+ attribute_warn_unused_result;
/*!
* \brief Indicate the taskprocessor is suspended.
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 0f360c7..77ab8a8 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -186,7 +186,8 @@
* \retval 0 success
* \retval -1 failure
*/
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+ attribute_warn_unused_result;
/*!
* \brief Shut down a threadpool and destroy it
diff --git a/main/stasis.c b/main/stasis.c
index 51f01c0..ce905e8 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -561,7 +561,9 @@
/* When all that's done, remove the ref the mailbox has on the sub */
if (sub->mailbox) {
- ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+ if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
+ /* BUGBUG: how do we handle this? */
+ }
}
/* Unsubscribing unrefs the subscription */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 91eb7d9..c186620 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -235,7 +235,9 @@
/* Hold a reference during shutdown */
ao2_t_ref(listener->tps, +1, "tps-shutdown");
- ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+ if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+ /* BUGBUG: what do we do here? */
+ }
ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
diff --git a/main/threadpool.c b/main/threadpool.c
index e7abc8f..d2de69a 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -658,7 +658,9 @@
}
if (pool->listener && pool->listener->callbacks->emptied) {
- ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
+ if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
+ ast_log(LOG_ERROR, "Failed to push call, leaks may occur.\n");
+ }
}
}
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 273e045..6428746 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -151,7 +151,10 @@
return AST_TEST_FAIL;
}
- ast_taskprocessor_push(tps, task, task_data);
+ if (ast_taskprocessor_push(tps, task, task_data)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ return AST_TEST_FAIL;
+ }
res = task_wait(task_data);
if (res != 0) {
@@ -240,7 +243,11 @@
for (i = 0; i < NUM_TASKS; ++i) {
rand_data[i] = ast_random();
- ast_taskprocessor_push(tps, load_task, &rand_data[i]);
+ if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_end;
+ }
}
ast_mutex_lock(&load_task_results.lock);
@@ -438,14 +445,22 @@
goto test_exit;
}
- ast_taskprocessor_push(tps, listener_test_task, NULL);
+ if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_exit;
+ }
if (check_stats(test, pvt, 1, 0, 1) < 0) {
res = AST_TEST_FAIL;
goto test_exit;
}
- ast_taskprocessor_push(tps, listener_test_task, NULL);
+ if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_exit;
+ }
if (check_stats(test, pvt, 2, 0, 1) < 0) {
res = AST_TEST_FAIL;
@@ -710,7 +725,10 @@
local_data = 0;
ast_taskprocessor_set_local(tps, &local_data);
- ast_taskprocessor_push_local(tps, local_task_exe, task_data);
+ if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ return AST_TEST_FAIL;
+ }
res = task_wait(task_data);
if (res != 0) {
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index d8acf26..3680a5b 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -319,7 +319,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
wait_for_task_pushed(listener);
@@ -635,9 +637,11 @@
}
ast_mutex_unlock(&tld->lock);
- ast_threadpool_push(pool, simple_task, std);
-
- res = wait_for_completion(test, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ res = AST_TEST_FAIL;
+ } else {
+ res = wait_for_completion(test, std);
+ }
ast_free(std);
@@ -707,7 +711,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 1);
@@ -796,7 +802,10 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ res = AST_TEST_FAIL;
+ goto end;
+ }
res = wait_for_completion(test, std);
if (res == AST_TEST_FAIL) {
@@ -882,9 +891,18 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
- ast_threadpool_push(pool, simple_task, std2);
- ast_threadpool_push(pool, simple_task, std3);
+ res = AST_TEST_FAIL;
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std3)) {
+ goto end;
+ }
res = wait_for_completion(test, std1);
if (res == AST_TEST_FAIL) {
@@ -1011,7 +1029,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
/* Pushing the task should result in the threadpool growing
* by three threads. This will allow the task to actually execute
@@ -1034,9 +1054,19 @@
/* Now push three tasks into the pool and ensure the pool does not
* grow.
*/
- ast_threadpool_push(pool, simple_task, std2);
- ast_threadpool_push(pool, simple_task, std3);
- ast_threadpool_push(pool, simple_task, std4);
+ res = AST_TEST_FAIL;
+
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std3)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std4)) {
+ goto end;
+ }
res = wait_for_completion(test, std2);
if (res == AST_TEST_FAIL) {
@@ -1121,7 +1151,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
res = wait_for_completion(test, std);
if (res == AST_TEST_FAIL) {
@@ -1193,7 +1225,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 1);
@@ -1218,7 +1252,10 @@
}
/* Now make sure the threadpool reactivates when we add a second task */
- ast_threadpool_push(pool, simple_task, std2);
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ res = AST_TEST_FAIL;
+ goto end;
+ }
res = wait_for_completion(test, std2);
if (res == AST_TEST_FAIL) {
@@ -1400,8 +1437,13 @@
goto end;
}
- ast_threadpool_push(pool, complex_task, ctd1);
- ast_threadpool_push(pool, complex_task, ctd2);
+ if (ast_threadpool_push(pool, complex_task, ctd1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, complex_task, ctd2)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 2);
@@ -1496,8 +1538,13 @@
goto end;
}
- ast_threadpool_push(pool, complex_task, ctd1);
- ast_threadpool_push(pool, complex_task, ctd2);
+ if (ast_threadpool_push(pool, complex_task, ctd1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, complex_task, ctd2)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 4);
--
To view, visit https://gerrit.asterisk.org/10486
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7e4079bd7b21cfe52fb431ea79e41314520c3f6d
Gerrit-Change-Number: 10486
Gerrit-PatchSet: 1
Gerrit-Owner: Corey Farrell <git at cfware.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181016/2bc67ea2/attachment-0001.html>
More information about the asterisk-code-review
mailing list