[svn-commits] mmichelson: branch group/rls r417762 - /team/group/rls/res/res_pjsip_pubsub.c

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Jul 2 09:18:22 CDT 2014


Author: mmichelson
Date: Wed Jul  2 09:18:19 2014
New Revision: 417762

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=417762
Log:
Commit temporary SUBSCRIBE-handling changes to common RLS branch.

These changes are currently up for review. However, I wish to continue
with NOTIFY-handling changes and it would do me well to have the SUBSCRIBE-
handling changes present.


Modified:
    team/group/rls/res/res_pjsip_pubsub.c

Modified: team/group/rls/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/group/rls/res/res_pjsip_pubsub.c?view=diff&rev=417762&r1=417761&r2=417762
==============================================================================
--- team/group/rls/res/res_pjsip_pubsub.c (original)
+++ team/group/rls/res/res_pjsip_pubsub.c Wed Jul  2 09:18:19 2014
@@ -208,6 +208,12 @@
 /*! \brief Default expiration time for PUBLISH if one is not specified */
 #define DEFAULT_PUBLISH_EXPIRES 3600
 
+/*! \brief Number of buckets for subscription datastore */
+#define DATASTORE_BUCKETS 53
+
+/*! \brief Default expiration for subscriptions */
+#define DEFAULT_EXPIRES 3600
+
 /*! \brief Defined method for PUBLISH */
 const pjsip_method pjsip_publish_method =
 {
@@ -271,6 +277,11 @@
 };
 
 /*!
+ * \brief A vector of strings commonly used throughout this module
+ */
+AST_VECTOR(resources, const char *);
+
+/*!
  * \brief Resource list configuration item
  */
 struct resource_list {
@@ -278,7 +289,7 @@
 	/*! SIP event package the list uses. */
 	char event[32];
 	/*! Strings representing resources in the list. */
-	AST_VECTOR(, const char *) items;
+	struct resources items;
 	/*! Indicates if Asterisk sends full or partial state on notifications. */
 	unsigned int full_state;
 	/*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
@@ -431,12 +442,20 @@
 
 static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
 {
-	return sub->reality.real.evsub;
+	if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
+		return sip_subscription_get_evsub(sub->reality.virtual.parent);
+	} else {
+		return sub->reality.real.evsub;
+	}
 }
 
 static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
 {
-	return sub->reality.real.dlg;
+	if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
+		return sip_subscription_get_dlg(sub->reality.virtual.parent);
+	} else {
+		return sub->reality.real.dlg;
+	}
 }
 
 /*! \brief Destructor for subscription persistence */
@@ -550,22 +569,60 @@
 	return handler;
 }
 
+/*!
+ * \brief Accept headers that are exceptions to the rule
+ *
+ * Typically, when a SUBSCRIBE arrives, we attempt to find a
+ * body generator that matches one of the Accept headers in
+ * the request. When subscribing to a single resource, this works
+ * great. However, when subscribing to a list, things work
+ * differently. Most Accept header values are fine, but there
+ * are a couple that are endemic to resource lists that need
+ * to be ignored when searching for a body generator to use
+ * for the individual resources of the subscription.
+ */
+const char *accept_exceptions[] =  {
+	"multipart/related",
+	"application/rlmi+xml",
+};
+
+/*!
+ * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
+ *
+ * \retval 1 This Accept header value is an exception to the rule.
+ * \retval 0 This Accept header is not an exception to the rule.
+ */
+static int exceptional_accept(const pj_str_t *accept)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
+		if (!pj_strcmp2(accept, accept_exceptions[i])) {
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
 	const struct ast_sip_subscription_handler *handler)
 {
 	pjsip_accept_hdr *accept_header;
 	char accept[AST_SIP_MAX_ACCEPT][64];
-	size_t num_accept_headers;
+	size_t num_accept_headers = 0;
 
 	accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
 	if (accept_header) {
 		int i;
 
 		for (i = 0; i < accept_header->count; ++i) {
-			ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+			if (!exceptional_accept(&accept_header->values[i])) {
+				ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+				++num_accept_headers;
+			}
 		}
-		num_accept_headers = accept_header->count;
 	} else {
 		/* If a SUBSCRIBE contains no Accept headers, then we must assume that
 		 * the default accept type for the event package is to be used.
@@ -577,218 +634,252 @@
 	return find_body_generator(accept, num_accept_headers);
 }
 
-static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
+static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
 		struct ast_sip_pubsub_body_generator *generator);
 
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
-{
-	struct subscription_persistence *persistence = obj;
-	pj_pool_t *pool = arg;
-	pjsip_rx_data rdata = { { 0, }, };
-	pjsip_expires_hdr *expires_header;
-	struct ast_sip_subscription_handler *handler;
-	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
-	struct ast_sip_subscription *sub;
-	struct ast_sip_pubsub_body_generator *generator;
-	int resp;
-	char *resource;
-	size_t resource_size;
-	pjsip_sip_uri *request_uri;
-
-	/* If this subscription has already expired remove it */
-	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
-	if (!endpoint) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	pj_pool_reset(pool);
-	rdata.tp_info.pool = pool;
-
-	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
-		persistence->transport_key, persistence->local_name, persistence->local_port)) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
-	resource_size = pj_strlen(&request_uri->user) + 1;
-	resource = alloca(resource_size);
-	ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
-	/* Update the expiration header with the new expiration */
-	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
-	if (!expires_header) {
-		expires_header = pjsip_expires_hdr_create(pool, 0);
-		if (!expires_header) {
-			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-			return 0;
+/*!
+ * \brief A node for a resource tree.
+ */
+struct tree_node {
+	AST_VECTOR(, struct tree_node *) children;
+	char resource[0];
+};
+
+/*!
+ * \brief Helper function for retrieving a resource list for a given event.
+ *
+ * This will retrieve a resource list that corresponds to the resource and event provided.
+ *
+ * \param resource The name of the resource list to retrieve
+ * \param event The expected event name on the resource list
+ */
+static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
+{
+	struct resource_list *list;
+
+	list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
+	if (!list) {
+		return NULL;
+	}
+
+	if (strcmp(list->event, event)) {
+		ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
+				resource, list->event, event);
+		ao2_cleanup(list);
+		return NULL;
+	}
+
+	return list;
+}
+
+/*!
+ * \brief Allocate a tree node
+ *
+ * In addition to allocating and initializing the tree node, the node is also added
+ * to the vector of visited resources. See \ref build_resource_tree for more information
+ * on the visited resources.
+ *
+ * \param resource The name of the resource for this tree node.
+ * \param visited The vector of resources that have been visited.
+ * \retval NULL Allocation failure.
+ * \retval non-NULL The newly-allocated tree_node
+ */
+static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited)
+{
+	struct tree_node *node;
+
+	node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
+	if (!node) {
+		return NULL;
+	}
+
+	strcpy(node->resource, resource);
+	AST_VECTOR_INIT(&node->children, 4);
+
+	if (visited) {
+		AST_VECTOR_APPEND(visited, resource);
+	}
+	return node;
+}
+
+/*!
+ * \brief Destructor for a tree node
+ *
+ * This function calls recursively in order to destroy
+ * all nodes lower in the tree from the given node in
+ * addition to the node itself.
+ *
+ * \param node The node to destroy.
+ */
+static void tree_node_destroy(struct tree_node *node)
+{
+	int i;
+	if (!node) {
+		return;
+	}
+
+	for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
+		tree_node_destroy(AST_VECTOR_GET(&node->children, i));
+	}
+	AST_VECTOR_FREE(&node->children);
+	ast_free(node);
+}
+
+/*!
+ * \brief Determine if this resource has been visited already
+ *
+ * See \ref build_resource_tree for more information
+ *
+ * \param resource The resource currently being visited
+ * \param visited The resources that have previously been visited
+ */
+static int have_visited(const char *resource, struct resources *visited)
+{
+	int i;
+
+	for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
+		if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
+			return 1;
 		}
-		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
-	}
-	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
-	handler = subscription_get_handler_from_rdata(&rdata);
-	if (!handler || !handler->notifier) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	generator = subscription_get_generator_from_rdata(&rdata, handler);
-	if (!generator) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
-			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
-	resp = handler->notifier->new_subscribe(endpoint, resource);
-	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
-		sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
-		sub->persistence = ao2_bump(persistence);
-		subscription_persistence_update(sub, &rdata);
+	}
+
+	return 0;
+}
+
+/*!
+ * \brief Build child nodes for a given parent.
+ *
+ * This iterates through the items on a resource list and creates tree nodes for each one. The
+ * tree nodes created are children of the supplied parent node. If an item in the resource
+ * list is itself a list, then this function is called recursively to provide children for
+ * the the new node.
+ *
+ * If an item in a resource list is not a list, then the supplied subscription handler is
+ * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
+ * is used to determine if the node can be added to the tree or not.
+ *
+ * If a parent node ends up having no child nodes added under it, then the parent node is
+ * pruned from the tree.
+ *
+ * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
+ * \param handler The subscription handler for leaf nodes in the tree.
+ * \param list The configured resource list from which the child node is being built.
+ * \param parent The parent node for these children.
+ * \param visited The resources that have already been visited.
+ */
+static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
+		struct resource_list *list, struct tree_node *parent, struct resources *visited)
+{
+	int i;
+
+	for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
+		struct tree_node *current;
+		struct resource_list *child_list;
+		const char *resource = AST_VECTOR_GET(&list->items, i);
+
+		if (have_visited(resource, visited)) {
+			continue;
+		}
+
+		child_list = retrieve_resource_list(resource, list->event);
+		if (!child_list) {
+			int resp = handler->notifier->new_subscribe(endpoint, resource);
+			if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+				current = tree_node_alloc(resource, visited);
+				AST_VECTOR_APPEND(&parent->children, current);
+			}
+		} else {
+			current = tree_node_alloc(resource, visited);
+			build_node_children(endpoint, handler, child_list, current, visited);
+			if (AST_VECTOR_SIZE(&current->children) > 0) {
+				AST_VECTOR_APPEND(&parent->children, current);
+			} else {
+				tree_node_destroy(current);
+			}
+			ao2_cleanup(child_list);
+		}
+	}
+}
+
+/*!
+ * \brief A resource tree
+ *
+ * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
+ * be a resource list. If this is the case, the resource list may contain resources
+ * that are themselves lists. The structure needed to hold the resources is
+ * a tree.
+ *
+ * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
+ * to the individual resources in the tree would be successful or not. Any successful
+ * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
+ * result in no node being created.
+ *
+ * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
+ * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
+ */
+struct resource_tree {
+	struct tree_node *root;
+};
+
+/*!
+ * \brief Destroy a resource tree.
+ *
+ * This function makes no assumptions about how the tree itself was
+ * allocated and does not attempt to free the tree itself. Callers
+ * of this function are responsible for freeing the tree.
+ *
+ * \param tree The tree to destroy.
+ */
+static void resource_tree_destroy(struct resource_tree *tree)
+{
+	if (tree) {
+		tree_node_destroy(tree->root);
+	}
+}
+
+/*!
+ * \brief Build a resource tree
+ *
+ * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
+ *
+ * This function also creates a container that has all resources that have been visited during
+ * creation of the tree, whether those resources resulted in a tree node being created or not.
+ * Keeping this container of visited resources allows for misconfigurations such as loops in
+ * the tree or duplicated resources to be detected.
+ *
+ * \param endpoint The endpoint that sent the SUBSCRIBE request.
+ * \param handler The subscription handler for leaf nodes in the tree.
+ * \param resource The resource requested in the SUBSCRIBE request.
+ * \param tree The tree that is to be built.
+ *
+ * \retval 200-299 Successfully subscribed to at least one resource.
+ * \retval 300-699 Failure to subscribe to requested resource.
+ */
+static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
+		const char *resource, struct resource_tree *tree)
+{
+	struct resource_list *list;
+	struct resources visited;
+
+	list = retrieve_resource_list(resource, handler->event_name);
+	if (!list) {
+		tree->root = tree_node_alloc(resource, NULL);
+		return handler->notifier->new_subscribe(endpoint, resource);
+	}
+
+	AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items));
+	tree->root = tree_node_alloc(resource, &visited);
+	build_node_children(endpoint, handler, list, tree->root, &visited);
+	AST_VECTOR_FREE(&visited);
+	ao2_cleanup(list);
+
+	if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
+		return 200;
 	} else {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-	}
-
-	return 0;
-}
-
-/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
-static int subscription_persistence_load(void *data)
-{
-	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
-		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
-	pj_pool_t *pool;
-
-	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
-		PJSIP_POOL_RDATA_INC);
-	if (!pool) {
-		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
-		return 0;
-	}
-
-	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
-
-	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
-	ao2_ref(persisted_subscriptions, -1);
-	return 0;
-}
-
-/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
-static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
-{
-	struct ast_json_payload *payload;
-	const char *type;
-
-	if (stasis_message_type(message) != ast_manager_get_generic_type()) {
-		return;
-	}
-
-	payload = stasis_message_data(message);
-	type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
-
-	/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
-	 * recreate SIP subscriptions.
-	 */
-	if (strcmp(type, "FullyBooted")) {
-		return;
-	}
-
-	/* This has to be here so the subscription is recreated when the body generator is available */
-	ast_sip_push_task(NULL, subscription_persistence_load, NULL);
-
-	/* Once the system is fully booted we don't care anymore */
-	stasis_unsubscribe(sub);
-}
-
-static void add_subscription(struct ast_sip_subscription *obj)
-{
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
-	ast_module_ref(ast_module_info->self);
-}
-
-static void remove_subscription(struct ast_sip_subscription *obj)
-{
-	struct ast_sip_subscription *i;
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
-		if (i == obj) {
-			AST_RWLIST_REMOVE_CURRENT(next);
-			ast_module_unref(ast_module_info->self);
-			break;
-		}
-	}
-	AST_RWLIST_TRAVERSE_SAFE_END;
-}
-
-typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
-
-static int for_each_subscription(on_subscription_t on_subscription, void *arg)
-{
-	int num = 0;
-	struct ast_sip_subscription *i;
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
-	if (!on_subscription) {
-		return num;
-	}
-
-	AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
-		if (on_subscription(i, arg)) {
-			break;
-		}
-		++num;
-	}
-	return num;
-}
-
-static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
-				    struct ast_str **buf)
-{
-	char str[256];
-	struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
-
-	ast_str_append(buf, 0, "Role: %s\r\n",
-		       sip_subscription_roles_map[sub->role]);
-	ast_str_append(buf, 0, "Endpoint: %s\r\n",
-		       ast_sorcery_object_get_id(sub->endpoint));
-
-	ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
-	ast_str_append(buf, 0, "Callid: %s\r\n", str);
-
-	ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
-			       sip_subscription_get_evsub(sub)));
-
-	ast_callerid_merge(str, sizeof(str),
-			   S_COR(id->self.name.valid, id->self.name.str, NULL),
-			   S_COR(id->self.number.valid, id->self.number.str, NULL),
-			   "Unknown");
-
-	ast_str_append(buf, 0, "Callerid: %s\r\n", str);
-
-	if (sub->handler->to_ami) {
-		sub->handler->to_ami(sub, buf);
-	}
-}
-
-#define DATASTORE_BUCKETS 53
-
-#define DEFAULT_EXPIRES 3600
+		return 500;
+	}
+}
 
 static int datastore_hash(const void *obj, int flags)
 {
@@ -831,6 +922,20 @@
 	return 0;
 }
 
+static void remove_subscription(struct ast_sip_subscription *obj)
+{
+	struct ast_sip_subscription *i;
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
+		if (i == obj) {
+			AST_RWLIST_REMOVE_CURRENT(next);
+			ast_module_unref(ast_module_info->self);
+			break;
+		}
+	}
+	AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
 static void subscription_destructor(void *obj)
 {
 	struct ast_sip_subscription *sub = obj;
@@ -848,6 +953,306 @@
 		ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
 	}
 	ast_taskprocessor_unreference(sub->serializer);
+}
+
+static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
+		struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
+		enum sip_subscription_type type)
+{
+	struct ast_sip_subscription *sub;
+
+	sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
+	if (!sub) {
+		return NULL;
+	}
+	strcpy(sub->resource, resource); /* Safe */
+
+	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+	if (!sub->datastores) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+	sub->serializer = ast_sip_create_serializer();
+	if (!sub->serializer) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+	sub->role = role;
+	sub->type = type;
+	sub->endpoint = ao2_bump(endpoint);
+	sub->handler = handler;
+
+	return sub;
+}
+
+/*!
+ * \brief Create a tree of virtual subscriptions based on a resource tree node.
+ *
+ * \param handler The handler to supply to leaf subscriptions.
+ * \param endpoint The endpoint that sent the SUBSCRIBE request to Asterisk.
+ * \param rdata The SUBSCRIBE request content.
+ * \param resource The requested resource in the SUBSCRIBE request.
+ * \param generator Body generator to use for leaf subscriptions.
+ * \param parent The subscription (real or virtual) that is parent to the subscriptions created here.
+ * \param parent_resource The tree node that corresponds to the parent subscription.
+ */
+static void create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
+		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
+		struct ast_sip_pubsub_body_generator *generator, struct ast_sip_subscription *parent,
+		struct tree_node *parent_resource)
+{
+	int i;
+
+	for (i = 0; i < AST_VECTOR_SIZE(&parent_resource->children); ++i) {
+		struct ast_sip_subscription *sub;
+
+		sub = allocate_subscription(handler, endpoint, resource,
+				AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_VIRTUAL);
+		if (!sub) {
+			continue;
+		}
+		/* XXX For subscriptions with children, the generator will need to be
+		 * the multipart RLMI generator instead. This will be handled in
+		 * ASTERISK-23869 or ASTERISK-23867
+		 */
+		sub->body_generator = generator;
+		sub->reality.virtual.parent = parent;
+
+		create_virtual_subscriptions(handler, endpoint, rdata, resource,
+				generator, sub, AST_VECTOR_GET(&parent_resource->children, i));
+	}
+}
+
+/*!
+ * \brief Create a subscription tree based on a resource tree.
+ *
+ * Using the previously-determined valid resources in the provided resource tree,
+ * a corresponding tree of ast_sip_subscriptions are created. The root of the
+ * subscription tree is a real subscription, and the rest in the tree are
+ * virtual subscriptions.
+ *
+ * \param handler The handler to use for leaf subscriptions
+ * \param endpoint The endpoint that sent the SUBSCRIBE request
+ * \param rdata The SUBSCRIBE content
+ * \param resource The requested resource in the SUBSCRIBE request
+ * \param generator The body generator to use in leaf subscriptions
+ * \param tree The resource tree on which the subscription tree is based
+ *
+ * \retval NULL Could not create the subscription tree
+ * \retval non-NULL The root of the created subscription tree
+ */
+static struct ast_sip_subscription *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
+		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
+		struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
+{
+	struct ast_sip_subscription *sub;
+
+	/* Start by creating the root subscription. It's the only real subscription.
+	 * XXX Since this is the root of a subscription tree, it should actually use the
+	 * multipart RLMI generator instead if this is a list. This will be handled in
+	 * ASTERISK-23869 or ASTERISK-23867
+	 */
+	sub = create_real_subscription(handler, endpoint, rdata, resource, generator);
+	if (!sub) {
+		return NULL;
+	}
+
+	create_virtual_subscriptions(handler, endpoint, rdata, resource, generator, sub, tree->root);
+	return sub;
+}
+
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+	struct subscription_persistence *persistence = obj;
+	pj_pool_t *pool = arg;
+	pjsip_rx_data rdata = { { 0, }, };
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+	struct ast_sip_subscription *sub;
+	struct ast_sip_pubsub_body_generator *generator;
+	int resp;
+	char *resource;
+	size_t resource_size;
+	pjsip_sip_uri *request_uri;
+	struct resource_tree tree;
+	pjsip_expires_hdr *expires_header;
+	struct ast_sip_subscription_handler *handler;
+
+	/* If this subscription has already expired remove it */
+	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+	if (!endpoint) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	pj_pool_reset(pool);
+	rdata.tp_info.pool = pool;
+
+	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+		persistence->transport_key, persistence->local_name, persistence->local_port)) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
+	resource_size = pj_strlen(&request_uri->user) + 1;
+	resource = alloca(resource_size);
+	ast_copy_pj_str(resource, &request_uri->user, resource_size);
+
+	/* Update the expiration header with the new expiration */
+	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+	if (!expires_header) {
+		expires_header = pjsip_expires_hdr_create(pool, 0);
+		if (!expires_header) {
+			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+			return 0;
+		}
+		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+	}
+	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+	handler = subscription_get_handler_from_rdata(&rdata);
+	if (!handler || !handler->notifier) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	generator = subscription_get_generator_from_rdata(&rdata, handler);
+	if (!generator) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+	memset(&tree, 0, sizeof(tree));
+	resp = build_resource_tree(endpoint, handler, resource, &tree);
+	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+		sub = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
+		sub->persistence = ao2_bump(persistence);
+		subscription_persistence_update(sub, &rdata);
+	} else {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+	}
+
+	return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+	pj_pool_t *pool;
+
+	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+		PJSIP_POOL_RDATA_INC);
+	if (!pool) {
+		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+		return 0;
+	}
+
+	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+	ao2_ref(persisted_subscriptions, -1);
+	return 0;
+}
+
+/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
+static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	struct ast_json_payload *payload;
+	const char *type;
+
+	if (stasis_message_type(message) != ast_manager_get_generic_type()) {
+		return;
+	}
+
+	payload = stasis_message_data(message);
+	type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+
+	/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
+	 * recreate SIP subscriptions.
+	 */
+	if (strcmp(type, "FullyBooted")) {
+		return;
+	}
+
+	/* This has to be here so the subscription is recreated when the body generator is available */
+	ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+
+	/* Once the system is fully booted we don't care anymore */
+	stasis_unsubscribe(sub);
+}
+
+static void add_subscription(struct ast_sip_subscription *obj)
+{
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
+	ast_module_ref(ast_module_info->self);
+}
+
+typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
+
+static int for_each_subscription(on_subscription_t on_subscription, void *arg)
+{
+	int num = 0;
+	struct ast_sip_subscription *i;
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+	if (!on_subscription) {
+		return num;
+	}
+
+	AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
+		if (on_subscription(i, arg)) {
+			break;
+		}
+		++num;
+	}
+	return num;
+}
+
+static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
+				    struct ast_str **buf)
+{
+	char str[256];
+	struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
+
+	ast_str_append(buf, 0, "Role: %s\r\n",
+		       sip_subscription_roles_map[sub->role]);
+	ast_str_append(buf, 0, "Endpoint: %s\r\n",
+		       ast_sorcery_object_get_id(sub->endpoint));
+
+	ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
+	ast_str_append(buf, 0, "Callid: %s\r\n", str);
+
+	ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
+			       sip_subscription_get_evsub(sub)));
+
+	ast_callerid_merge(str, sizeof(str),
+			   S_COR(id->self.name.valid, id->self.name.str, NULL),
+			   S_COR(id->self.number.valid, id->self.number.str, NULL),
+			   "Unknown");
+
+	ast_str_append(buf, 0, "Callerid: %s\r\n", str);
+
+	if (sub->handler->to_ami) {
+		sub->handler->to_ami(sub, buf);
+	}
 }
 
 
@@ -868,34 +1273,6 @@
 	.on_server_timeout = pubsub_on_server_timeout,
 };
 
-static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role)
-{
-	struct ast_sip_subscription *sub;
-
-	sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
-	if (!sub) {
-		return NULL;
-	}
-	strcpy(sub->resource, resource); /* Safe */
-
-	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
-	if (!sub->datastores) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-	sub->serializer = ast_sip_create_serializer();
-	if (!sub->serializer) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-	sub->role = role;
-	sub->type = SIP_SUBSCRIPTION_REAL;
-	sub->endpoint = ao2_bump(endpoint);
-	sub->handler = handler;
-
-	return sub;
-}
 
 static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
 {
@@ -908,7 +1285,7 @@
 	pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
 }
 
-static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
+static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
 		struct ast_sip_pubsub_body_generator *generator)
 {
@@ -916,7 +1293,7 @@
 	pjsip_dialog *dlg;
 	struct subscription_persistence *persistence;
 
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER);
+	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_REAL);
 	if (!sub) {
 		return NULL;
 	}
@@ -972,7 +1349,7 @@
 	pjsip_tx_data *tdata;
 	pjsip_evsub *evsub;
 
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER);
+	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER, SIP_SUBSCRIPTION_REAL);
 	if (!sub) {
 		return NULL;
 	}
@@ -1118,14 +1495,35 @@
 	return sub->resource;
 }
 
+static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
+{
+	pjsip_require_hdr *require;
+
+	require = pjsip_require_hdr_create(pool);
+	pj_strdup2(pool, &require->values[0], "eventlist");
+	require->count = 1;
+
+	return require;
+}
+
 static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
 {
+	pjsip_hdr res_hdr;
+
+	ast_assert(sub->type == SIP_SUBSCRIPTION_REAL);
+
 	/* If this is a persistence recreation the subscription has already been accepted */
 	if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
 		return 0;
 	}
 
-	return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
+	pj_list_init(&res_hdr);
+	if (!AST_LIST_EMPTY(&sub->children)) {
+		/* If subscribing to a list, our response has to have a Require: eventlist header in it */
+		pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
+	}
+
+	return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
 }
 
 static void subscription_datastore_destroy(void *obj)
@@ -1411,6 +1809,7 @@
 	pjsip_sip_uri *request_uri_sip;
 	size_t resource_size;
 	int resp;
+	struct resource_tree tree;
 
 	endpoint = ast_pjsip_rdata_get_endpoint(rdata);
 	ast_assert(endpoint != NULL);
@@ -1466,24 +1865,31 @@
 		return PJ_TRUE;
 	}
 
-	resp = handler->notifier->new_subscribe(endpoint, resource);
+	memset(&tree, 0, sizeof(tree));
+	resp = build_resource_tree(endpoint, handler, resource, &tree);
 	if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
 		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
+		resource_tree_destroy(&tree);
 		return PJ_TRUE;
 	}
 
-	sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
+	sub = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree);
 	if (!sub) {
 		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
 	} else {
 		sub->persistence = subscription_persistence_create(sub);
 		subscription_persistence_update(sub, rdata);
 		sip_subscription_accept(sub, rdata, resp);
+		/* XXX Currently this is calling directly into the event handler instead of traversing
+		 * the subscription tree. For single resource subscriptions, this is fine, but this will
+		 * malfunction for lists. This will be handled with ASTERISK-23869
+		 */
 		if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) {
 			pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE);
 		}
 	}
 
+	resource_tree_destroy(&tree);
 	return PJ_TRUE;
 }
 
@@ -1953,8 +2359,16 @@
 	} else {
 		reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED;
 	}
+	/* XXX Currently this is calling directly into the event handler instead of traversing
+	 * the subscription tree. For single resource subscriptions, this is fine, but this will
+	 * malfunction for lists. This will be handled with ASTERISK-23869
+	 */
 	if (sub->handler->notifier->notify_required(sub, reason)) {
 		*p_st_code = 500;
+	}
+
+	if (!AST_LIST_EMPTY(&sub->children)) {
+		pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
 	}
 }
 
@@ -2001,6 +2415,10 @@
 {
 	struct ast_sip_subscription *sub = userdata;
 
+	/* XXX Currently this is calling directly into the event handler instead of traversing
+	 * the subscription tree. For single resource subscriptions, this is fine, but this will
+	 * malfunction for lists. This will be handled with ASTERISK-23869
+	 */
 	sub->handler->notifier->notify_required(sub,
 			AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED);
 
@@ -2309,6 +2727,580 @@
 
 	return 0;
 }
+
+#ifdef TEST_FRAMEWORK
+
+/*!
+ * \brief "bad" resources
+ *
+ * These are resources that the test handler will reject subscriptions to.
+ */
+const char *bad_resources[] = {
+	"coconut",
+	"cilantro",
+	"olive",
+	"cheese",
+};
+
+/*!
+ * \brief new_subscribe callback for unit tests
+ *
+ * Will give a 200 OK response to any resource except the "bad" ones.
+ */
+static int test_new_subscribe(struct ast_sip_endpoint *endpoint, const char *resource)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_LEN(bad_resources); ++i) {
+		if (!strcmp(resource, bad_resources[i])) {
+			return 400;
+		}
+	}
+
+	return 200;
+}
+
+/*!
+ * \brief Subscription notifier for unit tests.
+ *
+ * Since unit tests are only concerned with building a resource tree,
+ * only the new_subscribe callback needs to be defined.
+ */
+struct ast_sip_notifier test_notifier = {
+	.new_subscribe = test_new_subscribe,
+};
+
+/*!
+ * \brief Subscription handler for unit tests.
+ */
+struct ast_sip_subscription_handler test_handler = {
+	.event_name = "test",
+	.notifier = &test_notifier,
+};
+
+/*!
+ * \brief Set properties on an allocated resource list
+ *
+ * \param list The list to set details on.
+ * \param event The list's event.
+ * \param resources Array of resources to add to the list.
+ * \param num_resources Number of resources in the array.
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+static int populate_list(struct resource_list *list, const char *event, const char **resources, size_t num_resources)
+{
+	int i;
+
+	ast_copy_string(list->event, event, sizeof(list->event));
+
+	for (i = 0; i < num_resources; ++i) {
+		if (AST_VECTOR_APPEND(&list->items, ast_strdup(resources[i]))) {
+			return -1;
+		}
+	}
+	return 0;
+}
+
+/*!
+ * \brief RAII callback to destroy a resource list
+ */
+static void cleanup_resource_list(struct resource_list *list)
+{
+	if (!list) {
+		return;
+	}
+
+	ast_sorcery_delete(ast_sip_get_sorcery(), list);
+	ao2_cleanup(list);
+}
+
+/*!
+ * \brief allocate a resource list, store it in sorcery, and set its details
+ *
+ * \param test The unit test. Used for logging status messages.
+ * \param list_name The name of the list to create.
+ * \param event The event the list services
+ * \param resources Array of resources to apply to the list
+ * \param num_resources The number of resources in the array
+ * \retval NULL Failed to allocate or populate list

[... 511 lines stripped ...]



More information about the svn-commits mailing list