[Asterisk-code-review] pjsip: Rewrite OPTIONS support with new eyes. (asterisk[15])
Richard Mudgett
asteriskteam at digium.com
Mon Apr 9 18:25:51 CDT 2018
Richard Mudgett has uploaded this change for review. ( https://gerrit.asterisk.org/8752
Change subject: pjsip: Rewrite OPTIONS support with new eyes.
......................................................................
pjsip: Rewrite OPTIONS support with new eyes.
The OPTIONS support in PJSIP has organically grown, like many things in
Asterisk. It has been tweaked, changed, and adapted based on situations
run into. Unfortunately this has taken its toll. Configuration file
based objects have poor performance and even dynamic ones aren't that
great.
This change scraps the existing code and starts fresh with new eyes. It
leverages all of the APIs made available such as sorcery observers and
serializers to provide a better implementation.
1. The state of contacts, AORs, and endpoints relevant to the qualify
process is maintained. This state can be updated by external forces (such
as a device registering/unregistering) and also the reload process. This
state also includes the association between endpoints and AORs.
2. AORs are scheduled and not contacts. This reduces the amount of work
spent juggling scheduled items.
3. Manipulation of which AORs are being qualified and the endpoint states
all occur within a serializer to reduce the conflict that can occur with
multiple threads attempting to modify things.
4. Operations regarding an AOR use a serializer specific to that AOR.
5. AORs and endpoint state act as state compositors. They take input
from lower level objects (contacts feed AORs, AORs feed endpoint state)
and determine if a sufficient enough change has occurred to be fed further
up the chain.
6. Realtime is supported by using observers to know when a contact has
been registered. If state does not exist for the associated AOR then it
is retrieved and becomes active as appropriate.
The end result of all of this is best shown with a configuration file of
3000 endpoints each with an AOR that has a static contact. In the old
code it would take over a minute to load and use all 8 of my cores. This
new code takes 2-3 seconds and barely touches the CPU even while dealing
with all of the OPTIONS requests.
ASTERISK-26806
Change-Id: I6a5ebbfca9001dfe933eaeac4d3babd8d2e6f082
---
M funcs/func_pjsip_contact.c
M include/asterisk/res_pjsip.h
M res/res_pjsip/location.c
M res/res_pjsip/pjsip_configuration.c
M res/res_pjsip/pjsip_options.c
5 files changed, 2,432 insertions(+), 1,442 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/52/8752/1
diff --git a/funcs/func_pjsip_contact.c b/funcs/func_pjsip_contact.c
index c840365..b306de2 100644
--- a/funcs/func_pjsip_contact.c
+++ b/funcs/func_pjsip_contact.c
@@ -142,12 +142,12 @@
return -1;
}
- contact_status = ast_sorcery_retrieve_by_id(pjsip_sorcery, CONTACT_STATUS, ast_sorcery_object_get_id(contact_obj));
+ contact_status = ast_sip_get_contact_status(contact_obj);
if (!strcmp(args.field_name, "status")) {
- ast_str_set(buf, len, "%s", ast_sip_get_contact_status_label(contact_status->status));
+ ast_str_set(buf, len, "%s", ast_sip_get_contact_status_label(contact_status ? contact_status->status : UNKNOWN));
} else if (!strcmp(args.field_name, "rtt")) {
- if (contact_status->status == UNKNOWN) {
+ if (!contact_status || contact_status->status == UNKNOWN) {
ast_str_set(buf, len, "%s", "N/A");
} else {
ast_str_set(buf, len, "%" PRId64, contact_status->rtt);
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index d7c6347..b11e601 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -283,8 +283,6 @@
int prune_on_boot;
};
-#define CONTACT_STATUS "contact_status"
-
/*!
* \brief Status type for a contact.
*/
@@ -303,23 +301,20 @@
* if available.
*/
struct ast_sip_contact_status {
- SORCERY_OBJECT(details);
AST_DECLARE_STRING_FIELDS(
/*! The original contact's URI */
AST_STRING_FIELD(uri);
/*! The name of the aor this contact_status belongs to */
AST_STRING_FIELD(aor);
);
- /*! Current status for a contact (default - unavailable) */
- enum ast_sip_contact_status_type status;
- /*! The round trip start time set before sending a qualify request */
- struct timeval rtt_start;
/*! The round trip time in microseconds */
int64_t rtt;
+ /*! Current status for a contact (default - unavailable) */
+ enum ast_sip_contact_status_type status;
/*! Last status for a contact (default - unavailable) */
enum ast_sip_contact_status_type last_status;
- /*! TRUE if the contact was refreshed. e.g., re-registered */
- unsigned int refresh:1;
+ /*! Name of the contact */
+ char name[0];
};
/*!
@@ -1057,7 +1052,7 @@
/*!
* \brief Change state of a persistent endpoint.
*
- * \param endpoint The SIP endpoint name to change state.
+ * \param endpoint_name The SIP endpoint name to change state.
* \param state The new state
* \retval 0 Success
* \retval -1 Endpoint not found
@@ -1065,6 +1060,26 @@
int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state);
/*!
+ * \brief Publish the change of state for a contact.
+ *
+ * \param endpoint_name The SIP endpoint name.
+ * \param contact_status The contact status.
+ */
+void ast_sip_persistent_endpoint_publish_contact_state(const char *endpoint_name, const struct ast_sip_contact_status *contact_status);
+
+/*!
+ * \brief Retrieve the current status for a contact.
+ *
+ * \param contact The contact.
+ *
+ * \retval non-NULL Success
+ * \retval NULL Status information not found
+ *
+ * \note The returned contact status object is immutable.
+ */
+struct ast_sip_contact_status *ast_sip_get_contact_status(const struct ast_sip_contact *contact);
+
+/*!
* \brief Get a pointer to the PJSIP endpoint.
*
* This is useful when modules have specific information they need
diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c
index 2ce4a18..0028ebe 100644
--- a/res/res_pjsip/location.c
+++ b/res/res_pjsip/location.c
@@ -185,7 +185,7 @@
struct ast_sip_contact_status *status;
int unreachable;
- status = ast_res_pjsip_find_or_create_contact_status(contact);
+ status = ast_sip_get_contact_status(contact);
if (!status) {
return 0;
}
@@ -1070,7 +1070,7 @@
ast_assert(contact->uri != NULL);
ast_assert(context->output_buffer != NULL);
- status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS, contact_id);
+ status = ast_sip_get_contact_status(contact);
indent = CLI_INDENT_TO_SPACES(context->indent_level);
flexwidth = CLI_LAST_TABSTOP - indent - 9 - strlen(contact->aor) + 1;
diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c
index 3094f24..bfba830 100644
--- a/res/res_pjsip/pjsip_configuration.c
+++ b/res/res_pjsip/pjsip_configuration.c
@@ -42,8 +42,6 @@
struct sip_persistent_endpoint {
/*! \brief Asterisk endpoint itself */
struct ast_endpoint *endpoint;
- /*! \brief AORs that we should react to */
- char *aors;
};
/*! \brief Container for persistent endpoint information */
@@ -69,239 +67,6 @@
return !strcmp(ast_endpoint_get_resource(persistent1->endpoint), id) ? CMP_MATCH | CMP_STOP : 0;
}
-
-/*! \brief Internal function for changing the state of an endpoint */
-static void endpoint_update_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
-{
- struct ast_json *blob;
- char *regcontext;
-
- /* If there was no state change, don't publish anything. */
- if (ast_endpoint_get_state(endpoint) == state) {
- return;
- }
-
- regcontext = ast_sip_get_regcontext();
-
- if (state == AST_ENDPOINT_ONLINE) {
- ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE);
- blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
-
- if (!ast_strlen_zero(regcontext)) {
- if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) {
- ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL,
- "Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP");
- }
- }
-
- ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint));
- } else {
- ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE);
- blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
-
- if (!ast_strlen_zero(regcontext)) {
- struct pbx_find_info q = { .stacklen = 0 };
-
- if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) {
- ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL);
- }
- }
-
- ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint));
- }
-
- ast_free(regcontext);
-
- ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob);
- ast_json_unref(blob);
- ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint));
-}
-
-static void endpoint_publish_contact_status(struct ast_endpoint *endpoint, struct ast_sip_contact_status *contact)
-{
- struct ast_json *blob;
- char rtt[32];
-
- snprintf(rtt, sizeof(rtt), "%" PRId64, contact->rtt);
- blob = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}",
- "contact_status", ast_sip_get_contact_status_label(contact->status),
- "aor", contact->aor,
- "uri", contact->uri,
- "roundtrip_usec", rtt,
- "endpoint_name", ast_endpoint_get_resource(endpoint));
- if (blob) {
- ast_endpoint_blob_publish(endpoint, ast_endpoint_contact_state_type(), blob);
- ast_json_unref(blob);
- }
-}
-
-/*! \brief Callback function for publishing the status of an endpoint */
-static int persistent_endpoint_publish_status(void *obj, void *arg, int flags)
-{
- struct sip_persistent_endpoint *persistent = obj;
- struct ast_endpoint *endpoint = persistent->endpoint;
- struct ast_sip_contact_status *status = arg;
-
- /* If the status' aor isn't one of the endpoint's, we skip */
- if (!strstr(persistent->aors, status->aor)) {
- return 0;
- }
-
- endpoint_publish_contact_status(endpoint, status);
- return 0;
-}
-
-/*! \brief Callback function for changing the state of an endpoint */
-static int persistent_endpoint_update_state(void *obj, void *arg, int flags)
-{
- struct sip_persistent_endpoint *persistent = obj;
- struct ast_endpoint *endpoint = persistent->endpoint;
- struct ast_sip_contact_status *status = arg;
- struct ao2_container *contacts;
- struct ao2_iterator iter;
- struct ast_sip_contact *contact;
- enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE;
-
- /* If the status' aor isn't one of the endpoint's, we skip */
- if (!strstr(persistent->aors, status->aor)) {
- return 0;
- }
-
- endpoint_publish_contact_status(endpoint, status);
-
- /* Find all the contacts for this endpoint. If ANY are available,
- * mark the endpoint as ONLINE.
- */
- contacts = ast_sip_location_retrieve_contacts_from_aor_list(persistent->aors);
- if (contacts) {
- iter = ao2_iterator_init(contacts, 0);
- while (state == AST_ENDPOINT_OFFLINE && (contact = ao2_iterator_next(&iter))) {
- struct ast_sip_contact_status *contact_status;
- const char *contact_id = ast_sorcery_object_get_id(contact);
-
- contact_status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(),
- CONTACT_STATUS, contact_id);
- if (contact_status && contact_status->status != UNAVAILABLE) {
- state = AST_ENDPOINT_ONLINE;
- }
- ao2_cleanup(contact_status);
- ao2_ref(contact, -1);
- }
- ao2_iterator_destroy(&iter);
- ao2_ref(contacts, -1);
- }
-
- endpoint_update_state(endpoint, state);
-
- return 0;
-}
-
-/*! \brief Function called when a contact is created */
-static void persistent_endpoint_contact_created_observer(const void *object)
-{
- const struct ast_sip_contact *contact = object;
- struct ast_sip_contact_status *contact_status;
-
- contact_status = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(contact));
- if (!contact_status) {
- ast_log(LOG_ERROR, "Unable to create ast_sip_contact_status for contact %s/%s\n",
- contact->aor, contact->uri);
- return;
- }
- ast_string_field_set(contact_status, uri, contact->uri);
-
- contact_status->status = CREATED;
-
- ast_verb(2, "Contact %s/%s has been created\n", contact->aor, contact->uri);
-
- ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, contact_status);
- ao2_cleanup(contact_status);
-}
-
-/*! \brief Function called when a contact is deleted */
-static void persistent_endpoint_contact_deleted_observer(const void *object)
-{
- const struct ast_sip_contact *contact = object;
- struct ast_sip_contact_status *contact_status;
-
- contact_status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS, ast_sorcery_object_get_id(contact));
- if (!contact_status) {
- ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s/%s\n",
- contact->aor, contact->uri);
- return;
- }
-
- ast_verb(2, "Contact %s/%s has been deleted\n", contact->aor, contact->uri);
- ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
- "-1", 1.0, ast_sip_get_contact_status_label(contact_status->status));
- ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
- "+1", 1.0, ast_sip_get_contact_status_label(REMOVED));
-
- ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, contact_status);
- ast_sorcery_delete(ast_sip_get_sorcery(), contact_status);
- ao2_cleanup(contact_status);
-}
-
-/*! \brief Observer for contacts so state can be updated on respective endpoints */
-static const struct ast_sorcery_observer state_contact_observer = {
- .created = persistent_endpoint_contact_created_observer,
- .deleted = persistent_endpoint_contact_deleted_observer,
-};
-
-/*! \brief Function called when a contact_status is updated */
-static void persistent_endpoint_contact_status_observer(const void *object)
-{
- struct ast_sip_contact_status *contact_status = (struct ast_sip_contact_status *)object;
-
- if (contact_status->refresh) {
- /* We are only re-publishing the contact status. */
- ao2_callback(persistent_endpoints, OBJ_NODATA,
- persistent_endpoint_publish_status, contact_status);
- return;
- }
-
- /* If rtt_start is set (this is the outgoing OPTIONS), ignore. */
- if (contact_status->rtt_start.tv_sec > 0) {
- return;
- }
-
- if (contact_status->status != contact_status->last_status) {
- ast_verb(3, "Contact %s/%s is now %s. RTT: %.3f msec\n",
- contact_status->aor, contact_status->uri,
- ast_sip_get_contact_status_label(contact_status->status),
- contact_status->rtt / 1000.0);
-
- ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
- "-1", 1.0, ast_sip_get_contact_status_label(contact_status->last_status));
- ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
- "+1", 1.0, ast_sip_get_contact_status_label(contact_status->status));
-
- ast_test_suite_event_notify("AOR_CONTACT_UPDATE",
- "Contact: %s\r\n"
- "Status: %s",
- ast_sorcery_object_get_id(contact_status),
- ast_sip_get_contact_status_label(contact_status->status));
-
- ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state,
- contact_status);
- } else {
- ast_debug(3, "Contact %s/%s status didn't change: %s, RTT: %.3f msec\n",
- contact_status->aor, contact_status->uri,
- ast_sip_get_contact_status_label(contact_status->status),
- contact_status->rtt / 1000.0);
- }
-
- ast_statsd_log_full_va("PJSIP.contacts.%s.rtt", AST_STATSD_TIMER,
- contact_status->status != AVAILABLE ? -1 : contact_status->rtt / 1000,
- 1.0,
- ast_sorcery_object_get_id(contact_status));
-}
-
-/*! \brief Observer for contacts so state can be updated on respective endpoints */
-static const struct ast_sorcery_observer state_contact_status_observer = {
- .updated = persistent_endpoint_contact_status_observer,
-};
static void endpoint_deleted_observer(const void *object)
{
@@ -1352,21 +1117,89 @@
struct sip_persistent_endpoint *persistent = obj;
ast_endpoint_shutdown(persistent->endpoint);
- ast_free(persistent->aors);
}
int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state)
{
struct sip_persistent_endpoint *persistent;
+ struct ast_json *blob;
+ char *regcontext;
- ao2_lock(persistent_endpoints);
- persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (persistent) {
- endpoint_update_state(persistent->endpoint, state);
- ao2_ref(persistent, -1);
+ persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY);
+ if (!persistent) {
+ return -1;
}
- ao2_unlock(persistent_endpoints);
- return persistent ? 0 : -1;
+
+ /* If there was no state change, don't publish anything. */
+ if (ast_endpoint_get_state(persistent->endpoint) == state) {
+ ao2_ref(persistent, -1);
+ return 0;
+ }
+
+ regcontext = ast_sip_get_regcontext();
+
+ if (state == AST_ENDPOINT_ONLINE) {
+ ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_ONLINE);
+ blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
+
+ if (!ast_strlen_zero(regcontext)) {
+ if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL)) {
+ ast_add_extension(regcontext, 1, ast_endpoint_get_resource(persistent->endpoint), 1, NULL, NULL,
+ "Noop", ast_strdup(ast_endpoint_get_resource(persistent->endpoint)), ast_free_ptr, "SIP");
+ }
+ }
+
+ ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(persistent->endpoint));
+ } else {
+ ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_OFFLINE);
+ blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
+
+ if (!ast_strlen_zero(regcontext)) {
+ struct pbx_find_info q = { .stacklen = 0 };
+
+ if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL, "", E_MATCH)) {
+ ast_context_remove_extension(regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL);
+ }
+ }
+
+ ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(persistent->endpoint));
+ }
+
+ ast_free(regcontext);
+
+ ast_endpoint_blob_publish(persistent->endpoint, ast_endpoint_state_type(), blob);
+ ast_json_unref(blob);
+ ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(persistent->endpoint));
+
+ ao2_ref(persistent, -1);
+
+ return 0;
+}
+
+void ast_sip_persistent_endpoint_publish_contact_state(const char *endpoint_name, const struct ast_sip_contact_status *contact_status)
+{
+ struct sip_persistent_endpoint *persistent;
+ struct ast_json *blob;
+ char rtt[32];
+
+ persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY);
+ if (!persistent) {
+ return;
+ }
+
+ snprintf(rtt, sizeof(rtt), "%" PRId64, contact_status->rtt);
+ blob = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}",
+ "contact_status", ast_sip_get_contact_status_label(contact_status->status),
+ "aor", contact_status->aor,
+ "uri", contact_status->uri,
+ "roundtrip_usec", rtt,
+ "endpoint_name", ast_endpoint_get_resource(persistent->endpoint));
+ if (blob) {
+ ast_endpoint_blob_publish(persistent->endpoint, ast_endpoint_contact_state_type(), blob);
+ ast_json_unref(blob);
+ }
+
+ ao2_ref(persistent, -1);
}
/*! \brief Internal function which finds (or creates) persistent endpoint information */
@@ -1390,22 +1223,9 @@
return NULL;
}
- persistent->aors = ast_strdup(endpoint->aors);
- if (!persistent->aors) {
- return NULL;
- }
-
ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_OFFLINE);
ao2_link_flags(persistent_endpoints, persistent, OBJ_NOLOCK);
- } else if (strcmp(persistent->aors, endpoint->aors)) {
- char *new_aors = ast_strdup(endpoint->aors);
-
- /* make sure we don't NULL persistent->aors if allocation fails. */
- if (new_aors) {
- ast_free(persistent->aors);
- persistent->aors = new_aors;
- }
}
ao2_ref(persistent->endpoint, +1);
@@ -2097,16 +1917,7 @@
return -1;
}
- if (ast_sip_initialize_sorcery_qualify()) {
- ast_log(LOG_ERROR, "Failed to register SIP qualify support with sorcery\n");
- ast_sorcery_unref(sip_sorcery);
- sip_sorcery = NULL;
- return -1;
- }
-
ast_sorcery_observer_add(sip_sorcery, "endpoint", &endpoint_observers);
- ast_sorcery_observer_add(sip_sorcery, "contact", &state_contact_observer);
- ast_sorcery_observer_add(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer);
if (ast_sip_initialize_sorcery_domain_alias()) {
ast_log(LOG_ERROR, "Failed to register SIP domain aliases support with sorcery\n");
@@ -2155,8 +1966,6 @@
return;
}
- ast_sorcery_observer_remove(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer);
- ast_sorcery_observer_remove(sip_sorcery, "contact", &state_contact_observer);
ast_sip_destroy_sorcery_global();
ast_sip_destroy_sorcery_location();
ast_sip_destroy_sorcery_auth();
diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c
index aaa05d1..5afe3e3 100644
--- a/res/res_pjsip/pjsip_options.c
+++ b/res/res_pjsip/pjsip_options.c
@@ -1,9 +1,10 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 2013, Digium, Inc.
+ * Copyright (C) 2018, Digium, Inc.
*
- * Matt Jordan <mjordan at digium.com>
+ * Joshua Colp <jcolp at digium.com>
+ * Richard Mudgett <rmudgett at digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
@@ -32,664 +33,184 @@
#include "asterisk/statsd.h"
#include "include/res_pjsip_private.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
+
+/*
+ * This implementation for OPTIONS support is based around the idea
+ * that realistically an AOR generally has very few contacts and is
+ * referenced by only a few endpoints. While it is perfectly fine for
+ * use in opposite scenarios it works best in the above case. It is
+ * also not shy to keeping state but it is reactive to outside changes
+ * so it can be updated.
+ *
+ * The lowest level object in here is a contact and its associated
+ * contact status. The result of an OPTIONS request to a contact is
+ * reflected in the contact status. The scheduling of these OPTIONS
+ * request is driven by the AOR. The AOR periodicially (according to
+ * configuration) sends OPTIONS requests out to any contacts
+ * associated with it. Contacts themselves are not individually
+ * scheduled. Contacts can be added or deleted as appropriate with no
+ * requirement to reschedule.
+ *
+ * The next level object up is the AOR itself. The result of a contact
+ * status change is fed into it and the result composited with all
+ * other contacts. This may result in the AOR itself changing state
+ * (it can be either AVAILABLE or UNAVAILABLE).
+ *
+ * The highest level object up is the endpoint state compositor (ESC).
+ * The result of AOR state changes is fed into it and the result
+ * composited with all other referenced AORs. This may result in the
+ * endpoint itself changing state (it can be either ONLINE or
+ * OFFLINE). If this occurs the permanent endpoint is updated to
+ * reflect it.
+ *
+ * The threading model errs on the side of a world where things are
+ * not constantly changing. That is: A world where AORs and endpoints
+ * are not being constantly added/removed. This more closely mirrors
+ * the usage of the vast majority of people. This scenario can still
+ * be done but it may not be applied immediately.
+ *
+ * Manipulation of which AORs, endpoint state compositors, and
+ * contacts exist is done within a single serializer. This ensures
+ * that no matter the source threads order is preserved and you won't
+ * get into a weird situation where things are referencing other
+ * things that should have already been destroyed.
+ *
+ * Operations which impact the state of an AOR are done within a
+ * serializer that is specific to the AOR. This includes the result of
+ * a contact status change. This change is queued and executed on the
+ * AOR serializer afterwards.
+ *
+ * Operations which impact an endpoint state compositor are protected
+ * by a lock. This is done as the endpoint state compositor usage is
+ * minimal and the overhead of using a serializer and queueing things
+ * is not warranted.
+ *
+ * AORs which do not have a qualify frequency are also kept in here
+ * but do not require the same criteria as qualified AORs to be
+ * considered available. In their case as long as at least 1 contact
+ * is configured on the AOR (or added to it by registration) it is
+ * considered available.
+ */
#define DEFAULT_LANGUAGE "en"
#define DEFAULT_ENCODING "text/plain"
-#define QUALIFIED_BUCKETS 211
-static const char *status_map [] = {
- [UNAVAILABLE] = "Unreachable",
- [AVAILABLE] = "Reachable",
- [UNKNOWN] = "Unknown",
- [CREATED] = "Created",
- [REMOVED] = "Removed",
+/*! \brief These are the number of buckets to store AORs in */
+#ifdef LOW_MEMORY
+#define AOR_BUCKETS 61
+#else
+#define AOR_BUCKETS 1567
+#endif
+
+/*! \brief These are the number of contact status buckets */
+#ifdef LOW_MEMORY
+#define CONTACT_STATUS_BUCKETS 61
+#else
+#define CONTACT_STATUS_BUCKETS 1567
+#endif
+
+/*! \brief These are the number of buckets (per AOR) to use to store contacts */
+#define CONTACT_BUCKETS 13
+
+/*! \brief These are the number of buckets to store endpoint state compositors */
+#define ENDPOINT_STATE_COMPOSITOR_BUCKETS 13
+
+/*! \brief The initial vector size for the endpoint state compositors on an AOR */
+#define ENDPOINT_STATE_COMPOSITOR_INITIAL_SIZE 1
+
+/*! \brief These are the number of buckets (per endpoint state compositor) to use to store AOR statuses */
+#define AOR_STATUS_BUCKETS 3
+
+/*! \brief Maximum wait time to join the below shutdown group */
+#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */
+
+/*! \brief Shutdown group for options serializers */
+static struct ast_serializer_shutdown_group *shutdown_group;
+
+/*!
+ * \brief Structure which contains status information for an AOR feeding an endpoint state compositor
+ */
+struct sip_options_endpoint_aor_status {
+ /*! \brief The last contributed available status of the named AOR (1 if available, 0 if not available) */
+ char available;
+ /*! \brief The name of the AOR */
+ char name[0];
};
-static const char *short_status_map [] = {
- [UNAVAILABLE] = "Unavail",
- [AVAILABLE] = "Avail",
- [UNKNOWN] = "Unknown",
- [CREATED] = "Created",
- [REMOVED] = "Removed",
-};
-
-static void contact_deleted(const void *obj);
-static void qualify_and_schedule(struct ast_sip_contact *contact);
-
-const char *ast_sip_get_contact_status_label(const enum ast_sip_contact_status_type status)
-{
- return status_map[status];
-}
-
-const char *ast_sip_get_contact_short_status_label(const enum ast_sip_contact_status_type status)
-{
- return short_status_map[status];
-}
-
/*!
- * \internal
- * \brief Destroy a ast_sip_contact_status object.
+ * \brief Structure which contains composites information for endpoint state
*/
-static void contact_status_destroy(void * obj)
-{
- struct ast_sip_contact_status *status = obj;
-
- ast_string_field_free_memory(status);
-}
-
-/*!
- * \internal
- * \brief Create a ast_sip_contact_status object.
- */
-static void *contact_status_alloc(const char *name)
-{
- struct ast_sip_contact_status *status = ast_sorcery_generic_alloc(sizeof(*status), contact_status_destroy);
- char *id = ast_strdupa(name);
- char *aor = id;
- char *aor_separator = NULL;
-
- if (!status) {
- ast_log(LOG_ERROR, "Unable to allocate ast_sip_contact_status\n");
- return NULL;
- }
-
- if (ast_string_field_init(status, 256)) {
- ast_log(LOG_ERROR, "Unable to allocate ast_sip_contact_status stringfields\n");
- ao2_cleanup(status);
- return NULL;
- }
-
- /* Dynamic contacts are delimited with ";@" and static ones with "@@" */
- if ((aor_separator = strstr(id, ";@")) || (aor_separator = strstr(id, "@@"))) {
- *aor_separator = '\0';
- }
- ast_assert(aor_separator != NULL);
-
- ast_string_field_set(status, aor, aor);
- status->status = CREATED;
-
- return status;
-}
-
-static int qualify_and_schedule_aor_contact(void *obj)
-{
- struct ast_sip_contact *contact = obj;
- struct ast_sip_aor *aor;
-
- if (!contact || ast_strlen_zero(contact->aor) ||
- !(aor = ast_sip_location_retrieve_aor(contact->aor))) {
- ao2_ref(contact, -1);
- return -1;
- }
-
- contact->qualify_frequency = aor->qualify_frequency;
- contact->qualify_timeout = aor->qualify_timeout;
- contact->authenticate_qualify = aor->authenticate_qualify;
-
- ao2_ref(aor, -1);
-
- qualify_and_schedule(contact);
- ao2_ref(contact, -1);
-
- return 0;
-}
-
-AST_MUTEX_DEFINE_STATIC(creation_lock);
-
-/*!
- * \brief Retrieve a ast_sip_contact_status object from sorcery creating
- * one if not found.
- */
-struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const struct ast_sip_contact *contact)
-{
- struct ast_sip_contact_status *status;
- SCOPED_MUTEX(lock, &creation_lock);
-
- status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(contact));
- if (status) {
- return status;
- }
-
- status = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(contact));
- if (!status) {
- ast_log(LOG_ERROR, "Unable to create ast_sip_contact_status for contact %s/%s\n",
- contact->aor, contact->uri);
- return NULL;
- }
-
- ast_string_field_set(status, uri, contact->uri);
- status->rtt_start = ast_tv(0, 0);
- status->rtt = 0;
-
- if (ast_sorcery_create(ast_sip_get_sorcery(), status)) {
- ast_log(LOG_ERROR, "Unable to persist ast_sip_contact_status for contact %s\n",
- contact->uri);
- ao2_ref(status, -1);
- return NULL;
- }
-
- /* The permanent contact added after asterisk start should be qualified. */
- if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED) && ast_tvzero(contact->expiration_time)) {
- /*
- * The FULLY_BOOTED to filter out contacts that already existed when asterisk started.
- * The zero expiration_time to select only permanent contacts.
- */
- ao2_ref((struct ast_sip_contact *) contact, +1);
- if (ast_sip_push_task(NULL, qualify_and_schedule_aor_contact, (struct ast_sip_contact *) contact)) {
- ao2_ref((struct ast_sip_contact *) contact, -1);
- }
- }
-
- ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
- "+1", 1.0, ast_sip_get_contact_status_label(status->status));
-
- return status;
-}
-
-/*!
- * \internal
- * \brief Update an ast_sip_contact_status's elements.
- */
-static void update_contact_status(const struct ast_sip_contact *contact,
- enum ast_sip_contact_status_type value, int is_contact_refresh)
-{
- RAII_VAR(struct ast_sip_contact_status *, status, NULL, ao2_cleanup);
- RAII_VAR(struct ast_sip_contact_status *, update, NULL, ao2_cleanup);
-
- status = ast_res_pjsip_find_or_create_contact_status(contact);
- if (!status) {
- ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s\n",
- contact->uri);
- return;
- }
-
- if (is_contact_refresh
- && status->status == CREATED) {
- /*
- * The contact status hasn't been updated since creation
- * and we don't want to re-send a created status.
- */
- if (contact->qualify_frequency
- || status->rtt_start.tv_sec > 0) {
- /* Ignore, the status will change soon. */
- return;
- }
-
- /*
- * Convert to a regular contact status update
- * because the status may never change.
- */
- is_contact_refresh = 0;
- value = UNKNOWN;
- }
-
- update = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(status));
- if (!update) {
- ast_log(LOG_ERROR, "Unable to allocate ast_sip_contact_status for contact %s\n",
- contact->uri);
- return;
- }
-
- ast_string_field_set(update, uri, contact->uri);
-
- if (is_contact_refresh) {
- /* Copy everything just to set the refresh flag. */
- update->status = status->status;
- update->last_status = status->last_status;
- update->rtt = status->rtt;
- update->rtt_start = status->rtt_start;
- update->refresh = 1;
- } else {
- update->last_status = status->status;
- update->status = value;
-
- /*
- * if the contact is available calculate the rtt as
- * the diff between the last start time and "now"
- */
- update->rtt = update->status == AVAILABLE && status->rtt_start.tv_sec > 0
- ? ast_tvdiff_us(ast_tvnow(), status->rtt_start)
- : 0;
- update->rtt_start = ast_tv(0, 0);
-
- ast_test_suite_event_notify("AOR_CONTACT_QUALIFY_RESULT",
- "Contact: %s\r\n"
- "Status: %s\r\n"
- "RTT: %" PRId64,
- ast_sorcery_object_get_id(update),
- ast_sip_get_contact_status_label(update->status),
- update->rtt);
- }
-
- if (ast_sorcery_update(ast_sip_get_sorcery(), update)) {
- ast_log(LOG_ERROR, "Unable to update ast_sip_contact_status for contact %s\n",
- contact->uri);
- }
-}
-
-/*!
- * \internal
- * \brief Initialize the start time on a contact status so the round
- * trip time can be calculated upon a valid response.
- */
-static void init_start_time(const struct ast_sip_contact *contact)
-{
- RAII_VAR(struct ast_sip_contact_status *, status, NULL, ao2_cleanup);
- RAII_VAR(struct ast_sip_contact_status *, update, NULL, ao2_cleanup);
-
- status = ast_res_pjsip_find_or_create_contact_status(contact);
- if (!status) {
- ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s\n",
- contact->uri);
- return;
- }
-
- update = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(status));
- if (!update) {
- ast_log(LOG_ERROR, "Unable to copy ast_sip_contact_status for contact %s\n",
- contact->uri);
- return;
- }
-
- ast_string_field_set(status, uri, contact->uri);
- update->status = status->status;
- update->last_status = status->last_status;
- update->rtt = status->rtt;
- update->rtt_start = ast_tvnow();
-
- if (ast_sorcery_update(ast_sip_get_sorcery(), update)) {
- ast_log(LOG_ERROR, "Unable to update ast_sip_contact_status for contact %s\n",
- contact->uri);
- }
-}
-
-/*!
- * \internal
- * \brief For an endpoint try to match the given contact->aor.
- */
-static int on_endpoint(void *obj, void *arg, int flags)
-{
- struct ast_sip_endpoint *endpoint = obj;
- char *contact_aor = arg;
- char *aor_name;
- char *aors;
-
- if (!arg || ast_strlen_zero(endpoint->aors)) {
- return 0;
- }
-
- aors = ast_strdupa(endpoint->aors);
- while ((aor_name = ast_strip(strsep(&aors, ",")))) {
- if (!strcmp(contact_aor, aor_name)) {
- return CMP_MATCH;
- }
- }
-
- return 0;
-}
-
-/*!
- * \internal
- * \brief Find an endpoint associated with the given contact.
- */
-static struct ast_sip_endpoint *find_an_endpoint(struct ast_sip_contact *contact)
-{
- struct ao2_container *endpoints;
- struct ast_sip_endpoint *endpoint;
- struct ast_variable *var;
- char *aor = ast_alloca(strlen(contact->aor) + 3);
-
- sprintf(aor, "%%%s%%", contact->aor);
- var = ast_variable_new("aors LIKE", aor, "");
- endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "endpoint", AST_RETRIEVE_FLAG_MULTIPLE, var);
-
- ast_variables_destroy(var);
-
- /*
- * Because aors are a string list, we have to use a pattern match but since a simple
- * pattern match could return an endpoint that has an aor of "aaabccc" when searching
- * for "abc", we still have to iterate over them to find an exact aor match.
+struct sip_options_endpoint_state_compositor {
+ /*! \brief The last contributed available status of the AORs feeding this compositor */
+ struct ao2_container *aor_statuses;
+ /*!
+ * \brief Non-zero if the compositor is in normal operation. i.e. Not being setup/reconfigured.
+ *
+ * \details
+ * The aor layer can only update its aor_statuses record when not active.
+ * When active the aor layer can update its aor_statuses record, calculate the new
+ * number of available aors, determine if the endpoint compositor changed state,
+ * and report it.
*/
- endpoint = ao2_callback(endpoints, 0, on_endpoint, (char *)contact->aor);
- ao2_ref(endpoints, -1);
-
- return endpoint;
-}
-
-/*!
- * \internal
- * \brief Receive a response to the qualify contact request.
- */
-static void qualify_contact_cb(void *token, pjsip_event *e)
-{
- struct ast_sip_contact *contact = token;
-
- switch(e->body.tsx_state.type) {
- default:
- ast_log(LOG_ERROR, "Unexpected PJSIP event %u\n", e->body.tsx_state.type);
- /* Fall through */
- case PJSIP_EVENT_TRANSPORT_ERROR:
- case PJSIP_EVENT_TIMER:
- update_contact_status(contact, UNAVAILABLE, 0);
- break;
- case PJSIP_EVENT_RX_MSG:
- update_contact_status(contact, AVAILABLE, 0);
- break;
- }
- ao2_cleanup(contact);
-}
-
-/*!
- * \internal
- * \brief Attempt to qualify the contact
- *
- * \details Sends a SIP OPTIONS request to the given contact in order to make
- * sure that contact is available.
- */
-static int qualify_contact(struct ast_sip_endpoint *endpoint, struct ast_sip_contact *contact)
-{
- pjsip_tx_data *tdata;
- RAII_VAR(struct ast_sip_endpoint *, endpoint_local, NULL, ao2_cleanup);
-
- if (endpoint) {
- endpoint_local = ao2_bump(endpoint);
- } else {
- if (!ast_strlen_zero(contact->endpoint_name)) {
- endpoint_local = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", contact->endpoint_name);
- }
- if (!endpoint_local) {
- endpoint_local = find_an_endpoint(contact);
- }
- if (!endpoint_local) {
- ast_log(LOG_WARNING, "Unable to find an endpoint to qualify contact %s. Deleting this contact\n",
- contact->uri);
- contact_deleted(contact);
- return -1;
- }
- }
-
- if (ast_sip_create_request("OPTIONS", NULL, endpoint_local, NULL, contact, &tdata)) {
- ast_log(LOG_ERROR, "Unable to create request to qualify contact %s\n",
- contact->uri);
- return -1;
- }
-
- /* If an outbound proxy is specified set it on this request */
- if (!ast_strlen_zero(contact->outbound_proxy) &&
- ast_sip_set_outbound_proxy(tdata, contact->outbound_proxy)) {
- pjsip_tx_data_dec_ref(tdata);
- ast_log(LOG_ERROR, "Unable to apply outbound proxy on request to qualify contact %s\n",
- contact->uri);
- return -1;
- }
-
- init_start_time(contact);
-
- ao2_ref(contact, +1);
- if (ast_sip_send_out_of_dialog_request(tdata, endpoint_local, (int)(contact->qualify_timeout * 1000), contact, qualify_contact_cb)
- != PJ_SUCCESS) {
- ast_log(LOG_ERROR, "Unable to send request to qualify contact %s\n",
- contact->uri);
- update_contact_status(contact, UNAVAILABLE, 0);
- ao2_ref(contact, -1);
- return -1;
- }
-
- return 0;
-}
-
-/*!
- * \internal
- * \brief Scheduling context for sending QUALIFY request at specified intervals.
- */
-static struct ast_sched_context *sched;
-
-/*!
- * \internal
- * \brief Container to hold all actively scheduled qualifies.
- */
-static struct ao2_container *sched_qualifies;
-
-/*!
- * \internal
- * \brief Structure to hold qualify contact scheduling information.
- */
-struct sched_data {
- /*! The scheduling id */
- int id;
- /*! The the contact being checked */
- struct ast_sip_contact *contact;
+ char active;
+ /*! \brief The name of the endpoint */
+ char name[0];
};
/*!
- * \internal
- * \brief Destroy the scheduled data and remove from scheduler.
+ * \brief Structure which contains an AOR and contacts for qualifying purposes
*/
-static void sched_data_destructor(void *obj)
-{
- struct sched_data *data = obj;
-
- ao2_cleanup(data->contact);
-}
-/*!
- * \internal
- * \brief Create the scheduling data object.
- */
-static struct sched_data *sched_data_create(struct ast_sip_contact *contact)
-{
- struct sched_data *data;
-
- data = ao2_t_alloc(sizeof(*data), sched_data_destructor, contact->uri);
- if (!data) {
- ast_log(LOG_ERROR, "Unable to create schedule qualify data for contact %s\n",
- contact->uri);
- return NULL;
- }
-
- data->contact = contact;
- ao2_ref(data->contact, +1);
-
- return data;
-}
-
-/*!
- * \internal
- * \brief Send a qualify contact request within a threaded task.
- */
-static int qualify_contact_task(void *obj)
-{
- struct ast_sip_contact *contact = obj;
- int res;
-
- res = qualify_contact(NULL, contact);
- ao2_ref(contact, -1);
- return res;
-}
-
-/*!
- * \internal
- * \brief Send a scheduled qualify contact request.
- */
-static int qualify_contact_sched(const void *obj)
-{
- struct sched_data *data = (struct sched_data *) obj;
-
- ao2_ref(data->contact, +1);
- if (ast_sip_push_task(NULL, qualify_contact_task, data->contact)) {
- ao2_ref(data->contact, -1);
- }
-
- /*
- * Always reschedule rather than have a potential race cleaning
- * up the data object ref between self deletion and an external
- * deletion.
+struct sip_options_aor {
+ /*! \brief The scheduler task for this AOR */
+ struct ast_sip_sched_task *sched_task;
+ /*! \brief The serializer for this AOR */
+ struct ast_taskprocessor *serializer;
+ /*! \brief All contacts associated with this AOR */
+ struct ao2_container *contacts;
+ /*!
+ * \brief Only dynamic contacts associated with this AOR
+ * \note Used to speed up applying AOR configuration by
+ * minimizing wild card sorcery access.
*/
- return data->contact->qualify_frequency * 1000;
-}
-
-/*!
- * \internal
- * \brief Set up a scheduled qualify contact check.
- */
-static void schedule_qualify(struct ast_sip_contact *contact, int initial_interval)
-{
- struct sched_data *data;
-
- data = sched_data_create(contact);
- if (!data) {
- return;
- }
-
- ast_assert(contact->qualify_frequency != 0);
-
- ao2_t_ref(data, +1, "Ref for qualify_contact_sched() scheduler entry");
- data->id = ast_sched_add_variable(sched, initial_interval,
- qualify_contact_sched, data, 1);
- if (data->id < 0) {
- ao2_t_ref(data, -1, "Cleanup failed scheduler add");
- ast_log(LOG_ERROR, "Unable to schedule qualify for contact %s\n",
- contact->uri);
- } else if (!ao2_link(sched_qualifies, data)) {
- AST_SCHED_DEL_UNREF(sched, data->id,
- ao2_t_ref(data, -1, "Cleanup scheduler for failed ao2_link"));
- }
- ao2_t_ref(data, -1, "Done setting up scheduler entry");
-}
-
-/*!
- * \internal
- * \brief Remove the contact from the scheduler.
- */
-static void unschedule_qualify(struct ast_sip_contact *contact)
-{
- struct sched_data *data;
-
- data = ao2_find(sched_qualifies, contact, OBJ_UNLINK | OBJ_SEARCH_KEY);
- if (!data) {
- return;
- }
-
- AST_SCHED_DEL_UNREF(sched, data->id,
- ao2_t_ref(data, -1, "Delete scheduler entry ref"));
- ao2_t_ref(data, -1, "Done with ao2_find ref");
-}
-
-/*!
- * \internal
- * \brief Qualify the given contact and set up scheduling if configured.
- */
-static void qualify_and_schedule(struct ast_sip_contact *contact)
-{
- unschedule_qualify(contact);
-
- if (contact->qualify_frequency) {
- ao2_ref(contact, +1);
- if (ast_sip_push_task(NULL, qualify_contact_task, contact)) {
- ao2_ref(contact, -1);
- }
-
- schedule_qualify(contact, contact->qualify_frequency * 1000);
- } else {
- update_contact_status(contact, UNKNOWN, 0);
- }
-}
-
-/*!
- * \internal
- * \brief A new contact has been created make sure it is available.
- */
-static void contact_created(const void *obj)
-{
- qualify_and_schedule((struct ast_sip_contact *) obj);
-}
-
-/*!
- * \internal
- * \brief A contact has been updated.
- */
-static void contact_updated(const void *obj)
-{
- update_contact_status(obj, AVAILABLE, 1);
-}
-
-/*!
- * \internal
- * \brief A contact has been deleted remove status tracking.
- */
-static void contact_deleted(const void *obj)
-{
- struct ast_sip_contact *contact = (struct ast_sip_contact *) obj;
- struct ast_sip_contact_status *status;
-
- unschedule_qualify(contact);
-
- status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(contact));
- if (!status) {
- return;
- }
-
- if (ast_sorcery_delete(ast_sip_get_sorcery(), status)) {
- ast_log(LOG_ERROR, "Unable to delete ast_sip_contact_status for contact %s\n",
- contact->uri);
- }
- ao2_ref(status, -1);
-}
-
-static const struct ast_sorcery_observer contact_observer = {
- .created = contact_created,
- .updated = contact_updated,
- .deleted = contact_deleted,
+ struct ao2_container *dynamic_contacts;
+ /*! \brief The endpoint state compositors we are feeding, a reference is held to each */
+ AST_VECTOR(, struct sip_options_endpoint_state_compositor *) compositors;
+ /*! \brief The number of available contacts on this AOR */
+ unsigned int available;
+ /*! \brief Frequency to send OPTIONS requests to AOR contacts. 0 is disabled. */
+ unsigned int qualify_frequency;
+ /*! If true authenticate the qualify challenge response if needed */
+ int authenticate_qualify;
+ /*! \brief Qualify timeout. 0 is diabled. */
+ double qualify_timeout;
+ /*! \brief The name of the AOR */
+ char name[0];
};
-static pj_bool_t options_start(void)
-{
- sched = ast_sched_context_create();
- if (!sched) {
- return -1;
- }
- if (ast_sched_start_thread(sched)) {
- ast_sched_context_destroy(sched);
- sched = NULL;
- return -1;
- }
+/*!
+ * \internal
+ * \brief Container of active SIP AORs for qualifying
+ */
+static struct ao2_container *sip_options_aors;
- if (ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &contact_observer)) {
- ast_log(LOG_WARNING, "Unable to add contact observer\n");
- ast_sched_context_destroy(sched);
- sched = NULL;
- return -1;
- }
+/*!
+ * \internal
+ * \brief Container of contact statuses
+ */
+static struct ao2_container *sip_options_contact_statuses;
- return PJ_SUCCESS;
-}
+/*!
+ * \internal
+ * \brief Container of endpoint state compositors
+ */
+static struct ao2_container *sip_options_endpoint_state_compositors;
-static int sched_qualifies_empty(void *obj, void *arg, int flags)
-{
- ao2_t_ref(obj, -1, "Release ref held by destroyed scheduler context.");
- return CMP_MATCH;
-}
-
-static pj_bool_t options_stop(void)
-{
- ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &contact_observer);
-
- if (sched) {
- ast_sched_context_destroy(sched);
- sched = NULL;
- }
-
- /* Empty the container of scheduling data refs. */
- ao2_callback(sched_qualifies, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE,
- sched_qualifies_empty, NULL);
-
- return PJ_SUCCESS;
-}
+/*!
+ * \internal
+ * \brief Serializer for AOR, endpoint state compositor, and contact existence management
+ */
+static struct ast_taskprocessor *management_serializer;
static pj_status_t send_options_response(pjsip_rx_data *rdata, int code)
{
@@ -793,102 +314,1998 @@
.name = {"Options Module", 14},
.id = -1,
.priority = PJSIP_MOD_PRIORITY_APPLICATION,
- .start = options_start,
- .stop = options_stop,
.on_rx_request = options_on_rx_request,
};
-/*!
- * \internal
- * \brief Send qualify request to the given contact.
- */
-static int cli_on_contact(void *obj, void *arg, void *data, int flags)
-{
- struct ast_sip_contact *contact = obj;
- struct ast_sip_endpoint *endpoint = data;
- int *cli_fd = arg;
-
- ast_cli(*cli_fd, " contact %s\n", contact->uri);
- qualify_contact(endpoint, contact);
- return 0;
-}
-
-/*!
- * \brief Data pushed to threadpool to qualify endpoints from the CLI
- */
-struct qualify_data {
- /*! Endpoint that is being qualified */
- struct ast_sip_endpoint *endpoint;
- /*! CLI File descriptor for printing messages */
- int cli_fd;
+static const char *status_map[] = {
+ [UNAVAILABLE] = "Unreachable",
+ [AVAILABLE] = "Reachable",
+ [UNKNOWN] = "Unknown",
+ [CREATED] = "Created",
+ [REMOVED] = "Removed",
};
-static struct qualify_data *qualify_data_alloc(struct ast_sip_endpoint *endpoint, int cli_fd)
-{
- struct qualify_data *qual_data;
+static const char *short_status_map[] = {
+ [UNAVAILABLE] = "Unavail",
+ [AVAILABLE] = "Avail",
+ [UNKNOWN] = "Unknown",
+ [CREATED] = "Created",
+ [REMOVED] = "Removed",
+};
- qual_data = ast_malloc(sizeof(*qual_data));
- if (!qual_data) {
+const char *ast_sip_get_contact_status_label(const enum ast_sip_contact_status_type status)
+{
+ return status_map[status];
+}
+
+const char *ast_sip_get_contact_short_status_label(const enum ast_sip_contact_status_type status)
+{
+ return short_status_map[status];
+}
+
+/*! \brief Destructor for contact statuses */
+static void sip_contact_status_dtor(void *obj)
+{
+ struct ast_sip_contact_status *contact_status = obj;
+
+ ast_string_field_free_memory(contact_status);
+}
+
+static struct ast_sip_contact_status *sip_contact_status_alloc(const char *name)
+{
+ struct ast_sip_contact_status *contact_status;
+ size_t size = sizeof(*contact_status) + strlen(name) + 1;
+
+ contact_status = ao2_alloc_options(size, sip_contact_status_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!contact_status) {
+ return NULL;
+ }
+ if (ast_string_field_init(contact_status, 256)) {
+ ao2_ref(contact_status, -1);
+ return NULL;
+ }
+ strcpy(contact_status->name, name); /* SAFE */
+ return contact_status;
+}
+
+static struct ast_sip_contact_status *sip_contact_status_copy(const struct ast_sip_contact_status *src)
+{
+ struct ast_sip_contact_status *dst;
+
+ dst = sip_contact_status_alloc(src->name);
+ if (!dst) {
return NULL;
}
- qual_data->endpoint = ao2_bump(endpoint);
- qual_data->cli_fd = cli_fd;
- return qual_data;
+ if (ast_string_fields_copy(dst, src)) {
+ ao2_ref(dst, -1);
+ return NULL;
+ }
+ dst->rtt = src->rtt;
+ dst->status = src->status;
+ dst->last_status = src->last_status;
+ return dst;
}
-static void qualify_data_destroy(struct qualify_data *qual_data)
+/*! \brief Hashing function for contact statuses */
+AO2_STRING_FIELD_HASH_FN(ast_sip_contact_status, name);
+
+/*! \brief Sort function for contact statuses */
+AO2_STRING_FIELD_SORT_FN(ast_sip_contact_status, name);
+
+/*! \brief Comparator function for contact statuses */
+AO2_STRING_FIELD_CMP_FN(ast_sip_contact_status, name);
+
+/*! \brief Helper function to allocate a contact statuses container */
+static struct ao2_container *sip_options_contact_statuses_alloc(void)
{
- ao2_cleanup(qual_data->endpoint);
- ast_free(qual_data);
+ /*
+ * Replace duplicate objects so we can update the immutable
+ * contact status objects by simply linking in a new object.
+ */
+ return ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, CONTACT_STATUS_BUCKETS,
+ ast_sip_contact_status_hash_fn, ast_sip_contact_status_sort_fn,
+ ast_sip_contact_status_cmp_fn);
+}
+
+/*! \brief Function which publishes a contact status update to all interested endpoints */
+static void sip_options_publish_contact_state(const struct sip_options_aor *aor_options,
+ const struct ast_sip_contact_status *contact_status)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&aor_options->compositors); ++i) {
+ const struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = AST_VECTOR_GET(&aor_options->compositors, i);
+ ast_sip_persistent_endpoint_publish_contact_state(endpoint_state_compositor->name,
+ contact_status);
+ }
}
/*!
- * \internal
- * \brief For an endpoint iterate over and qualify all aors/contacts
+ * \brief Task to notify endpoints of a contact status change
+ * \note Run by management_serializer
*/
-static int cli_qualify_contacts(void *data)
+static int contact_status_publish_update_task(void *obj)
{
- char *aors;
- char *aor_name;
- RAII_VAR(struct qualify_data *, qual_data, data, qualify_data_destroy);
- struct ast_sip_endpoint *endpoint = qual_data->endpoint;
- int cli_fd = qual_data->cli_fd;
- const char *endpoint_name = ast_sorcery_object_get_id(endpoint);
+ struct ast_sip_contact_status *contact_status = obj;
+ struct sip_options_aor *aor_options;
- if (ast_strlen_zero(endpoint->aors)) {
- ast_cli(cli_fd, "Endpoint %s has no AoR's configured\n",
- endpoint_name);
+ aor_options = ao2_find(sip_options_aors, contact_status->aor, OBJ_SEARCH_KEY);
+ if (aor_options) {
+ sip_options_publish_contact_state(aor_options, contact_status);
+ ao2_ref(aor_options, -1);
+ }
+ ao2_ref(contact_status, -1);
+
+ return 0;
+}
+
+static void sip_options_contact_status_update(struct ast_sip_contact_status *contact_status)
+{
+ struct ast_taskprocessor *mgmt_serializer = management_serializer;
+
+ if (mgmt_serializer) {
+ ao2_ref(contact_status, +1);
+ if (ast_sip_push_task(mgmt_serializer, contact_status_publish_update_task,
+ contact_status)) {
+ ao2_ref(contact_status, -1);
+ }
+ }
+}
+
+struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const struct ast_sip_contact *contact)
+{
+ struct ast_sip_contact_status *contact_status;
+ int res;
+
+ /*
+ * At startup it is possible for contact status to be retrieved
+ * before we are ready, if this occurs then allocate the container
+ * here. Since we don't actually trigger qualify or anything as a
+ * result it is safe to do so. They'll just get back a contact
+ * status that will be updated later.
+ */
+ if (!sip_options_contact_statuses) {
+ sip_options_contact_statuses = sip_options_contact_statuses_alloc();
+ if (!sip_options_contact_statuses) {
+ return NULL;
+ }
+ }
+
+ ao2_lock(sip_options_contact_statuses);
+
+ /* If contact status for this contact already exists just return it */
+ contact_status = ao2_find(sip_options_contact_statuses,
+ ast_sorcery_object_get_id(contact), OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (contact_status) {
+ ao2_unlock(sip_options_contact_statuses);
+ return contact_status;
+ }
+
+ /* Otherwise we have to create and store a new contact status */
+ contact_status = sip_contact_status_alloc(ast_sorcery_object_get_id(contact));
+ if (!contact_status) {
+ ao2_unlock(sip_options_contact_statuses);
+ return NULL;
+ }
+
+ contact_status->rtt = 0;
+ contact_status->status = CREATED;
+ contact_status->last_status = CREATED;
+ res = ast_string_field_set(contact_status, uri, contact->uri);
+ res |= ast_string_field_set(contact_status, aor, contact->aor);
+ if (res) {
+ ao2_unlock(sip_options_contact_statuses);
+ ao2_ref(contact_status, -1);
+ return NULL;
+ }
+
+ ao2_link_flags(sip_options_contact_statuses, contact_status, OBJ_NOLOCK);
+ ao2_unlock(sip_options_contact_statuses);
+
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "+1", 1.0, ast_sip_get_contact_status_label(contact_status->status));
+
+ sip_options_contact_status_update(contact_status);
+
+ return contact_status;
+}
+
+struct ast_sip_contact_status *ast_sip_get_contact_status(const struct ast_sip_contact *contact)
+{
+ return ao2_find(sip_options_contact_statuses, ast_sorcery_object_get_id(contact),
+ OBJ_SEARCH_KEY);
+}
+
+/*! \brief Hashing function for OPTIONS AORs */
+AO2_STRING_FIELD_HASH_FN(sip_options_aor, name);
+
+/*! \brief Comparator function for SIP OPTIONS AORs */
+AO2_STRING_FIELD_CMP_FN(sip_options_aor, name);
+
+/*! \brief Hashing function for endpoint state compositors */
+AO2_STRING_FIELD_HASH_FN(sip_options_endpoint_state_compositor, name);
+
+/*! \brief Comparator function for endpoint state compositors */
+AO2_STRING_FIELD_CMP_FN(sip_options_endpoint_state_compositor, name);
+
+/*! \brief Structure used to contain information for an OPTIONS callback */
+struct sip_options_contact_callback_data {
+ /*! \brief The contact we qualified */
+ struct ast_sip_contact *contact;
+ /*! \brief The AOR options */
+ struct sip_options_aor *aor_options;
+ /*! \brief The time at which this OPTIONS attempt was started */
+ struct timeval rtt_start;
+ /*! \brief The new status of the contact */
+ enum ast_sip_contact_status_type status;
+};
+
+/*!
+ * \brief Return the current state of an endpoint state compositor
+ * \pre The endpoint_state_compositor lock must be held.
+ */
+static enum ast_endpoint_state sip_options_get_endpoint_state_compositor_state(
+ const struct sip_options_endpoint_state_compositor *endpoint_state_compositor)
+{
+ struct ao2_iterator it_aor_statuses;
+ struct sip_options_endpoint_aor_status *aor_status;
+ enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE;
+
+ it_aor_statuses = ao2_iterator_init(endpoint_state_compositor->aor_statuses, 0);
+ for (; (aor_status = ao2_iterator_next(&it_aor_statuses)); ao2_ref(aor_status, -1)) {
+ if (aor_status->available) {
+ state = AST_ENDPOINT_ONLINE;
+ break;
+ }
+ }
+ ao2_iterator_destroy(&it_aor_statuses);
+
+ return state;
+}
+
+/*!
+ * \brief Update the AOR status on an endpoint state compositor
+ * \pre The endpoint_state_compositor lock must be held.
+ */
+static void sip_options_update_endpoint_state_compositor_aor(struct sip_options_endpoint_state_compositor *endpoint_state_compositor,
+ const char *name, enum ast_sip_contact_status_type status)
+{
+ struct sip_options_endpoint_aor_status *aor_status;
+ enum ast_endpoint_state endpoint_state;
+
+ aor_status = ao2_find(endpoint_state_compositor->aor_statuses, name,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!aor_status) {
+ /* The AOR status doesn't exist already so we don't need to go any further */
+ if (status == REMOVED) {
+ return;
+ }
+
+ aor_status = ao2_alloc_options(sizeof(*aor_status) + strlen(name) + 1, NULL,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!aor_status) {
+ return;
+ }
+
+ strcpy(aor_status->name, name); /* SAFE */
+ ao2_link(endpoint_state_compositor->aor_statuses, aor_status);
+ }
+
+ if (status == REMOVED) {
+ /*
+ * If the AOR is being removed then remove its AOR status
+ * from the endpoint compositor.
+ */
+ ao2_unlink(endpoint_state_compositor->aor_statuses, aor_status);
+ } else {
+ aor_status->available = (status == AVAILABLE ? 1 : 0);
+ }
+ ao2_ref(aor_status, -1);
+
+ if (!endpoint_state_compositor->active) {
+ return;
+ }
+
+ /* If this AOR is available then the endpoint itself has to be online */
+ if (status == AVAILABLE) {
+ ast_debug(3, "Endpoint state compositor '%s' is online as AOR '%s' is available\n",
+ endpoint_state_compositor->name, name);
+ endpoint_state = AST_ENDPOINT_ONLINE;
+ } else {
+ endpoint_state =
+ sip_options_get_endpoint_state_compositor_state(endpoint_state_compositor);
+ }
+
+ ast_sip_persistent_endpoint_update_state(endpoint_state_compositor->name,
+ endpoint_state);
+}
+
+/*! \brief Function which notifies endpoint state compositors of a state change of an AOR */
+static void sip_options_notify_endpoint_state_compositors(struct sip_options_aor *aor_options,
+ enum ast_sip_contact_status_type status)
+{
+ int i;
+
+ /* Iterate through the associated endpoint state compositors updating them */
+ for (i = 0; i < AST_VECTOR_SIZE(&aor_options->compositors); ++i) {
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = AST_VECTOR_GET(&aor_options->compositors, i);
+
+ ao2_lock(endpoint_state_compositor);
+ sip_options_update_endpoint_state_compositor_aor(endpoint_state_compositor,
+ aor_options->name, status);
+ ao2_unlock(endpoint_state_compositor);
+ }
+
+ if (status == REMOVED) {
+ AST_VECTOR_RESET(&aor_options->compositors, ao2_cleanup);
+ }
+}
+
+/*!
+ * \brief Task to notify an AOR of a contact status change
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_contact_status_notify_task(void *obj)
+{
+ struct sip_options_contact_callback_data *contact_callback_data = obj;
+ struct ast_sip_contact *contact;
+ struct ast_sip_contact_status *cs_old;
+ struct ast_sip_contact_status *cs_new;
+
+ /*
+ * Determine if this is a late arriving notification, as it is
+ * possible that we get a callback from PJSIP giving us contact
+ * status but in the mean time said contact has been removed
+ * from the controlling AOR.
+ */
+
+ if (!contact_callback_data->aor_options->qualify_frequency) {
+ /* Contact qualify response is late */
+ ao2_ref(contact_callback_data, -1);
return 0;
}
- aors = ast_strdupa(endpoint->aors);
- while ((aor_name = ast_strip(strsep(&aors, ",")))) {
- struct ast_sip_aor *aor;
+ contact = ao2_find(contact_callback_data->aor_options->contacts,
+ contact_callback_data->contact, OBJ_SEARCH_OBJECT);
+ if (!contact) {
+ /* Contact qualify response is late */
+ ao2_ref(contact_callback_data, -1);
+ return 0;
+ }
+ ao2_ref(contact, -1);
+
+ cs_old = ao2_find(sip_options_contact_statuses,
+ ast_sorcery_object_get_id(contact_callback_data->contact), OBJ_SEARCH_KEY);
+ if (!cs_old) {
+ /* Contact qualify response is late */
+ ao2_ref(contact_callback_data, -1);
+ return 0;
+ }
+
+ /* Update the contact specific status information */
+ cs_new = sip_contact_status_copy(cs_old);
+ ao2_ref(cs_old, -1);
+ if (!cs_new) {
+ ao2_ref(contact_callback_data, -1);
+ return 0;
+ }
+ cs_new->last_status = cs_new->status;
+ cs_new->status = contact_callback_data->status;
+ cs_new->rtt =
+ cs_new->status == AVAILABLE
+ ? ast_tvdiff_us(ast_tvnow(), contact_callback_data->rtt_start)
+ : 0;
+ ao2_link(sip_options_contact_statuses, cs_new);
+
+ /*
+ * If the status has changed then notify the endpoint state compositors
+ * and publish our events.
+ */
+ if (cs_new->last_status != cs_new->status) {
+ if (cs_new->status == AVAILABLE) {
+ /* If this is the first available contact then the AOR has become available */
+ ++contact_callback_data->aor_options->available;
+ if (contact_callback_data->aor_options->available == 1) {
+ sip_options_notify_endpoint_state_compositors(
+ contact_callback_data->aor_options, AVAILABLE);
+ }
+ } else if (cs_new->last_status == AVAILABLE) {
+ ast_assert(cs_new->status == UNAVAILABLE);
+
+ /* If there are no more available contacts then this AOR is unavailable */
+ --contact_callback_data->aor_options->available;
+ if (!contact_callback_data->aor_options->available) {
+ sip_options_notify_endpoint_state_compositors(
+ contact_callback_data->aor_options, UNAVAILABLE);
+ }
+ }
+
+ ast_verb(3, "Contact %s/%s is now %s. RTT: %.3f msec\n",
+ cs_new->aor,
+ cs_new->uri,
+ ast_sip_get_contact_status_label(cs_new->status),
+ cs_new->rtt / 1000.0);
+
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "-1", 1.0, ast_sip_get_contact_status_label(cs_new->last_status));
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "+1", 1.0, ast_sip_get_contact_status_label(cs_new->status));
+
+ sip_options_contact_status_update(cs_new);
+
+ ast_test_suite_event_notify("AOR_CONTACT_UPDATE",
+ "Contact: %s\r\n"
+ "Status: %s",
+ cs_new->name,
+ ast_sip_get_contact_status_label(cs_new->status));
+ } else {
+ ast_debug(3, "Contact %s/%s status didn't change: %s, RTT: %.3f msec\n",
+ cs_new->aor,
+ cs_new->uri,
+ ast_sip_get_contact_status_label(cs_new->status),
+ cs_new->rtt / 1000.0);
+ }
+
+ ast_statsd_log_full_va("PJSIP.contacts.%s.rtt", AST_STATSD_TIMER,
+ cs_new->status != AVAILABLE ? -1 : cs_new->rtt / 1000,
+ 1.0,
+ cs_new->name);
+
+ ast_test_suite_event_notify("AOR_CONTACT_QUALIFY_RESULT",
+ "Contact: %s\r\n"
+ "Status: %s\r\n"
+ "RTT: %" PRId64,
+ cs_new->name,
+ ast_sip_get_contact_status_label(cs_new->status),
+ cs_new->rtt);
+
+ ast_debug(3, "AOR '%s' now has %d available contacts\n",
+ contact_callback_data->aor_options->name,
+ contact_callback_data->aor_options->available);
+
+ ao2_ref(cs_new, -1);
+ ao2_ref(contact_callback_data, -1);
+
+ return 0;
+}
+
+/*! \brief Callback for when we get a result from a SIP OPTIONS request (a response or a timeout) */
+static void qualify_contact_cb(void *token, pjsip_event *e)
+{
+ struct sip_options_contact_callback_data *contact_callback_data = token;
+ enum ast_sip_contact_status_type status;
+
+ switch(e->body.tsx_state.type) {
+ default:
+ ast_log(LOG_ERROR, "Unexpected PJSIP event %u\n", e->body.tsx_state.type);
+ /* Fall through */
+ case PJSIP_EVENT_TRANSPORT_ERROR:
+ case PJSIP_EVENT_TIMER:
+ status = UNAVAILABLE;
+ break;
+ case PJSIP_EVENT_RX_MSG:
+ status = AVAILABLE;
+ break;
+ }
+
+ /* Update the callback data with the new status, this will get handled in the AOR serializer */
+ contact_callback_data->status = status;
+
+ if (ast_sip_push_task(contact_callback_data->aor_options->serializer,
+ sip_options_contact_status_notify_task, contact_callback_data)) {
+ ast_log(LOG_NOTICE, "Unable to queue contact status update for '%s' on AOR '%s', state will be incorrect\n",
+ ast_sorcery_object_get_id(contact_callback_data->contact),
+ contact_callback_data->aor_options->name);
+ ao2_ref(contact_callback_data, -1);
+ }
+
+ /* The task inherited our reference so we don't unreference here */
+}
+
+/*! \brief Destructor for contact callback data */
+static void sip_options_contact_callback_data_dtor(void *obj)
+{
+ struct sip_options_contact_callback_data *contact_callback_data = obj;
+
+ ao2_cleanup(contact_callback_data->contact);
+ ao2_cleanup(contact_callback_data->aor_options);
+}
+
+/*! \brief Contact callback data allocator */
+static struct sip_options_contact_callback_data *sip_options_contact_callback_data_alloc(
+ struct ast_sip_contact *contact, struct sip_options_aor *aor_options)
+{
+ struct sip_options_contact_callback_data *contact_callback_data;
+
+ contact_callback_data = ao2_alloc_options(sizeof(*contact_callback_data),
+ sip_options_contact_callback_data_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!contact_callback_data) {
+ return NULL;
+ }
+
+ contact_callback_data->contact = ao2_bump(contact);
+ contact_callback_data->aor_options = ao2_bump(aor_options);
+ contact_callback_data->rtt_start = ast_tvnow();
+
+ return contact_callback_data;
+}
+
+/*! \brief Send a SIP OPTIONS request for a contact */
+static int sip_options_qualify_contact(void *obj, void *arg, int flags)
+{
+ struct ast_sip_contact *contact = obj;
+ struct sip_options_aor *aor_options = arg;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ pjsip_tx_data *tdata;
+ struct ast_sip_contact_status *contact_status;
+ struct sip_options_contact_callback_data *contact_callback_data;
+
+ ast_debug(3, "Qualifying contact '%s' on AOR '%s'\n",
+ ast_sorcery_object_get_id(contact), aor_options->name);
+
+ if (!ast_strlen_zero(contact->endpoint_name)) {
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
+ contact->endpoint_name);
+ }
+ if (!endpoint && AST_VECTOR_SIZE(&aor_options->compositors)) {
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = AST_VECTOR_GET(&aor_options->compositors, 0);
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
+ endpoint_state_compositor->name);
+ }
+ if (!endpoint) {
+ ast_debug(3, "Could not find an endpoint to qualify contact '%s' on AOR '%s'\n",
+ ast_sorcery_object_get_id(contact), aor_options->name);
+ return 0;
+ }
+
+ if (ast_sip_create_request("OPTIONS", NULL, endpoint, NULL, contact, &tdata)) {
+ ast_log(LOG_ERROR, "Unable to create request to qualify contact %s on AOR %s\n",
+ contact->uri, aor_options->name);
+ return 0;
+ }
+
+ /* If an outbound proxy is specified set it on this request */
+ if (!ast_strlen_zero(contact->outbound_proxy) &&
+ ast_sip_set_outbound_proxy(tdata, contact->outbound_proxy)) {
+ ast_log(LOG_ERROR, "Unable to apply outbound proxy on request to qualify contact %s\n",
+ contact->uri);
+ pjsip_tx_data_dec_ref(tdata);
+ return 0;
+ }
+
+ contact_status = ast_res_pjsip_find_or_create_contact_status(contact);
+ if (!contact_status) {
+ ast_log(LOG_ERROR, "Unable to retrieve contact status information for contact %s on AOR %s\n",
+ contact->uri, aor_options->name);
+ pjsip_tx_data_dec_ref(tdata);
+ return 0;
+ }
+ ao2_ref(contact_status, -1);
+
+ contact_callback_data = sip_options_contact_callback_data_alloc(contact, aor_options);
+ if (!contact_callback_data) {
+ ast_log(LOG_ERROR, "Unable to create object to contain callback data for contact %s on AOR %s\n",
+ contact->uri, aor_options->name);
+ pjsip_tx_data_dec_ref(tdata);
+ return 0;
+ }
+
+ if (ast_sip_send_out_of_dialog_request(tdata, endpoint,
+ (int)(aor_options->qualify_timeout * 1000), contact_callback_data,
+ qualify_contact_cb)) {
+ ast_log(LOG_ERROR, "Unable to send request to qualify contact %s on AOR %s\n",
+ contact->uri, aor_options->name);
+ ao2_ref(contact_callback_data, -1);
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Task to qualify contacts of an AOR
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_qualify_aor(void *obj)
+{
+ struct sip_options_aor *aor_options = obj;
+
+ ast_debug(3, "Qualifying all contacts on AOR '%s'\n", aor_options->name);
+
+ /* Attempt to send an OPTIONS request to every contact on this AOR */
+ ao2_callback(aor_options->contacts, OBJ_NODATA, sip_options_qualify_contact,
+ (struct sip_options_aor *) aor_options);
+
+ /* Always reschedule to the frequency we should go */
+ return aor_options->qualify_frequency * 1000;
+}
+
+/*! \brief Forward declaration of this helpful function */
+static int sip_options_remove_contact(void *obj, void *arg, int flags);
+
+/*! \brief Destructor function for SIP OPTIONS AORs */
+static void sip_options_aor_dtor(void *obj)
+{
+ struct sip_options_aor *aor_options = obj;
+
+ /*
+ * Any contacts are unreachable since the AOR is being destroyed
+ * so remove their contact status
+ */
+ if (aor_options->contacts) {
+ ao2_callback(aor_options->contacts, OBJ_NODATA | OBJ_UNLINK,
+ sip_options_remove_contact, aor_options);
+ ao2_ref(aor_options->contacts, -1);
+ }
+ ao2_cleanup(aor_options->dynamic_contacts);
+
+ ast_taskprocessor_unreference(aor_options->serializer);
+
+ ast_assert(AST_VECTOR_SIZE(&aor_options->compositors) == 0);
+ AST_VECTOR_FREE(&aor_options->compositors);
+}
+
+/*! \brief Allocator for AOR OPTIONS */
+static struct sip_options_aor *sip_options_aor_alloc(struct ast_sip_aor *aor)
+{
+ struct sip_options_aor *aor_options;
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ aor_options = ao2_alloc_options(sizeof(*aor_options) + strlen(ast_sorcery_object_get_id(aor)) + 1,
+ sip_options_aor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!aor_options) {
+ return NULL;
+ }
+
+ strcpy(aor_options->name, ast_sorcery_object_get_id(aor)); /* SAFE */
+
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/options/%s",
+ ast_sorcery_object_get_id(aor));
+ aor_options->serializer = ast_sip_create_serializer_group(tps_name,
+ shutdown_group);
+ if (!aor_options->serializer) {
+ ao2_ref(aor_options, -1);
+ return NULL;
+ }
+
+ if (AST_VECTOR_INIT(&aor_options->compositors, ENDPOINT_STATE_COMPOSITOR_INITIAL_SIZE)) {
+ ao2_ref(aor_options, -1);
+ return NULL;
+ }
+
+ aor_options->contacts = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_NOLOCK,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, CONTACT_BUCKETS, ast_sorcery_object_id_hash,
+ ast_sorcery_object_id_sort, ast_sorcery_object_id_compare);
+ if (!aor_options->contacts) {
+ ao2_ref(aor_options, -1);
+ return NULL;
+ }
+
+ aor_options->dynamic_contacts = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_NOLOCK,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, CONTACT_BUCKETS, ast_sorcery_object_id_hash,
+ ast_sorcery_object_id_sort, ast_sorcery_object_id_compare);
+ if (!aor_options->dynamic_contacts) {
+ ao2_ref(aor_options, -1);
+ return NULL;
+ }
+
+ return aor_options;
+}
+
+/*! \brief Remove contact status for a hint */
+static void sip_options_remove_contact_status(struct sip_options_aor *aor_options,
+ struct ast_sip_contact *contact)
+{
+ struct ast_sip_contact_status *cs_new;
+ struct ast_sip_contact_status *cs_old;
+
+ cs_old = ao2_find(sip_options_contact_statuses, ast_sorcery_object_get_id(contact),
+ OBJ_SEARCH_KEY | OBJ_UNLINK);
+ if (!cs_old) {
+ ast_debug(3, "Attempted to remove contact status for '%s' but it does not exist\n",
+ ast_sorcery_object_get_id(contact));
+ return;
+ }
+
+ ast_verb(2, "Contact %s/%s has been deleted\n", contact->aor, contact->uri);
+
+ /* Update the contact status to reflect its new state */
+ cs_new = sip_contact_status_copy(cs_old);
+ if (!cs_new) {
+ /*
+ * We'll have to violate the immutable property because we
+ * couldn't create a new one to modify and we are deleting
+ * the contact status anyway.
+ */
+ cs_new = cs_old;
+ } else {
+ ao2_ref(cs_old, -1);
+ }
+ cs_new->last_status = cs_new->status;
+ cs_new->status = REMOVED;
+ cs_new->rtt = 0;
+
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "-1", 1.0, ast_sip_get_contact_status_label(cs_new->last_status));
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "+1", 1.0, ast_sip_get_contact_status_label(cs_new->status));
+
+ sip_options_contact_status_update(cs_new);
+
+ /*
+ * The only time we need to update the AOR is if this contact was
+ * available and qualify is in use, otherwise we can just stop
+ * early.
+ */
+ if (!aor_options->qualify_frequency || cs_new->last_status != AVAILABLE) {
+ ao2_ref(cs_new, -1);
+ return;
+ }
+
+ --aor_options->available;
+ if (!aor_options->available) {
+ sip_options_notify_endpoint_state_compositors(aor_options, UNAVAILABLE);
+ }
+
+ ast_debug(3, "AOR '%s' now has %d available contacts\n", aor_options->name,
+ aor_options->available);
+
+ ao2_ref(cs_new, -1);
+}
+
+/*! \brief Task data for AOR creation or updating */
+struct sip_options_synchronize_aor_task_data {
+ /*! \brief The AOR options for this AOR */
+ struct sip_options_aor *aor_options;
+ /*! \brief The AOR which contains the new configuraton */
+ struct ast_sip_aor *aor;
+ /*! \brief Optional container of existing AOR s*/
+ struct ao2_container *existing;
+ /*! \brief Whether this AOR is being added */
+ int added;
+};
+
+/*! \brief Callback function to remove a contact and its contact status from an AOR */
+static int sip_options_remove_contact(void *obj, void *arg, int flags)
+{
+ struct ast_sip_contact *contact = obj;
+ struct sip_options_aor *aor_options = arg;
+
+ sip_options_remove_contact_status(aor_options, contact);
+
+ return CMP_MATCH;
+}
+
+/*! \brief Determine an initial time for scheduling AOR qualifying */
+static int sip_options_determine_initial_qualify_time(int qualify_frequency)
+{
+ int initial_interval;
+ int max_time = ast_sip_get_max_initial_qualify_time();
+
+ if (max_time && max_time < qualify_frequency) {
+ initial_interval = max_time;
+ } else {
+ initial_interval = qualify_frequency;
+ }
+
+ initial_interval = (int)((initial_interval * 1000) * ast_random_double());
+ return 0 < initial_interval ? initial_interval : 1;
+}
+
+/*! \brief Set the contact status for a contact */
+static void sip_options_set_contact_status(struct ast_sip_contact_status *contact_status,
+ enum ast_sip_contact_status_type status)
+{
+ struct ast_sip_contact_status *cs_new;
+
+ /* Update the contact specific status information */
+ cs_new = sip_contact_status_copy(contact_status);
+ if (!cs_new) {
+ return;
+ }
+ cs_new->last_status = cs_new->status;
+ cs_new->status = status;
+
+ /*
+ * We need to always set the RTT to zero because we haven't completed
+ * an OPTIONS ping so RTT is unknown. If the OPTIONS ping were still
+ * running it will be refreshed on the next go round anyway.
+ */
+ cs_new->rtt = 0;
+
+ ao2_link(sip_options_contact_statuses, cs_new);
+
+ if (cs_new->status != cs_new->last_status) {
+ ast_verb(3, "Contact %s/%s is now %s.\n",
+ cs_new->aor, cs_new->uri,
+ ast_sip_get_contact_status_label(cs_new->status));
+
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "-1", 1.0, ast_sip_get_contact_status_label(cs_new->last_status));
+ ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
+ "+1", 1.0, ast_sip_get_contact_status_label(cs_new->status));
+
+ sip_options_contact_status_update(cs_new);
+
+ ast_test_suite_event_notify("AOR_CONTACT_UPDATE",
+ "Contact: %s\r\n"
+ "Status: %s",
+ cs_new->name,
+ ast_sip_get_contact_status_label(cs_new->status));
+ }
+ ao2_ref(cs_new, -1);
+}
+
+/*! \brief Transition the contact status to unqualified mode */
+static int sip_options_set_contact_status_unqualified(void *obj, void *arg, int flags)
+{
+ struct ast_sip_contact *contact = obj;
+ struct ast_sip_contact_status *contact_status;
+
+ contact_status = ast_res_pjsip_find_or_create_contact_status(contact);
+ if (!contact_status) {
+ return 0;
+ }
+
+ switch (contact_status->status) {
+ case AVAILABLE:
+ case UNAVAILABLE:
+ case CREATED:
+ sip_options_set_contact_status(contact_status, UNKNOWN);
+ break;
+ case UNKNOWN:
+ case REMOVED:
+ break;
+ }
+
+ ao2_ref(contact_status, -1);
+
+ return 0;
+}
+
+/*! \brief Transition the contact status to qualified mode */
+static int sip_options_set_contact_status_qualified(void *obj, void *arg, int flags)
+{
+ struct ast_sip_contact *contact = obj;
+ struct ast_sip_contact_status *contact_status;
+
+ contact_status = ast_res_pjsip_find_or_create_contact_status(contact);
+ if (!contact_status) {
+ return 0;
+ }
+
+ switch (contact_status->status) {
+ case AVAILABLE:
+ sip_options_set_contact_status(contact_status, UNAVAILABLE);
+ break;
+ case UNAVAILABLE:
+ case UNKNOWN:
+ case CREATED:
+ case REMOVED:
+ break;
+ }
+
+ ao2_ref(contact_status, -1);
+
+ return 0;
+}
+
+/*! \brief Count AVAILABLE qualified contacts. */
+static int sip_options_contact_status_available_count(void *obj, void *arg, int flags)
+{
+ struct ast_sip_contact *contact = obj;
+ unsigned int *available = arg;
+ struct ast_sip_contact_status *contact_status;
+
+ contact_status = ast_res_pjsip_find_or_create_contact_status(contact);
+ if (!contact_status) {
+ return 0;
+ }
+
+ /* Count qualified available contacts. */
+ switch (contact_status->status) {
+ case AVAILABLE:
+ ++*available;
+ break;
+ case UNAVAILABLE:
+ case UNKNOWN:
+ case CREATED:
+ case REMOVED:
+ break;
+ }
+
+ ao2_ref(contact_status, -1);
+
+ return 0;
+}
+
+/*!
+ * \brief Function which applies configuration to an AOR options structure
+ * \note Run by aor_options->serializer (or management_serializer on aor_options creation)
+ */
+static int sip_options_apply_aor_configuration(struct sip_options_aor *aor_options,
+ struct ast_sip_aor *aor, int is_new)
+{
+ struct ao2_container *existing_contacts;
+ struct ast_sip_contact *contact;
+ struct ao2_iterator iter;
+
+ ast_debug(3, "Configuring AOR '%s' with current state of configuration and world\n",
+ aor_options->name);
+
+ /*
+ * Permanent contacts, since we receive no notification that they
+ * are gone, follow the same approach as AORs. We create a copy
+ * of the existing container and any reused contacts are removed
+ * from it. Any contacts remaining in the container after
+ * processing no longer exist so we need to remove their state.
+ */
+ existing_contacts = ao2_container_clone(aor_options->contacts, 0);
+ if (!existing_contacts) {
+ ast_log(LOG_WARNING, "Synchronization of AOR '%s' failed for qualify, retaining existing state\n",
+ aor_options->name);
+ return -1;
+ }
+
+ ao2_callback(aor_options->contacts, OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE,
+ NULL, NULL);
+
+ /* Process permanent contacts */
+ if (aor->permanent_contacts) {
+ iter = ao2_iterator_init(aor->permanent_contacts, 0);
+ for (; (contact = ao2_iterator_next(&iter)); ao2_ref(contact, -1)) {
+ ao2_find(existing_contacts, ast_sorcery_object_get_id(contact),
+ OBJ_NODATA | OBJ_UNLINK | OBJ_SEARCH_KEY);
+ ao2_link(aor_options->contacts, contact);
+ }
+ ao2_iterator_destroy(&iter);
+ }
+
+ /*
+ * If this is newly added we need to see if there are any
+ * existing dynamic contacts to add. Ones that are added
+ * after creation will occur as a result of the contact
+ * observer creation callback.
+ */
+ if (is_new) {
+ size_t prefix_len = strlen(ast_sorcery_object_get_id(aor)) + sizeof(";@") - 1;
+ char prefix[prefix_len + 1];
struct ao2_container *contacts;
- aor = ast_sip_location_retrieve_aor(aor_name);
- if (!aor) {
+ sprintf(prefix, "%s;@", ast_sorcery_object_get_id(aor)); /* Safe */
+ contacts = ast_sorcery_retrieve_by_prefix(ast_sip_get_sorcery(), "contact",
+ prefix, prefix_len);
+ if (contacts) {
+ ao2_container_dup(aor_options->dynamic_contacts, contacts, 0);
+ ao2_ref(contacts, -1);
+ }
+ }
+
+ /* Process dynamic contacts */
+ iter = ao2_iterator_init(aor_options->dynamic_contacts, 0);
+ for (; (contact = ao2_iterator_next(&iter)); ao2_ref(contact, -1)) {
+ ao2_find(existing_contacts, ast_sorcery_object_get_id(contact),
+ OBJ_NODATA | OBJ_UNLINK | OBJ_SEARCH_KEY);
+ ao2_link(aor_options->contacts, contact);
+ }
+ ao2_iterator_destroy(&iter);
+
+ /* Any contacts left no longer exist, so raise events and make them disappear */
+ ao2_callback(existing_contacts, OBJ_NODATA | OBJ_UNLINK,
+ sip_options_remove_contact, aor_options);
+ ao2_ref(existing_contacts, -1);
+
+ /*
+ * Update the available count if we transition between qualified
+ * and unqualified. In the qualified case we need to start with
+ * 0 available as the qualify process will take care of it. In
+ * the unqualified case it is based on the number of contacts
+ * present.
+ */
+ if (!aor->qualify_frequency) {
+ ao2_callback(aor_options->contacts, OBJ_NODATA,
+ sip_options_set_contact_status_unqualified, NULL);
+ aor_options->available = ao2_container_count(aor_options->contacts);
+ ast_debug(3, "AOR '%s' is unqualified, number of available contacts is therefore '%d'\n",
+ aor_options->name, aor_options->available);
+ } else if (!aor_options->qualify_frequency) {
+ ao2_callback(aor_options->contacts, OBJ_NODATA,
+ sip_options_set_contact_status_qualified, NULL);
+ aor_options->available = 0;
+ ast_debug(3, "AOR '%s' has transitioned from unqualified to qualified, reset available contacts to 0\n",
+ aor_options->name);
+ } else {
+ /*
+ * Count the number of AVAILABLE qualified contacts to ensure
+ * the count is in sync with reality.
+ */
+ aor_options->available = 0;
+ ao2_callback(aor_options->contacts, OBJ_NODATA,
+ sip_options_contact_status_available_count, &aor_options->available);
+ }
+
+ aor_options->authenticate_qualify = aor->authenticate_qualify;
+ aor_options->qualify_timeout = aor->qualify_timeout;
+
+ /*
+ * If we need to stop or start the scheduled callback then do so.
+ * This occurs due to the following:
+ * 1. The qualify frequency has changed
+ * 2. Contacts were added when previously there were none
+ * 3. There are no contacts but previously there were some
+ */
+ if (aor_options->qualify_frequency != aor->qualify_frequency
+ || (!aor_options->sched_task && ao2_container_count(aor_options->contacts))
+ || (aor_options->sched_task && !ao2_container_count(aor_options->contacts))) {
+ if (aor_options->sched_task) {
+ ast_sip_sched_task_cancel(aor_options->sched_task);
+ ao2_ref(aor_options->sched_task, -1);
+ aor_options->sched_task = NULL;
+ }
+
+ /* If there is still a qualify frequency then schedule this */
+ aor_options->qualify_frequency = aor->qualify_frequency;
+ if (aor_options->qualify_frequency
+ && ao2_container_count(aor_options->contacts)) {
+ aor_options->sched_task = ast_sip_schedule_task(aor_options->serializer,
+ sip_options_determine_initial_qualify_time(aor_options->qualify_frequency),
+ sip_options_qualify_aor, ast_taskprocessor_name(aor_options->serializer),
+ aor_options, AST_SIP_SCHED_TASK_VARIABLE | AST_SIP_SCHED_TASK_DATA_AO2);
+ if (!aor_options->sched_task) {
+ ast_log(LOG_ERROR, "Unable to schedule qualify for contacts of AOR '%s'\n",
+ aor_options->name);
+ }
+ }
+ }
+
+ ast_debug(3, "AOR '%s' now has %d available contacts\n", aor_options->name,
+ aor_options->available);
+
+ return 0;
+}
+
+/*!
+ * \brief Task to synchronize an AOR with our local state
+ * \note Run by aor_options->serializer (or management_serializer on aor_options creation)
+ */
+static int sip_options_synchronize_aor_task(void *obj)
+{
+ struct sip_options_synchronize_aor_task_data *task_data = obj;
+ int i;
+
+ ast_debug(3, "Synchronizing AOR '%s' with current state of configuration and world\n",
+ task_data->aor_options->name);
+
+ sip_options_apply_aor_configuration(task_data->aor_options, task_data->aor,
+ task_data->added);
+
+ /*
+ * Endpoint state compositors are removed in this operation but not
+ * added. To reduce the amount of work done they are done later. In
+ * the mean time things can still qualify and once an endpoint state
+ * compositor is added to the AOR it will be updated with the current
+ * state.
+ */
+ for (i = 0; i < AST_VECTOR_SIZE(&task_data->aor_options->compositors); ++i) {
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = AST_VECTOR_GET(&task_data->aor_options->compositors, i);
+
+ ao2_lock(endpoint_state_compositor);
+ endpoint_state_compositor->active = 0;
+ sip_options_update_endpoint_state_compositor_aor(endpoint_state_compositor,
+ task_data->aor_options->name, REMOVED);
+ ao2_unlock(endpoint_state_compositor);
+ }
+ AST_VECTOR_RESET(&task_data->aor_options->compositors, ao2_cleanup);
+
+ return 0;
+}
+
+/*!
+ * \brief Synchronize an AOR with our local state
+ * \note Run by management_serializer
+ */
+static int sip_options_synchronize_aor(void *obj, void *arg, int flags)
+{
+ struct sip_options_synchronize_aor_task_data task_data = {
+ .aor = obj,
+ .existing = arg,
+ };
+
+ task_data.aor_options = ao2_find(sip_options_aors,
+ ast_sorcery_object_get_id(task_data.aor), OBJ_SEARCH_KEY);
+ if (!task_data.aor_options) {
+ task_data.aor_options = sip_options_aor_alloc(task_data.aor);
+ if (!task_data.aor_options) {
+ return 0;
+ }
+
+ task_data.added = 1;
+
+ /* Nothing is aware of this AOR yet so we can just update it in this thread */
+ sip_options_synchronize_aor_task(&task_data);
+ ao2_link(sip_options_aors, task_data.aor_options);
+ } else {
+ /* This AOR already exists so we have to do manipulation in its serializer */
+ ast_sip_push_task_wait_serializer(task_data.aor_options->serializer,
+ sip_options_synchronize_aor_task, &task_data);
+ }
+
+ ao2_ref(task_data.aor_options, -1);
+
+ if (task_data.existing) {
+ ao2_find(task_data.existing, ast_sorcery_object_get_id(task_data.aor),
+ OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
+ }
+
+ return 0;
+}
+
+/*! \brief Destructor for endpoint state compositors */
+static void sip_options_endpoint_state_compositor_dtor(void *obj)
+{
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor = obj;
+
+ ao2_cleanup(endpoint_state_compositor->aor_statuses);
+}
+
+/*! \brief Hashing function for endpoint AOR status */
+AO2_STRING_FIELD_HASH_FN(sip_options_endpoint_aor_status, name);
+
+/*! \brief Comparator function for endpoint AOR status */
+AO2_STRING_FIELD_CMP_FN(sip_options_endpoint_aor_status, name);
+
+/*! \brief Find (or create) an endpoint state compositor */
+static struct sip_options_endpoint_state_compositor *sip_options_endpoint_state_compositor_find_or_alloc(const struct ast_sip_endpoint *endpoint)
+{
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ ao2_lock(sip_options_endpoint_state_compositors);
+ endpoint_state_compositor = ao2_find(sip_options_endpoint_state_compositors,
+ ast_sorcery_object_get_id(endpoint), OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (endpoint_state_compositor) {
+ ao2_unlock(sip_options_endpoint_state_compositors);
+ return endpoint_state_compositor;
+ }
+
+ endpoint_state_compositor = ao2_alloc(sizeof(*endpoint_state_compositor)
+ + strlen(ast_sorcery_object_get_id(endpoint)) + 1,
+ sip_options_endpoint_state_compositor_dtor);
+ if (!endpoint_state_compositor) {
+ ao2_unlock(sip_options_endpoint_state_compositors);
+ return NULL;
+ }
+
+ /*
+ * NOTE: The endpoint_state_compositor->aor_statuses container is
+ * externally protected by the endpoint_state_compositor lock.
+ */
+ endpoint_state_compositor->aor_statuses = ao2_container_alloc_hash(
+ AO2_ALLOC_OPT_LOCK_NOLOCK, 0, AOR_STATUS_BUCKETS,
+ sip_options_endpoint_aor_status_hash_fn, NULL,
+ sip_options_endpoint_aor_status_cmp_fn);
+ if (!endpoint_state_compositor->aor_statuses) {
+ ao2_unlock(sip_options_endpoint_state_compositors);
+ ao2_ref(endpoint_state_compositor, -1);
+ return NULL;
+ }
+
+ strcpy(endpoint_state_compositor->name, ast_sorcery_object_get_id(endpoint)); /* SAFE */
+
+ ao2_link_flags(sip_options_endpoint_state_compositors, endpoint_state_compositor,
+ OBJ_NOLOCK);
+ ao2_unlock(sip_options_endpoint_state_compositors);
+
+ return endpoint_state_compositor;
+}
+
+/*! \brief Task details for adding an AOR to an endpoint state compositor */
+struct sip_options_endpoint_compositor_task_data {
+ /*! \brief The AOR options that the endpoint state compositor should be added to */
+ struct sip_options_aor *aor_options;
+ /*! \brief The endpoint state compositor */
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+};
+
+/*!
+ * \brief Task which adds an AOR to an endpoint state compositor
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_endpoint_compositor_add_task(void *obj)
+{
+ struct sip_options_endpoint_compositor_task_data *task_data = obj;
+
+ ast_debug(3, "Adding endpoint compositor '%s' to AOR '%s'\n",
+ task_data->endpoint_state_compositor->name, task_data->aor_options->name);
+
+ AST_VECTOR_APPEND(&task_data->aor_options->compositors,
+ ao2_bump(task_data->endpoint_state_compositor));
+
+ ao2_lock(task_data->endpoint_state_compositor);
+ sip_options_update_endpoint_state_compositor_aor(task_data->endpoint_state_compositor,
+ task_data->aor_options->name,
+ task_data->aor_options->available ? AVAILABLE : UNAVAILABLE);
+ ao2_unlock(task_data->endpoint_state_compositor);
+
+ return 0;
+}
+
+/*!
+ * \brief Task which adds removes an AOR from an endpoint state compositor
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_endpoint_compositor_remove_task(void *obj)
+{
+ struct sip_options_endpoint_compositor_task_data *task_data = obj;
+ int i;
+
+ ast_debug(3, "Removing endpoint compositor '%s' from AOR '%s'\n",
+ task_data->endpoint_state_compositor->name,
+ task_data->aor_options->name);
+
+ for (i = 0; i < AST_VECTOR_SIZE(&task_data->aor_options->compositors); ++i) {
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = AST_VECTOR_GET(&task_data->aor_options->compositors, i);
+ if (endpoint_state_compositor != task_data->endpoint_state_compositor) {
continue;
}
- contacts = ast_sip_location_retrieve_aor_contacts(aor);
- if (contacts) {
- ast_cli(cli_fd, "Sending qualify to endpoint %s\n", endpoint_name);
- ao2_callback_data(contacts, OBJ_NODATA, cli_on_contact, &cli_fd, endpoint);
- ao2_ref(contacts, -1);
- }
-
- ao2_ref(aor, -1);
+ AST_VECTOR_REMOVE(&task_data->aor_options->compositors, i, 0);
+ ao2_ref(endpoint_state_compositor, -1);
+ break;
}
+
return 0;
}
+
+/*!
+ * \brief Synchronize an endpoint with our local state
+ * \note Run by management_serializer
+ */
+static int sip_options_synchronize_endpoint(void *obj, void *arg, int flags)
+{
+ struct ast_sip_endpoint *endpoint = obj;
+ struct ast_sip_aor *aor = arg;
+ char *aors;
+ char *aor_name;
+ struct sip_options_endpoint_compositor_task_data task_data = { NULL, };
+
+ if (ast_strlen_zero(endpoint->aors)) {
+ /* There are no AORs, so really... who the heck knows */
+ ast_debug(3, "Endpoint '%s' is not interested in any AORs so not creating endpoint state compositor\n",
+ ast_sorcery_object_get_id(endpoint));
+ return 0;
+ }
+
+ ast_debug(3, "Synchronizing endpoint '%s' with AORs '%s'\n",
+ ast_sorcery_object_get_id(endpoint), endpoint->aors);
+
+ aors = ast_strdupa(endpoint->aors);
+ while ((aor_name = ast_strip(strsep(&aors, ",")))) {
+ if (ast_strlen_zero(aor_name)) {
+ continue;
+ }
+ if (aor && strcasecmp(ast_sorcery_object_get_id(aor), aor_name)) {
+ ast_debug(3, "Filtered AOR '%s' on endpoint '%s' as we are looking for '%s'\n",
+ aor_name, ast_sorcery_object_get_id(endpoint),
+ ast_sorcery_object_get_id(aor));
+ continue;
+ }
+
+ task_data.aor_options = ao2_find(sip_options_aors, aor_name, OBJ_SEARCH_KEY);
+ if (!task_data.aor_options) {
+ /*
+ * They have referenced an invalid AOR. If that's all they've
+ * done we will set them to offline at the end.
+ */
+ ast_debug(3, "Endpoint '%s' referenced invalid AOR '%s'\n",
+ ast_sorcery_object_get_id(endpoint), aor_name);
+ continue;
+ }
+
+ if (!task_data.endpoint_state_compositor) {
+ /*
+ * We create an endpoint state compositor only after we know
+ * for sure we need it.
+ */
+ task_data.endpoint_state_compositor =
+ sip_options_endpoint_state_compositor_find_or_alloc(endpoint);
+ if (!task_data.endpoint_state_compositor) {
+ ast_log(LOG_WARNING,
+ "Could not create endpoint state compositor for '%s', endpoint state will be incorrect\n",
+ ast_sorcery_object_get_id(endpoint));
+ ao2_ref(task_data.aor_options, -1);
+ ast_sip_persistent_endpoint_update_state(ast_sorcery_object_get_id(endpoint),
+ AST_ENDPOINT_OFFLINE);
+ return 0;
+ }
+ }
+
+ /* We use a synchronous task so that we don't flood the system */
+ ast_sip_push_task_wait_serializer(task_data.aor_options->serializer,
+ sip_options_endpoint_compositor_add_task, &task_data);
+
+ ao2_ref(task_data.aor_options, -1);
+
+ /*
+ * If we filtered on a specific AOR name then the endpoint can
+ * only reference it once so break early.
+ */
+ if (aor) {
+ break;
+ }
+ }
+
+ if (task_data.endpoint_state_compositor) {
+ /*
+ * If an endpoint state compositor is present determine the current state
+ * of the endpoint and update it.
+ */
+ ao2_lock(task_data.endpoint_state_compositor);
+ task_data.endpoint_state_compositor->active = 1;
+ ast_sip_persistent_endpoint_update_state(ast_sorcery_object_get_id(endpoint),
+ sip_options_get_endpoint_state_compositor_state(task_data.endpoint_state_compositor));
+ ao2_unlock(task_data.endpoint_state_compositor);
+
+ ao2_ref(task_data.endpoint_state_compositor, -1);
+ } else {
+ /* If there is none then they may have referenced an invalid AOR or none at all */
+ ast_debug(3, "Endpoint '%s' has no AORs feeding it, setting it to offline state as default\n",
+ ast_sorcery_object_get_id(endpoint));
+ ast_sip_persistent_endpoint_update_state(ast_sorcery_object_get_id(endpoint),
+ AST_ENDPOINT_OFFLINE);
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Task which removes an AOR from all of the ESCs it is reporting to
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_aor_remove_task(void *obj)
+{
+ struct sip_options_aor *aor_options = obj;
+
+ sip_options_notify_endpoint_state_compositors(aor_options, REMOVED);
+
+ if (aor_options->sched_task) {
+ ast_sip_sched_task_cancel(aor_options->sched_task);
+ ao2_ref(aor_options->sched_task, -1);
+ aor_options->sched_task = NULL;
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Callback which removes any unused AORs that remained after reloading
+ * \note Run by management_serializer
+ */
+static int sip_options_unused_aor(void *obj, void *arg, int flags)
+{
+ struct sip_options_aor *aor_options = obj;
+
+ ast_debug(3, "AOR '%s' is no longer configured, removing it\n", aor_options->name);
+
+ ast_sip_push_task_wait_serializer(aor_options->serializer, sip_options_aor_remove_task,
+ aor_options);
+ ao2_unlink(sip_options_aors, aor_options);
+
+ return CMP_MATCH;
+}
+
+/*!
+ * \brief Callback function used to unlink and remove event state compositors that have no AORs feeding them
+ * \note Run by management_serializer
+ */
+static int sip_options_unused_endpoint_state_compositor(void *obj, void *arg, int flags)
+{
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor = obj;
+
+ if (ao2_container_count(endpoint_state_compositor->aor_statuses)) {
+ return 0;
+ }
+
+ /* No AORs are feeding this endpoint state compositor */
+ ast_sip_persistent_endpoint_update_state(endpoint_state_compositor->name,
+ AST_ENDPOINT_OFFLINE);
+
+ return CMP_MATCH;
+}
+
+/*! \brief Structure which contains information required to synchronize */
+struct sip_options_synchronize_task_data {
+ /*! \brief Whether this is a reload or not */
+ int reload;
+};
+
+/*!
+ * \brief Task to synchronize our local container of AORs and endpoint state compositors with the current configuration
+ * \note Run by management_serializer
+ */
+static int sip_options_synchronize_task(void *obj)
+{
+ struct sip_options_synchronize_task_data *task_data = obj;
+ struct ao2_container *existing = NULL;
+ struct ao2_container *objects;
+
+ /*
+ * When reloading we keep track of the existing AORs so we can
+ * terminate old ones that are no longer referenced or used.
+ */
+ if (task_data->reload) {
+ existing = ao2_container_clone(sip_options_aors, 0);
+ if (!existing) {
+ return 0;
+ }
+ }
+
+ objects = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "aor",
+ AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ if (objects) {
+ /* Go through the returned AORs and synchronize with our local state */
+ ao2_callback(objects, OBJ_NODATA, sip_options_synchronize_aor, existing);
+ ao2_ref(objects, -1);
+ }
+
+ /*
+ * Any AORs remaining in existing are no longer referenced by
+ * the current container of AORs we retrieved, so remove them.
+ */
+ if (existing) {
+ ao2_callback(existing, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK,
+ sip_options_unused_aor, NULL);
+ ao2_ref(existing, -1);
+ }
+
+ objects = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "endpoint",
+ AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ if (objects) {
+ /* Go through the provided endpoints and update AORs */
+ ao2_callback(objects, OBJ_NODATA, sip_options_synchronize_endpoint, NULL);
+ ao2_ref(objects, -1);
+ }
+
+ /*
+ * All endpoint state compositors that don't have any AORs
+ * feeding them information can be removed. If they end
+ * up getting needed later they'll just be recreated.
+ */
+ ao2_callback(sip_options_endpoint_state_compositors,
+ OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK,
+ sip_options_unused_endpoint_state_compositor, NULL);
+
+ return 0;
+}
+
+/*! \brief Synchronize our local container of AORs and endpoint state compositors with the current configuration */
+static void sip_options_synchronize(int reload)
+{
+ struct sip_options_synchronize_task_data task_data = {
+ .reload = reload,
+ };
+
+ ast_sip_push_task_wait_serializer(management_serializer, sip_options_synchronize_task,
+ &task_data);
+}
+
+/*!
+ * \brief Unlink AORs feeding the endpoint status compositor
+ * \note Run by management_serializer
+ */
+static void sip_options_endpoint_unlink_aor_feeders(struct ast_sip_endpoint *endpoint,
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor)
+{
+ struct ao2_iterator it_aor_statuses;
+ struct sip_options_endpoint_aor_status *aor_status;
+ struct sip_options_endpoint_compositor_task_data task_data = {
+ .endpoint_state_compositor = endpoint_state_compositor,
+ };
+
+ ao2_lock(endpoint_state_compositor);
+ endpoint_state_compositor->active = 0;
+
+ /* Unlink AOR feeders pointing to endpoint */
+ it_aor_statuses = ao2_iterator_init(endpoint_state_compositor->aor_statuses, 0);
+ for (; (aor_status = ao2_iterator_next(&it_aor_statuses)); ao2_ref(aor_status, -1)) {
+ task_data.aor_options = ao2_find(sip_options_aors, aor_status->name,
+ OBJ_SEARCH_KEY);
+ if (!task_data.aor_options) {
+ continue;
+ }
+
+ ast_debug(3, "Removing endpoint state compositor '%s' from AOR '%s'\n",
+ ast_sorcery_object_get_id(endpoint), aor_status->name);
+ ao2_unlock(endpoint_state_compositor);
+ ast_sip_push_task_wait_serializer(task_data.aor_options->serializer,
+ sip_options_endpoint_compositor_remove_task, &task_data);
+ ao2_lock(endpoint_state_compositor);
+ ao2_ref(task_data.aor_options, -1);
+ }
+ ao2_iterator_destroy(&it_aor_statuses);
+
+ /*
+ * We do not need to remove the AOR feeder status memory from the
+ * aor_statuses container. The endpoint_state_compositor is about
+ * to die and do it for us.
+ */
+
+ ao2_unlock(endpoint_state_compositor);
+}
+
+/*!
+ * \brief Task to delete an endpoint from the known universe
+ * \note Run by management_serializer
+ */
+static int sip_options_endpoint_observer_deleted_task(void *obj)
+{
+ struct ast_sip_endpoint *endpoint = obj;
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ endpoint_state_compositor = ao2_find(sip_options_endpoint_state_compositors,
+ ast_sorcery_object_get_id(endpoint), OBJ_SEARCH_KEY | OBJ_UNLINK);
+ if (!endpoint_state_compositor) {
+ return 0;
+ }
+
+ ast_debug(3, "Endpoint '%s' has been deleted, removing endpoint state compositor from AORs\n",
+ ast_sorcery_object_get_id(endpoint));
+ sip_options_endpoint_unlink_aor_feeders(endpoint, endpoint_state_compositor);
+ ao2_ref(endpoint_state_compositor, -1);
+
+ return 0;
+}
+
+/*! \brief Observer callback invoked on endpoint deletion */
+static void endpoint_observer_deleted(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_endpoint_observer_deleted_task, (void *) obj);
+}
+
+/*!
+ * \brief Task to synchronize the endpoint
+ * \note Run by management_serializer
+ */
+static int sip_options_endpoint_observer_modified_task(void *obj)
+{
+ struct ast_sip_endpoint *endpoint = obj;
+ struct sip_options_endpoint_state_compositor *endpoint_state_compositor;
+
+ ast_debug(3, "Endpoint '%s' has been created or modified, updating state\n",
+ ast_sorcery_object_get_id(endpoint));
+
+ endpoint_state_compositor = ao2_find(sip_options_endpoint_state_compositors,
+ ast_sorcery_object_get_id(endpoint), OBJ_SEARCH_KEY | OBJ_UNLINK);
+ if (endpoint_state_compositor) {
+ /* Unlink the AORs currently feeding the endpoint. */
+ sip_options_endpoint_unlink_aor_feeders(endpoint, endpoint_state_compositor);
+ ao2_ref(endpoint_state_compositor, -1);
+ }
+
+ /* Connect the AORs that now feed the endpoint. */
+ sip_options_synchronize_endpoint(endpoint, NULL, 0);
+ return 0;
+}
+
+/*! \brief Observer callback invoked on endpoint creation or modification */
+static void endpoint_observer_modified(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_endpoint_observer_modified_task, (void *)obj);
+}
+
+/*! \brief Observer callbacks for endpoints */
+static const struct ast_sorcery_observer endpoint_observer_callbacks = {
+ .created = endpoint_observer_modified,
+ .updated = endpoint_observer_modified,
+ .deleted = endpoint_observer_deleted,
+};
+
+/*!
+ * \brief Task to synchronize an AOR with our local state
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_update_aor_task(void *obj)
+{
+ struct sip_options_synchronize_aor_task_data *task_data = obj;
+ int available = task_data->aor_options->available;
+
+ ast_debug(3, "Individually updating AOR '%s' with current state of configuration and world\n",
+ task_data->aor_options->name);
+
+ sip_options_apply_aor_configuration(task_data->aor_options, task_data->aor,
+ task_data->added);
+
+ if (!available && task_data->aor_options->available) {
+ ast_debug(3, "After modifying AOR '%s' it has now become available\n",
+ task_data->aor_options->name);
+ sip_options_notify_endpoint_state_compositors(task_data->aor_options, AVAILABLE);
+ } else if (available && !task_data->aor_options->available) {
+ ast_debug(3, "After modifying AOR '%s' it has become unavailable\n",
+ task_data->aor_options->name);
+ sip_options_notify_endpoint_state_compositors(task_data->aor_options, UNAVAILABLE);
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Task to synchronize the AOR
+ * \note Run by management_serializer
+ */
+static int sip_options_aor_observer_modified_task(void *obj)
+{
+ struct ast_sip_aor *aor = obj;
+ struct sip_options_aor *aor_options;
+
+ aor_options = ao2_find(sip_options_aors, ast_sorcery_object_get_id(aor),
+ OBJ_SEARCH_KEY);
+ if (!aor_options) {
+ struct ao2_container *endpoints;
+
+ aor_options = sip_options_aor_alloc(aor);
+ if (!aor_options) {
+ return 0;
+ }
+
+ /*
+ * This is a newly added AOR and we need to establish any
+ * endpoint state compositors that may reference only the
+ * AOR. If these need to be updated later then they'll
+ * be done by modifying the endpoint or issuing a reload.
+ */
+ sip_options_apply_aor_configuration(aor_options, aor, 1);
+ ao2_link(sip_options_aors, aor_options);
+
+ /*
+ * Using LIKE doesn't seem to work very well with non-realtime so we
+ * fetch everything right now and do a filter on our side.
+ */
+ endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+ "endpoint", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ if (endpoints) {
+ ao2_callback(endpoints, OBJ_NODATA, sip_options_synchronize_endpoint, aor);
+ ao2_ref(endpoints, -1);
+ }
+ } else {
+ struct sip_options_synchronize_aor_task_data task_data = {
+ .aor_options = aor_options,
+ .aor = aor,
+ };
+
+ /*
+ * If this AOR was modified we have to do our work in its serializer
+ * instead of this thread to ensure that things aren't modified by
+ * multiple threads.
+ */
+ ast_sip_push_task_wait_serializer(aor_options->serializer,
+ sip_options_update_aor_task, &task_data);
+ }
+
+ ao2_ref(aor_options, -1);
+
+ return 0;
+}
+
+/*! \brief Observer callback invoked on AOR creation or modification */
+static void aor_observer_modified(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_aor_observer_modified_task, (void *) obj);
+}
+
+/*!
+ * \brief Task to delete an AOR from the known universe
+ * \note Run by management_serializer
+ */
+static int sip_options_aor_observer_deleted_task(void *obj)
+{
+ struct ast_sip_aor *aor = obj;
+ struct sip_options_aor *aor_options;
+
+ aor_options = ao2_find(sip_options_aors, ast_sorcery_object_get_id(aor),
+ OBJ_SEARCH_KEY | OBJ_UNLINK);
+ if (!aor_options) {
+ return 0;
+ }
+
+ ast_debug(3, "AOR '%s' has been deleted, removing it\n", aor_options->name);
+
+ ast_sip_push_task_wait_serializer(aor_options->serializer, sip_options_aor_remove_task,
+ aor_options);
+ ao2_ref(aor_options, -1);
+
+ return 0;
+}
+
+/*! \brief Observer callback invoked on AOR deletion */
+static void aor_observer_deleted(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_aor_observer_deleted_task, (void *) obj);
+}
+
+/*! \brief Observer callbacks for AORs */
+static const struct ast_sorcery_observer aor_observer_callbacks = {
+ .created = aor_observer_modified,
+ .updated = aor_observer_modified,
+ .deleted = aor_observer_deleted,
+};
+
+/*! \brief Task details for adding an AOR to an endpoint state compositor */
+struct sip_options_contact_observer_task_data {
+ /*! \brief The AOR options that the contact is referring to */
+ struct sip_options_aor *aor_options;
+ /*! \brief The contact itself */
+ struct ast_sip_contact *contact;
+};
+
+/*!
+ * \brief Task which adds a dynamic contact to an AOR
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_contact_add_task(void *obj)
+{
+ struct sip_options_contact_observer_task_data *task_data = obj;
+ struct ast_sip_contact_status *contact_status;
+
+ ao2_link(task_data->aor_options->dynamic_contacts, task_data->contact);
+ ao2_link(task_data->aor_options->contacts, task_data->contact);
+
+ contact_status = ast_res_pjsip_find_or_create_contact_status(task_data->contact);
+ if (contact_status) {
+ if (!task_data->aor_options->qualify_frequency
+ && contact_status->status == CREATED) {
+ sip_options_set_contact_status(contact_status, UNKNOWN);
+ }
+ ao2_ref(contact_status, -1);
+ }
+
+ if (task_data->aor_options->qualify_frequency) {
+ /* If this is the first contact we need to schedule up qualification */
+ if (ao2_container_count(task_data->aor_options->contacts) == 1) {
+ ast_debug(3, "Starting scheduled callback on AOR '%s' for qualifying as there is now a contact on it\n",
+ task_data->aor_options->name);
+ /*
+ * We immediately schedule the initial qualify so that we get
+ * reachable/unreachable as soon as possible. Realistically
+ * since they pretty much just registered they should be
+ * reachable.
+ */
+ if (task_data->aor_options->sched_task) {
+ ast_sip_sched_task_cancel(task_data->aor_options->sched_task);
+ ao2_ref(task_data->aor_options->sched_task, -1);
+ task_data->aor_options->sched_task = NULL;
+ }
+ task_data->aor_options->sched_task = ast_sip_schedule_task(
+ task_data->aor_options->serializer, 1, sip_options_qualify_aor,
+ ast_taskprocessor_name(task_data->aor_options->serializer),
+ task_data->aor_options,
+ AST_SIP_SCHED_TASK_VARIABLE | AST_SIP_SCHED_TASK_DATA_AO2);
+ if (!task_data->aor_options->sched_task) {
+ ast_log(LOG_ERROR, "Unable to schedule qualify for contacts of AOR '%s'\n",
+ task_data->aor_options->name);
+ }
+ }
+ } else {
+ /*
+ * If this was the first contact added to a non-qualified AOR then
+ * it should become available.
+ */
+ task_data->aor_options->available =
+ ao2_container_count(task_data->aor_options->contacts);
+ if (task_data->aor_options->available == 1) {
+ ast_debug(3, "An unqualified contact has been added to AOR '%s' so it is now available\n",
+ task_data->aor_options->name);
+ sip_options_notify_endpoint_state_compositors(task_data->aor_options,
+ AVAILABLE);
+ }
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Task to add a dynamic contact to an AOR in its serializer
+ * \note Run by management_serializer
+ */
+static int sip_options_contact_add_management_task(void *obj)
+{
+ struct sip_options_contact_observer_task_data task_data;
+
+ task_data.contact = obj;
+ task_data.aor_options = ao2_find(sip_options_aors, task_data.contact->aor,
+ OBJ_SEARCH_KEY);
+ if (!task_data.aor_options) {
+ struct ast_sip_aor *aor;
+
+ /*
+ * The only reason this would occur is if the AOR was sourced
+ * after the last reload happened. To handle this we fetch the
+ * AOR and treat it as if we received notification that it had
+ * been created. This will create the needed AOR feeder
+ * compositor and will cause any associated contact statuses and
+ * endpoint state compositors to also get created if needed.
+ */
+ aor = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "aor",
+ task_data.contact->aor);
+ if (aor) {
+ sip_options_aor_observer_modified_task(aor);
+ ao2_ref(aor, -1);
+ }
+ return 0;
+ }
+
+ ast_sip_push_task_wait_serializer(task_data.aor_options->serializer,
+ sip_options_contact_add_task, &task_data);
+ ao2_ref(task_data.aor_options, -1);
+
+ return 0;
+}
+
+/*! \brief Observer callback invoked on contact creation */
+static void contact_observer_created(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_contact_add_management_task, (void *) obj);
+}
+
+/*!
+ * \brief Task which updates a dynamic contact to an AOR
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_contact_update_task(void *obj)
+{
+ struct sip_options_contact_observer_task_data *task_data = obj;
+ struct ast_sip_contact_status *contact_status;
+
+ contact_status = ast_sip_get_contact_status(task_data->contact);
+ if (contact_status) {
+ switch (contact_status->status) {
+ case CREATED:
+ sip_options_set_contact_status(contact_status, UNKNOWN);
+ break;
+ case UNAVAILABLE:
+ case AVAILABLE:
+ case UNKNOWN:
+ /* Refresh the ContactStatus AMI events. */
+ sip_options_contact_status_update(contact_status);
+ break;
+ case REMOVED:
+ break;
+ }
+ ao2_ref(contact_status, -1);
+ }
+
+ ao2_ref(task_data->contact, -1);
+ ao2_ref(task_data->aor_options, -1);
+ ast_free(task_data);
+ return 0;
+}
+
+/*! \brief Observer callback invoked on contact update */
+static void contact_observer_updated(const void *obj)
+{
+ struct sip_options_contact_observer_task_data *task_data;
+
+ task_data = ast_malloc(sizeof(*task_data));
+ if (!task_data) {
+ return;
+ }
+
+ task_data->contact = (struct ast_sip_contact *) obj;
+ task_data->aor_options = ao2_find(sip_options_aors, task_data->contact->aor,
+ OBJ_SEARCH_KEY);
+ if (!task_data->aor_options) {
+ ao2_cleanup(task_data->aor_options);
+ ast_free(task_data);
+ return;
+ }
+
+ ao2_ref(task_data->contact, +1);
+ if (ast_sip_push_task(task_data->aor_options->serializer,
+ sip_options_contact_update_task, task_data)) {
+ ao2_ref(task_data->contact, -1);
+ ao2_ref(task_data->aor_options, -1);
+ ast_free(task_data);
+ }
+}
+
+/*!
+ * \brief Task which deletes a dynamic contact from an AOR
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_contact_delete_task(void *obj)
+{
+ struct sip_options_contact_observer_task_data *task_data = obj;
+
+ ao2_find(task_data->aor_options->dynamic_contacts, task_data->contact,
+ OBJ_NODATA | OBJ_UNLINK | OBJ_SEARCH_OBJECT);
+ ao2_find(task_data->aor_options->contacts, task_data->contact,
+ OBJ_NODATA | OBJ_UNLINK | OBJ_SEARCH_OBJECT);
+
+ sip_options_remove_contact_status(task_data->aor_options, task_data->contact);
+
+ if (task_data->aor_options->qualify_frequency) {
+ /* If this is the last contact then we need to stop the scheduled callback */
+ if (!ao2_container_count(task_data->aor_options->contacts)) {
+ ast_debug(3, "Terminating scheduled callback on AOR '%s' as there are no contacts to qualify\n",
+ task_data->aor_options->name);
+ if (task_data->aor_options->sched_task) {
+ ast_sip_sched_task_cancel(task_data->aor_options->sched_task);
+ ao2_ref(task_data->aor_options->sched_task, -1);
+ task_data->aor_options->sched_task = NULL;
+ }
+ }
+ } else {
+ task_data->aor_options->available =
+ ao2_container_count(task_data->aor_options->contacts);
+ if (!task_data->aor_options->available) {
+ ast_debug(3, "An unqualified contact has been removed from AOR '%s' leaving no remaining contacts\n",
+ task_data->aor_options->name);
+ sip_options_notify_endpoint_state_compositors(task_data->aor_options,
+ UNAVAILABLE);
+ }
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Task to delete a contact from an AOR in its serializer
+ * \note Run by management_serializer
+ */
+static int sip_options_contact_delete_management_task(void *obj)
+{
+ struct sip_options_contact_observer_task_data task_data;
+
+ task_data.contact = obj;
+ task_data.aor_options = ao2_find(sip_options_aors, task_data.contact->aor,
+ OBJ_SEARCH_KEY);
+ if (!task_data.aor_options) {
+ /* For contacts that are deleted we don't really care if there is no AOR locally */
+ return 0;
+ }
+
+ ast_sip_push_task_wait_serializer(task_data.aor_options->serializer,
+ sip_options_contact_delete_task, &task_data);
+ ao2_ref(task_data.aor_options, -1);
+
+ return 0;
+}
+
+/*! \brief Observer callback invoked on contact deletion */
+static void contact_observer_deleted(const void *obj)
+{
+ ast_sip_push_task_wait_serializer(management_serializer,
+ sip_options_contact_delete_management_task, (void *) obj);
+}
+
+/*! \brief Observer callbacks for contacts */
+static const struct ast_sorcery_observer contact_observer_callbacks = {
+ .created = contact_observer_created,
+ .updated = contact_observer_updated,
+ .deleted = contact_observer_deleted,
+};
static char *cli_qualify(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
const char *endpoint_name;
- struct qualify_data *qual_data;
+ char *aors;
+ char *aor_name;
switch (cmd) {
case CLI_INIT:
@@ -907,38 +2324,34 @@
endpoint_name = a->argv[2];
- if (!(endpoint = ast_sorcery_retrieve_by_id(
- ast_sip_get_sorcery(), "endpoint", endpoint_name))) {
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
+ endpoint_name);
+ if (!endpoint) {
ast_cli(a->fd, "Unable to retrieve endpoint %s\n", endpoint_name);
return CLI_FAILURE;
}
- qual_data = qualify_data_alloc(endpoint, a->fd);
- if (!qual_data) {
+ if (ast_strlen_zero(endpoint->aors)) {
+ ast_cli(a->fd, "No AORs configured for endpoint '%s'\n", endpoint_name);
return CLI_FAILURE;
}
- if (ast_sip_push_task(NULL, cli_qualify_contacts, qual_data)) {
- qualify_data_destroy(qual_data);
- return CLI_FAILURE;
+ aors = ast_strdupa(endpoint->aors);
+ while ((aor_name = ast_strip(strsep(&aors, ",")))) {
+ struct sip_options_aor *aor_options;
+
+ aor_options = ao2_find(sip_options_aors, aor_name, OBJ_SEARCH_KEY);
+ if (!aor_options) {
+ continue;
+ }
+
+ ast_cli(a->fd, "Qualifying AOR '%s' on endpoint '%s'\n", aor_name, endpoint_name);
+ ast_sip_push_task_wait_serializer(aor_options->serializer, sip_options_qualify_aor,
+ aor_options);
+ ao2_ref(aor_options, -1);
}
return CLI_SUCCESS;
-}
-
-/*!
- * \internal
- * \brief Send qualify request to the given contact.
- */
-static int ami_contact_cb(void *obj, void *arg, int flags)
-{
- struct ast_sip_contact *contact = obj;
-
- ao2_ref(contact, +1);
- if (ast_sip_push_task(NULL, qualify_contact_task, contact)) {
- ao2_ref(contact, -1);
- }
- return 0;
}
static int ami_sip_qualify(struct mansession *s, const struct message *m)
@@ -968,21 +2381,16 @@
aors = ast_strdupa(endpoint->aors);
while ((aor_name = ast_strip(strsep(&aors, ",")))) {
- struct ast_sip_aor *aor;
- struct ao2_container *contacts;
+ struct sip_options_aor *aor_options;
- aor = ast_sip_location_retrieve_aor(aor_name);
- if (!aor) {
+ aor_options = ao2_find(sip_options_aors, aor_name, OBJ_SEARCH_KEY);
+ if (!aor_options) {
continue;
}
- contacts = ast_sip_location_retrieve_aor_contacts(aor);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA, ami_contact_cb, NULL);
- ao2_ref(contacts, -1);
- }
-
- ao2_ref(aor, -1);
+ ast_sip_push_task_wait_serializer(aor_options->serializer, sip_options_qualify_aor,
+ aor_options);
+ ao2_ref(aor_options, -1);
}
astman_send_ack(s, m, "Endpoint found, will qualify");
@@ -993,233 +2401,6 @@
AST_CLI_DEFINE(cli_qualify, "Send an OPTIONS request to a PJSIP endpoint")
};
-static int sched_qualifies_hash_fn(const void *obj, int flags)
-{
- const struct sched_data *object;
- const struct ast_sip_contact *key;
-
- switch (flags & OBJ_SEARCH_MASK) {
- case OBJ_SEARCH_KEY:
- key = obj;
- break;
- case OBJ_SEARCH_OBJECT:
- object = obj;
- key = object->contact;
- break;
- default:
- /* Hash can only work on something with a full key. */
- ast_assert(0);
- return 0;
- }
- return ast_str_hash(ast_sorcery_object_get_id(key));
-}
-
-static int sched_qualifies_cmp_fn(void *obj, void *arg, int flags)
-{
- const struct sched_data *object_left = obj;
- const struct sched_data *object_right = arg;
- struct ast_sip_contact *right_key = arg;
- int cmp;
-
- switch (flags & OBJ_SEARCH_MASK) {
- case OBJ_SEARCH_OBJECT:
- right_key = object_right->contact;
- /* Fall through */
- case OBJ_SEARCH_KEY:
- cmp = strcmp(ast_sorcery_object_get_id(object_left->contact),
- ast_sorcery_object_get_id(right_key));
- break;
- case OBJ_SEARCH_PARTIAL_KEY:
- /* Not supported by container. */
- ast_assert(0);
- return 0;
- default:
- /*
- * What arg points to is specific to this traversal callback
- * and has no special meaning to astobj2.
- */
- cmp = 0;
- break;
- }
- if (cmp) {
- return 0;
- }
- /*
- * At this point the traversal callback is identical to a sorted
- * container.
- */
- return CMP_MATCH;
-}
-
-static int rtt_start_handler(const struct aco_option *opt,
- struct ast_variable *var, void *obj)
-{
- struct ast_sip_contact_status *status = obj;
- long int sec, usec;
-
- if (sscanf(var->value, "%ld.%06ld", &sec, &usec) != 2) {
- return -1;
- }
-
- status->rtt_start = ast_tv(sec, usec);
-
- return 0;
-}
-
-static int rtt_start_to_str(const void *obj, const intptr_t *args, char **buf)
-{
- const struct ast_sip_contact_status *status = obj;
-
- if (ast_asprintf(buf, "%ld.%06ld", (long)status->rtt_start.tv_sec, (long)status->rtt_start.tv_usec) == -1) {
- return -1;
- }
-
- return 0;
-}
-
-static char status_value_unknown[2];
-static char status_value_created[2];
-
-int ast_sip_initialize_sorcery_qualify(void)
-{
- struct ast_sorcery *sorcery = ast_sip_get_sorcery();
-
- /* initialize sorcery ast_sip_contact_status resource */
- ast_sorcery_apply_default(sorcery, CONTACT_STATUS, "memory", NULL);
- ast_sorcery_object_set_congestion_levels(sorcery, CONTACT_STATUS, -1,
- 3 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
-
- if (ast_sorcery_internal_object_register(sorcery, CONTACT_STATUS,
- contact_status_alloc, NULL, NULL)) {
- ast_log(LOG_ERROR, "Unable to register ast_sip_contact_status in sorcery\n");
- return -1;
- }
-
- snprintf(status_value_unknown, sizeof(status_value_unknown), "%u", UNKNOWN);
- ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "last_status",
- status_value_unknown, OPT_UINT_T, 0, FLDSET(struct ast_sip_contact_status, last_status));
- snprintf(status_value_created, sizeof(status_value_created), "%u", CREATED);
- ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "status",
- status_value_created, OPT_UINT_T, 0, FLDSET(struct ast_sip_contact_status, status));
- ast_sorcery_object_field_register_custom_nodoc(sorcery, CONTACT_STATUS, "rtt_start",
- "0.0", rtt_start_handler, rtt_start_to_str, NULL, 0, 0);
- ast_sorcery_object_field_register_nodoc(sorcery, CONTACT_STATUS, "rtt",
- "0", OPT_UINT_T, 0, FLDSET(struct ast_sip_contact_status, rtt));
-
- return 0;
-}
-
-static void qualify_and_schedule_contact(struct ast_sip_contact *contact)
-{
- int initial_interval;
- int max_time = ast_sip_get_max_initial_qualify_time();
-
- /* Delay initial qualification by a random fraction of the specified interval */
- if (max_time && max_time < contact->qualify_frequency) {
- initial_interval = max_time;
- } else {
- initial_interval = contact->qualify_frequency;
- }
-
- initial_interval = (int)((initial_interval * 1000) * ast_random_double());
-
- unschedule_qualify(contact);
- if (contact->qualify_frequency) {
- schedule_qualify(contact, initial_interval);
- } else {
- update_contact_status(contact, UNKNOWN, 0);
- }
-}
-
-static int qualify_and_schedule_cb_with_aor(void *obj, void *arg, int flags)
-{
- struct ast_sip_contact *contact = obj;
- struct ast_sip_aor *aor = arg;
-
- contact->qualify_frequency = aor->qualify_frequency;
- contact->qualify_timeout = aor->qualify_timeout;
- contact->authenticate_qualify = aor->authenticate_qualify;
-
- qualify_and_schedule_contact(contact);
-
- return 0;
-}
-
-static int qualify_and_schedule_cb_without_aor(void *obj, void *arg, int flags)
-{
- /*
- * These are really dynamic contacts. We need to retrieve the aor associated
- * with the contact since it's possible some of the aor's fields were updated
- * since last load.
- */
- struct ast_sip_contact *contact = obj;
- struct ast_sip_aor *aor = ast_sip_location_retrieve_aor(contact->aor);
-
- if (aor) {
- qualify_and_schedule_cb_with_aor(obj, aor, flags);
- ao2_ref(aor, -1);
- } else {
- ast_log(LOG_WARNING, "Unable to locate AOR for contact '%s'. Keeping old "
- "associated settings: frequency=%d, timeout=%f, authenticate=%s\n",
- contact->uri, contact->qualify_frequency, contact->qualify_timeout,
- contact->authenticate_qualify ? "yes" : "no");
- qualify_and_schedule_contact(contact);
- }
-
- return 0;
-}
-
-/*!
- * \internal
- * \brief Qualify and schedule an aor's contacts
- *
- * \details For the given aor check if it has permanent contacts,
- * qualify all contacts and schedule for checks if configured.
- */
-static int qualify_and_schedule_all_cb(void *obj, void *arg, int flags)
-{
- struct ast_sip_aor *aor = obj;
- struct ao2_container *contacts;
-
- if (aor->permanent_contacts) {
- contacts = ast_sip_location_retrieve_aor_contacts(aor);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA, qualify_and_schedule_cb_with_aor, aor);
- ao2_ref(contacts, -1);
- }
- }
-
- return 0;
-}
-
-static void qualify_and_schedule_all(void)
-{
- struct ao2_container *aors;
- struct ao2_container *contacts;
-
- /*
- * It's possible that the AOR had some of it's fields updated prior to a
- * reload. For instance qualifying could have been turned on or off by
- * setting the qualify_frequency. Due to this we have to iterate through
- * all contacts (static and dynamic), and not just ones where the frequency
- * is greater than zero, updating any contact fields with the AOR's values.
- */
-
- aors = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "aor", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
-
- if (aors) {
- ao2_callback(aors, OBJ_NODATA, qualify_and_schedule_all_cb, NULL);
- ao2_ref(aors, -1);
- }
-
- contacts = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "contact", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA, qualify_and_schedule_cb_without_aor, NULL);
- ao2_ref(contacts, -1);
- }
-}
int ast_sip_format_contact_ami(void *obj, void *arg, int flags)
{
@@ -1235,9 +2416,7 @@
return -1;
}
- status = ast_sorcery_retrieve_by_id(
- ast_sip_get_sorcery(), CONTACT_STATUS,
- ast_sorcery_object_get_id(contact));
+ status = ast_sip_get_contact_status(contact);
ast_str_append(&buf, 0, "AOR: %s\r\n", wrapper->aor_id);
ast_str_append(&buf, 0, "URI: %s\r\n", contact->uri);
@@ -1253,14 +2432,15 @@
if (!ast_strlen_zero(contact->call_id)) {
ast_str_append(&buf, 0, "CallID: %s\r\n", contact->call_id);
}
- ast_str_append(&buf, 0, "Status: %s\r\n", ast_sip_get_contact_status_label(status ? status->status : UNKNOWN));
+ ast_str_append(&buf, 0, "Status: %s\r\n",
+ ast_sip_get_contact_status_label(status ? status->status : UNKNOWN));
if (!status || status->status == UNKNOWN) {
ast_str_append(&buf, 0, "RoundtripUsec: N/A\r\n");
} else {
ast_str_append(&buf, 0, "RoundtripUsec: %" PRId64 "\r\n", status->rtt);
}
ast_str_append(&buf, 0, "EndpointName: %s\r\n",
- endpoint ? ast_sorcery_object_get_id(endpoint) : S_OR(contact->endpoint_name, ""));
+ endpoint ? ast_sorcery_object_get_id(endpoint) : S_OR(contact->endpoint_name, ""));
ast_str_append(&buf, 0, "ID: %s\r\n", ast_sorcery_object_get_id(contact));
ast_str_append(&buf, 0, "AuthenticateQualify: %d\r\n", contact->authenticate_qualify);
@@ -1295,218 +2475,204 @@
.format_ami = format_ami_contact_status
};
-static void aor_observer_modified(const void *obj)
+/*!
+ * \brief Management task to clean up an AOR
+ * \note Run by aor_options->serializer
+ */
+static int sip_options_cleanup_aor_task(void *obj)
{
- struct ast_sip_aor *aor = (void *)obj;
- struct ao2_container *contacts;
+ struct sip_options_aor *aor_options = obj;
- contacts = ast_sip_location_retrieve_aor_contacts(aor);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA, qualify_and_schedule_cb_with_aor, aor);
- ao2_ref(contacts, -1);
+ ast_debug(2, "Cleaning up AOR '%s' for shutdown\n", aor_options->name);
+
+ aor_options->qualify_frequency = 0;
+ if (aor_options->sched_task) {
+ ast_sip_sched_task_cancel(aor_options->sched_task);
+ ao2_ref(aor_options->sched_task, -1);
+ aor_options->sched_task = NULL;
}
+ AST_VECTOR_RESET(&aor_options->compositors, ao2_cleanup);
+
+ return 0;
}
-static int unschedule_contact_cb(void *obj, void *arg, int flags)
+/*!
+ * \brief Management task to clean up the environment
+ * \note Run by management_serializer
+ */
+static int sip_options_cleanup_task(void *obj)
{
- unschedule_qualify(obj);
+ struct ao2_iterator it_aor;
+ struct sip_options_aor *aor_options;
- return CMP_MATCH;
-}
-
-static void aor_observer_deleted(const void *obj)
-{
- const struct ast_sip_aor *aor = obj;
- struct ao2_container *contacts;
-
- contacts = ast_sip_location_retrieve_aor_contacts(aor);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA | OBJ_MULTIPLE, unschedule_contact_cb, NULL);
- ao2_ref(contacts, -1);
- }
-}
-
-static const struct ast_sorcery_observer observer_callbacks_options = {
- .created = aor_observer_modified,
- .updated = aor_observer_modified,
- .deleted = aor_observer_deleted
-};
-
-static int aor_update_endpoint_state(void *obj, void *arg, int flags)
-{
- struct ast_sip_endpoint *endpoint = obj;
- const char *endpoint_name = ast_sorcery_object_get_id(endpoint);
- char *aor = arg;
- char *endpoint_aor;
- char *endpoint_aors;
-
- if (ast_strlen_zero(aor) || ast_strlen_zero(endpoint->aors)) {
+ if (!sip_options_aors) {
+ /* Nothing to do */
return 0;
}
- endpoint_aors = ast_strdupa(endpoint->aors);
- while ((endpoint_aor = ast_strip(strsep(&endpoint_aors, ",")))) {
- if (!strcmp(aor, endpoint_aor)) {
- if (ast_sip_persistent_endpoint_update_state(endpoint_name, AST_ENDPOINT_ONLINE) == -1) {
- ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for aor '%s'\n",
- endpoint_name, aor);
- }
- }
+ it_aor = ao2_iterator_init(sip_options_aors, AO2_ITERATOR_UNLINK);
+ for (; (aor_options = ao2_iterator_next(&it_aor)); ao2_ref(aor_options, -1)) {
+ ast_sip_push_task_wait_serializer(aor_options->serializer,
+ sip_options_cleanup_aor_task, aor_options);
}
-
- return 0;
-}
-
-static int on_aor_update_endpoint_state(void *obj, void *arg, int flags)
-{
- struct ast_sip_aor *aor = obj;
- struct ao2_container *endpoints;
- RAII_VAR(struct ast_variable *, var, NULL, ast_variables_destroy);
- const char *aor_name = ast_sorcery_object_get_id(aor);
- char *aor_like;
-
- if (ast_strlen_zero(aor_name)) {
- return -1;
- }
-
- if (aor->permanent_contacts && ((int)(aor->qualify_frequency * 1000)) <= 0) {
- aor_like = ast_alloca(strlen(aor_name) + 3);
- sprintf(aor_like, "%%%s%%", aor_name);
- var = ast_variable_new("aors LIKE", aor_like, "");
- if (!var) {
- return -1;
- }
- endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "endpoint", AST_RETRIEVE_FLAG_MULTIPLE, var);
-
- if (endpoints) {
- /*
- * Because aors are a string list, we have to use a pattern match but since a simple
- * pattern match could return an endpoint that has an aor of "aaabccc" when searching
- * for "abc", we still have to iterate over them to find an exact aor match.
- */
- ao2_callback(endpoints, 0, aor_update_endpoint_state, (char *)aor_name);
- ao2_ref(endpoints, -1);
- }
- }
-
- return 0;
-}
-
-static int contact_update_endpoint_state(void *obj, void *arg, int flags)
-{
- const struct ast_sip_contact *contact = obj;
- struct timeval tv = ast_tvnow();
-
- if (!ast_strlen_zero(contact->endpoint_name) && ((int)(contact->qualify_frequency * 1000)) <= 0 &&
- contact->expiration_time.tv_sec > tv.tv_sec) {
-
- if (ast_sip_persistent_endpoint_update_state(contact->endpoint_name, AST_ENDPOINT_ONLINE) == -1) {
- ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for contact '%s/%s'\n",
- contact->endpoint_name, contact->aor, contact->uri);
- return -1;
- }
- }
-
- return 0;
-}
-
-static void update_all_unqualified_endpoints(void)
-{
- struct ao2_container *aors;
- struct ao2_container *contacts;
- RAII_VAR(struct ast_variable *, var_aor, NULL, ast_variables_destroy);
- RAII_VAR(struct ast_variable *, var_contact, NULL, ast_variables_destroy);
- RAII_VAR(char *, time_now, NULL, ast_free);
- struct timeval tv = ast_tvnow();
-
- if (!(var_aor = ast_variable_new("contact !=", "", ""))) {
- return;
- }
- if (!(var_aor->next = ast_variable_new("qualify_frequency <=", "0", ""))) {
- return;
- }
-
- if (ast_asprintf(&time_now, "%ld", tv.tv_sec) == -1) {
- return;
- }
- if (!(var_contact = ast_variable_new("expiration_time >", time_now, ""))) {
- return;
- }
- if (!(var_contact->next = ast_variable_new("qualify_frequency <=", "0", ""))) {
- return;
- }
-
- aors = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "aor", AST_RETRIEVE_FLAG_MULTIPLE, var_aor);
- if (aors) {
- ao2_callback(aors, OBJ_NODATA, on_aor_update_endpoint_state, NULL);
- ao2_ref(aors, -1);
- }
-
- contacts = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "contact", AST_RETRIEVE_FLAG_MULTIPLE, var_contact);
- if (contacts) {
- ao2_callback(contacts, OBJ_NODATA, contact_update_endpoint_state, NULL);
- ao2_ref(contacts, -1);
- }
-}
-
-int ast_res_pjsip_init_options_handling(int reload)
-{
- static const pj_str_t STR_OPTIONS = { "OPTIONS", 7 };
-
- if (reload) {
- qualify_and_schedule_all();
- return 0;
- }
-
- sched_qualifies = ao2_t_container_alloc(QUALIFIED_BUCKETS,
- sched_qualifies_hash_fn, sched_qualifies_cmp_fn,
- "Create container for scheduled qualifies");
- if (!sched_qualifies) {
- return -1;
- }
-
- if (pjsip_endpt_register_module(ast_sip_get_pjsip_endpoint(), &options_module) != PJ_SUCCESS) {
- ao2_cleanup(sched_qualifies);
- sched_qualifies = NULL;
- return -1;
- }
-
- if (pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW,
- NULL, 1, &STR_OPTIONS) != PJ_SUCCESS) {
- pjsip_endpt_unregister_module(ast_sip_get_pjsip_endpoint(), &options_module);
- ao2_cleanup(sched_qualifies);
- sched_qualifies = NULL;
- return -1;
- }
-
- if (ast_sorcery_observer_add(ast_sip_get_sorcery(), "aor", &observer_callbacks_options)) {
- pjsip_endpt_unregister_module(ast_sip_get_pjsip_endpoint(), &options_module);
- ao2_cleanup(sched_qualifies);
- sched_qualifies = NULL;
- return -1;
- }
-
- internal_sip_register_endpoint_formatter(&contact_status_formatter);
- ast_manager_register_xml("PJSIPQualify", EVENT_FLAG_SYSTEM | EVENT_FLAG_REPORTING, ami_sip_qualify);
- ast_cli_register_multiple(cli_options, ARRAY_LEN(cli_options));
-
- update_all_unqualified_endpoints();
- qualify_and_schedule_all();
+ ao2_iterator_destroy(&it_aor);
return 0;
}
void ast_res_pjsip_cleanup_options_handling(void)
{
+ int remaining;
+ struct ast_taskprocessor *mgmt_serializer;
+
ast_cli_unregister_multiple(cli_options, ARRAY_LEN(cli_options));
ast_manager_unregister("PJSIPQualify");
internal_sip_unregister_endpoint_formatter(&contact_status_formatter);
- ast_sorcery_observer_remove(ast_sip_get_sorcery(), "aor", &observer_callbacks_options);
+ ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact",
+ &contact_observer_callbacks);
+ ast_sorcery_observer_remove(ast_sip_get_sorcery(), "aor",
+ &aor_observer_callbacks);
+ ast_sorcery_observer_remove(ast_sip_get_sorcery(), "endpoint",
+ &endpoint_observer_callbacks);
+
+ mgmt_serializer = management_serializer;
+ management_serializer = NULL;
+ if (mgmt_serializer) {
+ ast_sip_push_task_wait_serializer(mgmt_serializer, sip_options_cleanup_task, NULL);
+ }
+
+ remaining = ast_serializer_shutdown_group_join(shutdown_group,
+ MAX_UNLOAD_TIMEOUT_TIME);
+ if (remaining) {
+ ast_log(LOG_WARNING, "Cleanup incomplete. Could not stop %d AORs.\n",
+ remaining);
+ }
+ ao2_cleanup(shutdown_group);
+ shutdown_group = NULL;
+
+ if (mgmt_serializer) {
+ ast_taskprocessor_unreference(mgmt_serializer);
+ }
+
+ ao2_cleanup(sip_options_aors);
+ sip_options_aors = NULL;
+ ao2_cleanup(sip_options_contact_statuses);
+ sip_options_contact_statuses = NULL;
+ ao2_cleanup(sip_options_endpoint_state_compositors);
+ sip_options_endpoint_state_compositors = NULL;
+
pjsip_endpt_unregister_module(ast_sip_get_pjsip_endpoint(), &options_module);
- ao2_cleanup(sched_qualifies);
- sched_qualifies = NULL;
+}
+
+/*!
+ * \brief Management task to finish setting up the environment.
+ * \note Run by management_serializer
+ */
+static int sip_options_init_task(void *mgmt_serializer)
+{
+ management_serializer = mgmt_serializer;
+
+ shutdown_group = ast_serializer_shutdown_group_alloc();
+ if (!shutdown_group) {
+ return -1;
+ }
+
+ if (ast_sorcery_observer_add(ast_sip_get_sorcery(), "endpoint",
+ &endpoint_observer_callbacks)) {
+ return -1;
+ }
+ if (ast_sorcery_observer_add(ast_sip_get_sorcery(), "aor",
+ &aor_observer_callbacks)) {
+ return -1;
+ }
+ if (ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact",
+ &contact_observer_callbacks)) {
+ return -1;
+ }
+
+ sip_options_synchronize(0);
+
+ return 0;
+}
+
+int ast_res_pjsip_init_options_handling(int reload)
+{
+ struct ast_taskprocessor *mgmt_serializer;
+
+ static const pj_str_t STR_OPTIONS = { "OPTIONS", 7 };
+
+ if (reload) {
+ sip_options_synchronize(1);
+ return 0;
+ }
+
+ if (pjsip_endpt_register_module(ast_sip_get_pjsip_endpoint(), &options_module)
+ != PJ_SUCCESS) {
+ return -1;
+ }
+
+ if (pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW,
+ NULL, 1, &STR_OPTIONS) != PJ_SUCCESS) {
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+
+ sip_options_aors = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0, AOR_BUCKETS,
+ sip_options_aor_hash_fn, NULL, sip_options_aor_cmp_fn);
+ if (!sip_options_aors) {
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+ if (!sip_options_contact_statuses) {
+ /* The container is not already created for us so we have to create it. */
+ sip_options_contact_statuses = sip_options_contact_statuses_alloc();
+ if (!sip_options_contact_statuses) {
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+ }
+ sip_options_endpoint_state_compositors =
+ ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
+ ENDPOINT_STATE_COMPOSITOR_BUCKETS,
+ sip_options_endpoint_state_compositor_hash_fn, NULL,
+ sip_options_endpoint_state_compositor_cmp_fn);
+ if (!sip_options_endpoint_state_compositors) {
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+
+ mgmt_serializer = ast_sip_create_serializer("pjsip/options/manage");
+ if (!mgmt_serializer) {
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+
+ /*
+ * Set the water mark levels high because we can get a flood of
+ * contact status updates from sip_options_synchronize() that
+ * quickly clears on initial load or reload.
+ */
+ ast_taskprocessor_alert_set_levels(mgmt_serializer, -1,
+ 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
+
+ /*
+ * We make sure that the environment is completely setup before we allow
+ * any other threads to post contact_status updates to the
+ * management_serializer.
+ */
+ if (ast_sip_push_task_wait_serializer(mgmt_serializer, sip_options_init_task,
+ mgmt_serializer)) {
+ /* Set management_serializer in case pushing the task actually failed. */
+ management_serializer = mgmt_serializer;
+ ast_res_pjsip_cleanup_options_handling();
+ return -1;
+ }
+
+ internal_sip_register_endpoint_formatter(&contact_status_formatter);
+ ast_manager_register_xml("PJSIPQualify", EVENT_FLAG_SYSTEM | EVENT_FLAG_REPORTING,
+ ami_sip_qualify);
+ ast_cli_register_multiple(cli_options, ARRAY_LEN(cli_options));
+
+ return 0;
}
--
To view, visit https://gerrit.asterisk.org/8752
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: 15
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6a5ebbfca9001dfe933eaeac4d3babd8d2e6f082
Gerrit-Change-Number: 8752
Gerrit-PatchSet: 1
Gerrit-Owner: Richard Mudgett <rmudgett at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180409/167229ee/attachment-0001.html>
More information about the asterisk-code-review
mailing list