[Asterisk-code-review] pjsip scheduler.c: Fix some corner cases. (asterisk[15])

Richard Mudgett asteriskteam at digium.com
Mon Apr 9 18:24:06 CDT 2018


Richard Mudgett has uploaded this change for review. ( https://gerrit.asterisk.org/8749


Change subject: pjsip_scheduler.c: Fix some corner cases.
......................................................................

pjsip_scheduler.c: Fix some corner cases.

* Fix the periodic interval wander because it may take significant time
between the sched thread queueing the task in the serializer and the
serializer actually executing the task.  The time it takes to actually
execute the task was already taken into account.

* Pass a schtd ref to the serializer when we queue a scheduled task on
the serializer.  We don't want it going away on us while it is in the
serializer queue.

* Skip the scheduled task if the task was canceled between queueing the
task to the serializer and the serializer actually executing the task.

* Reorder struct ast_sip_sched_task to avoid unnecessary padding.  Removed
task_id and added next_periodic.

* Hold a ref to the passed in serializer so the serializer cannot go away
on the scheduled task.

ASTERISK_26806

Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24
---
M include/asterisk/res_pjsip.h
M res/res_pjsip/pjsip_scheduler.c
2 files changed, 121 insertions(+), 46 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/49/8749/1

diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index cb9aa2d..77b808d 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1403,7 +1403,7 @@
  * the next item on the SIP socket(s) can be serviced. On incoming messages,
  * Asterisk automatically will push the request to a servant thread. When your
  * module callback is called, processing will already be in a servant. However,
- * for other PSJIP events, such as transaction state changes due to timer
+ * for other PJSIP events, such as transaction state changes due to timer
  * expirations, your module will be called into from a PJSIP thread. If you
  * are called into from a PJSIP thread, then you should push whatever processing
  * is needed to a servant as soon as possible. You can discern if you are currently
@@ -1584,13 +1584,13 @@
 
 	/*!
 	 * Run at a fixed interval.
-	 * Stop scheduling if the callback returns 0.
+	 * Stop scheduling if the callback returns <= 0.
 	 * Any other value is ignored.
 	 */
 	AST_SIP_SCHED_TASK_FIXED = (0 << 0),
 	/*!
 	 * Run at a variable interval.
-	 * Stop scheduling if the callback returns 0.
+	 * Stop scheduling if the callback returns <= 0.
 	 * Any other return value is used as the new interval.
 	 */
 	AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
index 5b86a79..4210664 100644
--- a/res/res_pjsip/pjsip_scheduler.c
+++ b/res/res_pjsip/pjsip_scheduler.c
@@ -28,6 +28,7 @@
 #include "asterisk/res_pjsip.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/res_pjsip_cli.h"
+#include "asterisk/taskprocessor.h"
 
 #define TASK_BUCKETS 53
 
@@ -36,30 +37,30 @@
 static int task_count;
 
 struct ast_sip_sched_task {
-	/*! ast_sip_sched task id */
-	uint32_t task_id;
-	/*! ast_sched scheudler id */
-	int current_scheduler_id;
-	/*! task is currently running */
-	int is_running;
-	/*! task */
-	ast_sip_task task;
+	/*! The serializer to be used (if any) (Holds a ref) */
+	struct ast_taskprocessor *serializer;
 	/*! task data */
 	void *task_data;
-	/*! reschedule interval in milliseconds */
-	int interval;
-	/*! the time the task was queued */
+	/*! task function */
+	ast_sip_task task;
+	/*! the time the task was originally scheduled/queued */
 	struct timeval when_queued;
 	/*! the last time the task was started */
 	struct timeval last_start;
 	/*! the last time the task was ended */
 	struct timeval last_end;
+	/*! When the periodic task is next expected to run */
+	struct timeval next_periodic;
+	/*! reschedule interval in milliseconds */
+	int interval;
+	/*! ast_sched scheudler id */
+	int current_scheduler_id;
+	/*! task is currently running */
+	int is_running;
 	/*! times run */
 	int run_count;
 	/*! the task reschedule, cleanup and policy flags */
 	enum ast_sip_scheduler_task_flags flags;
-	/*! the serializer to be used (if any) */
-	struct ast_taskprocessor *serializer;
 	/*! A name to be associated with the task */
 	char name[0];
 };
@@ -76,14 +77,19 @@
  */
 static int run_task(void *data)
 {
-	RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+	RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
 	int res;
 	int delay;
+
+	if (!schtd->interval) {
+		/* Task was cancelled while waiting to be executed by the serializer */
+		return -1;
+	}
 
 	ao2_lock(schtd);
 	schtd->last_start = ast_tvnow();
 	schtd->is_running = 1;
-	schtd->run_count++;
+	++schtd->run_count;
 	ao2_unlock(schtd);
 
 	res = schtd->task(schtd->task_data);
@@ -93,10 +99,10 @@
 	schtd->last_end = ast_tvnow();
 
 	/*
-	 * Don't restart if the task returned 0 or if the interval
+	 * Don't restart if the task returned <= 0 or if the interval
 	 * was set to 0 while the task was running
 	 */
-	if (!res || !schtd->interval) {
+	if (res <= 0 || !schtd->interval) {
 		schtd->interval = 0;
 		ao2_unlock(schtd);
 		ao2_unlink(tasks, schtd);
@@ -110,13 +116,22 @@
 	if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
 		delay = schtd->interval;
 	} else {
-		delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+		int64_t diff;
+
+		/* Determine next periodic interval we need to expire. */
+		do {
+			schtd->next_periodic = ast_tvadd(schtd->next_periodic,
+				ast_samp2tv(schtd->interval, 1000));
+			diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
+		} while (diff <= 0);
+		delay = diff;
 	}
 
 	schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
 	if (schtd->current_scheduler_id < 0) {
 		schtd->interval = 0;
 		ao2_unlock(schtd);
+		ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
 		ao2_unlink(tasks, schtd);
 		return -1;
 	}
@@ -133,9 +148,29 @@
 static int push_to_serializer(const void *data)
 {
 	struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+	int sched_id;
 
+	ao2_lock(schtd);
+	sched_id = schtd->current_scheduler_id;
+	schtd->current_scheduler_id = -1;
+	ao2_unlock(schtd);
+	if (sched_id < 0) {
+		/* Task was cancelled while waiting on the lock */
+		return 0;
+	}
+
+	ao2_t_ref(schtd, +1, "Give ref to run_task()");
 	if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
-		ao2_ref(schtd, -1);
+		/*
+		 * Oh my.  Have to cancel the scheduled item because we
+		 * unexpectedly cannot run it anymore.
+		 */
+		ao2_unlink(tasks, schtd);
+		ao2_lock(schtd);
+		schtd->interval = 0;
+		ao2_unlock(schtd);
+
+		ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
 	}
 
 	return 0;
@@ -144,20 +179,22 @@
 int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
 {
 	int res;
+	int sched_id;
 
-	if (!ao2_ref_and_lock(schtd)) {
-		return -1;
-	}
-
-	if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
-		ao2_unlock_and_unref(schtd);
-		return 0;
-	}
-
+	/*
+	 * Prevent any tasks in the serializer queue from
+	 * running and restarting the scheduled item on us
+	 * first.
+	 */
+	ao2_lock(schtd);
 	schtd->interval = 0;
-	ao2_unlock_and_unref(schtd);
+
+	sched_id = schtd->current_scheduler_id;
+	schtd->current_scheduler_id = -1;
+	ao2_unlock(schtd);
+	res = ast_sched_del(scheduler_context, sched_id);
+
 	ao2_unlink(tasks, schtd);
-	res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
 
 	return res;
 }
@@ -306,7 +343,7 @@
 	return is_running;
 }
 
-static void schtd_destructor(void *data)
+static void schtd_dtor(void *data)
 {
 	struct ast_sip_sched_task *schtd = data;
 
@@ -316,6 +353,7 @@
 	} else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
 		ast_free(schtd->task_data);
 	}
+	ast_taskprocessor_unreference(schtd->serializer);
 }
 
 struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
@@ -326,38 +364,60 @@
 	struct ast_sip_sched_task *schtd;
 	int res;
 
-	if (interval < 0) {
+	if (interval <= 0) {
 		return NULL;
 	}
 
-	schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+	schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
+		schtd_dtor);
 	if (!schtd) {
 		return NULL;
 	}
 
-	schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
-	schtd->serializer = serializer;
+	schtd->serializer = ao2_bump(serializer);
+	schtd->task_data = task_data;
 	schtd->task = sip_task;
+	schtd->interval = interval;
+	schtd->flags = flags;
 	if (!ast_strlen_zero(name)) {
 		strcpy(schtd->name, name); /* Safe */
 	} else {
-		sprintf(schtd->name, "task_%08x", schtd->task_id);
+		uint32_t task_id;
+
+		task_id = ast_atomic_fetchadd_int(&task_count, 1);
+		sprintf(schtd->name, "task_%08x", task_id);
 	}
-	schtd->task_data = task_data;
-	schtd->flags = flags;
-	schtd->interval = interval;
 	schtd->when_queued = ast_tvnow();
+	if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
+		schtd->next_periodic = ast_tvadd(schtd->when_queued,
+			ast_samp2tv(schtd->interval, 1000));
+	}
 
 	if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
 		ao2_ref(task_data, +1);
 	}
+
+	/*
+	 * We must put it in the 'tasks' container before scheduling
+	 * the task because we don't want the push_to_serializer()
+	 * sched task to "remove" it on failure before we even put
+	 * it in.  If this happens then nothing would remove it from
+	 * the 'tasks' container.
+	 */
+	ao2_link(tasks, schtd);
+
+	/*
+	 * Lock so we are guaranteed to get the sched id set before
+	 * the push_to_serializer() sched task can clear it.
+	 */
+	ao2_lock(schtd);
 	res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
+	schtd->current_scheduler_id = res;
+	ao2_unlock(schtd);
 	if (res < 0) {
+		ao2_unlink(tasks, schtd);
 		ao2_ref(schtd, -1);
 		return NULL;
-	} else {
-		schtd->current_scheduler_id = res;
-		ao2_link(tasks, schtd);
 	}
 
 	return schtd;
@@ -457,7 +517,8 @@
 
 int ast_sip_initialize_scheduler(void)
 {
-	if (!(scheduler_context = ast_sched_context_create())) {
+	scheduler_context = ast_sched_context_create();
+	if (!scheduler_context) {
 		ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
 		return -1;
 	}
@@ -487,7 +548,21 @@
 	ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
 	if (scheduler_context) {
+		if (tasks) {
+			struct ao2_iterator iter;
+			struct ast_sip_sched_task *schtd;
+
+			/* Cancel all scheduled tasks */
+			iter = ao2_iterator_init(tasks, 0);
+			while ((schtd = ao2_iterator_next(&iter))) {
+				ast_sip_sched_task_cancel(schtd);
+				ao2_ref(schtd, -1);
+			}
+			ao2_iterator_destroy(&iter);
+		}
+
 		ast_sched_context_destroy(scheduler_context);
+		scheduler_context = NULL;
 	}
 
 	ao2_cleanup(tasks);

-- 
To view, visit https://gerrit.asterisk.org/8749
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 15
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6c8046b75f6953792c8c30e55b836a4291143f24
Gerrit-Change-Number: 8749
Gerrit-PatchSet: 1
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180409/108c3644/attachment-0001.html>


More information about the asterisk-code-review mailing list