<p>Richard Mudgett has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/8749">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">pjsip_scheduler.c: Fix some corner cases.<br><br>* Fix the periodic interval wander because it may take significant time<br>between the sched thread queueing the task in the serializer and the<br>serializer actually executing the task.  The time it takes to actually<br>execute the task was already taken into account.<br><br>* Pass a schtd ref to the serializer when we queue a scheduled task on<br>the serializer.  We don't want it going away on us while it is in the<br>serializer queue.<br><br>* Skip the scheduled task if the task was canceled between queueing the<br>task to the serializer and the serializer actually executing the task.<br><br>* Reorder struct ast_sip_sched_task to avoid unnecessary padding.  Removed<br>task_id and added next_periodic.<br><br>* Hold a ref to the passed in serializer so the serializer cannot go away<br>on the scheduled task.<br><br>ASTERISK_26806<br><br>Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24<br>---<br>M include/asterisk/res_pjsip.h<br>M res/res_pjsip/pjsip_scheduler.c<br>2 files changed, 121 insertions(+), 46 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/49/8749/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h<br>index cb9aa2d..77b808d 100644<br>--- a/include/asterisk/res_pjsip.h<br>+++ b/include/asterisk/res_pjsip.h<br>@@ -1403,7 +1403,7 @@<br>  * the next item on the SIP socket(s) can be serviced. On incoming messages,<br>  * Asterisk automatically will push the request to a servant thread. When your<br>  * module callback is called, processing will already be in a servant. However,<br>- * for other PSJIP events, such as transaction state changes due to timer<br>+ * for other PJSIP events, such as transaction state changes due to timer<br>  * expirations, your module will be called into from a PJSIP thread. If you<br>  * are called into from a PJSIP thread, then you should push whatever processing<br>  * is needed to a servant as soon as possible. You can discern if you are currently<br>@@ -1584,13 +1584,13 @@<br> <br>   /*!<br>    * Run at a fixed interval.<br>-   * Stop scheduling if the callback returns 0.<br>+         * Stop scheduling if the callback returns <= 0.<br>    * Any other value is ignored.<br>         */<br>   AST_SIP_SCHED_TASK_FIXED = (0 << 0),<br>    /*!<br>    * Run at a variable interval.<br>-        * Stop scheduling if the callback returns 0.<br>+         * Stop scheduling if the callback returns <= 0.<br>    * Any other return value is used as the new interval.<br>         */<br>   AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),<br>diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c<br>index 5b86a79..4210664 100644<br>--- a/res/res_pjsip/pjsip_scheduler.c<br>+++ b/res/res_pjsip/pjsip_scheduler.c<br>@@ -28,6 +28,7 @@<br> #include "asterisk/res_pjsip.h"<br> #include "include/res_pjsip_private.h"<br> #include "asterisk/res_pjsip_cli.h"<br>+#include "asterisk/taskprocessor.h"<br> <br> #define TASK_BUCKETS 53<br> <br>@@ -36,30 +37,30 @@<br> static int task_count;<br> <br> struct ast_sip_sched_task {<br>-  /*! ast_sip_sched task id */<br>- uint32_t task_id;<br>-    /*! ast_sched scheudler id */<br>-        int current_scheduler_id;<br>-    /*! task is currently running */<br>-     int is_running;<br>-      /*! task */<br>-  ast_sip_task task;<br>+   /*! The serializer to be used (if any) (Holds a ref) */<br>+      struct ast_taskprocessor *serializer;<br>         /*! task data */<br>      void *task_data;<br>-     /*! reschedule interval in milliseconds */<br>-   int interval;<br>-        /*! the time the task was queued */<br>+  /*! task function */<br>+ ast_sip_task task;<br>+   /*! the time the task was originally scheduled/queued */<br>      struct timeval when_queued;<br>   /*! the last time the task was started */<br>     struct timeval last_start;<br>    /*! the last time the task was ended */<br>       struct timeval last_end;<br>+     /*! When the periodic task is next expected to run */<br>+        struct timeval next_periodic;<br>+        /*! reschedule interval in milliseconds */<br>+   int interval;<br>+        /*! ast_sched scheudler id */<br>+        int current_scheduler_id;<br>+    /*! task is currently running */<br>+     int is_running;<br>       /*! times run */<br>      int run_count;<br>        /*! the task reschedule, cleanup and policy flags */<br>  enum ast_sip_scheduler_task_flags flags;<br>-     /*! the serializer to be used (if any) */<br>-    struct ast_taskprocessor *serializer;<br>         /*! A name to be associated with the task */<br>  char name[0];<br> };<br>@@ -76,14 +77,19 @@<br>  */<br> static int run_task(void *data)<br> {<br>-  RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);<br>+   RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);<br>      int res;<br>      int delay;<br>+<br>+        if (!schtd->interval) {<br>+           /* Task was cancelled while waiting to be executed by the serializer */<br>+              return -1;<br>+   }<br> <br>  ao2_lock(schtd);<br>      schtd->last_start = ast_tvnow();<br>   schtd->is_running = 1;<br>-    schtd->run_count++;<br>+       ++schtd->run_count;<br>        ao2_unlock(schtd);<br> <br>         res = schtd->task(schtd->task_data);<br>@@ -93,10 +99,10 @@<br>       schtd->last_end = ast_tvnow();<br> <br>  /*<br>-    * Don't restart if the task returned 0 or if the interval<br>+        * Don't restart if the task returned <= 0 or if the interval<br>   * was set to 0 while the task was running<br>     */<br>-  if (!res || !schtd->interval) {<br>+   if (res <= 0 || !schtd->interval) {<br>             schtd->interval = 0;<br>               ao2_unlock(schtd);<br>            ao2_unlink(tasks, schtd);<br>@@ -110,13 +116,22 @@<br>      if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {<br>                 delay = schtd->interval;<br>   } else {<br>-             delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);<br>+         int64_t diff;<br>+<br>+             /* Determine next periodic interval we need to expire. */<br>+            do {<br>+                 schtd->next_periodic = ast_tvadd(schtd->next_periodic,<br>+                         ast_samp2tv(schtd->interval, 1000));<br>+                      diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);<br>+           } while (diff <= 0);<br>+              delay = diff;<br>         }<br> <br>  schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);<br>  if (schtd->current_scheduler_id < 0) {<br>          schtd->interval = 0;<br>               ao2_unlock(schtd);<br>+           ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);<br>              ao2_unlink(tasks, schtd);<br>             return -1;<br>    }<br>@@ -133,9 +148,29 @@<br> static int push_to_serializer(const void *data)<br> {<br>         struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;<br>+        int sched_id;<br> <br>+     ao2_lock(schtd);<br>+     sched_id = schtd->current_scheduler_id;<br>+   schtd->current_scheduler_id = -1;<br>+ ao2_unlock(schtd);<br>+   if (sched_id < 0) {<br>+               /* Task was cancelled while waiting on the lock */<br>+           return 0;<br>+    }<br>+<br>+ ao2_t_ref(schtd, +1, "Give ref to run_task()");<br>     if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {<br>-              ao2_ref(schtd, -1);<br>+          /*<br>+            * Oh my.  Have to cancel the scheduled item because we<br>+               * unexpectedly cannot run it anymore.<br>+                */<br>+          ao2_unlink(tasks, schtd);<br>+            ao2_lock(schtd);<br>+             schtd->interval = 0;<br>+              ao2_unlock(schtd);<br>+<br>+                ao2_t_ref(schtd, -1, "Failed so release run_task() ref");<br>   }<br> <br>  return 0;<br>@@ -144,20 +179,22 @@<br> int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)<br> {<br>        int res;<br>+     int sched_id;<br> <br>-     if (!ao2_ref_and_lock(schtd)) {<br>-              return -1;<br>-   }<br>-<br>- if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {<br>-           ao2_unlock_and_unref(schtd);<br>-         return 0;<br>-    }<br>-<br>+ /*<br>+    * Prevent any tasks in the serializer queue from<br>+     * running and restarting the scheduled item on us<br>+    * first.<br>+     */<br>+  ao2_lock(schtd);<br>      schtd->interval = 0;<br>-      ao2_unlock_and_unref(schtd);<br>+<br>+      sched_id = schtd->current_scheduler_id;<br>+   schtd->current_scheduler_id = -1;<br>+ ao2_unlock(schtd);<br>+   res = ast_sched_del(scheduler_context, sched_id);<br>+<br>  ao2_unlink(tasks, schtd);<br>-    res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);<br> <br>    return res;<br> }<br>@@ -306,7 +343,7 @@<br>  return is_running;<br> }<br> <br>-static void schtd_destructor(void *data)<br>+static void schtd_dtor(void *data)<br> {<br>         struct ast_sip_sched_task *schtd = data;<br> <br>@@ -316,6 +353,7 @@<br>      } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {<br>             ast_free(schtd->task_data);<br>        }<br>+    ast_taskprocessor_unreference(schtd->serializer);<br> }<br> <br> struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,<br>@@ -326,38 +364,60 @@<br>   struct ast_sip_sched_task *schtd;<br>     int res;<br> <br>-  if (interval < 0) {<br>+       if (interval <= 0) {<br>               return NULL;<br>  }<br> <br>- schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);<br>+        schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),<br>+           schtd_dtor);<br>  if (!schtd) {<br>                 return NULL;<br>  }<br> <br>- schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);<br>-     schtd->serializer = serializer;<br>+   schtd->serializer = ao2_bump(serializer);<br>+ schtd->task_data = task_data;<br>      schtd->task = sip_task;<br>+   schtd->interval = interval;<br>+       schtd->flags = flags;<br>      if (!ast_strlen_zero(name)) {<br>                 strcpy(schtd->name, name); /* Safe */<br>      } else {<br>-             sprintf(schtd->name, "task_%08x", schtd->task_id);<br>+           uint32_t task_id;<br>+<br>+         task_id = ast_atomic_fetchadd_int(&task_count, 1);<br>+               sprintf(schtd->name, "task_%08x", task_id);<br>      }<br>-    schtd->task_data = task_data;<br>-     schtd->flags = flags;<br>-     schtd->interval = interval;<br>        schtd->when_queued = ast_tvnow();<br>+ if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {<br>+             schtd->next_periodic = ast_tvadd(schtd->when_queued,<br>+                   ast_samp2tv(schtd->interval, 1000));<br>+      }<br> <br>  if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {<br>                ao2_ref(task_data, +1);<br>       }<br>+<br>+ /*<br>+    * We must put it in the 'tasks' container before scheduling<br>+  * the task because we don't want the push_to_serializer()<br>+        * sched task to "remove" it on failure before we even put<br>+  * it in.  If this happens then nothing would remove it from<br>+  * the 'tasks' container.<br>+     */<br>+  ao2_link(tasks, schtd);<br>+<br>+   /*<br>+    * Lock so we are guaranteed to get the sched id set before<br>+   * the push_to_serializer() sched task can clear it.<br>+  */<br>+  ao2_lock(schtd);<br>      res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);<br>+ schtd->current_scheduler_id = res;<br>+        ao2_unlock(schtd);<br>    if (res < 0) {<br>+            ao2_unlink(tasks, schtd);<br>             ao2_ref(schtd, -1);<br>           return NULL;<br>- } else {<br>-             schtd->current_scheduler_id = res;<br>-                ao2_link(tasks, schtd);<br>       }<br> <br>  return schtd;<br>@@ -457,7 +517,8 @@<br> <br> int ast_sip_initialize_scheduler(void)<br> {<br>-   if (!(scheduler_context = ast_sched_context_create())) {<br>+     scheduler_context = ast_sched_context_create();<br>+      if (!scheduler_context) {<br>             ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");<br>          return -1;<br>    }<br>@@ -487,7 +548,21 @@<br>       ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));<br> <br>        if (scheduler_context) {<br>+             if (tasks) {<br>+                 struct ao2_iterator iter;<br>+                    struct ast_sip_sched_task *schtd;<br>+<br>+                 /* Cancel all scheduled tasks */<br>+                     iter = ao2_iterator_init(tasks, 0);<br>+                  while ((schtd = ao2_iterator_next(&iter))) {<br>+                             ast_sip_sched_task_cancel(schtd);<br>+                            ao2_ref(schtd, -1);<br>+                  }<br>+                    ao2_iterator_destroy(&iter);<br>+             }<br>+<br>          ast_sched_context_destroy(scheduler_context);<br>+                scheduler_context = NULL;<br>     }<br> <br>  ao2_cleanup(tasks);<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/8749">change 8749</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/8749"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 15 </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24 </div>
<div style="display:none"> Gerrit-Change-Number: 8749 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: Richard Mudgett <rmudgett@digium.com> </div>