<p>Richard Mudgett has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/8755">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">pjsip_scheduler.c: Fix some corner cases.<br><br>* Fix the periodic interval wander because it may take significant time<br>between the sched thread queueing the task in the serializer and the<br>serializer actually executing the task. The time it takes to actually<br>execute the task was already taken into account.<br><br>* Pass a schtd ref to the serializer when we queue a scheduled task on<br>the serializer. We don't want it going away on us while it is in the<br>serializer queue.<br><br>* Skip the scheduled task if the task was canceled between queueing the<br>task to the serializer and the serializer actually executing the task.<br><br>* Reorder struct ast_sip_sched_task to avoid unnecessary padding. Removed<br>task_id and added next_periodic.<br><br>* Hold a ref to the passed in serializer so the serializer cannot go away<br>on the scheduled task.<br><br>ASTERISK_26806<br><br>Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24<br>---<br>M include/asterisk/res_pjsip.h<br>M res/res_pjsip/pjsip_scheduler.c<br>2 files changed, 121 insertions(+), 46 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/55/8755/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h<br>index d3849ad..b01d6f5 100644<br>--- a/include/asterisk/res_pjsip.h<br>+++ b/include/asterisk/res_pjsip.h<br>@@ -1407,7 +1407,7 @@<br> * the next item on the SIP socket(s) can be serviced. On incoming messages,<br> * Asterisk automatically will push the request to a servant thread. When your<br> * module callback is called, processing will already be in a servant. However,<br>- * for other PSJIP events, such as transaction state changes due to timer<br>+ * for other PJSIP events, such as transaction state changes due to timer<br> * expirations, your module will be called into from a PJSIP thread. If you<br> * are called into from a PJSIP thread, then you should push whatever processing<br> * is needed to a servant as soon as possible. You can discern if you are currently<br>@@ -1588,13 +1588,13 @@<br> <br> /*!<br> * Run at a fixed interval.<br>- * Stop scheduling if the callback returns 0.<br>+ * Stop scheduling if the callback returns <= 0.<br> * Any other value is ignored.<br> */<br> AST_SIP_SCHED_TASK_FIXED = (0 << 0),<br> /*!<br> * Run at a variable interval.<br>- * Stop scheduling if the callback returns 0.<br>+ * Stop scheduling if the callback returns <= 0.<br> * Any other return value is used as the new interval.<br> */<br> AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),<br>diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c<br>index 5b86a79..4210664 100644<br>--- a/res/res_pjsip/pjsip_scheduler.c<br>+++ b/res/res_pjsip/pjsip_scheduler.c<br>@@ -28,6 +28,7 @@<br> #include "asterisk/res_pjsip.h"<br> #include "include/res_pjsip_private.h"<br> #include "asterisk/res_pjsip_cli.h"<br>+#include "asterisk/taskprocessor.h"<br> <br> #define TASK_BUCKETS 53<br> <br>@@ -36,30 +37,30 @@<br> static int task_count;<br> <br> struct ast_sip_sched_task {<br>- /*! ast_sip_sched task id */<br>- uint32_t task_id;<br>- /*! ast_sched scheudler id */<br>- int current_scheduler_id;<br>- /*! task is currently running */<br>- int is_running;<br>- /*! task */<br>- ast_sip_task task;<br>+ /*! The serializer to be used (if any) (Holds a ref) */<br>+ struct ast_taskprocessor *serializer;<br> /*! task data */<br> void *task_data;<br>- /*! reschedule interval in milliseconds */<br>- int interval;<br>- /*! the time the task was queued */<br>+ /*! task function */<br>+ ast_sip_task task;<br>+ /*! the time the task was originally scheduled/queued */<br> struct timeval when_queued;<br> /*! the last time the task was started */<br> struct timeval last_start;<br> /*! the last time the task was ended */<br> struct timeval last_end;<br>+ /*! When the periodic task is next expected to run */<br>+ struct timeval next_periodic;<br>+ /*! reschedule interval in milliseconds */<br>+ int interval;<br>+ /*! ast_sched scheudler id */<br>+ int current_scheduler_id;<br>+ /*! task is currently running */<br>+ int is_running;<br> /*! times run */<br> int run_count;<br> /*! the task reschedule, cleanup and policy flags */<br> enum ast_sip_scheduler_task_flags flags;<br>- /*! the serializer to be used (if any) */<br>- struct ast_taskprocessor *serializer;<br> /*! A name to be associated with the task */<br> char name[0];<br> };<br>@@ -76,14 +77,19 @@<br> */<br> static int run_task(void *data)<br> {<br>- RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);<br>+ RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);<br> int res;<br> int delay;<br>+<br>+ if (!schtd->interval) {<br>+ /* Task was cancelled while waiting to be executed by the serializer */<br>+ return -1;<br>+ }<br> <br> ao2_lock(schtd);<br> schtd->last_start = ast_tvnow();<br> schtd->is_running = 1;<br>- schtd->run_count++;<br>+ ++schtd->run_count;<br> ao2_unlock(schtd);<br> <br> res = schtd->task(schtd->task_data);<br>@@ -93,10 +99,10 @@<br> schtd->last_end = ast_tvnow();<br> <br> /*<br>- * Don't restart if the task returned 0 or if the interval<br>+ * Don't restart if the task returned <= 0 or if the interval<br> * was set to 0 while the task was running<br> */<br>- if (!res || !schtd->interval) {<br>+ if (res <= 0 || !schtd->interval) {<br> schtd->interval = 0;<br> ao2_unlock(schtd);<br> ao2_unlink(tasks, schtd);<br>@@ -110,13 +116,22 @@<br> if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {<br> delay = schtd->interval;<br> } else {<br>- delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);<br>+ int64_t diff;<br>+<br>+ /* Determine next periodic interval we need to expire. */<br>+ do {<br>+ schtd->next_periodic = ast_tvadd(schtd->next_periodic,<br>+ ast_samp2tv(schtd->interval, 1000));<br>+ diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);<br>+ } while (diff <= 0);<br>+ delay = diff;<br> }<br> <br> schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);<br> if (schtd->current_scheduler_id < 0) {<br> schtd->interval = 0;<br> ao2_unlock(schtd);<br>+ ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);<br> ao2_unlink(tasks, schtd);<br> return -1;<br> }<br>@@ -133,9 +148,29 @@<br> static int push_to_serializer(const void *data)<br> {<br> struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;<br>+ int sched_id;<br> <br>+ ao2_lock(schtd);<br>+ sched_id = schtd->current_scheduler_id;<br>+ schtd->current_scheduler_id = -1;<br>+ ao2_unlock(schtd);<br>+ if (sched_id < 0) {<br>+ /* Task was cancelled while waiting on the lock */<br>+ return 0;<br>+ }<br>+<br>+ ao2_t_ref(schtd, +1, "Give ref to run_task()");<br> if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {<br>- ao2_ref(schtd, -1);<br>+ /*<br>+ * Oh my. Have to cancel the scheduled item because we<br>+ * unexpectedly cannot run it anymore.<br>+ */<br>+ ao2_unlink(tasks, schtd);<br>+ ao2_lock(schtd);<br>+ schtd->interval = 0;<br>+ ao2_unlock(schtd);<br>+<br>+ ao2_t_ref(schtd, -1, "Failed so release run_task() ref");<br> }<br> <br> return 0;<br>@@ -144,20 +179,22 @@<br> int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)<br> {<br> int res;<br>+ int sched_id;<br> <br>- if (!ao2_ref_and_lock(schtd)) {<br>- return -1;<br>- }<br>-<br>- if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {<br>- ao2_unlock_and_unref(schtd);<br>- return 0;<br>- }<br>-<br>+ /*<br>+ * Prevent any tasks in the serializer queue from<br>+ * running and restarting the scheduled item on us<br>+ * first.<br>+ */<br>+ ao2_lock(schtd);<br> schtd->interval = 0;<br>- ao2_unlock_and_unref(schtd);<br>+<br>+ sched_id = schtd->current_scheduler_id;<br>+ schtd->current_scheduler_id = -1;<br>+ ao2_unlock(schtd);<br>+ res = ast_sched_del(scheduler_context, sched_id);<br>+<br> ao2_unlink(tasks, schtd);<br>- res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);<br> <br> return res;<br> }<br>@@ -306,7 +343,7 @@<br> return is_running;<br> }<br> <br>-static void schtd_destructor(void *data)<br>+static void schtd_dtor(void *data)<br> {<br> struct ast_sip_sched_task *schtd = data;<br> <br>@@ -316,6 +353,7 @@<br> } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {<br> ast_free(schtd->task_data);<br> }<br>+ ast_taskprocessor_unreference(schtd->serializer);<br> }<br> <br> struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,<br>@@ -326,38 +364,60 @@<br> struct ast_sip_sched_task *schtd;<br> int res;<br> <br>- if (interval < 0) {<br>+ if (interval <= 0) {<br> return NULL;<br> }<br> <br>- schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);<br>+ schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),<br>+ schtd_dtor);<br> if (!schtd) {<br> return NULL;<br> }<br> <br>- schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);<br>- schtd->serializer = serializer;<br>+ schtd->serializer = ao2_bump(serializer);<br>+ schtd->task_data = task_data;<br> schtd->task = sip_task;<br>+ schtd->interval = interval;<br>+ schtd->flags = flags;<br> if (!ast_strlen_zero(name)) {<br> strcpy(schtd->name, name); /* Safe */<br> } else {<br>- sprintf(schtd->name, "task_%08x", schtd->task_id);<br>+ uint32_t task_id;<br>+<br>+ task_id = ast_atomic_fetchadd_int(&task_count, 1);<br>+ sprintf(schtd->name, "task_%08x", task_id);<br> }<br>- schtd->task_data = task_data;<br>- schtd->flags = flags;<br>- schtd->interval = interval;<br> schtd->when_queued = ast_tvnow();<br>+ if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {<br>+ schtd->next_periodic = ast_tvadd(schtd->when_queued,<br>+ ast_samp2tv(schtd->interval, 1000));<br>+ }<br> <br> if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {<br> ao2_ref(task_data, +1);<br> }<br>+<br>+ /*<br>+ * We must put it in the 'tasks' container before scheduling<br>+ * the task because we don't want the push_to_serializer()<br>+ * sched task to "remove" it on failure before we even put<br>+ * it in. If this happens then nothing would remove it from<br>+ * the 'tasks' container.<br>+ */<br>+ ao2_link(tasks, schtd);<br>+<br>+ /*<br>+ * Lock so we are guaranteed to get the sched id set before<br>+ * the push_to_serializer() sched task can clear it.<br>+ */<br>+ ao2_lock(schtd);<br> res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);<br>+ schtd->current_scheduler_id = res;<br>+ ao2_unlock(schtd);<br> if (res < 0) {<br>+ ao2_unlink(tasks, schtd);<br> ao2_ref(schtd, -1);<br> return NULL;<br>- } else {<br>- schtd->current_scheduler_id = res;<br>- ao2_link(tasks, schtd);<br> }<br> <br> return schtd;<br>@@ -457,7 +517,8 @@<br> <br> int ast_sip_initialize_scheduler(void)<br> {<br>- if (!(scheduler_context = ast_sched_context_create())) {<br>+ scheduler_context = ast_sched_context_create();<br>+ if (!scheduler_context) {<br> ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");<br> return -1;<br> }<br>@@ -487,7 +548,21 @@<br> ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));<br> <br> if (scheduler_context) {<br>+ if (tasks) {<br>+ struct ao2_iterator iter;<br>+ struct ast_sip_sched_task *schtd;<br>+<br>+ /* Cancel all scheduled tasks */<br>+ iter = ao2_iterator_init(tasks, 0);<br>+ while ((schtd = ao2_iterator_next(&iter))) {<br>+ ast_sip_sched_task_cancel(schtd);<br>+ ao2_ref(schtd, -1);<br>+ }<br>+ ao2_iterator_destroy(&iter);<br>+ }<br>+<br> ast_sched_context_destroy(scheduler_context);<br>+ scheduler_context = NULL;<br> }<br> <br> ao2_cleanup(tasks);<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/8755">change 8755</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/8755"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24 </div>
<div style="display:none"> Gerrit-Change-Number: 8755 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: Richard Mudgett <rmudgett@digium.com> </div>