[svn-commits] mmichelson: branch mmichelson/rls-subscribe r417664 - /team/mmichelson/rls-su...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Jun 30 14:08:21 CDT 2014


Author: mmichelson
Date: Mon Jun 30 14:08:18 2014
New Revision: 417664

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=417664
Log:
Address some odds and ends.

* Add XXX comments for items to be addressed by work on scheduled issues.
  While it's not common to cite issues in our code, I'm making an exception
  here since these add concrete places to look in the code in order to find
  where the next set of changes will be required.

* Address previously-made XXX comments with regards to creating a subscription
  tree when recreating a persistent subscription.

* Restructure code a bit so that static functions appear before they are used.

The next bit to address is handling edge cases when building the resource
tree: loops and duplicated resources.


Modified:
    team/mmichelson/rls-subscribe/res/res_pjsip_pubsub.c

Modified: team/mmichelson/rls-subscribe/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-subscribe/res/res_pjsip_pubsub.c?view=diff&rev=417664&r1=417663&r2=417664
==============================================================================
--- team/mmichelson/rls-subscribe/res/res_pjsip_pubsub.c (original)
+++ team/mmichelson/rls-subscribe/res/res_pjsip_pubsub.c Mon Jun 30 14:08:18 2014
@@ -208,6 +208,12 @@
 /*! \brief Default expiration time for PUBLISH if one is not specified */
 #define DEFAULT_PUBLISH_EXPIRES 3600
 
+/*! \brief Number of buckets for subscription datastore */
+#define DATASTORE_BUCKETS 53
+
+/*! \brief Default expiration for subscriptions */
+#define DEFAULT_EXPIRES 3600
+
 /*! \brief Defined method for PUBLISH */
 const pjsip_method pjsip_publish_method =
 {
@@ -558,22 +564,60 @@
 	return handler;
 }
 
+/*!
+ * \brief Accept headers that are exceptions to the rule
+ *
+ * Typically, when a SUBSCRIBE arrives, we attempt to find a
+ * body generator that matches one of the Accept headers in
+ * the request. When subscribing to a single resource, this works
+ * great. However, when subscribing to a list, things work
+ * differently. Most Accept header values are fine, but there
+ * are a couple that are endemic to resource lists that need
+ * to be ignored when searching for a body generator to use
+ * for the individual resources of the subscription.
+ */
+const char *accept_exceptions[] =  {
+	"multipart/related",
+	"application/rlmi+xml",
+};
+
+/*!
+ * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
+ *
+ * \retval 1 This Accept header value is an exception to the rule.
+ * \retval 0 This Accept header is not an exception to the rule.
+ */
+static int exceptional_accept(const pj_str_t *accept)
+{
+	int i;
+	
+	for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
+		if (!pj_strcmp2(accept, accept_exceptions[i])) {
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
 	const struct ast_sip_subscription_handler *handler)
 {
 	pjsip_accept_hdr *accept_header;
 	char accept[AST_SIP_MAX_ACCEPT][64];
-	size_t num_accept_headers;
+	size_t num_accept_headers = 0;
 
 	accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
 	if (accept_header) {
 		int i;
 
 		for (i = 0; i < accept_header->count; ++i) {
-			ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+			if (!exceptional_accept(&accept_header->values[i])) {
+				ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+				++num_accept_headers;
+			}
 		}
-		num_accept_headers = accept_header->count;
 	} else {
 		/* If a SUBSCRIBE contains no Accept headers, then we must assume that
 		 * the default accept type for the event package is to be used.
@@ -588,829 +632,6 @@
 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
 		struct ast_sip_pubsub_body_generator *generator);
-
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
-{
-	struct subscription_persistence *persistence = obj;
-	pj_pool_t *pool = arg;
-	pjsip_rx_data rdata = { { 0, }, };
-	pjsip_expires_hdr *expires_header;
-	struct ast_sip_subscription_handler *handler;
-	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
-	struct ast_sip_subscription *sub;
-	struct ast_sip_pubsub_body_generator *generator;
-	int resp;
-	char *resource;
-	size_t resource_size;
-	pjsip_sip_uri *request_uri;
-
-	/* If this subscription has already expired remove it */
-	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
-	if (!endpoint) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	pj_pool_reset(pool);
-	rdata.tp_info.pool = pool;
-
-	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
-		persistence->transport_key, persistence->local_name, persistence->local_port)) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
-	resource_size = pj_strlen(&request_uri->user) + 1;
-	resource = alloca(resource_size);
-	ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
-	/* Update the expiration header with the new expiration */
-	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
-	if (!expires_header) {
-		expires_header = pjsip_expires_hdr_create(pool, 0);
-		if (!expires_header) {
-			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-			return 0;
-		}
-		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
-	}
-	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
-	handler = subscription_get_handler_from_rdata(&rdata);
-	if (!handler || !handler->notifier) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	generator = subscription_get_generator_from_rdata(&rdata, handler);
-	if (!generator) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
-			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
-	/* XXX This section needs to use the same bit of logic as is used when
-	 * processing a new inbound subscription so that a subscription tree
-	 * is created
-	 */
-	resp = handler->notifier->new_subscribe(endpoint, resource);
-	if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
-		sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
-		sub->persistence = ao2_bump(persistence);
-		subscription_persistence_update(sub, &rdata);
-	} else {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-	}
-
-	return 0;
-}
-
-/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
-static int subscription_persistence_load(void *data)
-{
-	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
-		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
-	pj_pool_t *pool;
-
-	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
-		PJSIP_POOL_RDATA_INC);
-	if (!pool) {
-		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
-		return 0;
-	}
-
-	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
-
-	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
-	ao2_ref(persisted_subscriptions, -1);
-	return 0;
-}
-
-/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
-static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
-{
-	struct ast_json_payload *payload;
-	const char *type;
-
-	if (stasis_message_type(message) != ast_manager_get_generic_type()) {
-		return;
-	}
-
-	payload = stasis_message_data(message);
-	type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
-
-	/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
-	 * recreate SIP subscriptions.
-	 */
-	if (strcmp(type, "FullyBooted")) {
-		return;
-	}
-
-	/* This has to be here so the subscription is recreated when the body generator is available */
-	ast_sip_push_task(NULL, subscription_persistence_load, NULL);
-
-	/* Once the system is fully booted we don't care anymore */
-	stasis_unsubscribe(sub);
-}
-
-static void add_subscription(struct ast_sip_subscription *obj)
-{
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
-	ast_module_ref(ast_module_info->self);
-}
-
-static void remove_subscription(struct ast_sip_subscription *obj)
-{
-	struct ast_sip_subscription *i;
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
-		if (i == obj) {
-			AST_RWLIST_REMOVE_CURRENT(next);
-			ast_module_unref(ast_module_info->self);
-			break;
-		}
-	}
-	AST_RWLIST_TRAVERSE_SAFE_END;
-}
-
-typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
-
-static int for_each_subscription(on_subscription_t on_subscription, void *arg)
-{
-	int num = 0;
-	struct ast_sip_subscription *i;
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
-	if (!on_subscription) {
-		return num;
-	}
-
-	AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
-		if (on_subscription(i, arg)) {
-			break;
-		}
-		++num;
-	}
-	return num;
-}
-
-static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
-				    struct ast_str **buf)
-{
-	char str[256];
-	struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
-
-	ast_str_append(buf, 0, "Role: %s\r\n",
-		       sip_subscription_roles_map[sub->role]);
-	ast_str_append(buf, 0, "Endpoint: %s\r\n",
-		       ast_sorcery_object_get_id(sub->endpoint));
-
-	ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
-	ast_str_append(buf, 0, "Callid: %s\r\n", str);
-
-	ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
-			       sip_subscription_get_evsub(sub)));
-
-	ast_callerid_merge(str, sizeof(str),
-			   S_COR(id->self.name.valid, id->self.name.str, NULL),
-			   S_COR(id->self.number.valid, id->self.number.str, NULL),
-			   "Unknown");
-
-	ast_str_append(buf, 0, "Callerid: %s\r\n", str);
-
-	if (sub->handler->to_ami) {
-		sub->handler->to_ami(sub, buf);
-	}
-}
-
-#define DATASTORE_BUCKETS 53
-
-#define DEFAULT_EXPIRES 3600
-
-static int datastore_hash(const void *obj, int flags)
-{
-	const struct ast_datastore *datastore = obj;
-	const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
-
-	ast_assert(uid != NULL);
-
-	return ast_str_hash(uid);
-}
-
-static int datastore_cmp(void *obj, void *arg, int flags)
-{
-	const struct ast_datastore *datastore1 = obj;
-	const struct ast_datastore *datastore2 = arg;
-	const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
-
-	ast_assert(datastore1->uid != NULL);
-	ast_assert(uid2 != NULL);
-
-	return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
-}
-
-static int subscription_remove_serializer(void *obj)
-{
-	struct ast_sip_subscription *sub = obj;
-
-	/* This is why we keep the dialog on the subscription. When the subscription
-	 * is destroyed, there is no guarantee that the underlying dialog is ready
-	 * to be destroyed. Furthermore, there's no guarantee in the opposite direction
-	 * either. The dialog could be destroyed before our subscription is. We fix
-	 * this problem by keeping a reference to the dialog until it is time to
-	 * destroy the subscription. We need to have the dialog available when the
-	 * subscription is destroyed so that we can guarantee that our attempt to
-	 * remove the serializer will be successful.
-	 */
-	ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
-	pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
-
-	return 0;
-}
-
-static void subscription_destructor(void *obj)
-{
-	struct ast_sip_subscription *sub = obj;
-
-	ast_debug(3, "Destroying SIP subscription\n");
-
-	subscription_persistence_remove(sub);
-
-	remove_subscription(sub);
-
-	ao2_cleanup(sub->datastores);
-	ao2_cleanup(sub->endpoint);
-
-	if (sip_subscription_get_dlg(sub)) {
-		ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
-	}
-	ast_taskprocessor_unreference(sub->serializer);
-}
-
-
-static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
-static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
-		int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
-static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
-		pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
-static void pubsub_on_client_refresh(pjsip_evsub *sub);
-static void pubsub_on_server_timeout(pjsip_evsub *sub);
-
-
-static pjsip_evsub_user pubsub_cb = {
-	.on_evsub_state = pubsub_on_evsub_state,
-	.on_rx_refresh = pubsub_on_rx_refresh,
-	.on_rx_notify = pubsub_on_rx_notify,
-	.on_client_refresh = pubsub_on_client_refresh,
-	.on_server_timeout = pubsub_on_server_timeout,
-};
-
-static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
-		enum sip_subscription_type type)
-{
-	struct ast_sip_subscription *sub;
-
-	sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
-	if (!sub) {
-		return NULL;
-	}
-	strcpy(sub->resource, resource); /* Safe */
-
-	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
-	if (!sub->datastores) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-	sub->serializer = ast_sip_create_serializer();
-	if (!sub->serializer) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-	sub->role = role;
-	sub->type = type;
-	sub->endpoint = ao2_bump(endpoint);
-	sub->handler = handler;
-
-	return sub;
-}
-
-static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
-{
-	/* We keep a reference to the dialog until our subscription is destroyed. See
-	 * the subscription_destructor for more details
-	 */
-	pjsip_dlg_inc_session(dlg, &pubsub_module);
-	sub->reality.real.dlg = dlg;
-	ast_sip_dialog_set_serializer(dlg, sub->serializer);
-	pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
-}
-
-static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
-		struct ast_sip_pubsub_body_generator *generator)
-{
-	struct ast_sip_subscription *sub;
-	pjsip_dialog *dlg;
-	struct subscription_persistence *persistence;
-
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_REAL);
-	if (!sub) {
-		return NULL;
-	}
-
-	sub->body_generator = generator;
-	dlg = ast_sip_create_dialog_uas(endpoint, rdata);
-	if (!dlg) {
-		ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-
-	persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
-			pubsub_module.id, MOD_DATA_PERSISTENCE);
-	if (persistence) {
-		/* Update the created dialog with the persisted information */
-		pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
-		pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
-		dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
-		pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
-		dlg->local.cseq = persistence->cseq;
-		dlg->remote.cseq = persistence->cseq;
-	}
-
-	pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub);
-	subscription_setup_dialog(sub, dlg);
-
-	ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
-			pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
-
-	add_subscription(sub);
-	return sub;
-}
-
-void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
-{
-	pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
-	pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
-	pj_str_t name;
-
-	pj_cstr(&name, header);
-
-	return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
-}
-
-struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, const char *resource)
-{
-	struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
-	pjsip_dialog *dlg;
-	struct ast_sip_contact *contact;
-	pj_str_t event;
-	pjsip_tx_data *tdata;
-	pjsip_evsub *evsub;
-
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER, SIP_SUBSCRIPTION_REAL);
-	if (!sub) {
-		return NULL;
-	}
-
-	contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
-	if (!contact || ast_strlen_zero(contact->uri)) {
-		ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
-				ast_sorcery_object_get_id(endpoint));
-		ao2_ref(sub, -1);
-		ao2_cleanup(contact);
-		return NULL;
-	}
-
-	dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
-	ao2_cleanup(contact);
-	if (!dlg) {
-		ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-
-	pj_cstr(&event, handler->event_name);
-	pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub);
-	subscription_setup_dialog(sub, dlg);
-
-	add_subscription(sub);
-
-	evsub = sip_subscription_get_evsub(sub);
-
-	if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
-		pjsip_evsub_send_request(evsub, tdata);
-	} else {
-		/* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
-		 * being called and terminating the subscription. Therefore, we don't
-		 * need to decrease the reference count of sub here.
-		 */
-		pjsip_evsub_terminate(evsub, PJ_TRUE);
-		return NULL;
-	}
-
-	return sub;
-}
-
-struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
-{
-	ast_assert(sub->endpoint != NULL);
-	ao2_ref(sub->endpoint, +1);
-	return sub->endpoint;
-}
-
-struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
-{
-	ast_assert(sub->serializer != NULL);
-	return sub->serializer;
-}
-
-static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
-{
-	struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
-	int res;
-
-	ao2_ref(sub, +1);
-	res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub),
-			tdata) == PJ_SUCCESS ? 0 : -1;
-
-	subscription_persistence_update(sub, NULL);
-
-	ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
-		"StateText: %s\r\n"
-		"Endpoint: %s\r\n",
-		pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)),
-		ast_sorcery_object_get_id(endpoint));
-	ao2_cleanup(sub);
-	ao2_cleanup(endpoint);
-
-	return res;
-}
-
-int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data,
-		int terminate)
-{
-	struct ast_sip_body body = {
-		.type = ast_sip_subscription_get_body_type(sub),
-		.subtype = ast_sip_subscription_get_body_subtype(sub),
-	};
-	struct ast_str *body_text = ast_str_create(64);
-	pjsip_evsub *evsub = sip_subscription_get_evsub(sub);
-	pjsip_tx_data *tdata;
-	pjsip_evsub_state state;
-
-	if (!body_text) {
-		return -1;
-	}
-
-	if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) {
-		ast_free(body_text);
-		return -1;
-	}
-
-	body.body_text = ast_str_buffer(body_text);
-
-	if (terminate) {
-		state = PJSIP_EVSUB_STATE_TERMINATED;
-	} else {
-		state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ?
-			PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED;
-	}
-
-	ast_log_backtrace();
-
-	if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) {
-		ast_free(body_text);
-		return -1;
-	}
-	if (ast_sip_add_body(tdata, &body)) {
-		ast_free(body_text);
-		pjsip_tx_data_dec_ref(tdata);
-		return -1;
-	}
-	if (sip_subscription_send_request(sub, tdata)) {
-		ast_free(body_text);
-		pjsip_tx_data_dec_ref(tdata);
-		return -1;
-	}
-
-	return 0;
-}
-
-void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
-{
-	pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
-	ast_copy_pj_str(buf, &dlg->local.info_str, size);
-}
-
-void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
-{
-	pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
-	ast_copy_pj_str(buf, &dlg->remote.info_str, size);
-}
-
-const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
-{
-	return sub->resource;
-}
-
-static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
-{
-	/* If this is a persistence recreation the subscription has already been accepted */
-	if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
-		return 0;
-	}
-
-	return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
-}
-
-static void subscription_datastore_destroy(void *obj)
-{
-	struct ast_datastore *datastore = obj;
-
-	/* Using the destroy function (if present) destroy the data */
-	if (datastore->info->destroy != NULL && datastore->data != NULL) {
-		datastore->info->destroy(datastore->data);
-		datastore->data = NULL;
-	}
-
-	ast_free((void *) datastore->uid);
-	datastore->uid = NULL;
-}
-
-struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
-{
-	RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
-	const char *uid_ptr = uid;
-
-	if (!info) {
-		return NULL;
-	}
-
-	datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
-	if (!datastore) {
-		return NULL;
-	}
-
-	datastore->info = info;
-	if (ast_strlen_zero(uid)) {
-		/* They didn't provide an ID so we'll provide one ourself */
-		struct ast_uuid *uuid = ast_uuid_generate();
-		char uuid_buf[AST_UUID_STR_LEN];
-		if (!uuid) {
-			return NULL;
-		}
-		uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
-		ast_free(uuid);
-	}
-
-	datastore->uid = ast_strdup(uid_ptr);
-	if (!datastore->uid) {
-		return NULL;
-	}
-
-	ao2_ref(datastore, +1);
-	return datastore;
-}
-
-int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
-{
-	ast_assert(datastore != NULL);
-	ast_assert(datastore->info != NULL);
-	ast_assert(!ast_strlen_zero(datastore->uid));
-
-	if (!ao2_link(subscription->datastores, datastore)) {
-		return -1;
-	}
-	return 0;
-}
-
-struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
-{
-	return ao2_find(subscription->datastores, name, OBJ_KEY);
-}
-
-void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
-{
-	ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
-}
-
-int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
-{
-	ast_assert(datastore != NULL);
-	ast_assert(datastore->info != NULL);
-	ast_assert(!ast_strlen_zero(datastore->uid));
-
-	if (!ao2_link(publication->datastores, datastore)) {
-		return -1;
-	}
-	return 0;
-}
-
-struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
-{
-	return ao2_find(publication->datastores, name, OBJ_KEY);
-}
-
-void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
-{
-	ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
-}
-
-AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
-
-static int publication_hash_fn(const void *obj, const int flags)
-{
-	const struct ast_sip_publication *publication = obj;
-	const int *entity_tag = obj;
-
-	return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
-}
-
-static int publication_cmp_fn(void *obj, void *arg, int flags)
-{
-	const struct ast_sip_publication *publication1 = obj;
-	const struct ast_sip_publication *publication2 = arg;
-	const int *entity_tag = arg;
-
-	return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
-		CMP_MATCH | CMP_STOP : 0);
-}
-
-static void publish_add_handler(struct ast_sip_publish_handler *handler)
-{
-	SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
-}
-
-int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
-{
-	if (ast_strlen_zero(handler->event_name)) {
-		ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
-		return -1;
-	}
-
-	if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
-		publication_hash_fn, publication_cmp_fn))) {
-		ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
-			handler->event_name);
-		return -1;
-	}
-
-	publish_add_handler(handler);
-
-	ast_module_ref(ast_module_info->self);
-
-	return 0;
-}
-
-void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
-{
-	struct ast_sip_publish_handler *iter;
-	SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
-		if (handler == iter) {
-			AST_RWLIST_REMOVE_CURRENT(next);
-			ao2_cleanup(handler->publications);
-			ast_module_unref(ast_module_info->self);
-			break;
-		}
-	}
-	AST_RWLIST_TRAVERSE_SAFE_END;
-}
-
-AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
-
-static void sub_add_handler(struct ast_sip_subscription_handler *handler)
-{
-	SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
-	ast_module_ref(ast_module_info->self);
-}
-
-static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
-{
-	struct ast_sip_subscription_handler *iter;
-	SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
-	AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
-		if (!strcmp(iter->event_name, event_name)) {
-			break;
-		}
-	}
-	return iter;
-}
-
-int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
-{
-	pj_str_t event;
-	pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
-	struct ast_sip_subscription_handler *existing;
-	int i = 0;
-
-	if (ast_strlen_zero(handler->event_name)) {
-		ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
-		return -1;
-	}
-
-	existing = find_sub_handler_for_event_name(handler->event_name);
-	if (existing) {
-		ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
-				"A handler is already registered\n", handler->event_name);
-		return -1;
-	}
-
-	for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
-		pj_cstr(&accept[i], handler->accept[i]);
-	}
-
-	pj_cstr(&event, handler->event_name);
-
-	pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
-
-	sub_add_handler(handler);
-
-	return 0;
-}
-
-void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
-{
-	struct ast_sip_subscription_handler *iter;
-	SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
-		if (handler == iter) {
-			AST_RWLIST_REMOVE_CURRENT(next);
-			ast_module_unref(ast_module_info->self);
-			break;
-		}
-	}
-	AST_RWLIST_TRAVERSE_SAFE_END;
-}
-
-static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
-		const char *content_subtype)
-{
-	struct ast_sip_pubsub_body_generator *iter;
-	SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
-	AST_LIST_TRAVERSE(&body_generators, iter, list) {
-		if (!strcmp(iter->type, content_type) &&
-				!strcmp(iter->subtype, content_subtype)) {
-			break;
-		}
-	};
-
-	return iter;
-}
-
-static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
-{
-	char *accept_copy = ast_strdupa(accept);
-	char *subtype = accept_copy;
-	char *type = strsep(&subtype, "/");
-
-	if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
-		return NULL;
-	}
-
-	return find_body_generator_type_subtype(type, subtype);
-}
-
-static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
-		size_t num_accept)
-{
-	int i;
-	struct ast_sip_pubsub_body_generator *generator = NULL;
-
-	for (i = 0; i < num_accept; ++i) {
-		generator = find_body_generator_accept(accept[i]);
-		if (generator) {
-			ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
-			break;
-		} else {
-			ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
-		}
-	}
-
-	return generator;
-}
 
 struct tree_node {
 	AST_VECTOR(, struct tree_node *) children;
@@ -1478,7 +699,6 @@
 
 struct resource_tree {
 	struct tree_node *root;
-	int single_resource;
 };
 
 static void resource_tree_destroy(struct resource_tree *tree)
@@ -1486,7 +706,7 @@
 	tree_node_destroy(tree->root);
 }
 
-static int build_subscription_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
+static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
 		const char *resource, struct resource_tree *tree)
 {
 	struct resource_list *list;
@@ -1494,7 +714,6 @@
 	/* Simple shortcut in case the requested resource is not a list */
 	list = retrieve_resource_list(resource);
 	if (!list) {
-		tree->single_resource = 1;
 		return handler->notifier->new_subscribe(endpoint, resource);
 	}
 
@@ -1512,6 +731,110 @@
 	}
 }
 
+static int datastore_hash(const void *obj, int flags)
+{
+	const struct ast_datastore *datastore = obj;
+	const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
+
+	ast_assert(uid != NULL);
+
+	return ast_str_hash(uid);
+}
+
+static int datastore_cmp(void *obj, void *arg, int flags)
+{
+	const struct ast_datastore *datastore1 = obj;
+	const struct ast_datastore *datastore2 = arg;
+	const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
+
+	ast_assert(datastore1->uid != NULL);
+	ast_assert(uid2 != NULL);
+
+	return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+static int subscription_remove_serializer(void *obj)
+{
+	struct ast_sip_subscription *sub = obj;
+
+	/* This is why we keep the dialog on the subscription. When the subscription
+	 * is destroyed, there is no guarantee that the underlying dialog is ready
+	 * to be destroyed. Furthermore, there's no guarantee in the opposite direction
+	 * either. The dialog could be destroyed before our subscription is. We fix
+	 * this problem by keeping a reference to the dialog until it is time to
+	 * destroy the subscription. We need to have the dialog available when the
+	 * subscription is destroyed so that we can guarantee that our attempt to
+	 * remove the serializer will be successful.
+	 */
+	ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
+	pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
+
+	return 0;
+}
+
+static void remove_subscription(struct ast_sip_subscription *obj)
+{
+	struct ast_sip_subscription *i;
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
+		if (i == obj) {
+			AST_RWLIST_REMOVE_CURRENT(next);
+			ast_module_unref(ast_module_info->self);
+			break;
+		}
+	}
+	AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
+static void subscription_destructor(void *obj)
+{
+	struct ast_sip_subscription *sub = obj;
+
+	ast_debug(3, "Destroying SIP subscription\n");
+
+	subscription_persistence_remove(sub);
+
+	remove_subscription(sub);
+
+	ao2_cleanup(sub->datastores);
+	ao2_cleanup(sub->endpoint);
+
+	if (sip_subscription_get_dlg(sub)) {
+		ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
+	}
+	ast_taskprocessor_unreference(sub->serializer);
+}
+
+static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
+		struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
+		enum sip_subscription_type type)
+{
+	struct ast_sip_subscription *sub;
+
+	sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
+	if (!sub) {
+		return NULL;
+	}
+	strcpy(sub->resource, resource); /* Safe */
+
+	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+	if (!sub->datastores) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+	sub->serializer = ast_sip_create_serializer();
+	if (!sub->serializer) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+	sub->role = role;
+	sub->type = type;
+	sub->endpoint = ao2_bump(endpoint);
+	sub->handler = handler;
+
+	return sub;
+}
+
 static void create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
 		struct ast_sip_pubsub_body_generator *generator, struct ast_sip_subscription *parent,
@@ -1524,6 +847,10 @@
 		
 		sub = allocate_subscription(handler, endpoint, resource,
 				AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_VIRTUAL);
+		/* XXX For subscriptions with children, the generator will need to be
+		 * the multipart RLMI generator instead. This will be handled in
+		 * ASTERISK-23869 or ASTERISK-23867
+		 */
 		sub->body_generator = generator;
 		sub->reality.virtual.parent = parent;
 
@@ -1538,13 +865,747 @@
 {
 	struct ast_sip_subscription *sub;
 	
-	/* Start by creating the root subscription. It's the only real subscription. */
+	/* Start by creating the root subscription. It's the only real subscription. 
+	 * XXX Since this is the root of a subscription tree, it should actually use the
+	 * multipart RLMI generator instead. This will be handled in ASTERISK-23869 or
+	 * ASTERISK-23867
+	 */
 	sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
 
 	/* Now we need to create virtual subscriptions */
 	create_virtual_subscriptions(handler, endpoint, rdata, resource, generator, sub, tree->root);
 
 	return sub;
+}
+
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+	struct subscription_persistence *persistence = obj;
+	pj_pool_t *pool = arg;
+	pjsip_rx_data rdata = { { 0, }, };
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+	struct ast_sip_subscription *sub;
+	struct ast_sip_pubsub_body_generator *generator;
+	int resp;
+	char *resource;
+	size_t resource_size;
+	pjsip_sip_uri *request_uri;
+	struct resource_tree tree;
+	pjsip_expires_hdr *expires_header;
+	struct ast_sip_subscription_handler *handler;
+
+	/* If this subscription has already expired remove it */
+	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+	if (!endpoint) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	pj_pool_reset(pool);
+	rdata.tp_info.pool = pool;
+
+	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+		persistence->transport_key, persistence->local_name, persistence->local_port)) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
+	resource_size = pj_strlen(&request_uri->user) + 1;
+	resource = alloca(resource_size);
+	ast_copy_pj_str(resource, &request_uri->user, resource_size);
+
+	/* Update the expiration header with the new expiration */
+	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+	if (!expires_header) {
+		expires_header = pjsip_expires_hdr_create(pool, 0);
+		if (!expires_header) {
+			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+			return 0;
+		}
+		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+	}
+	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+	handler = subscription_get_handler_from_rdata(&rdata);
+	if (!handler || !handler->notifier) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	generator = subscription_get_generator_from_rdata(&rdata, handler);
+	if (!generator) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+	memset(&tree, 0, sizeof(tree));
+	/* XXX This section needs to use the same bit of logic as is used when
+	 * processing a new inbound subscription so that a subscription tree
+	 * is created
+	 */
+	resp = build_resource_tree(endpoint, handler, resource, &tree);
+	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+		sub = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
+		sub->persistence = ao2_bump(persistence);
+		subscription_persistence_update(sub, &rdata);
+	} else {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+	}
+
+	return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+	pj_pool_t *pool;
+
+	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+		PJSIP_POOL_RDATA_INC);
+	if (!pool) {
+		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+		return 0;
+	}
+
+	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+	ao2_ref(persisted_subscriptions, -1);

[... 685 lines stripped ...]



More information about the svn-commits mailing list