[asterisk-commits] kmoore: branch kmoore/scheduler r430273 - in /team/kmoore/scheduler: include/...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Jan 6 18:51:46 CST 2015


Author: kmoore
Date: Tue Jan  6 18:51:38 2015
New Revision: 430273

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=430273
Log:
Move sched to sched2 since the changeover can't be clean

Added:
    team/kmoore/scheduler/include/asterisk/sched2.h
      - copied unchanged from r430272, team/kmoore/scheduler/include/asterisk/sched.h
    team/kmoore/scheduler/main/sched2.c
      - copied unchanged from r430272, team/kmoore/scheduler/main/sched.c
Modified:
    team/kmoore/scheduler/include/asterisk/sched.h
    team/kmoore/scheduler/main/sched.c

Modified: team/kmoore/scheduler/include/asterisk/sched.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/scheduler/include/asterisk/sched.h?view=diff&rev=430273&r1=430272&r2=430273
==============================================================================
--- team/kmoore/scheduler/include/asterisk/sched.h (original)
+++ team/kmoore/scheduler/include/asterisk/sched.h Tue Jan  6 18:51:38 2015
@@ -28,6 +28,123 @@
 extern "C" {
 #endif
 
+/*! 
+ * \brief Remove a scheduler entry
+ *
+ * This is a loop construct to ensure that
+ * the scheduled task get deleted. The idea is that
+ * if we loop attempting to remove the scheduled task,
+ * then whatever callback had been running will complete
+ * and reinsert the task into the scheduler.
+ *
+ * Since macro expansion essentially works like pass-by-name
+ * parameter passing, this macro will still work correctly even
+ * if the id of the task to delete changes. This holds as long as 
+ * the name of the id which could change is passed to the macro 
+ * and not a copy of the value of the id.
+ */
+#define AST_SCHED_DEL(sched, id) \
+	({ \
+		int _count = 0; \
+		int _sched_res = -1; \
+		while (id > -1 && (_sched_res = ast_sched_del(sched, id)) && ++_count < 10) \
+			usleep(1); \
+		if (_count == 10) { \
+			ast_debug(3, "Unable to cancel schedule ID %d.\n", id); \
+		} \
+		id = -1; \
+		(_sched_res); \
+	})
+
+#define AST_SCHED_DEL_ACCESSOR(sched, obj, getter, setter) \
+	({ \
+		int _count = 0; \
+		int _sched_res = -1; \
+		while (getter(obj) > -1 && (_sched_res = ast_sched_del(sched, getter(obj))) && ++_count < 10) \
+			usleep(1); \
+		if (_count == 10) { \
+			ast_debug(3, "Unable to cancel schedule ID %d.\n", getter(obj)); \
+		} \
+		setter(obj, -1); \
+		(_sched_res); \
+	})
+
+/*!
+ * \brief schedule task to get deleted and call unref function
+ * \sa AST_SCHED_DEL
+ * \since 1.6.1
+ */
+#define AST_SCHED_DEL_UNREF(sched, id, refcall)			\
+	do { \
+		int _count = 0; \
+		while (id > -1 && ast_sched_del(sched, id) && ++_count < 10) { \
+			usleep(1); \
+		} \
+		if (_count == 10) \
+			ast_log(LOG_WARNING, "Unable to cancel schedule ID %d.  This is probably a bug (%s: %s, line %d).\n", id, __FILE__, __PRETTY_FUNCTION__, __LINE__); \
+		if (id > -1) \
+			refcall; \
+		id = -1; \
+	} while (0);
+
+/*!
+ * \brief schedule task to get deleted releasing the lock between attempts
+ * \since 1.6.1
+ */
+#define AST_SCHED_DEL_SPINLOCK(sched, id, lock) \
+	({ \
+		int _count = 0; \
+		int _sched_res = -1; \
+		while (id > -1 && (_sched_res = ast_sched_del(sched, id)) && ++_count < 10) { \
+			ast_mutex_unlock(lock); \
+			usleep(1); \
+			ast_mutex_lock(lock); \
+		} \
+		if (_count == 10) { \
+			ast_debug(3, "Unable to cancel schedule ID %d.\n", id); \
+		} \
+		id = -1; \
+		(_sched_res); \
+	})
+
+#define AST_SCHED_REPLACE_VARIABLE(id, sched, when, callback, data, variable) \
+	do { \
+		int _count = 0; \
+		while (id > -1 && ast_sched_del(sched, id) && ++_count < 10) { \
+			usleep(1); \
+		} \
+		if (_count == 10) \
+			ast_log(LOG_WARNING, "Unable to cancel schedule ID %d.  This is probably a bug (%s: %s, line %d).\n", id, __FILE__, __PRETTY_FUNCTION__, __LINE__); \
+		id = ast_sched_add_variable(sched, when, callback, data, variable); \
+	} while (0);
+
+#define AST_SCHED_REPLACE(id, sched, when, callback, data) \
+		AST_SCHED_REPLACE_VARIABLE(id, sched, when, callback, data, 0)
+
+/*!
+ * \note Not currently used in the source?
+ * \since 1.6.1
+ */
+#define AST_SCHED_REPLACE_VARIABLE_UNREF(id, sched, when, callback, data, variable, unrefcall, addfailcall, refcall) \
+	do { \
+		int _count = 0, _res=1;											 \
+		void *_data = (void *)ast_sched_find_data(sched, id);			\
+		while (id > -1 && (_res = ast_sched_del(sched, id) && _count++ < 10)) { \
+			usleep(1); \
+		} \
+		if (!_res && _data)							\
+			unrefcall;	/* should ref _data! */		\
+		if (_count == 10) \
+			ast_log(LOG_WARNING, "Unable to cancel schedule ID %d.  This is probably a bug (%s: %s, line %d).\n", id, __FILE__, __PRETTY_FUNCTION__, __LINE__); \
+		refcall; \
+		id = ast_sched_add_variable(sched, when, callback, data, variable); \
+		if (id == -1)  \
+			addfailcall;	\
+	} while (0);
+
+#define AST_SCHED_REPLACE_UNREF(id, sched, when, callback, data, unrefcall, addfailcall, refcall) \
+	AST_SCHED_REPLACE_VARIABLE_UNREF(id, sched, when, callback, data, 0, unrefcall, addfailcall, refcall)
+
 /*!
  * \brief Create a scheduler context
  *
@@ -47,12 +164,10 @@
  *
  * A scheduler callback takes a pointer with callback data and
  *
- * \param obj The data provided when the event was scheduled
- *
  * \retval 0 if the callback should not be rescheduled
- * \retval non-zero if the callback should be scheduled again
- */
-typedef int (*ast_sched_cb)(void *obj);
+ * \retval non-zero if the callback should be scheduled agai
+ */
+typedef int (*ast_sched_cb)(const void *data);
 #define AST_SCHED_CB(a) ((ast_sched_cb)(a))
 
 struct ast_cb_names {
@@ -63,7 +178,6 @@
 
 /*!
  * \brief Show statics on what it is in the schedule queue
- *
  * \param con Schedule context to check
  * \param buf dynamic string to store report
  * \param cbnames to check against
@@ -83,11 +197,24 @@
  * \param con Scheduler context to add
  * \param when how many milliseconds to wait for event to occur
  * \param callback function to call when the amount of time expires
- * \param obj AO2 object to pass to the callback
+ * \param data data to pass to the callback
  *
  * \return Returns a schedule item ID on success, -1 on failure
  */
-#define ast_sched_add(con, when, callback, obj) ast_sched_add_variable(con, when, callback, obj, 0)
+int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result;
+
+/*!
+ * \brief replace a scheduler entry
+ * \deprecated You should use the AST_SCHED_REPLACE() macro instead.
+ *
+ * This deletes the scheduler entry for old_id if it exists, and then
+ * calls ast_sched_add to create a new entry.  A negative old_id will
+ * be ignored.
+ *
+ * \retval -1 failure
+ * \retval otherwise, returns scheduled item ID
+ */
+int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result;
 
 /*!
  * \brief Adds a scheduled event with rescheduling support
@@ -95,7 +222,7 @@
  * \param con Scheduler context to add
  * \param when how many milliseconds to wait for event to occur
  * \param callback function to call when the amount of time expires
- * \param obj AO2 object to pass to the callback
+ * \param data data to pass to the callback
  * \param variable If true, the result value of callback function will be
  *       used for rescheduling
  *
@@ -107,27 +234,39 @@
  *
  * \return Returns a schedule item ID on success, -1 on failure
  */
-int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, void *obj, int variable) attribute_warn_unused_result;
+int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) attribute_warn_unused_result;
+
+/*!
+ * \brief replace a scheduler entry
+ * \deprecated You should use the AST_SCHED_REPLACE_VARIABLE() macro instead.
+ *
+ * This deletes the scheduler entry for old_id if it exists, and then
+ * calls ast_sched_add to create a new entry.  A negative old_id will
+ * be ignored.
+ *
+ * \retval -1 failure
+ * \retval otherwise, returns scheduled item ID
+ */
+int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) attribute_warn_unused_result;
 
 /*! 
  * \brief Find a sched structure and return the data field associated with it. 
- * \since 1.6.1
  *
  * \param con scheduling context in which to search fro the matching id
  * \param id ID of the scheduled item to find
- *
- * \return the AO2 object from the matching scheduled event (must be ao2_cleanup()d)
- * \return NULL if not found or object not provided
- */
-void *ast_sched_find_data(struct ast_sched_context *con, int id);
+ * \return the data field from the matching sched struct if found; else return NULL if not found.
+ *
+ * \since 1.6.1
+ */
+const void *ast_sched_find_data(struct ast_sched_context *con, int id);
 
 /*!
  * \brief Deletes a scheduled event
  *
- * Remove this event from being run. A procedure should not remove its own
- * event, but return 0 instead. If a scheduled event is deleted while executing,
- * that event will be prevented from rescheduling itself. Additional deletion
- * attempts will result in failure, even if the callback is still executing.
+ * Remove this event from being run.  A procedure should not remove its own
+ * event, but return 0 instead.  In most cases, you should not call this
+ * routine directly, but use the AST_SCHED_DEL() macro instead (especially if
+ * you don't intend to do something different when it returns failure).
  *
  * \param con scheduling context to delete item from
  * \param id ID of the scheduled item to delete
@@ -187,6 +326,24 @@
 long ast_sched_when(struct ast_sched_context *con,int id);
 
 /*!
+ * \brief Convenience macro for objects and reference (add)
+ *
+ */
+#define ast_sched_add_object(obj,con,when,callback) ast_sched_add((con),(when),(callback), ASTOBJ_REF((obj)))
+
+/*!
+ * \brief Convenience macro for objects and reference (del)
+ *
+ */
+#define ast_sched_del_object(obj,destructor,con,id) do { \
+	if ((id) > -1) { \
+		ast_sched_del((con),(id)); \
+		(id) = -1; \
+		ASTOBJ_UNREF((obj),(destructor)); \
+	} \
+} while(0)
+
+/*!
  * \brief Start a thread for processing scheduler entries
  *
  * \param con the scheduler context this thread will manage

Modified: team/kmoore/scheduler/main/sched.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/scheduler/main/sched.c?view=diff&rev=430273&r1=430272&r2=430273
==============================================================================
--- team/kmoore/scheduler/main/sched.c (original)
+++ team/kmoore/scheduler/main/sched.c Tue Jan  6 18:51:38 2015
@@ -67,10 +67,9 @@
 	int id;                       /*!< ID number of event */
 	struct timeval when;          /*!< Absolute time event should take place */
 	int resched;                  /*!< When to reschedule */
-	void *object;                 /*!< AO2 object for callback */
+	int variable;                 /*!< Use return value from callback to reschedule */
+	const void *data;             /*!< Data */
 	ast_sched_cb callback;        /*!< Callback */
-	unsigned int variable:1;      /*!< Use return value from callback to reschedule */
-	unsigned int deleted:1;       /*!< Flagged for deletion during execution */
 	ssize_t __heap_index;
 	/*!
 	 * Used to synchronize between thread running a task and thread
@@ -90,7 +89,7 @@
 struct ast_sched_context {
 	ast_mutex_t lock;
 	unsigned int eventcnt;                  /*!< Number of events processed */
-	unsigned int highwater;			/*!< highest count so far */
+	unsigned int highwater;					/*!< highest count so far */
 	struct ast_heap *sched_heap;
 	struct sched_thread *sched_thread;
 	/*! The scheduled task that is currently executing */
@@ -280,8 +279,6 @@
 	 * Add to the cache, or just free() if we
 	 * already have too many cache entries
 	 */
-	ao2_cleanup(tmp->obj);
-	tmp->obj = NULL;
 
 #ifdef SCHED_MAX_CACHE
 	if (con->schedccnt < SCHED_MAX_CACHE) {
@@ -362,7 +359,7 @@
 /*! \brief
  * Schedule callback(data) to happen when ms into the future
  */
-int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, void *obj, int variable)
+int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
 {
 	struct sched *tmp;
 	int res = -1;
@@ -373,11 +370,10 @@
 	if ((tmp = sched_alloc(con))) {
 		tmp->id = con->eventcnt++;
 		tmp->callback = callback;
-		tmp->object = ao2_bump(obj);
+		tmp->data = data;
 		tmp->resched = when;
 		tmp->variable = variable;
 		tmp->when = ast_tv(0, 0);
-		tmp->deleted = 0;
 		if (sched_settime(&tmp->when, when)) {
 			sched_release(con, tmp);
 		} else {
@@ -437,7 +433,7 @@
 
 	s = sched_find(con, id);
 	if (s) {
-		data = ao2_bump(s->object);
+		data = s->data;
 	}
 
 	ast_mutex_unlock(&con->lock);
@@ -474,7 +470,7 @@
 			ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
 		}
 		sched_release(con, s);
-	} else if (con->currently_executing && (id == con->currently_executing->id) && !con->currently_executing->deleted) {
+	} else if (con->currently_executing && (id == con->currently_executing->id)) {
 		s = con->currently_executing;
 		s->deleted = 1;
 		/* Wait for executing task to complete so that caller of ast_sched_del() does not
@@ -500,10 +496,9 @@
 		ast_assert(s != NULL);
 #else
 		{
-			char buf[100];
-
-			snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
-			_ast_assert(0, buf, file, line, function);
+		char buf[100];
+		snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
+		_ast_assert(0, buf, file, line, function);
 		}
 #endif
 		*last_id = id;
@@ -577,7 +572,7 @@
 		ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
 			q->id,
 			q->callback,
-			q->object,
+			q->data,
 			(long)delta.tv_sec,
 			(long int)delta.tv_usec);
 	}
@@ -590,6 +585,7 @@
  */
 int ast_sched_runq(struct ast_sched_context *con)
 {
+	struct sched *current;
 	struct timeval when;
 	int numevents;
 	int res;
@@ -599,17 +595,17 @@
 	ast_mutex_lock(&con->lock);
 
 	when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
-	for (numevents = 0; (con->current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
+	for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
 		/* schedule all events which are going to expire within 1ms.
 		 * We only care about millisecond accuracy anyway, so this will
 		 * help us get more than one event at one time if they are very
 		 * close together.
 		 */
-		if (ast_tvcmp(con->current->when, when) != -1) {
+		if (ast_tvcmp(current->when, when) != -1) {
 			break;
 		}
 
-		con->current = ast_heap_pop(con->sched_heap);
+		current = ast_heap_pop(con->sched_heap);
 
 		/*
 		 * At this point, the schedule queue is still intact.  We
@@ -622,7 +618,7 @@
 
 		con->currently_executing = current;
 		ast_mutex_unlock(&con->lock);
-		res = current->callback(current->object);
+		res = current->callback(current->data);
 		ast_mutex_lock(&con->lock);
 		con->currently_executing = NULL;
 		ast_cond_signal(&current->cond);
@@ -632,18 +628,17 @@
 			 * If they return non-zero, we should schedule them to be
 			 * run again.
 			 */
-			if (sched_settime(&con->current->when, con->current->variable ? res : con->current->resched)) {
-				sched_release(con, con->current);
+			if (sched_settime(&current->when, current->variable? res : current->resched)) {
+				sched_release(con, current);
 			} else {
-				schedule(con, con->current);
+				schedule(con, current);
 			}
 		} else {
 			/* No longer needed, so release it */
-			sched_release(con, con->current);
-		}
-	}
-
-	con->current = NULL;
+			sched_release(con, current);
+		}
+	}
+
 	ast_mutex_unlock(&con->lock);
 
 	return numevents;




More information about the asterisk-commits mailing list