[asterisk-commits] mmichelson: branch group/rls r417762 - /team/group/rls/res/res_pjsip_pubsub.c
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Jul 2 09:18:22 CDT 2014
Author: mmichelson
Date: Wed Jul 2 09:18:19 2014
New Revision: 417762
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=417762
Log:
Commit temporary SUBSCRIBE-handling changes to common RLS branch.
These changes are currently up for review. However, I wish to continue
with NOTIFY-handling changes and it would do me well to have the SUBSCRIBE-
handling changes present.
Modified:
team/group/rls/res/res_pjsip_pubsub.c
Modified: team/group/rls/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/group/rls/res/res_pjsip_pubsub.c?view=diff&rev=417762&r1=417761&r2=417762
==============================================================================
--- team/group/rls/res/res_pjsip_pubsub.c (original)
+++ team/group/rls/res/res_pjsip_pubsub.c Wed Jul 2 09:18:19 2014
@@ -208,6 +208,12 @@
/*! \brief Default expiration time for PUBLISH if one is not specified */
#define DEFAULT_PUBLISH_EXPIRES 3600
+/*! \brief Number of buckets for subscription datastore */
+#define DATASTORE_BUCKETS 53
+
+/*! \brief Default expiration for subscriptions */
+#define DEFAULT_EXPIRES 3600
+
/*! \brief Defined method for PUBLISH */
const pjsip_method pjsip_publish_method =
{
@@ -271,6 +277,11 @@
};
/*!
+ * \brief A vector of strings commonly used throughout this module
+ */
+AST_VECTOR(resources, const char *);
+
+/*!
* \brief Resource list configuration item
*/
struct resource_list {
@@ -278,7 +289,7 @@
/*! SIP event package the list uses. */
char event[32];
/*! Strings representing resources in the list. */
- AST_VECTOR(, const char *) items;
+ struct resources items;
/*! Indicates if Asterisk sends full or partial state on notifications. */
unsigned int full_state;
/*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
@@ -431,12 +442,20 @@
static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
{
- return sub->reality.real.evsub;
+ if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
+ return sip_subscription_get_evsub(sub->reality.virtual.parent);
+ } else {
+ return sub->reality.real.evsub;
+ }
}
static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
{
- return sub->reality.real.dlg;
+ if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
+ return sip_subscription_get_dlg(sub->reality.virtual.parent);
+ } else {
+ return sub->reality.real.dlg;
+ }
}
/*! \brief Destructor for subscription persistence */
@@ -550,22 +569,60 @@
return handler;
}
+/*!
+ * \brief Accept headers that are exceptions to the rule
+ *
+ * Typically, when a SUBSCRIBE arrives, we attempt to find a
+ * body generator that matches one of the Accept headers in
+ * the request. When subscribing to a single resource, this works
+ * great. However, when subscribing to a list, things work
+ * differently. Most Accept header values are fine, but there
+ * are a couple that are endemic to resource lists that need
+ * to be ignored when searching for a body generator to use
+ * for the individual resources of the subscription.
+ */
+const char *accept_exceptions[] = {
+ "multipart/related",
+ "application/rlmi+xml",
+};
+
+/*!
+ * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
+ *
+ * \retval 1 This Accept header value is an exception to the rule.
+ * \retval 0 This Accept header is not an exception to the rule.
+ */
+static int exceptional_accept(const pj_str_t *accept)
+{
+ int i;
+
+ for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
+ if (!pj_strcmp2(accept, accept_exceptions[i])) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
/*! \brief Retrieve a body generator using the Accept header of an rdata message */
static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
const struct ast_sip_subscription_handler *handler)
{
pjsip_accept_hdr *accept_header;
char accept[AST_SIP_MAX_ACCEPT][64];
- size_t num_accept_headers;
+ size_t num_accept_headers = 0;
accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
if (accept_header) {
int i;
for (i = 0; i < accept_header->count; ++i) {
- ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ if (!exceptional_accept(&accept_header->values[i])) {
+ ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ ++num_accept_headers;
+ }
}
- num_accept_headers = accept_header->count;
} else {
/* If a SUBSCRIBE contains no Accept headers, then we must assume that
* the default accept type for the event package is to be used.
@@ -577,218 +634,252 @@
return find_body_generator(accept, num_accept_headers);
}
-static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
+static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
struct ast_sip_pubsub_body_generator *generator);
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
-{
- struct subscription_persistence *persistence = obj;
- pj_pool_t *pool = arg;
- pjsip_rx_data rdata = { { 0, }, };
- pjsip_expires_hdr *expires_header;
- struct ast_sip_subscription_handler *handler;
- RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
- struct ast_sip_subscription *sub;
- struct ast_sip_pubsub_body_generator *generator;
- int resp;
- char *resource;
- size_t resource_size;
- pjsip_sip_uri *request_uri;
-
- /* If this subscription has already expired remove it */
- if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
- if (!endpoint) {
- ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
- persistence->endpoint);
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- pj_pool_reset(pool);
- rdata.tp_info.pool = pool;
-
- if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
- persistence->transport_key, persistence->local_name, persistence->local_port)) {
- ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
- persistence->endpoint);
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
- resource_size = pj_strlen(&request_uri->user) + 1;
- resource = alloca(resource_size);
- ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
- /* Update the expiration header with the new expiration */
- expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
- if (!expires_header) {
- expires_header = pjsip_expires_hdr_create(pool, 0);
- if (!expires_header) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
+/*!
+ * \brief A node for a resource tree.
+ */
+struct tree_node {
+ AST_VECTOR(, struct tree_node *) children;
+ char resource[0];
+};
+
+/*!
+ * \brief Helper function for retrieving a resource list for a given event.
+ *
+ * This will retrieve a resource list that corresponds to the resource and event provided.
+ *
+ * \param resource The name of the resource list to retrieve
+ * \param event The expected event name on the resource list
+ */
+static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
+{
+ struct resource_list *list;
+
+ list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
+ if (!list) {
+ return NULL;
+ }
+
+ if (strcmp(list->event, event)) {
+ ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
+ resource, list->event, event);
+ ao2_cleanup(list);
+ return NULL;
+ }
+
+ return list;
+}
+
+/*!
+ * \brief Allocate a tree node
+ *
+ * In addition to allocating and initializing the tree node, the node is also added
+ * to the vector of visited resources. See \ref build_resource_tree for more information
+ * on the visited resources.
+ *
+ * \param resource The name of the resource for this tree node.
+ * \param visited The vector of resources that have been visited.
+ * \retval NULL Allocation failure.
+ * \retval non-NULL The newly-allocated tree_node
+ */
+static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited)
+{
+ struct tree_node *node;
+
+ node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
+ if (!node) {
+ return NULL;
+ }
+
+ strcpy(node->resource, resource);
+ AST_VECTOR_INIT(&node->children, 4);
+
+ if (visited) {
+ AST_VECTOR_APPEND(visited, resource);
+ }
+ return node;
+}
+
+/*!
+ * \brief Destructor for a tree node
+ *
+ * This function calls recursively in order to destroy
+ * all nodes lower in the tree from the given node in
+ * addition to the node itself.
+ *
+ * \param node The node to destroy.
+ */
+static void tree_node_destroy(struct tree_node *node)
+{
+ int i;
+ if (!node) {
+ return;
+ }
+
+ for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
+ tree_node_destroy(AST_VECTOR_GET(&node->children, i));
+ }
+ AST_VECTOR_FREE(&node->children);
+ ast_free(node);
+}
+
+/*!
+ * \brief Determine if this resource has been visited already
+ *
+ * See \ref build_resource_tree for more information
+ *
+ * \param resource The resource currently being visited
+ * \param visited The resources that have previously been visited
+ */
+static int have_visited(const char *resource, struct resources *visited)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
+ if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
+ return 1;
}
- pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
- }
- expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
- handler = subscription_get_handler_from_rdata(&rdata);
- if (!handler || !handler->notifier) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- generator = subscription_get_generator_from_rdata(&rdata, handler);
- if (!generator) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
- pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
- resp = handler->notifier->new_subscribe(endpoint, resource);
- if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
- sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
- sub->persistence = ao2_bump(persistence);
- subscription_persistence_update(sub, &rdata);
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Build child nodes for a given parent.
+ *
+ * This iterates through the items on a resource list and creates tree nodes for each one. The
+ * tree nodes created are children of the supplied parent node. If an item in the resource
+ * list is itself a list, then this function is called recursively to provide children for
+ * the the new node.
+ *
+ * If an item in a resource list is not a list, then the supplied subscription handler is
+ * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
+ * is used to determine if the node can be added to the tree or not.
+ *
+ * If a parent node ends up having no child nodes added under it, then the parent node is
+ * pruned from the tree.
+ *
+ * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
+ * \param handler The subscription handler for leaf nodes in the tree.
+ * \param list The configured resource list from which the child node is being built.
+ * \param parent The parent node for these children.
+ * \param visited The resources that have already been visited.
+ */
+static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
+ struct resource_list *list, struct tree_node *parent, struct resources *visited)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
+ struct tree_node *current;
+ struct resource_list *child_list;
+ const char *resource = AST_VECTOR_GET(&list->items, i);
+
+ if (have_visited(resource, visited)) {
+ continue;
+ }
+
+ child_list = retrieve_resource_list(resource, list->event);
+ if (!child_list) {
+ int resp = handler->notifier->new_subscribe(endpoint, resource);
+ if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+ current = tree_node_alloc(resource, visited);
+ AST_VECTOR_APPEND(&parent->children, current);
+ }
+ } else {
+ current = tree_node_alloc(resource, visited);
+ build_node_children(endpoint, handler, child_list, current, visited);
+ if (AST_VECTOR_SIZE(¤t->children) > 0) {
+ AST_VECTOR_APPEND(&parent->children, current);
+ } else {
+ tree_node_destroy(current);
+ }
+ ao2_cleanup(child_list);
+ }
+ }
+}
+
+/*!
+ * \brief A resource tree
+ *
+ * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
+ * be a resource list. If this is the case, the resource list may contain resources
+ * that are themselves lists. The structure needed to hold the resources is
+ * a tree.
+ *
+ * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
+ * to the individual resources in the tree would be successful or not. Any successful
+ * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
+ * result in no node being created.
+ *
+ * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
+ * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
+ */
+struct resource_tree {
+ struct tree_node *root;
+};
+
+/*!
+ * \brief Destroy a resource tree.
+ *
+ * This function makes no assumptions about how the tree itself was
+ * allocated and does not attempt to free the tree itself. Callers
+ * of this function are responsible for freeing the tree.
+ *
+ * \param tree The tree to destroy.
+ */
+static void resource_tree_destroy(struct resource_tree *tree)
+{
+ if (tree) {
+ tree_node_destroy(tree->root);
+ }
+}
+
+/*!
+ * \brief Build a resource tree
+ *
+ * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
+ *
+ * This function also creates a container that has all resources that have been visited during
+ * creation of the tree, whether those resources resulted in a tree node being created or not.
+ * Keeping this container of visited resources allows for misconfigurations such as loops in
+ * the tree or duplicated resources to be detected.
+ *
+ * \param endpoint The endpoint that sent the SUBSCRIBE request.
+ * \param handler The subscription handler for leaf nodes in the tree.
+ * \param resource The resource requested in the SUBSCRIBE request.
+ * \param tree The tree that is to be built.
+ *
+ * \retval 200-299 Successfully subscribed to at least one resource.
+ * \retval 300-699 Failure to subscribe to requested resource.
+ */
+static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
+ const char *resource, struct resource_tree *tree)
+{
+ struct resource_list *list;
+ struct resources visited;
+
+ list = retrieve_resource_list(resource, handler->event_name);
+ if (!list) {
+ tree->root = tree_node_alloc(resource, NULL);
+ return handler->notifier->new_subscribe(endpoint, resource);
+ }
+
+ AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items));
+ tree->root = tree_node_alloc(resource, &visited);
+ build_node_children(endpoint, handler, list, tree->root, &visited);
+ AST_VECTOR_FREE(&visited);
+ ao2_cleanup(list);
+
+ if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
+ return 200;
} else {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- }
-
- return 0;
-}
-
-/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
-static int subscription_persistence_load(void *data)
-{
- struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
- pj_pool_t *pool;
-
- pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
- PJSIP_POOL_RDATA_INC);
- if (!pool) {
- ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
- return 0;
- }
-
- ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
-
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
- ao2_ref(persisted_subscriptions, -1);
- return 0;
-}
-
-/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
-static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
-{
- struct ast_json_payload *payload;
- const char *type;
-
- if (stasis_message_type(message) != ast_manager_get_generic_type()) {
- return;
- }
-
- payload = stasis_message_data(message);
- type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
-
- /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
- * recreate SIP subscriptions.
- */
- if (strcmp(type, "FullyBooted")) {
- return;
- }
-
- /* This has to be here so the subscription is recreated when the body generator is available */
- ast_sip_push_task(NULL, subscription_persistence_load, NULL);
-
- /* Once the system is fully booted we don't care anymore */
- stasis_unsubscribe(sub);
-}
-
-static void add_subscription(struct ast_sip_subscription *obj)
-{
- SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
- AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
- ast_module_ref(ast_module_info->self);
-}
-
-static void remove_subscription(struct ast_sip_subscription *obj)
-{
- struct ast_sip_subscription *i;
- SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
- AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
- if (i == obj) {
- AST_RWLIST_REMOVE_CURRENT(next);
- ast_module_unref(ast_module_info->self);
- break;
- }
- }
- AST_RWLIST_TRAVERSE_SAFE_END;
-}
-
-typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
-
-static int for_each_subscription(on_subscription_t on_subscription, void *arg)
-{
- int num = 0;
- struct ast_sip_subscription *i;
- SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
- if (!on_subscription) {
- return num;
- }
-
- AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
- if (on_subscription(i, arg)) {
- break;
- }
- ++num;
- }
- return num;
-}
-
-static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
- struct ast_str **buf)
-{
- char str[256];
- struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
-
- ast_str_append(buf, 0, "Role: %s\r\n",
- sip_subscription_roles_map[sub->role]);
- ast_str_append(buf, 0, "Endpoint: %s\r\n",
- ast_sorcery_object_get_id(sub->endpoint));
-
- ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
- ast_str_append(buf, 0, "Callid: %s\r\n", str);
-
- ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
- sip_subscription_get_evsub(sub)));
-
- ast_callerid_merge(str, sizeof(str),
- S_COR(id->self.name.valid, id->self.name.str, NULL),
- S_COR(id->self.number.valid, id->self.number.str, NULL),
- "Unknown");
-
- ast_str_append(buf, 0, "Callerid: %s\r\n", str);
-
- if (sub->handler->to_ami) {
- sub->handler->to_ami(sub, buf);
- }
-}
-
-#define DATASTORE_BUCKETS 53
-
-#define DEFAULT_EXPIRES 3600
+ return 500;
+ }
+}
static int datastore_hash(const void *obj, int flags)
{
@@ -831,6 +922,20 @@
return 0;
}
+static void remove_subscription(struct ast_sip_subscription *obj)
+{
+ struct ast_sip_subscription *i;
+ SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
+ if (i == obj) {
+ AST_RWLIST_REMOVE_CURRENT(next);
+ ast_module_unref(ast_module_info->self);
+ break;
+ }
+ }
+ AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
static void subscription_destructor(void *obj)
{
struct ast_sip_subscription *sub = obj;
@@ -848,6 +953,306 @@
ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
}
ast_taskprocessor_unreference(sub->serializer);
+}
+
+static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
+ struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
+ enum sip_subscription_type type)
+{
+ struct ast_sip_subscription *sub;
+
+ sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
+ if (!sub) {
+ return NULL;
+ }
+ strcpy(sub->resource, resource); /* Safe */
+
+ sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+ if (!sub->datastores) {
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ sub->serializer = ast_sip_create_serializer();
+ if (!sub->serializer) {
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ sub->role = role;
+ sub->type = type;
+ sub->endpoint = ao2_bump(endpoint);
+ sub->handler = handler;
+
+ return sub;
+}
+
+/*!
+ * \brief Create a tree of virtual subscriptions based on a resource tree node.
+ *
+ * \param handler The handler to supply to leaf subscriptions.
+ * \param endpoint The endpoint that sent the SUBSCRIBE request to Asterisk.
+ * \param rdata The SUBSCRIBE request content.
+ * \param resource The requested resource in the SUBSCRIBE request.
+ * \param generator Body generator to use for leaf subscriptions.
+ * \param parent The subscription (real or virtual) that is parent to the subscriptions created here.
+ * \param parent_resource The tree node that corresponds to the parent subscription.
+ */
+static void create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
+ struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
+ struct ast_sip_pubsub_body_generator *generator, struct ast_sip_subscription *parent,
+ struct tree_node *parent_resource)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&parent_resource->children); ++i) {
+ struct ast_sip_subscription *sub;
+
+ sub = allocate_subscription(handler, endpoint, resource,
+ AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_VIRTUAL);
+ if (!sub) {
+ continue;
+ }
+ /* XXX For subscriptions with children, the generator will need to be
+ * the multipart RLMI generator instead. This will be handled in
+ * ASTERISK-23869 or ASTERISK-23867
+ */
+ sub->body_generator = generator;
+ sub->reality.virtual.parent = parent;
+
+ create_virtual_subscriptions(handler, endpoint, rdata, resource,
+ generator, sub, AST_VECTOR_GET(&parent_resource->children, i));
+ }
+}
+
+/*!
+ * \brief Create a subscription tree based on a resource tree.
+ *
+ * Using the previously-determined valid resources in the provided resource tree,
+ * a corresponding tree of ast_sip_subscriptions are created. The root of the
+ * subscription tree is a real subscription, and the rest in the tree are
+ * virtual subscriptions.
+ *
+ * \param handler The handler to use for leaf subscriptions
+ * \param endpoint The endpoint that sent the SUBSCRIBE request
+ * \param rdata The SUBSCRIBE content
+ * \param resource The requested resource in the SUBSCRIBE request
+ * \param generator The body generator to use in leaf subscriptions
+ * \param tree The resource tree on which the subscription tree is based
+ *
+ * \retval NULL Could not create the subscription tree
+ * \retval non-NULL The root of the created subscription tree
+ */
+static struct ast_sip_subscription *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
+ struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
+ struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
+{
+ struct ast_sip_subscription *sub;
+
+ /* Start by creating the root subscription. It's the only real subscription.
+ * XXX Since this is the root of a subscription tree, it should actually use the
+ * multipart RLMI generator instead if this is a list. This will be handled in
+ * ASTERISK-23869 or ASTERISK-23867
+ */
+ sub = create_real_subscription(handler, endpoint, rdata, resource, generator);
+ if (!sub) {
+ return NULL;
+ }
+
+ create_virtual_subscriptions(handler, endpoint, rdata, resource, generator, sub, tree->root);
+ return sub;
+}
+
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+ struct subscription_persistence *persistence = obj;
+ pj_pool_t *pool = arg;
+ pjsip_rx_data rdata = { { 0, }, };
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ struct ast_sip_subscription *sub;
+ struct ast_sip_pubsub_body_generator *generator;
+ int resp;
+ char *resource;
+ size_t resource_size;
+ pjsip_sip_uri *request_uri;
+ struct resource_tree tree;
+ pjsip_expires_hdr *expires_header;
+ struct ast_sip_subscription_handler *handler;
+
+ /* If this subscription has already expired remove it */
+ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+ if (!endpoint) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ pj_pool_reset(pool);
+ rdata.tp_info.pool = pool;
+
+ if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+ persistence->transport_key, persistence->local_name, persistence->local_port)) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
+ resource_size = pj_strlen(&request_uri->user) + 1;
+ resource = alloca(resource_size);
+ ast_copy_pj_str(resource, &request_uri->user, resource_size);
+
+ /* Update the expiration header with the new expiration */
+ expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+ if (!expires_header) {
+ expires_header = pjsip_expires_hdr_create(pool, 0);
+ if (!expires_header) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+ pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+ }
+ expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+ handler = subscription_get_handler_from_rdata(&rdata);
+ if (!handler || !handler->notifier) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ generator = subscription_get_generator_from_rdata(&rdata, handler);
+ if (!generator) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+ memset(&tree, 0, sizeof(tree));
+ resp = build_resource_tree(endpoint, handler, resource, &tree);
+ if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+ sub = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
+ sub->persistence = ao2_bump(persistence);
+ subscription_persistence_update(sub, &rdata);
+ } else {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ }
+
+ return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+ struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+ "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ pj_pool_t *pool;
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+ PJSIP_POOL_RDATA_INC);
+ if (!pool) {
+ ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+ return 0;
+ }
+
+ ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+ ao2_ref(persisted_subscriptions, -1);
+ return 0;
+}
+
+/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
+static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct ast_json_payload *payload;
+ const char *type;
+
+ if (stasis_message_type(message) != ast_manager_get_generic_type()) {
+ return;
+ }
+
+ payload = stasis_message_data(message);
+ type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+
+ /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
+ * recreate SIP subscriptions.
+ */
+ if (strcmp(type, "FullyBooted")) {
+ return;
+ }
+
+ /* This has to be here so the subscription is recreated when the body generator is available */
+ ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+
+ /* Once the system is fully booted we don't care anymore */
+ stasis_unsubscribe(sub);
+}
+
+static void add_subscription(struct ast_sip_subscription *obj)
+{
+ SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
+ ast_module_ref(ast_module_info->self);
+}
+
+typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
+
+static int for_each_subscription(on_subscription_t on_subscription, void *arg)
+{
+ int num = 0;
+ struct ast_sip_subscription *i;
+ SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ if (!on_subscription) {
+ return num;
+ }
+
+ AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
+ if (on_subscription(i, arg)) {
+ break;
+ }
+ ++num;
+ }
+ return num;
+}
+
+static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
+ struct ast_str **buf)
+{
+ char str[256];
+ struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
+
+ ast_str_append(buf, 0, "Role: %s\r\n",
+ sip_subscription_roles_map[sub->role]);
+ ast_str_append(buf, 0, "Endpoint: %s\r\n",
+ ast_sorcery_object_get_id(sub->endpoint));
+
+ ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
+ ast_str_append(buf, 0, "Callid: %s\r\n", str);
+
+ ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
+ sip_subscription_get_evsub(sub)));
+
+ ast_callerid_merge(str, sizeof(str),
+ S_COR(id->self.name.valid, id->self.name.str, NULL),
+ S_COR(id->self.number.valid, id->self.number.str, NULL),
+ "Unknown");
+
+ ast_str_append(buf, 0, "Callerid: %s\r\n", str);
+
+ if (sub->handler->to_ami) {
+ sub->handler->to_ami(sub, buf);
+ }
}
@@ -868,34 +1273,6 @@
.on_server_timeout = pubsub_on_server_timeout,
};
-static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
- struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role)
-{
- struct ast_sip_subscription *sub;
-
- sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
- if (!sub) {
- return NULL;
- }
- strcpy(sub->resource, resource); /* Safe */
-
- sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
- if (!sub->datastores) {
- ao2_ref(sub, -1);
- return NULL;
- }
- sub->serializer = ast_sip_create_serializer();
- if (!sub->serializer) {
- ao2_ref(sub, -1);
- return NULL;
- }
- sub->role = role;
- sub->type = SIP_SUBSCRIPTION_REAL;
- sub->endpoint = ao2_bump(endpoint);
- sub->handler = handler;
-
- return sub;
-}
static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
{
@@ -908,7 +1285,7 @@
pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
}
-static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
+static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
struct ast_sip_pubsub_body_generator *generator)
{
@@ -916,7 +1293,7 @@
pjsip_dialog *dlg;
struct subscription_persistence *persistence;
- sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER);
+ sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_REAL);
if (!sub) {
return NULL;
}
@@ -972,7 +1349,7 @@
pjsip_tx_data *tdata;
pjsip_evsub *evsub;
- sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER);
+ sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER, SIP_SUBSCRIPTION_REAL);
if (!sub) {
return NULL;
}
@@ -1118,14 +1495,35 @@
return sub->resource;
}
+static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
+{
+ pjsip_require_hdr *require;
+
+ require = pjsip_require_hdr_create(pool);
+ pj_strdup2(pool, &require->values[0], "eventlist");
+ require->count = 1;
+
+ return require;
+}
+
static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
{
+ pjsip_hdr res_hdr;
+
+ ast_assert(sub->type == SIP_SUBSCRIPTION_REAL);
+
/* If this is a persistence recreation the subscription has already been accepted */
if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
return 0;
}
- return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
+ pj_list_init(&res_hdr);
+ if (!AST_LIST_EMPTY(&sub->children)) {
+ /* If subscribing to a list, our response has to have a Require: eventlist header in it */
+ pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
+ }
+
+ return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
}
static void subscription_datastore_destroy(void *obj)
@@ -1411,6 +1809,7 @@
pjsip_sip_uri *request_uri_sip;
size_t resource_size;
int resp;
+ struct resource_tree tree;
endpoint = ast_pjsip_rdata_get_endpoint(rdata);
ast_assert(endpoint != NULL);
@@ -1466,24 +1865,31 @@
return PJ_TRUE;
}
- resp = handler->notifier->new_subscribe(endpoint, resource);
+ memset(&tree, 0, sizeof(tree));
+ resp = build_resource_tree(endpoint, handler, resource, &tree);
if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
+ resource_tree_destroy(&tree);
return PJ_TRUE;
}
- sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
+ sub = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree);
if (!sub) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
} else {
sub->persistence = subscription_persistence_create(sub);
subscription_persistence_update(sub, rdata);
sip_subscription_accept(sub, rdata, resp);
+ /* XXX Currently this is calling directly into the event handler instead of traversing
+ * the subscription tree. For single resource subscriptions, this is fine, but this will
+ * malfunction for lists. This will be handled with ASTERISK-23869
+ */
if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) {
pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE);
}
}
+ resource_tree_destroy(&tree);
return PJ_TRUE;
}
@@ -1953,8 +2359,16 @@
} else {
reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED;
}
+ /* XXX Currently this is calling directly into the event handler instead of traversing
+ * the subscription tree. For single resource subscriptions, this is fine, but this will
+ * malfunction for lists. This will be handled with ASTERISK-23869
+ */
if (sub->handler->notifier->notify_required(sub, reason)) {
*p_st_code = 500;
+ }
+
+ if (!AST_LIST_EMPTY(&sub->children)) {
+ pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
}
}
@@ -2001,6 +2415,10 @@
{
struct ast_sip_subscription *sub = userdata;
+ /* XXX Currently this is calling directly into the event handler instead of traversing
+ * the subscription tree. For single resource subscriptions, this is fine, but this will
+ * malfunction for lists. This will be handled with ASTERISK-23869
+ */
sub->handler->notifier->notify_required(sub,
AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED);
@@ -2309,6 +2727,580 @@
return 0;
}
+
+#ifdef TEST_FRAMEWORK
+
+/*!
+ * \brief "bad" resources
+ *
+ * These are resources that the test handler will reject subscriptions to.
+ */
+const char *bad_resources[] = {
+ "coconut",
+ "cilantro",
+ "olive",
+ "cheese",
+};
+
+/*!
+ * \brief new_subscribe callback for unit tests
+ *
+ * Will give a 200 OK response to any resource except the "bad" ones.
+ */
+static int test_new_subscribe(struct ast_sip_endpoint *endpoint, const char *resource)
+{
+ int i;
+
+ for (i = 0; i < ARRAY_LEN(bad_resources); ++i) {
+ if (!strcmp(resource, bad_resources[i])) {
+ return 400;
+ }
+ }
+
+ return 200;
+}
+
+/*!
+ * \brief Subscription notifier for unit tests.
+ *
+ * Since unit tests are only concerned with building a resource tree,
+ * only the new_subscribe callback needs to be defined.
+ */
+struct ast_sip_notifier test_notifier = {
+ .new_subscribe = test_new_subscribe,
+};
+
+/*!
+ * \brief Subscription handler for unit tests.
+ */
+struct ast_sip_subscription_handler test_handler = {
+ .event_name = "test",
+ .notifier = &test_notifier,
+};
+
+/*!
+ * \brief Set properties on an allocated resource list
+ *
+ * \param list The list to set details on.
+ * \param event The list's event.
+ * \param resources Array of resources to add to the list.
+ * \param num_resources Number of resources in the array.
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+static int populate_list(struct resource_list *list, const char *event, const char **resources, size_t num_resources)
+{
+ int i;
+
+ ast_copy_string(list->event, event, sizeof(list->event));
+
+ for (i = 0; i < num_resources; ++i) {
+ if (AST_VECTOR_APPEND(&list->items, ast_strdup(resources[i]))) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/*!
+ * \brief RAII callback to destroy a resource list
+ */
+static void cleanup_resource_list(struct resource_list *list)
+{
+ if (!list) {
+ return;
+ }
+
+ ast_sorcery_delete(ast_sip_get_sorcery(), list);
+ ao2_cleanup(list);
+}
+
+/*!
+ * \brief allocate a resource list, store it in sorcery, and set its details
+ *
+ * \param test The unit test. Used for logging status messages.
+ * \param list_name The name of the list to create.
+ * \param event The event the list services
+ * \param resources Array of resources to apply to the list
+ * \param num_resources The number of resources in the array
+ * \retval NULL Failed to allocate or populate list
[... 511 lines stripped ...]
More information about the asterisk-commits
mailing list