[Asterisk-code-review] taskprocessors: Implement high/low water mark alerts. (asterisk[certified/13.8])

Joshua Colp asteriskteam at digium.com
Thu Jun 9 15:14:05 CDT 2016


Joshua Colp has submitted this change and it was merged.

Change subject: taskprocessors: Implement high/low water mark alerts.
......................................................................


taskprocessors: Implement high/low water mark alerts.

When taskprocessors get backed up, there is a good chance that we are
being overloaded and need to defer adding new work to the system.

* Implemented a high/low water alert mechanism for modules to check if the
system is being overloaded and take appropriate action.  When a
taskprocessor is created it has default congestion levels set.  A
taskprocessor can later have those congestion levels altered for specific
needs if stress testing shows that the taskprocessor is a symptom of
overloading or needs to handle bursty activity without triggering an
overload alert.

* Add CLI "core show taskprocessor" low/high water columns.

* Fixed __allocate_taskprocessor() to not use RAII_VAR().  RAII_VAR() was
never a good thing to use when creating a taskprocessor because of the
nature of how its references needed to be cleaned up on a partial
creation.

* Made res_pjsip's distributor check if the taskprocessor overload alert
is active before placing a message representing brand new work onto a
distributor serializer.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I182f1be603529cd665958661c4c05ff9901825fa
---
M include/asterisk/taskprocessor.h
M main/taskprocessor.c
M res/res_pjsip/pjsip_distributor.c
3 files changed, 177 insertions(+), 58 deletions(-)

Approvals:
  George Joseph: Looks good to me, but someone else must approve
  Joshua Colp: Looks good to me, approved; Verified



diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index af3ce74..e511222 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -59,6 +59,7 @@
 /*! \brief Suggested maximum taskprocessor name length (less null terminator). */
 #define AST_TASKPROCESSOR_MAX_NAME	45
 
+/*! Default taskprocessor high water level alert trigger */
 #define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500
 
 /*!
@@ -297,4 +298,26 @@
  */
 long ast_taskprocessor_size(struct ast_taskprocessor *tps);
 
+/*!
+ * \brief Get the current taskprocessor high water alert count.
+ * \since 13.10.0
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_alert_get(void);
+
+/*!
+ * \brief Set the high and low alert water marks of the given taskprocessor queue.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor to update queue water marks.
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water);
+
 #endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 1ba0c8a..7ce3e4f 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -76,6 +76,10 @@
 	void *local_data;
 	/*! \brief Taskprocessor current queue size */
 	long tps_queue_size;
+	/*! \brief Taskprocessor low water clear alert level */
+	long tps_queue_low;
+	/*! \brief Taskprocessor high water alert trigger level */
+	long tps_queue_high;
 	/*! \brief Taskprocessor queue */
 	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
 	struct ast_taskprocessor_listener *listener;
@@ -85,6 +89,8 @@
 	unsigned int executing:1;
 	/*! Indicates that a high water warning has been issued on this task processor */
 	unsigned int high_water_warned:1;
+	/*! Indicates that a high water alert is active on this taskprocessor */
+	unsigned int high_water_alert:1;
 };
 
 /*!
@@ -121,14 +127,8 @@
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
-
 /*! \brief CLI <example>taskprocessor ping <blah></example> handler function */
 static int tps_ping_handler(void *datap);
-
-/*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
 
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -472,8 +472,8 @@
 	struct ao2_container *sorted_tps;
 	struct ast_taskprocessor *tps;
 	struct ao2_iterator iter;
-#define FMT_HEADERS		"%-45s %10s %10s %10s\n"
-#define FMT_FIELDS		"%-45s %10lu %10lu %10lu\n"
+#define FMT_HEADERS		"%-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS		"%-45s %10lu %10lu %10lu %10lu %10lu\n"
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -498,7 +498,7 @@
 		return CLI_FAILURE;
 	}
 
-	ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
 	tcount = 0;
 	iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
 	while ((tps = ao2_iterator_next(&iter))) {
@@ -511,7 +511,8 @@
 			maxqsize = 0;
 			processed = 0;
 		}
-		ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
+		ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+			tps->tps_queue_low, tps->tps_queue_high);
 		ast_taskprocessor_unreference(tps);
 		++tcount;
 	}
@@ -539,28 +540,106 @@
 	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! Count of the number of taskprocessors in high water alert. */
+static unsigned int tps_alert_count;
+
+/*! Access protection for tps_alert_count */
+AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
+
+/*!
+ * \internal
+ * \brief Add a delta to tps_alert_count with protection.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor updating queue water mark alert trigger.
+ * \param delta The amount to add to tps_alert_count.
+ *
+ * \return Nothing
+ */
+static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
+{
+	unsigned int old;
+
+	ast_rwlock_wrlock(&tps_alert_lock);
+	old = tps_alert_count;
+	tps_alert_count += delta;
+	if (DEBUG_ATLEAST(3)
+		/* and tps_alert_count becomes zero or non-zero */
+		&& !old != !tps_alert_count) {
+		ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
+			tps->name, tps_alert_count ? "triggered" : "cleared");
+	}
+	ast_rwlock_unlock(&tps_alert_lock);
+}
+
+unsigned int ast_taskprocessor_alert_get(void)
+{
+	unsigned int count;
+
+	ast_rwlock_rdlock(&tps_alert_lock);
+	count = tps_alert_count;
+	ast_rwlock_unlock(&tps_alert_lock);
+
+	return count;
+}
+
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
+{
+	if (!tps || high_water < 0 || high_water < low_water) {
+		return -1;
+	}
+
+	if (low_water < 0) {
+		/* Set low water level to 90% of high water level */
+		low_water = (high_water * 9) / 10;
+	}
+
+	ao2_lock(tps);
+
+	tps->tps_queue_low = low_water;
+	tps->tps_queue_high = high_water;
+
+	if (tps->high_water_alert) {
+		if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
+			/* Update water mark alert immediately */
+			tps->high_water_alert = 0;
+			tps_alert_add(tps, -1);
+		}
+	} else {
+		if (high_water <= tps->tps_queue_size) {
+			/* Update water mark alert immediately */
+			tps->high_water_alert = 1;
+			tps_alert_add(tps, +1);
+		}
+	}
+
+	ao2_unlock(tps);
+
+	return 0;
+}
+
 /* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
+static void tps_taskprocessor_dtor(void *tps)
 {
 	struct ast_taskprocessor *t = tps;
 	struct tps_task *task;
 
-	if (!tps) {
-		ast_log(LOG_ERROR, "missing taskprocessor\n");
-		return;
-	}
-	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
-	/* free it */
-	ast_free(t->stats);
-	t->stats = NULL;
-	ast_free((char *) t->name);
-	if (t->listener) {
-		ao2_ref(t->listener, -1);
-		t->listener = NULL;
-	}
 	while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
 		tps_task_free(task);
 	}
+	t->tps_queue_size = 0;
+
+	if (t->high_water_alert) {
+		t->high_water_alert = 0;
+		tps_alert_add(t, -1);
+	}
+
+	ast_free(t->stats);
+	t->stats = NULL;
+	ast_free((char *) t->name);
+	t->name = NULL;
+	ao2_cleanup(t->listener);
+	t->listener = NULL;
 }
 
 /* pop the front task and return it */
@@ -569,7 +648,11 @@
 	struct tps_task *task;
 
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
-		tps->tps_queue_size--;
+		--tps->tps_queue_size;
+		if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
+			tps->high_water_alert = 0;
+			tps_alert_add(tps, -1);
+		}
 	}
 	return task;
 }
@@ -648,19 +731,22 @@
 
 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
 {
-	RAII_VAR(struct ast_taskprocessor *, p,
-			ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
+	struct ast_taskprocessor *p;
 
+	p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
 	if (!p) {
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
 		return NULL;
 	}
 
-	if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
-		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
-		return NULL;
-	}
-	if (!(p->name = ast_strdup(name))) {
+	/* Set default congestion water level alert triggers. */
+	p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
+	p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+
+	p->stats = ast_calloc(1, sizeof(*p->stats));
+	p->name = ast_strdup(name);
+	if (!p->stats || !p->name) {
+		ao2_ref(p, -1);
 		return NULL;
 	}
 
@@ -675,22 +761,18 @@
 	if (!(ao2_link(tps_singletons, p))) {
 		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
 		listener->tps = NULL;
-		ao2_ref(p, -1);
+		ao2_ref(p, -2);
 		return NULL;
 	}
 
 	if (p->listener->callbacks->start(p->listener)) {
-		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+			p->name);
 		ast_taskprocessor_unreference(p);
 		return NULL;
 	}
 
-	/* RAII_VAR will decrement the refcount at the end of the function.
-	 * Since we want to pass back a reference to p, we bump the refcount
-	 */
-	ao2_ref(p, +1);
 	return p;
-
 }
 
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
@@ -799,10 +881,16 @@
 	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
 	previous_size = tps->tps_queue_size++;
 
-	if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
-		ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
-			tps->name, previous_size);
-		tps->high_water_warned = 1;
+	if (previous_size >= tps->tps_queue_high) {
+		if (!tps->high_water_warned) {
+			tps->high_water_warned = 1;
+			ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
+				tps->name, previous_size);
+		}
+		if (!tps->high_water_alert) {
+			tps->high_water_alert = 1;
+			tps_alert_add(tps, +1);
+		}
 	}
 
 	/* The currently executing task counts as still in queue */
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 1291690..a033f3b 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -333,8 +333,6 @@
 	.on_rx_request = endpoint_lookup,
 };
 
-#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
-
 static pj_bool_t distributor(pjsip_rx_data *rdata)
 {
 	pjsip_dialog *dlg;
@@ -372,6 +370,13 @@
 				rdata->msg_info.info);
 		serializer = find_request_serializer(rdata);
 		if (!serializer) {
+			if (ast_taskprocessor_alert_get()) {
+				/* We're overloaded, ignore the unmatched response. */
+				ast_debug(3, "Taskprocessor overload alert: Ignoring unmatched '%s'.\n",
+					pjsip_rx_data_get_info(rdata));
+				return PJ_TRUE;
+			}
+
 			/*
 			 * Pick a serializer for the unmatched response.  Maybe
 			 * the stack can figure out what it is for, or we really
@@ -386,6 +391,21 @@
 			PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
 		return PJ_TRUE;
 	} else {
+		if (ast_taskprocessor_alert_get()) {
+			/*
+			 * When taskprocessors get backed up, there is a good chance that
+			 * we are being overloaded and need to defer adding new work to
+			 * the system.  To defer the work we will ignore the request and
+			 * rely on the peer's transport layer to retransmit the message.
+			 * We usually work off the overload within a few seconds.  The
+			 * alternative is to send back a 503 response to these requests
+			 * and be done with it.
+			 */
+			ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
+				pjsip_rx_data_get_info(rdata));
+			return PJ_TRUE;
+		}
+
 		/* Pick a serializer for the out-of-dialog request. */
 		serializer = ast_sip_get_distributor_serializer(rdata);
 	}
@@ -396,21 +416,9 @@
 		clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
 	}
 
-	if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
-		/* When the threadpool is backed up this much, there is a good chance that we have encountered
-		 * some sort of terrible condition and don't need to be adding more work to the threadpool.
-		 * It's in our best interest to send back a 503 response and be done with it.
-		 */
-		if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
-			pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
-		}
+	if (ast_sip_push_task(serializer, distribute, clone)) {
 		ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
 		pjsip_rx_data_free_cloned(clone);
-	} else {
-		if (ast_sip_push_task(serializer, distribute, clone)) {
-			ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
-			pjsip_rx_data_free_cloned(clone);
-		}
 	}
 
 	ast_taskprocessor_unreference(serializer);

-- 
To view, visit https://gerrit.asterisk.org/2998
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I182f1be603529cd665958661c4c05ff9901825fa
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: certified/13.8
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>



More information about the asterisk-code-review mailing list