[Asterisk-code-review] taskprocessor: Enable subsystems and overload by subsystem (asterisk[master])
Joshua C. Colp
asteriskteam at digium.com
Tue Feb 26 07:04:15 CST 2019
Joshua C. Colp has submitted this change and it was merged. ( https://gerrit.asterisk.org/11003 )
Change subject: taskprocessor: Enable subsystems and overload by subsystem
......................................................................
taskprocessor: Enable subsystems and overload by subsystem
To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.
* Any taskprocessor name that has a '/' will have the part
before the '/' saved as its "subsystem".
Examples:
"sorcery/acl-0000006a" and "sorcery/aor-00000019"
will be grouped to subsystem "sorcery".
"pjsip/distributor-00000025" and "pjsip/distributor-00000026"
will bn grouped to subsystem "pjsip".
Taskprocessors with no '/' have an empty subsystem.
* When a taskprocessor enters high-water alert status and it
has a non-empty subsystem, the subsystem alert count will
be incremented.
* When a taskprocessor leaves high-water alert status and it
has a non-empty subsystem, the subsystem alert count will be
decremented.
* A new api ast_taskprocessor_get_subsystem_alert() has been
added that returns the number of taskprocessors in alert for
the subsystem.
* A new CLI command "core show taskprocessor alerted subsystems"
has been added.
* A new unit test was addded.
REMINDER: The taskprocessor code itself doesn't take any action
based on high-water alerts or overloading. It's up to taskprocessor
users to check and take action themselves. Currently only the pjsip
distributor does this.
* A new pjsip/global option "taskprocessor_overload_trigger"
has been added that allows the user to select the trigger
mechanism the distributor uses to pause accepting new requests.
"none": Don't pause on any overload condition.
"global": Pause on ANY taskprocessor overload (the default and
current behavior)
"pjsip_only": Pause only on pjsip taskprocessor overloads.
* The core pjsip pool was renamed from "SIP" to "pjsip" so it can
be properly grouped into the "pjsip" subsystem.
* stasis taskprocessor names were changed to "stasis" as the
subsystem.
* Sorcery core taskprocessor names were changed to "sorcery" to
match the object taskprocessors.
Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
---
M CHANGES
M configs/samples/pjsip.conf.sample
A contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
M include/asterisk/taskprocessor.h
M main/sorcery.c
M main/stasis.c
M main/taskprocessor.c
M main/threadpool.c
M res/res_pjsip.c
M res/res_pjsip/config_global.c
M res/res_pjsip/include/res_pjsip_private.h
M res/res_pjsip/pjsip_distributor.c
M tests/test_taskprocessor.c
13 files changed, 523 insertions(+), 10 deletions(-)
Approvals:
Richard Mudgett: Looks good to me, but someone else must approve
Kevin Harwell: Looks good to me, but someone else must approve
Joshua C. Colp: Looks good to me, approved; Approved for Submit
diff --git a/CHANGES b/CHANGES
index 78e019e..7ae6000 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,15 @@
types defined in the "disallowed" list are not sent to the application. Note
that if a type is specified in both lists "disallowed" takes precedence.
+res_pjsip
+------------------
+ * A new configuration parameter "taskprocessor_overload_trigger" has been
+ added to the pjsip.conf "globals" section. The distributor currently stops
+ accepting new requests when any taskprocessor overload is triggered. The
+ new option allows you to completely disable overload detection (NOT
+ RECOMMENDED), keep the current behavior, or trigger only on pjsip
+ taskprocessor overloads.
+
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 16.1.0 to Asterisk 16.2.0 ------------
------------------------------------------------------------------------------
diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample
index 0ed01f0..29f53a5 100644
--- a/configs/samples/pjsip.conf.sample
+++ b/configs/samples/pjsip.conf.sample
@@ -1149,6 +1149,17 @@
; event when a device refreshes its registration
; (default: "no")
+;taskprocessor_overload_trigger=global
+ ; Set the trigger the distributor will use to detect
+ ; taskprocessor overloads. When triggered, the distributor
+ ; will not accept any new requests until the overload has
+ ; cleared.
+ : "global": (default) Any taskprocessor overload will trigger.
+ ; "pjsip_only": Only pjsip taskprocessor overloads will trigger.
+ ; "none": No overload detection will be performed.
+ ; WARNING: The "none" and "pjsip_only" options should be used
+ ; with extreme caution and only to mitigate specific issues.
+ ; Under certain conditions they could make things worse.
; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
;==========================ACL SECTION OPTIONS=========================
diff --git a/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
new file mode 100644
index 0000000..6a5b9b2
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
@@ -0,0 +1,42 @@
+"""taskprocessor_overload_trigger
+
+Revision ID: f3c0b8695b66
+Revises: 0838f8db6a61
+Create Date: 2019-02-15 15:03:50.106790
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'f3c0b8695b66'
+down_revision = '0838f8db6a61'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values'
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only']
+
+def upgrade():
+ context = op.get_context()
+
+ if context.bind.dialect.name == 'postgresql':
+ enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+ name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+ enum.create(op.get_bind(), checkfirst=False)
+
+ op.add_column('ps_globals',
+ sa.Column('taskprocessor_overload_trigger',
+ sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+ name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME,
+ create_type=False)))
+
+def downgrade():
+ if op.get_context().bind.dialect.name == 'mssql':
+ op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals')
+ op.drop_column('ps_globals', 'taskprocessor_overload_trigger')
+
+ if context.bind.dialect.name == 'postgresql':
+ enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+ name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+ enum.drop(op.get_bind(), checkfirst=False)
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..5278595 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -341,6 +341,19 @@
*/
unsigned int ast_taskprocessor_alert_get(void);
+
+/*!
+ * \brief Get the current taskprocessor high water alert count by sybsystem.
+ * \since 13.26.0
+ * \since 16.3.0
+ *
+ * \param subsystem The subsystem name
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
+
/*!
* \brief Set the high and low alert water marks of the given taskprocessor queue.
* \since 13.10.0
diff --git a/main/sorcery.c b/main/sorcery.c
index beaad21..8e14881 100644
--- a/main/sorcery.c
+++ b/main/sorcery.c
@@ -380,7 +380,7 @@
};
ast_assert(wizards == NULL);
- threadpool = ast_threadpool_create("Sorcery", NULL, &options);
+ threadpool = ast_threadpool_create("sorcery", NULL, &options);
if (!threadpool) {
return -1;
}
diff --git a/main/stasis.c b/main/stasis.c
index f05f5ff..204e7c8 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -677,7 +677,7 @@
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
/* Create name with seq number appended. */
- ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
use_thread_pool ? 'p' : 'm',
stasis_topic_name(topic));
@@ -2593,7 +2593,7 @@
threadpool_opts.auto_increment = 1;
threadpool_opts.max_size = cfg->threadpool_options->max_size;
threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
- pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+ pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
ao2_ref(cfg, -1);
if (!pool) {
ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 30aeddb..9ebbf39 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -89,7 +89,11 @@
unsigned int high_water_alert:1;
/*! Indicates if the taskprocessor is currently suspended */
unsigned int suspended:1;
- /*! \brief Friendly name of the taskprocessor */
+ /*! \brief Anything before the first '/' in the name (if there is one) */
+ char *subsystem;
+ /*! \brief Friendly name of the taskprocessor.
+ * Subsystem is appended after the name's NULL terminator.
+ */
char name[0];
};
@@ -112,6 +116,16 @@
void *user_data;
};
+/*!
+ * Keep track of which subsystems are in alert
+ * and how many of their taskprocessors are overloaded.
+ */
+struct subsystem_alert {
+ unsigned int alert_count;
+ char subsystem[0];
+};
+static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
+
#ifdef LOW_MEMORY
#define TPS_MAX_BUCKETS 61
#else
@@ -138,10 +152,12 @@
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+ AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
};
struct default_taskprocessor_listener_pvt {
@@ -271,6 +287,8 @@
static void tps_shutdown(void)
{
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
+ AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
+ AST_VECTOR_RW_FREE(&overloaded_subsystems);
ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
tps_singletons = NULL;
}
@@ -285,6 +303,12 @@
return -1;
}
+ if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
+ ao2_ref(tps_singletons, -1);
+ ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+ return -1;
+ }
+
ast_cond_init(&cli_ping_cond, NULL);
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -548,6 +572,157 @@
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
+static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
+{
+ return !strcmp(alert->subsystem, subsystem);
+}
+
+static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
+{
+ return strcmp(a->subsystem, b->subsystem);
+}
+
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
+{
+ struct subsystem_alert *alert;
+ unsigned int count = 0;
+ int idx;
+
+ AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+ idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+ if (idx >= 0) {
+ alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+ count = alert->alert_count;
+ }
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+ return count;
+}
+
+static void subsystem_alert_increment(const char *subsystem)
+{
+ struct subsystem_alert *alert;
+ int idx;
+
+ if (ast_strlen_zero(subsystem)) {
+ return;
+ }
+
+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+ idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+ if (idx >= 0) {
+ alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+ alert->alert_count++;
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+ return;
+ }
+
+ alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
+ if (!alert) {
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+ return;
+ }
+ alert->alert_count = 1;
+ strcpy(alert->subsystem, subsystem); /* Safe */
+
+ if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
+ ast_free(alert);
+ }
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_alert_decrement(const char *subsystem)
+{
+ struct subsystem_alert *alert;
+ int idx;
+
+ if (ast_strlen_zero(subsystem)) {
+ return;
+ }
+
+ AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+ idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+ if (idx < 0) {
+ ast_log(LOG_ERROR,
+ "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+ return;
+ }
+ alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+
+ alert->alert_count--;
+ if (alert->alert_count <= 0) {
+ AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
+ ast_free(alert);
+ }
+
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_copy(struct subsystem_alert *alert,
+ struct subsystem_alert_vector *vector)
+{
+ struct subsystem_alert *alert_copy;
+ alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
+ if (!alert_copy) {
+ return;
+ }
+ alert_copy->alert_count = alert->alert_count;
+ strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
+ if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
+ ast_free(alert_copy);
+ }
+}
+
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct subsystem_alert_vector sorted_subsystems;
+ int i;
+
+#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
+#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "core show taskprocessor alerted subsystems";
+ e->usage =
+ "Usage: core show taskprocessor alerted subsystems\n"
+ " Shows a list of task processor subsystems that are currently alerted\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != e->args) {
+ return CLI_SHOWUSAGE;
+ }
+
+ if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
+ return CLI_FAILURE;
+ }
+
+ AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+ for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
+ subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
+ }
+ AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+ ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
+ struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
+ ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
+ }
+
+ ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
+
+ AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
+ AST_VECTOR_FREE(&sorted_subsystems);
+
+ return CLI_SUCCESS;
+}
+
+
/*! Count of the number of taskprocessors in high water alert. */
static unsigned int tps_alert_count;
@@ -577,6 +752,15 @@
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
tps->name, tps_alert_count ? "triggered" : "cleared");
}
+
+ if (tps->subsystem[0] != '\0') {
+ if (delta > 0) {
+ subsystem_alert_increment(tps->subsystem);
+ } else {
+ subsystem_alert_decrement(tps->subsystem);
+ }
+ }
+
ast_rwlock_unlock(&tps_alert_lock);
}
@@ -747,8 +931,17 @@
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
struct ast_taskprocessor *p;
+ char *subsystem_separator;
+ size_t subsystem_length = 0;
+ size_t name_length;
- p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
+ name_length = strlen(name);
+ subsystem_separator = strchr(name, '/');
+ if (subsystem_separator) {
+ subsystem_length = subsystem_separator - name;
+ }
+
+ p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
@@ -758,7 +951,9 @@
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
- strcpy(p->name, name); /*SAFE*/
+ strcpy(p->name, name); /* Safe */
+ p->subsystem = p->name + name_length + 1;
+ ast_copy_string(p->subsystem, name, subsystem_length + 1);
ao2_ref(listener, +1);
p->listener = listener;
diff --git a/main/threadpool.c b/main/threadpool.c
index 2ab0936..56fbb2c 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -413,7 +413,7 @@
return NULL;
}
- ast_str_set(&control_tps_name, 0, "%s-control", name);
+ ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
ast_free(control_tps_name);
@@ -919,6 +919,7 @@
struct ast_taskprocessor *tps;
RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
+ char *fullname;
pool = threadpool_alloc(name, options);
if (!pool) {
@@ -935,7 +936,9 @@
return NULL;
}
- tps = ast_taskprocessor_create_with_listener(name, tps_listener);
+ fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
+ sprintf(fullname, "%s/pool", name); /* Safe */
+ tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
if (!tps) {
return NULL;
}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 4f18010..24796fc 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1908,6 +1908,26 @@
<configOption name="send_contact_status_on_update_registration" default="no">
<synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis>
</configOption>
+ <configOption name="taskprocessor_overload_trigger">
+ <synopsis>Trigger scope for taskprocessor overloads</synopsis>
+ <description><para>
+ This option specifies the trigger the distributor will use for
+ detecting taskprocessor overloads. When it detects an overload condition,
+ the distrubutor will stop accepting new requests until the overload is
+ cleared.
+ </para>
+ <enumlist>
+ <enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum>
+ <enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum>
+ <enum name="none"><para>No overload detection will be performed.</para></enum>
+ </enumlist>
+ <warning><para>
+ The "none" and "pjsip_only" options should be used
+ with extreme caution and only to mitigate specific issues.
+ Under certain conditions they could make things worse.
+ </para></warning>
+ </description>
+ </configOption>
</configObject>
</configFile>
</configInfo>
@@ -5298,7 +5318,7 @@
/* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
sip_get_threadpool_options(&options);
options.thread_start = sip_thread_start;
- sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
+ sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
if (!sip_threadpool) {
goto error;
}
diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c
index 38383c5..8f21e50 100644
--- a/res/res_pjsip/config_global.c
+++ b/res/res_pjsip/config_global.c
@@ -51,6 +51,7 @@
#define DEFAULT_IGNORE_URI_USER_OPTIONS 0
#define DEFAULT_USE_CALLERID_CONTACT 0
#define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0
+#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
/*!
* \brief Cached global config object
@@ -110,6 +111,8 @@
unsigned int use_callerid_contact;
/*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
unsigned int send_contact_status_on_update_registration;
+ /*! Trigger the distributor should use to pause accepting new dialogs */
+ enum ast_sip_taskprocessor_overload_trigger overload_trigger;
};
static void global_destructor(void *obj)
@@ -483,6 +486,58 @@
return send_contact_status_on_update_registration;
}
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
+{
+ enum ast_sip_taskprocessor_overload_trigger trigger;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
+ }
+
+ trigger = cfg->overload_trigger;
+ ao2_ref(cfg, -1);
+ return trigger;
+}
+
+static int overload_trigger_handler(const struct aco_option *opt,
+ struct ast_variable *var, void *obj)
+{
+ struct global_config *cfg = obj;
+ if (!strcasecmp(var->value, "none")) {
+ cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
+ } else if (!strcasecmp(var->value, "global")) {
+ cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
+ } else if (!strcasecmp(var->value, "pjsip_only")) {
+ cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
+ } else {
+ ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
+ var->value, var->name);
+ return -1;
+ }
+ return 0;
+}
+
+static const char *overload_trigger_map[] = {
+ [TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
+ [TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
+ [TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
+};
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
+{
+ return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
+ overload_trigger_map[trigger] : "";
+}
+
+static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct global_config *cfg = obj;
+ *buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
+ return 0;
+}
+
/*!
* \internal
* \brief Observer to set default global object if none exist.
@@ -646,6 +701,9 @@
ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
+ ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
+ overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
+ overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 7af5b27..f6333bf 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -408,4 +408,14 @@
*/
int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
+enum ast_sip_taskprocessor_overload_trigger {
+ TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
+ TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
+ TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
+};
+
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
+
#endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index d356e37..72ed35b 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -51,6 +51,7 @@
static unsigned int unidentified_period;
static unsigned int unidentified_prune_interval;
static int using_auth_username;
+static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
struct unidentified_request{
struct timeval first_seen;
@@ -534,7 +535,10 @@
ao2_cleanup(dist);
return PJ_TRUE;
} else {
- if (ast_taskprocessor_alert_get()) {
+ if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
+ ast_taskprocessor_alert_get())
+ || (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
+ ast_taskprocessor_get_subsystem_alert("pjsip"))) {
/*
* When taskprocessors get backed up, there is a good chance that
* we are being overloaded and need to defer adding new work to
@@ -1196,6 +1200,8 @@
ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
+ overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
+
/* Clean out the old task, if any */
ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
/* Have to do something with the return value to shut up the stupid compiler. */
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 6428746..70cb556 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -46,6 +46,8 @@
ast_mutex_t lock;
/*! Boolean indicating that the task was run */
int task_complete;
+ /*! Milliseconds to wait before returning */
+ unsigned long wait_time;
};
static void task_data_dtor(void *obj)
@@ -69,6 +71,7 @@
ast_cond_init(&task_data->cond, NULL);
ast_mutex_init(&task_data->lock);
task_data->task_complete = 0;
+ task_data->wait_time = 0;
return task_data;
}
@@ -83,7 +86,11 @@
static int task(void *data)
{
struct task_data *task_data = data;
+
SCOPED_MUTEX(lock, &task_data->lock);
+ if (task_data->wait_time > 0) {
+ usleep(task_data->wait_time * 1000);
+ }
task_data->task_complete = 1;
ast_cond_signal(&task_data->cond);
return 0;
@@ -165,6 +172,143 @@
return AST_TEST_PASS;
}
+/*!
+ * \brief Baseline test for subsystem alert
+ */
+AST_TEST_DEFINE(subsystem_alert)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+#define TEST_DATA_ARRAY_SIZE 10
+#define LOW_WATER_MARK 3
+#define HIGH_WATER_MARK 6
+ struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
+ int res;
+ int i;
+ long queue_count;
+ unsigned int alert_level;
+ unsigned int subsystem_alert_level;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "subsystem_alert";
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test of subsystem alerts";
+ info->description =
+ "Ensures alerts are generated properly.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
+
+ if (!tps) {
+ ast_test_status_update(test, "Unable to create test taskprocessor\n");
+ return AST_TEST_FAIL;
+ }
+
+ ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
+ ast_taskprocessor_suspend(tps);
+
+ for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+ task_data[i] = task_data_create();
+ if (!task_data[i]) {
+ ast_test_status_update(test, "Unable to create task_data\n");
+ res = -1;
+ goto data_cleanup;
+ }
+ task_data[i]->wait_time = 500;
+
+ ast_test_status_update(test, "Pushing task %d\n", i);
+ if (ast_taskprocessor_push(tps, task, task_data[i])) {
+ ast_test_status_update(test, "Failed to queue task\n");
+ res = -1;
+ goto data_cleanup;
+ }
+
+ queue_count = ast_taskprocessor_size(tps);
+ alert_level = ast_taskprocessor_alert_get();
+ subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+ if (queue_count == HIGH_WATER_MARK) {
+ if (subsystem_alert_level) {
+ ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
+ }
+ if (alert_level) {
+ ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
+ }
+ } else if (queue_count < HIGH_WATER_MARK) {
+ if (subsystem_alert_level > 0) {
+ ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
+ res = -1;
+ }
+ if (alert_level > 0) {
+ ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
+ res = -1;
+ }
+ } else {
+ if (subsystem_alert_level == 0) {
+ ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
+ res = -1;
+ }
+ if (alert_level == 0) {
+ ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
+ res = -1;
+ }
+ }
+ }
+
+ ast_taskprocessor_unsuspend(tps);
+
+ for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+ ast_test_status_update(test, "Waiting on task %d\n", i);
+ if (task_wait(task_data[i])) {
+ ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
+ res = -1;
+ goto data_cleanup;
+ }
+
+ queue_count = ast_taskprocessor_size(tps);
+ alert_level = ast_taskprocessor_alert_get();
+ subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+ if (queue_count == LOW_WATER_MARK) {
+ if (!subsystem_alert_level) {
+ ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
+ }
+ if (!alert_level) {
+ ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
+ }
+ } else if (queue_count > LOW_WATER_MARK) {
+ if (subsystem_alert_level == 0) {
+ ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
+ res = -1;
+ }
+ if (alert_level == 0) {
+ ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
+ res = -1;
+ }
+ } else {
+ if (subsystem_alert_level > 0) {
+ ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
+ res = -1;
+ }
+ if (alert_level > 0) {
+ ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
+ res = -1;
+ }
+ }
+
+ }
+
+data_cleanup:
+ for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+ ao2_cleanup(task_data[i]);
+ }
+
+ return res ? AST_TEST_FAIL : AST_TEST_PASS;
+}
+
#define NUM_TASKS 20000
/*!
@@ -749,6 +893,7 @@
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
+ ast_test_unregister(subsystem_alert);
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
@@ -759,6 +904,7 @@
{
ast_test_register(default_taskprocessor);
ast_test_register(default_taskprocessor_load);
+ ast_test_register(subsystem_alert);
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
--
To view, visit https://gerrit.asterisk.org/11003
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
Gerrit-Change-Number: 11003
Gerrit-PatchSet: 4
Gerrit-Owner: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Friendly Automation (1000185)
Gerrit-Reviewer: Joshua C. Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190226/4db021bb/attachment-0001.html>
More information about the asterisk-code-review
mailing list