[Asterisk-code-review] res pjsip: improve realtime performance #2 (asterisk[13])

Joshua Colp asteriskteam at digium.com
Thu Jun 30 15:53:17 CDT 2016


Joshua Colp has submitted this change and it was merged.

Change subject: res_pjsip: improve realtime performance #2
......................................................................


res_pjsip: improve realtime performance #2

The patch removes updating all Endpoints' status on startup.
Instead, only non-qualified aors with static contact
and non-qualified non-expired contacts are retrieved from the realtime to
update the endpoint status to ONLINE.
The endpoint name was added to the contact object to simply find the endpoint
that created this contact.

The status of endpoints with qualified aors will be updated by 'qualify'
functions.

ASTERISK-26061 #close

Change-Id: Id324c1776fa55d3741e0c5457ecac0304cb1a0df
---
A contrib/ast-db-manage/config/versions/ef7efc2d3964_ps_contacts_add_endpoint_and_modify_.py
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip/location.c
M res/res_pjsip/pjsip_configuration.c
M res/res_pjsip/pjsip_options.c
6 files changed, 265 insertions(+), 46 deletions(-)

Approvals:
  George Joseph: Looks good to me, but someone else must approve
  Joshua Colp: Looks good to me, approved; Verified



diff --git a/contrib/ast-db-manage/config/versions/ef7efc2d3964_ps_contacts_add_endpoint_and_modify_.py b/contrib/ast-db-manage/config/versions/ef7efc2d3964_ps_contacts_add_endpoint_and_modify_.py
new file mode 100644
index 0000000..ff18343
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/ef7efc2d3964_ps_contacts_add_endpoint_and_modify_.py
@@ -0,0 +1,33 @@
+"""ps_contacts add endpoint and modify expiration_time to bigint
+
+Revision ID: ef7efc2d3964
+Revises: a845e4d8ade8
+Create Date: 2016-06-02 18:18:46.231920
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'ef7efc2d3964'
+down_revision = 'a845e4d8ade8'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    context = op.get_context()
+
+    op.add_column('ps_contacts', sa.Column('endpoint', sa.String(40)))
+
+    if context.bind.dialect.name != 'postgresql':
+        op.alter_column('ps_contacts', 'expiration_time', type_=sa.BigInteger)
+    else:
+        op.execute('ALTER TABLE ps_contacts ALTER COLUMN expiration_time TYPE BIGINT USING expiration_time::bigint')
+
+    op.create_index('ps_contacts_qualifyfreq_exptime', 'ps_contacts', ['qualify_frequency', 'expiration_time'])
+    op.create_index('ps_aors_qualifyfreq_contact', 'ps_aors', ['qualify_frequency', 'contact'])
+def downgrade():
+    op.drop_index('ps_aors_qualifyfreq_contact')
+    op.drop_index('ps_contacts_qualifyfreq_exptime')
+    op.drop_column('ps_contacts', 'endpoint')
+    op.alter_column('ps_contacts', 'expiration_time', type_=sa.String(40))
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index b64ad62..4319dbd 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -256,6 +256,8 @@
 	int via_port;
 	/*! Content of the Call-ID header in REGISTER request */
 	AST_STRING_FIELD_EXTENDED(call_id);
+	/*! The name of the endpoint that added the contact */
+	AST_STRING_FIELD_EXTENDED(endpoint_name);
 };
 
 #define CONTACT_STATUS "contact_status"
@@ -986,6 +988,16 @@
 void *ast_sip_endpoint_alloc(const char *name);
 
 /*!
+ * \brief Change state of a persistent endpoint.
+ *
+ * \param endpoint The SIP endpoint name to change state.
+ * \param state The new state
+ * \retval 0 Success
+ * \retval -1 Endpoint not found
+ */
+int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state);
+
+/*!
  * \brief Get a pointer to the PJSIP endpoint.
  *
  * This is useful when modules have specific information they need
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index adc4d41..97ab8e0 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1166,6 +1166,12 @@
 						REGISTER requests and is not intended to be configured manually.
 					</para></description>
 				</configOption>
+				<configOption name="endpoint">
+					<synopsis>Endpoint name</synopsis>
+					<description><para>
+						The name of the endpoint this contact belongs to
+					</para></description>
+				</configOption>
 				<configOption name="reg_server">
 					<synopsis>Asterisk Server name</synopsis>
 					<description><para>
diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c
index f55bd0f..1b7850f 100644
--- a/res/res_pjsip/location.c
+++ b/res/res_pjsip/location.c
@@ -121,6 +121,7 @@
 		return NULL;
 	}
 
+	ast_string_field_init_extended(contact, endpoint_name);
 	ast_string_field_init_extended(contact, reg_server);
 	ast_string_field_init_extended(contact, via_addr);
 	ast_string_field_init_extended(contact, call_id);
@@ -355,6 +356,10 @@
 	}
 
 	contact->endpoint = ao2_bump(endpoint);
+
+	if (endpoint) {
+		ast_string_field_set(contact, endpoint_name, ast_sorcery_object_get_id(endpoint));
+	}
 
 	return ast_sorcery_create(ast_sip_get_sorcery(), contact);
 }
@@ -1138,6 +1143,7 @@
 	ast_sorcery_object_field_register(sorcery, "contact", "authenticate_qualify", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_contact, authenticate_qualify));
 	ast_sorcery_object_field_register(sorcery, "contact", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, outbound_proxy));
 	ast_sorcery_object_field_register(sorcery, "contact", "user_agent", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, user_agent));
+	ast_sorcery_object_field_register(sorcery, "contact", "endpoint", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, endpoint_name));
 	ast_sorcery_object_field_register(sorcery, "contact", "reg_server", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, reg_server));
 	ast_sorcery_object_field_register(sorcery, "contact", "via_addr", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, via_addr));
 	ast_sorcery_object_field_register(sorcery, "contact", "via_port", "0", OPT_UINT_T, 0, FLDSET(struct ast_sip_contact, via_port));
diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c
index 368f0d8..8791816 100644
--- a/res/res_pjsip/pjsip_configuration.c
+++ b/res/res_pjsip/pjsip_configuration.c
@@ -58,6 +58,53 @@
 	return !strcmp(ast_endpoint_get_resource(persistent1->endpoint), id) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! \brief Internal function for changing the state of an endpoint */
+static void endpoint_update_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
+{
+	struct ast_json *blob;
+	char *regcontext;
+
+	/* If there was no state change, don't publish anything. */
+	if (ast_endpoint_get_state(endpoint) == state) {
+		return;
+	}
+
+	regcontext = ast_sip_get_regcontext();
+
+	if (state == AST_ENDPOINT_ONLINE) {
+		ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE);
+		blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
+
+		if (!ast_strlen_zero(regcontext)) {
+			if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) {
+				ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL,
+					"Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP");
+			}
+		}
+
+		ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint));
+	} else {
+		ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE);
+		blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
+
+		if (!ast_strlen_zero(regcontext)) {
+			struct pbx_find_info q = { .stacklen = 0 };
+
+			if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) {
+				ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL);
+			}
+		}
+
+		ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint));
+	}
+
+	ast_free(regcontext);
+
+	ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob);
+	ast_json_unref(blob);
+	ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint));
+}
+
 /*! \brief Callback function for changing the state of an endpoint */
 static int persistent_endpoint_update_state(void *obj, void *arg, int flags)
 {
@@ -69,7 +116,6 @@
 	struct ao2_iterator i;
 	struct ast_sip_contact *contact;
 	enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE;
-	char *regcontext;
 
 	if (status) {
 		char rtt[32];
@@ -113,45 +159,7 @@
 		ao2_ref(contacts, -1);
 	}
 
-	/* If there was no state change, don't publish anything. */
-	if (ast_endpoint_get_state(endpoint) == state) {
-		return 0;
-	}
-
-	regcontext = ast_sip_get_regcontext();
-
-	if (state == AST_ENDPOINT_ONLINE) {
-		ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE);
-		blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
-
-		if (!ast_strlen_zero(regcontext)) {
-			if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) {
-				ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL,
-					"Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP");
-			}
-		}
-
-		ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint));
-	} else {
-		ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE);
-		blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
-
-		if (!ast_strlen_zero(regcontext)) {
-			struct pbx_find_info q = { .stacklen = 0 };
-
-			if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) {
-				ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL);
-			}
-		}
-
-		ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint));
-	}
-
-	ast_free(regcontext);
-
-	ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob);
-	ast_json_unref(blob);
-	ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint));
+	endpoint_update_state(endpoint,state);
 
 	return 0;
 }
@@ -1184,6 +1192,20 @@
 	ast_free(persistent->aors);
 }
 
+int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state)
+{
+	RAII_VAR(struct sip_persistent_endpoint *, persistent, NULL, ao2_cleanup);
+	SCOPED_AO2LOCK(lock, persistent_endpoints);
+
+	if (!(persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_KEY | OBJ_NOLOCK))) {
+		return -1;
+	}
+
+	endpoint_update_state(persistent->endpoint, state);
+
+	return 0;
+}
+
 /*! \brief Internal function which finds (or creates) persistent endpoint information */
 static struct ast_endpoint *persistent_endpoint_find_or_create(const struct ast_sip_endpoint *endpoint)
 {
@@ -1201,11 +1223,7 @@
 
 		persistent->aors = ast_strdup(endpoint->aors);
 
-		if (ast_strlen_zero(persistent->aors)) {
-			ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_UNKNOWN);
-		} else {
-			persistent_endpoint_update_state(persistent, NULL, 0);
-		}
+		ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_UNKNOWN);
 
 		ao2_link_flags(persistent_endpoints, persistent, OBJ_NOLOCK);
 	}
@@ -1686,6 +1704,22 @@
 struct ast_sip_cli_formatter_entry *channel_formatter;
 struct ast_sip_cli_formatter_entry *endpoint_formatter;
 
+static int on_load_endpoint(void *obj, void *arg, int flags)
+{
+	return sip_endpoint_apply_handler(sip_sorcery, obj);
+}
+
+static void load_all_endpoints(void)
+{
+	struct ao2_container *endpoints;
+
+	endpoints = ast_sorcery_retrieve_by_fields(sip_sorcery, "endpoint", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+	if (endpoints) {
+		ao2_callback(endpoints, OBJ_NODATA, on_load_endpoint, NULL);
+		ao2_ref(endpoints, -1);
+	}
+}
+
 int ast_res_pjsip_initialize_configuration(const struct ast_module_info *ast_module_info)
 {
 	if (ast_manager_register_xml(AMI_SHOW_ENDPOINTS, EVENT_FLAG_SYSTEM, ami_show_endpoints) ||
@@ -1887,6 +1921,8 @@
 
 	ast_sorcery_load(sip_sorcery);
 
+	load_all_endpoints();
+
 	return 0;
 }
 
diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c
index d73766c..ede0d5e 100644
--- a/res/res_pjsip/pjsip_options.c
+++ b/res/res_pjsip/pjsip_options.c
@@ -340,7 +340,12 @@
 	if (endpoint) {
 		endpoint_local = ao2_bump(endpoint);
 	} else {
-		endpoint_local = find_an_endpoint(contact);
+		if (!ast_strlen_zero(contact->endpoint_name)) {
+			endpoint_local = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", contact->endpoint_name);
+		}
+		if (!endpoint_local) {
+			endpoint_local = find_an_endpoint(contact);
+		}
 		if (!endpoint_local) {
 			ast_log(LOG_ERROR, "Unable to find an endpoint to qualify contact %s\n",
 				contact->uri);
@@ -1248,6 +1253,126 @@
 	.deleted = aor_observer_deleted
 };
 
+static int aor_update_endpoint_state(void *obj, void *arg, int flags)
+{
+	struct ast_sip_endpoint *endpoint = obj;
+	const char *endpoint_name = ast_sorcery_object_get_id(endpoint);
+	char *aor = arg;
+	char *endpoint_aor;
+	char *endpoint_aors;
+
+	if (ast_strlen_zero(aor) || ast_strlen_zero(endpoint->aors)) {
+		return 0;
+	}
+
+	endpoint_aors = ast_strdupa(endpoint->aors);
+	while ((endpoint_aor = ast_strip(strsep(&endpoint_aors, ",")))) {
+		if (!strcmp(aor, endpoint_aor)) {
+			if (ast_sip_persistent_endpoint_update_state(endpoint_name, AST_ENDPOINT_ONLINE) == -1) {
+				ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for aor '%s'\n",
+					endpoint_name, aor);
+			}
+		}
+	}
+
+	return 0;
+}
+
+static int on_aor_update_endpoint_state(void *obj, void *arg, int flags)
+{
+	struct ast_sip_aor *aor = obj;
+	struct ao2_container *endpoints;
+	RAII_VAR(struct ast_variable *, var, NULL, ast_variables_destroy);
+	const char *aor_name = ast_sorcery_object_get_id(aor);
+	char *aor_like;
+
+	if (ast_strlen_zero(aor_name)) {
+		return -1;
+	}
+
+	if (aor->permanent_contacts && ((int)(aor->qualify_frequency * 1000)) <= 0) {
+		aor_like = ast_alloca(strlen(aor_name) + 3);
+		sprintf(aor_like, "%%%s%%", aor_name);
+		var = ast_variable_new("aors LIKE", aor_like, "");
+		if (!var) {
+			return -1;
+		}
+		endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+			"endpoint", AST_RETRIEVE_FLAG_MULTIPLE, var);
+
+		if (endpoints) {
+		    /*
+		     * Because aors are a string list, we have to use a pattern match but since a simple
+		     * pattern match could return an endpoint that has an aor of "aaabccc" when searching
+		     * for "abc", we still have to iterate over them to find an exact aor match.
+		     */
+		    ao2_callback(endpoints, 0, aor_update_endpoint_state, (char *)aor_name);
+		    ao2_ref(endpoints, -1);
+		}
+	}
+
+	return 0;
+}
+
+static int contact_update_endpoint_state(void *obj, void *arg, int flags)
+{
+	const struct ast_sip_contact *contact = obj;
+	struct timeval tv = ast_tvnow();
+
+	if (!ast_strlen_zero(contact->endpoint_name) && ((int)(contact->qualify_frequency * 1000)) <= 0 &&
+		contact->expiration_time.tv_sec > tv.tv_sec) {
+
+		if (ast_sip_persistent_endpoint_update_state(contact->endpoint_name, AST_ENDPOINT_ONLINE) == -1) {
+			ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for contact '%s/%s'\n",
+				contact->endpoint_name, contact->aor, contact->uri);
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+static void update_all_unqualified_endpoints(void)
+{
+	struct ao2_container *aors;
+	struct ao2_container *contacts;
+	RAII_VAR(struct ast_variable *, var_aor, NULL, ast_variables_destroy);
+	RAII_VAR(struct ast_variable *, var_contact, NULL, ast_variables_destroy);
+	RAII_VAR(char *, time_now, NULL, ast_free);
+	struct timeval tv = ast_tvnow();
+
+	if (!(var_aor = ast_variable_new("contact !=", "", ""))) {
+		return;
+	}
+	if (!(var_aor->next = ast_variable_new("qualify_frequency <=", "0", ""))) {
+		return;
+	}
+
+	if (ast_asprintf(&time_now, "%ld", tv.tv_sec) == -1) {
+		return;
+	}
+	if (!(var_contact = ast_variable_new("expiration_time >", time_now, ""))) {
+		return;
+	}
+	if (!(var_contact->next = ast_variable_new("qualify_frequency <=", "0", ""))) {
+		return;
+	}
+
+	aors = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+		"aor", AST_RETRIEVE_FLAG_MULTIPLE, var_aor);
+	if (aors) {
+		ao2_callback(aors, OBJ_NODATA, on_aor_update_endpoint_state, NULL);
+		ao2_ref(aors, -1);
+	}
+
+	contacts = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+		"contact", AST_RETRIEVE_FLAG_MULTIPLE, var_contact);
+	if (contacts) {
+		ao2_callback(contacts, OBJ_NODATA, contact_update_endpoint_state, NULL);
+		ao2_ref(contacts, -1);
+	}
+}
+
 int ast_res_pjsip_init_options_handling(int reload)
 {
 	static const pj_str_t STR_OPTIONS = { "OPTIONS", 7 };
@@ -1289,6 +1414,7 @@
 	ast_manager_register2("PJSIPQualify", EVENT_FLAG_SYSTEM | EVENT_FLAG_REPORTING, ami_sip_qualify, NULL, NULL, NULL);
 	ast_cli_register_multiple(cli_options, ARRAY_LEN(cli_options));
 
+	update_all_unqualified_endpoints();
 	qualify_and_schedule_all();
 
 	return 0;

-- 
To view, visit https://gerrit.asterisk.org/2906
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id324c1776fa55d3741e0c5457ecac0304cb1a0df
Gerrit-PatchSet: 6
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Alexei Gradinari <alex2grad at gmail.com>
Gerrit-Reviewer: Alexei Gradinari <alex2grad at gmail.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Mark Michelson <mmichelson at digium.com>



More information about the asterisk-code-review mailing list