[Asterisk-code-review] taskprocessor: Enable subsystems and overload by subsystem (asterisk[16])
George Joseph
asteriskteam at digium.com
Fri Feb 15 13:10:55 CST 2019
George Joseph has uploaded this change for review. ( https://gerrit.asterisk.org/11002
Change subject: taskprocessor: Enable subsystems and overload by subsystem
......................................................................
taskprocessor: Enable subsystems and overload by subsystem
To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.
* Any taskprocessor name that has a '/' will have the part
before the '/' saved as its "subsystem".
Examples:
"sorcery/acl-0000006a" and "sorcery/aor-00000019"
will be grouped to subsystem "sorcery".
"pjsip/distributor-00000025" and "pjsip/distributor-00000026"
will bn grouped to subsystem "pjsip".
Taskprocessors with no '/' have an empty subsystem.
The "core show taskprocessors" CLI command now shows the subsystem.
* When a taskprocessor enters high-water alert status and it
has a non-empty system, the subsystem name will be marked as
"overloaded".
* When a taskprocessor leaves high-water alert status and it
has a non-empty system, the subsystem name will be unmarked as
"overloaded".
* A new api ast_taskprocessor_is_subsystem_overloaded() has been
added that checks if a specific subsystem is overloaded.
* The pjsip distributor now uses
ast_taskprocessor_is_subsystem_overloaded("pjsip") to determine
if it should pause accepting new messages.
REMINDER: The taskprocessor code doesn't take any action itself based
on high-water alerts or overloading. It's up to taskprocessor users
to check and take action themselves. Currently only the pjsip
distributor does this.
Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
---
M include/asterisk/taskprocessor.h
M main/stasis.c
M main/taskprocessor.c
M res/res_pjsip/pjsip_distributor.c
4 files changed, 97 insertions(+), 11 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/02/11002/1
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..a97957f 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -342,6 +342,18 @@
unsigned int ast_taskprocessor_alert_get(void);
/*!
+ * \brief Test if any taskprocessors in the subsystem are in high water alert.
+ * \since 13.26.0
+ * \since 16.3.0
+ *
+ *\param subsystem The subsystem to test
+ *
+ * \retval 0 if no taskprocessors for this subsystem are in high water alert.
+ * \retval non-zero if some task processors for this subsystem are in high water alert.
+ */
+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem);
+
+/*!
* \brief Set the high and low alert water marks of the given taskprocessor queue.
* \since 13.10.0
*
diff --git a/main/stasis.c b/main/stasis.c
index fea9ac5..4ca89b1 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -677,7 +677,7 @@
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
/* Create name with seq number appended. */
- ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c/%s",
use_thread_pool ? 'p' : 'm',
stasis_topic_name(topic));
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 30aeddb..a0bfef4 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -89,8 +89,12 @@
unsigned int high_water_alert:1;
/*! Indicates if the taskprocessor is currently suspended */
unsigned int suspended:1;
+ /*! \brief Anyything before the first '/' in the name (if there is one) */
+ char *subsystem;
/*! \brief Friendly name of the taskprocessor */
- char name[0];
+ char *name;
+ /*! \brief Buffer for subsystem and name */
+ char buffer[0];
};
/*!
@@ -122,6 +126,8 @@
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
static struct ao2_container *tps_singletons;
+static AST_VECTOR_RW(,char *) overloaded_subsystems;
+
/*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition */
static ast_cond_t cli_ping_cond;
@@ -271,6 +277,8 @@
static void tps_shutdown(void)
{
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
+ AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, free);
+ AST_VECTOR_RW_FREE(&overloaded_subsystems);
ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
tps_singletons = NULL;
}
@@ -285,6 +293,12 @@
return -1;
}
+ if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
+ ao2_ref(tps_singletons, -1);
+ ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+ return -1;
+ }
+
ast_cond_init(&cli_ping_cond, NULL);
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -478,6 +492,7 @@
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
char name[256];
+ char subsystem[256];
int tcount;
unsigned long qsize;
unsigned long maxqsize;
@@ -485,8 +500,8 @@
struct ao2_container *sorted_tps;
struct ast_taskprocessor *tps;
struct ao2_iterator iter;
-#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
-#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
+#define FMT_HEADERS "%-16s %-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS "%-16s %-45s %10lu %10lu %10lu %10lu %10lu\n"
switch (cmd) {
case CLI_INIT:
@@ -511,15 +526,16 @@
return CLI_FAILURE;
}
- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Subsystem", "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
tcount = 0;
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
+ ast_copy_string(subsystem, tps->subsystem, sizeof(subsystem));
ast_copy_string(name, tps->name, sizeof(name));
qsize = tps->tps_queue_size;
maxqsize = tps->stats.max_qsize;
processed = tps->stats._tasks_processed_count;
- ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+ ast_cli(a->fd, FMT_FIELDS, subsystem, name, processed, qsize, maxqsize,
tps->tps_queue_low, tps->tps_queue_high);
ast_taskprocessor_unreference(tps);
++tcount;
@@ -548,6 +564,45 @@
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
+int ast_taskprocessor_is_subsystem_overloaded(const char *subsystem)
+{
+ char *s;
+
+ AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+ s = AST_VECTOR_GET_CMP(&overloaded_subsystems, subsystem, !strcmp);
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+ return !!s;
+}
+
+static void add_subsystem_alert(const char *subsystem)
+{
+ char *s;
+
+ if (ast_strlen_zero(subsystem)
+ || ast_taskprocessor_is_subsystem_overloaded(subsystem)) {
+ return;
+ }
+
+ s = ast_strdup(subsystem);
+ if (!s) {
+ return;
+ }
+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+ AST_VECTOR_APPEND(&overloaded_subsystems, s);
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void remove_subsystem_alert(const char *subsystem)
+{
+ if (ast_strlen_zero(subsystem)) {
+ return;
+ }
+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&overloaded_subsystems, subsystem, !strcmp, ast_free);
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
/*! Count of the number of taskprocessors in high water alert. */
static unsigned int tps_alert_count;
@@ -571,11 +626,17 @@
ast_rwlock_wrlock(&tps_alert_lock);
old = tps_alert_count;
tps_alert_count += delta;
- if (DEBUG_ATLEAST(3)
- /* and tps_alert_count becomes zero or non-zero */
- && !old != !tps_alert_count) {
+ if (/* tps_alert_count becomes zero or non-zero */
+ !old != !tps_alert_count) {
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
tps->name, tps_alert_count ? "triggered" : "cleared");
+ if (tps->subsystem[0] != '\0') {
+ if (tps_alert_count) {
+ add_subsystem_alert(tps->subsystem);
+ } else {
+ remove_subsystem_alert(tps->subsystem);
+ }
+ }
}
ast_rwlock_unlock(&tps_alert_lock);
}
@@ -747,8 +808,15 @@
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
struct ast_taskprocessor *p;
+ char *subsystem_separator;
+ size_t subsystem_length = 0;
- p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
+ subsystem_separator = strchr(name, '/');
+ if (subsystem_separator) {
+ subsystem_length = subsystem_separator - name;
+ }
+
+ p = ao2_alloc(sizeof(*p) + strlen(name) + subsystem_length + 2, tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
@@ -758,6 +826,12 @@
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+ if (subsystem_length) {
+ strncpy(p->buffer, name, subsystem_length);
+ }
+ p->buffer[subsystem_length] = '\0';
+ p->subsystem = p->buffer;
+ p->name = p->buffer + subsystem_length + 1;
strcpy(p->name, name); /*SAFE*/
ao2_ref(listener, +1);
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index d356e37..cbd49c3 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -534,7 +534,7 @@
ao2_cleanup(dist);
return PJ_TRUE;
} else {
- if (ast_taskprocessor_alert_get()) {
+ if (ast_taskprocessor_is_subsystem_overloaded("pjsip")) {
/*
* When taskprocessors get backed up, there is a good chance that
* we are being overloaded and need to defer adding new work to
--
To view, visit https://gerrit.asterisk.org/11002
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
Gerrit-Change-Number: 11002
Gerrit-PatchSet: 1
Gerrit-Owner: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190215/83cdba44/attachment-0001.html>
More information about the asterisk-code-review
mailing list