[Asterisk-code-review] taskprocessor: Warn on unused result from pushing task. (asterisk[13])
George Joseph
asteriskteam at digium.com
Fri Oct 19 08:40:19 CDT 2018
George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/10494 )
Change subject: taskprocessor: Warn on unused result from pushing task.
......................................................................
taskprocessor: Warn on unused result from pushing task.
Add attribute_warn_unused_result to ast_taskprocessor_push,
ast_taskprocessor_push_local and ast_threadpool_push. This will help
ensure we perform the necessary cleanup upon failure.
Change-Id: I7e4079bd7b21cfe52fb431ea79e41314520c3f6d
---
M apps/app_confbridge.c
M include/asterisk/taskprocessor.h
M include/asterisk/threadpool.h
M main/stasis.c
M main/taskprocessor.c
M main/threadpool.c
M tests/test_taskprocessor.c
M tests/test_threadpool.c
8 files changed, 161 insertions(+), 57 deletions(-)
Approvals:
George Joseph: Looks good to me, but someone else must approve; Approved for Submit
Richard Mudgett: Looks good to me, approved
diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c
index 3a760a9..9c6b690 100644
--- a/apps/app_confbridge.c
+++ b/apps/app_confbridge.c
@@ -1094,13 +1094,15 @@
if (conference->playback_queue) {
struct hangup_data hangup;
hangup_data_init(&hangup, conference);
- ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup);
- ast_mutex_lock(&hangup.lock);
- while (!hangup.hungup) {
- ast_cond_wait(&hangup.cond, &hangup.lock);
+ if (!ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup)) {
+ ast_mutex_lock(&hangup.lock);
+ while (!hangup.hungup) {
+ ast_cond_wait(&hangup.cond, &hangup.lock);
+ }
+ ast_mutex_unlock(&hangup.lock);
}
- ast_mutex_unlock(&hangup.lock);
+
hangup_data_destroy(&hangup);
} else {
/* Playback queue is not yet allocated. Just hang up the channel straight */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 7c79036..f74989a 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -213,7 +213,8 @@
* \retval -1 failure
* \since 1.6.1
*/
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+ attribute_warn_unused_result;
/*! \brief Local data parameter */
struct ast_taskprocessor_local {
@@ -239,7 +240,8 @@
* \since 12.0.0
*/
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
- int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+ int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+ attribute_warn_unused_result;
/*!
* \brief Indicate the taskprocessor is suspended.
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 0f360c7..77ab8a8 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -186,7 +186,8 @@
* \retval 0 success
* \retval -1 failure
*/
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
+int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
+ attribute_warn_unused_result;
/*!
* \brief Shut down a threadpool and destroy it
diff --git a/main/stasis.c b/main/stasis.c
index bfaf4fb..26e404c 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -563,7 +563,10 @@
/* When all that's done, remove the ref the mailbox has on the sub */
if (sub->mailbox) {
- ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+ if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
+ /* Nothing we can do here, the conditional is just to keep
+ * the compiler happy that we're not ignoring the result. */
+ }
}
/* Unsubscribing unrefs the subscription */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 7577715..baba2ff 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -237,7 +237,11 @@
/* Hold a reference during shutdown */
ao2_t_ref(listener->tps, +1, "tps-shutdown");
- ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+ if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+ /* This will cause the thread to exit early without completing tasks already
+ * in the queue. This is probably the least bad option in this situation. */
+ default_listener_die(pvt);
+ }
ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
diff --git a/main/threadpool.c b/main/threadpool.c
index e7abc8f..7729930 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -658,7 +658,9 @@
}
if (pool->listener && pool->listener->callbacks->emptied) {
- ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
+ if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
+ /* Nothing to do here but we need the check to keep the compiler happy. */
+ }
}
}
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 273e045..6428746 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -151,7 +151,10 @@
return AST_TEST_FAIL;
}
- ast_taskprocessor_push(tps, task, task_data);
+ if (ast_taskprocessor_push(tps, task, task_data)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ return AST_TEST_FAIL;
+ }
res = task_wait(task_data);
if (res != 0) {
@@ -240,7 +243,11 @@
for (i = 0; i < NUM_TASKS; ++i) {
rand_data[i] = ast_random();
- ast_taskprocessor_push(tps, load_task, &rand_data[i]);
+ if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_end;
+ }
}
ast_mutex_lock(&load_task_results.lock);
@@ -438,14 +445,22 @@
goto test_exit;
}
- ast_taskprocessor_push(tps, listener_test_task, NULL);
+ if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_exit;
+ }
if (check_stats(test, pvt, 1, 0, 1) < 0) {
res = AST_TEST_FAIL;
goto test_exit;
}
- ast_taskprocessor_push(tps, listener_test_task, NULL);
+ if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = AST_TEST_FAIL;
+ goto test_exit;
+ }
if (check_stats(test, pvt, 2, 0, 1) < 0) {
res = AST_TEST_FAIL;
@@ -710,7 +725,10 @@
local_data = 0;
ast_taskprocessor_set_local(tps, &local_data);
- ast_taskprocessor_push_local(tps, local_task_exe, task_data);
+ if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ return AST_TEST_FAIL;
+ }
res = task_wait(task_data);
if (res != 0) {
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index d8acf26..3fb4430 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -127,6 +127,18 @@
return std;
}
+static void simple_task_data_free(struct simple_task_data *std)
+{
+ if (!std) {
+ return;
+ }
+
+ ast_mutex_destroy(&std->lock);
+ ast_cond_destroy(&std->cond);
+
+ ast_free(std);
+}
+
static int simple_task(void *data)
{
struct simple_task_data *std = data;
@@ -319,7 +331,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
wait_for_task_pushed(listener);
@@ -328,7 +342,7 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std);
+ simple_task_data_free(std);
ast_free(tld);
return res;
}
@@ -635,11 +649,13 @@
}
ast_mutex_unlock(&tld->lock);
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ res = AST_TEST_FAIL;
+ } else {
+ res = wait_for_completion(test, std);
+ }
- res = wait_for_completion(test, std);
-
- ast_free(std);
+ simple_task_data_free(std);
if (res == AST_TEST_FAIL) {
goto end;
@@ -707,7 +723,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 1);
@@ -736,7 +754,7 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std);
+ simple_task_data_free(std);
ast_free(tld);
return res;
@@ -796,7 +814,10 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ res = AST_TEST_FAIL;
+ goto end;
+ }
res = wait_for_completion(test, std);
if (res == AST_TEST_FAIL) {
@@ -819,7 +840,7 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std);
+ simple_task_data_free(std);
ast_free(tld);
return res;
}
@@ -882,9 +903,18 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
- ast_threadpool_push(pool, simple_task, std2);
- ast_threadpool_push(pool, simple_task, std3);
+ res = AST_TEST_FAIL;
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std3)) {
+ goto end;
+ }
res = wait_for_completion(test, std1);
if (res == AST_TEST_FAIL) {
@@ -914,9 +944,9 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std1);
- ast_free(std2);
- ast_free(std3);
+ simple_task_data_free(std1);
+ simple_task_data_free(std2);
+ simple_task_data_free(std3);
ast_free(tld);
return res;
}
@@ -1011,7 +1041,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
/* Pushing the task should result in the threadpool growing
* by three threads. This will allow the task to actually execute
@@ -1034,9 +1066,19 @@
/* Now push three tasks into the pool and ensure the pool does not
* grow.
*/
- ast_threadpool_push(pool, simple_task, std2);
- ast_threadpool_push(pool, simple_task, std3);
- ast_threadpool_push(pool, simple_task, std4);
+ res = AST_TEST_FAIL;
+
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std3)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, simple_task, std4)) {
+ goto end;
+ }
res = wait_for_completion(test, std2);
if (res == AST_TEST_FAIL) {
@@ -1064,10 +1106,10 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std1);
- ast_free(std2);
- ast_free(std3);
- ast_free(std4);
+ simple_task_data_free(std1);
+ simple_task_data_free(std2);
+ simple_task_data_free(std3);
+ simple_task_data_free(std4);
ast_free(tld);
return res;
}
@@ -1121,7 +1163,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ if (ast_threadpool_push(pool, simple_task, std)) {
+ goto end;
+ }
res = wait_for_completion(test, std);
if (res == AST_TEST_FAIL) {
@@ -1137,7 +1181,7 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std);
+ simple_task_data_free(std);
ast_free(tld);
return res;
}
@@ -1193,7 +1237,9 @@
goto end;
}
- ast_threadpool_push(pool, simple_task, std1);
+ if (ast_threadpool_push(pool, simple_task, std1)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 1);
@@ -1218,7 +1264,10 @@
}
/* Now make sure the threadpool reactivates when we add a second task */
- ast_threadpool_push(pool, simple_task, std2);
+ if (ast_threadpool_push(pool, simple_task, std2)) {
+ res = AST_TEST_FAIL;
+ goto end;
+ }
res = wait_for_completion(test, std2);
if (res == AST_TEST_FAIL) {
@@ -1240,8 +1289,8 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(std1);
- ast_free(std2);
+ simple_task_data_free(std1);
+ simple_task_data_free(std2);
ast_free(tld);
return res;
@@ -1269,6 +1318,19 @@
return ctd;
}
+static void complex_task_data_free(struct complex_task_data *ctd)
+{
+ if (!ctd) {
+ return;
+ }
+
+ ast_mutex_destroy(&ctd->lock);
+ ast_cond_destroy(&ctd->stall_cond);
+ ast_cond_destroy(&ctd->notify_cond);
+
+ ast_free(ctd);
+}
+
static int complex_task(void *data)
{
struct complex_task_data *ctd = data;
@@ -1400,8 +1462,13 @@
goto end;
}
- ast_threadpool_push(pool, complex_task, ctd1);
- ast_threadpool_push(pool, complex_task, ctd2);
+ if (ast_threadpool_push(pool, complex_task, ctd1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, complex_task, ctd2)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 2);
@@ -1438,8 +1505,8 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(ctd1);
- ast_free(ctd2);
+ complex_task_data_free(ctd1);
+ complex_task_data_free(ctd2);
ast_free(tld);
return res;
}
@@ -1496,8 +1563,13 @@
goto end;
}
- ast_threadpool_push(pool, complex_task, ctd1);
- ast_threadpool_push(pool, complex_task, ctd2);
+ if (ast_threadpool_push(pool, complex_task, ctd1)) {
+ goto end;
+ }
+
+ if (ast_threadpool_push(pool, complex_task, ctd2)) {
+ goto end;
+ }
ast_threadpool_set_size(pool, 4);
@@ -1549,8 +1621,8 @@
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
- ast_free(ctd1);
- ast_free(ctd2);
+ complex_task_data_free(ctd1);
+ complex_task_data_free(ctd2);
ast_free(tld);
return res;
}
@@ -1666,9 +1738,9 @@
poke_worker(data3);
ast_taskprocessor_unreference(uut);
ast_threadpool_shutdown(pool);
- ast_free(data1);
- ast_free(data2);
- ast_free(data3);
+ complex_task_data_free(data1);
+ complex_task_data_free(data2);
+ complex_task_data_free(data3);
return res;
}
--
To view, visit https://gerrit.asterisk.org/10494
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-MessageType: merged
Gerrit-Change-Id: I7e4079bd7b21cfe52fb431ea79e41314520c3f6d
Gerrit-Change-Number: 10494
Gerrit-PatchSet: 2
Gerrit-Owner: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Jenkins2 (1000185)
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181019/6c16653d/attachment-0001.html>
More information about the asterisk-code-review
mailing list