[svn-commits] mmichelson: branch mmichelson/rls-notify r418162 - /team/mmichelson/rls-notif...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Jul 7 18:36:40 CDT 2014


Author: mmichelson
Date: Mon Jul  7 18:36:35 2014
New Revision: 418162

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=418162
Log:
Refactor the ast_sip_subscription into two separate objects.

I was going through an exercise where I was attempting to separate
the "real" and "virtual" parts of ast_sip_subscriptions more and
I realized that with only an exception or two, everything in the
structure was mutually exclusive to being in either the
real union member or virtual union member. I had a mini
"god object" on my hands. I have therefore separated the
ast_sip_subscription struct into two separate structures

ast_sip_subscription: This is the opaque forward-declared structure
in res_pjsip_pubsub that subscription handlers deal with. These are
essentially "virtual" subscriptions in that they have resource-specific
and handler-specific data on them only. No details about the actual
subscription are present.

sip_subscription_tree: This is the base on which ast_sip_subscriptions
stem. This contains information about the overarching subscription.
Within res_pjsip_pubsub, this structure is used much more often than
ast_sip_subscription.

This change is a breath of fresh air, because it eliminates a lot of
the conditional checking for whether the subscription was real or
virtual. Now, since they are separate types, the usage is much less
brittle than previous.

There are still checks in place to see if ast_sip_subscriptions are
lists or leaves. I'd like to eliminate that if possible, but I'm
not sure if I'll be able to do that.


Modified:
    team/mmichelson/rls-notify/res/res_pjsip_pubsub.c

Modified: team/mmichelson/rls-notify/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-notify/res/res_pjsip_pubsub.c?view=diff&rev=418162&r1=418161&r2=418162
==============================================================================
--- team/mmichelson/rls-notify/res/res_pjsip_pubsub.c (original)
+++ team/mmichelson/rls-notify/res/res_pjsip_pubsub.c Mon Jul  7 18:36:35 2014
@@ -348,13 +348,15 @@
 	struct timeval expires;
 };
 
-/*!
- * \brief Real subscription details
- *
- * A real subscription is one that has a direct link to a
- * PJSIP subscription and dialog.
- */
-struct ast_sip_real_subscription {
+struct sip_subscription_tree {
+	/*! The endpoint with which the subscription is communicating */
+	struct ast_sip_endpoint *endpoint;
+	/*! Serializer on which to place operations for this subscription */
+	struct ast_taskprocessor *serializer;
+	/*! The role for this subscription */
+	enum ast_sip_subscription_role role;
+	/*! Persistence information */
+	struct subscription_persistence *persistence;
 	/*! The underlying PJSIP event subscription structure */
 	pjsip_evsub *evsub;
 	/*! The underlying PJSIP dialog */
@@ -365,42 +367,12 @@
 	int notify_sched_id;
 	/*! Indicator if scheduled batched notification should be sent */
 	unsigned int send_scheduled_notify;
-};
-
-/*!
- * \brief Virtual subscription details
- *
- * A virtual subscription is one that does not have a direct
- * link to a PJSIP subscription. Instead, it is a descendent
- * of an ast_sip_subscription. Following the ancestry will
- * eventually lead to a real subscription.
- */
-struct ast_sip_virtual_subscription {
-	struct ast_sip_subscription *parent;
-};
-
-/*!
- * \brief Discriminator between real and virtual subscriptions
- */
-enum sip_subscription_type {
-	/*!
-	 * \brief a "real" subscription.
-	 *
-	 * Real subscriptions are at the root of a tree of subscriptions.
-	 * A real subscription has a corresponding SIP subscription in the
-	 * PJSIP stack.
-	 */
-	SIP_SUBSCRIPTION_REAL,
-	/*!
-	 * \brief a "virtual" subscription.
-	 *
-	 * Virtual subscriptions are the descendents of real subscriptions
-	 * in a tree of subscriptions. Virtual subscriptions do not have
-	 * a corresponding SIP subscription in the PJSIP stack. Instead,
-	 * when a state change happens on a virtual subscription, the
-	 * state change is indicated to the virtual subscription's parent.
-	 */
-	SIP_SUBSCRIPTION_VIRTUAL,
+	/*! The root of the subscription tree */
+	struct ast_sip_subscription *root;
+	/*! Is this subscription to a list? */
+	int is_list;
+	/*! Next item in the list */
+	AST_LIST_ENTRY(sip_subscription_tree) next;
 };
 
 /*!
@@ -409,29 +381,14 @@
 struct ast_sip_subscription {
 	/*! Subscription datastores set up by handlers */
 	struct ao2_container *datastores;
-	/*! The endpoint with which the subscription is communicating */
-	struct ast_sip_endpoint *endpoint;
-	/*! Serializer on which to place operations for this subscription */
-	struct ast_taskprocessor *serializer;
 	/*! The handler for this subscription */
 	const struct ast_sip_subscription_handler *handler;
-	/*! The role for this subscription */
-	enum ast_sip_subscription_role role;
-	/*! Indicator of real or virtual subscription */
-	enum sip_subscription_type type;
-	/*! Real and virtual components of the subscription */
-	union {
-		struct ast_sip_real_subscription real;
-		struct ast_sip_virtual_subscription virtual;
-	} reality;
+	/*! Pointer to the base of the tree */
+	struct sip_subscription_tree *tree;
 	/*! Body generaator for NOTIFYs */
 	struct ast_sip_pubsub_body_generator *body_generator;
-	/*! Persistence information */
-	struct subscription_persistence *persistence;
-	/*! Next item in the list */
-	AST_LIST_ENTRY(ast_sip_subscription) next;
-	/*! List of child subscriptions */
-	AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children;
+	/*! Vector of child subscriptions */
+	AST_VECTOR(, struct ast_sip_subscription *) children;
 	/*! Saved NOTIFY body text for this subscription */
 	struct ast_str *body_text;
 	/*! Indicator that the body text has changed since the last notification */
@@ -447,28 +404,27 @@
 	[AST_SIP_NOTIFIER] = "Notifier"
 };
 
-AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
+AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
 
 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
 
-static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
-{
-	if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
-		return sip_subscription_get_evsub(sub->reality.virtual.parent);
-	} else {
-		return sub->reality.real.evsub;
-	}
-}
-
-static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
-{
-	if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
-		return sip_subscription_get_dlg(sub->reality.virtual.parent);
-	} else {
-		return sub->reality.real.dlg;
-	}
-}
+static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
+static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
+		int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
+		pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_client_refresh(pjsip_evsub *sub);
+static void pubsub_on_server_timeout(pjsip_evsub *sub);
+
+
+static pjsip_evsub_user pubsub_cb = {
+	.on_evsub_state = pubsub_on_evsub_state,
+	.on_rx_refresh = pubsub_on_rx_refresh,
+	.on_rx_notify = pubsub_on_rx_notify,
+	.on_client_refresh = pubsub_on_client_refresh,
+	.on_server_timeout = pubsub_on_server_timeout,
+};
 
 /*! \brief Destructor for subscription persistence */
 static void subscription_persistence_destroy(void *obj)
@@ -486,7 +442,7 @@
 }
 
 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
-static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
+static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
 {
 	char tag[PJ_GUID_STRING_LENGTH + 1];
 
@@ -496,13 +452,13 @@
 	struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
 		"subscription_persistence", NULL);
 
-	pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
+	pjsip_dialog *dlg = sub_tree->dlg;
 
 	if (!persistence) {
 		return NULL;
 	}
 
-	persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
+	persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
 	ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
 	persistence->tag = ast_strdup(tag);
 
@@ -511,47 +467,49 @@
 }
 
 /*! \brief Function which updates persistence information of a subscription in sorcery */
-static void subscription_persistence_update(struct ast_sip_subscription *sub,
+static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
 	pjsip_rx_data *rdata)
 {
 	pjsip_dialog *dlg;
 
-	if (!sub->persistence) {
+	if (!sub_tree->persistence) {
 		return;
 	}
 
-	dlg = sip_subscription_get_dlg(sub);
-	sub->persistence->cseq = dlg->local.cseq;
+	dlg = sub_tree->dlg;
+	sub_tree->persistence->cseq = dlg->local.cseq;
 
 	if (rdata) {
 		int expires;
 		pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
 
 		expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
-		sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
-
-		ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
-		ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
-		sub->persistence->src_port = rdata->pkt_info.src_port;
-		ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
-			sizeof(sub->persistence->transport_key));
-		ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
-			sizeof(sub->persistence->local_name));
-		sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
-	}
-
-	ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
+		sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
+
+		ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
+				sizeof(sub_tree->persistence->packet));
+		ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
+				sizeof(sub_tree->persistence->src_name));
+		sub_tree->persistence->src_port = rdata->pkt_info.src_port;
+		ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
+			sizeof(sub_tree->persistence->transport_key));
+		ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
+			sizeof(sub_tree->persistence->local_name));
+		sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
+	}
+
+	ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
 }
 
 /*! \brief Function which removes persistence of a subscription from sorcery */
-static void subscription_persistence_remove(struct ast_sip_subscription *sub)
-{
-	if (!sub->persistence) {
+static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
+{
+	if (!sub_tree->persistence) {
 		return;
 	}
 
-	ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
-	ao2_ref(sub->persistence, -1);
+	ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
+	ao2_ref(sub_tree->persistence, -1);
 }
 
 
@@ -647,10 +605,6 @@
 }
 
 struct resource_tree;
-
-static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
-		struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree);
 
 /*!
  * \brief A node for a resource tree.
@@ -944,7 +898,7 @@
 
 static int subscription_remove_serializer(void *obj)
 {
-	struct ast_sip_subscription *sub = obj;
+	struct sip_subscription_tree *sub_tree = obj;
 
 	/* This is why we keep the dialog on the subscription. When the subscription
 	 * is destroyed, there is no guarantee that the underlying dialog is ready
@@ -955,21 +909,28 @@
 	 * subscription is destroyed so that we can guarantee that our attempt to
 	 * remove the serializer will be successful.
 	 */
-	ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
-	pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
-
-	return 0;
-}
-
-static void remove_subscription(struct ast_sip_subscription *obj)
-{
-	struct ast_sip_subscription *i;
+	ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
+	pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
+
+	return 0;
+}
+
+static void add_subscription(struct sip_subscription_tree *obj)
+{
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
+	ast_module_ref(ast_module_info->self);
+}
+
+static void remove_subscription(struct sip_subscription_tree *obj)
+{
+	struct sip_subscription_tree *i;
 	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
 	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
 		if (i == obj) {
 			AST_RWLIST_REMOVE_CURRENT(next);
 			ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
-					ast_sip_subscription_get_resource_name(i));
+					ast_sip_subscription_get_resource_name(i->root));
 			ast_module_unref(ast_module_info->self);
 			break;
 		}
@@ -983,22 +944,11 @@
 
 	ast_debug(3, "Destroying SIP subscription\n");
 
-	subscription_persistence_remove(sub);
-
-	remove_subscription(sub);
-
 	ao2_cleanup(sub->datastores);
-	ao2_cleanup(sub->endpoint);
-
-	if (sip_subscription_get_dlg(sub)) {
-		ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
-	}
-	ast_taskprocessor_unreference(sub->serializer);
 }
 
 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
-		enum sip_subscription_type type)
+		const char *resource, struct sip_subscription_tree *tree)
 {
 	struct ast_sip_subscription *sub;
 
@@ -1007,6 +957,12 @@
 		return NULL;
 	}
 	strcpy(sub->resource, resource); /* Safe */
+	
+	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+	if (!sub->datastores) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
 
 	sub->body_text = ast_str_create(128);
 	if (!sub->body_text) {
@@ -1014,23 +970,9 @@
 		return NULL;
 	}
 
-	sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
-	if (!sub->datastores) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-
-	sub->serializer = ast_sip_create_serializer();
-	if (!sub->serializer) {
-		ao2_ref(sub, -1);
-		return NULL;
-	}
-
-	sub->role = role;
-	sub->type = type;
-	sub->endpoint = ao2_bump(endpoint);
 	sub->handler = handler;
 	sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
+	sub->tree = tree;
 
 	return sub;
 }
@@ -1046,27 +988,60 @@
  * \param parent The subscription (real or virtual) that is parent to the subscriptions created here.
  * \param parent_resource The tree node that corresponds to the parent subscription.
  */
-static void create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
-		struct ast_sip_pubsub_body_generator *generator, struct ast_sip_subscription *parent,
-		struct tree_node *parent_resource)
+static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
+		const char *resource, struct ast_sip_pubsub_body_generator *generator,
+		struct sip_subscription_tree *tree, struct tree_node *current)
 {
 	int i;
-
-	for (i = 0; i < AST_VECTOR_SIZE(&parent_resource->children); ++i) {
-		struct ast_sip_subscription *sub;
-
-		sub = allocate_subscription(handler, endpoint, resource,
-				AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_VIRTUAL);
-		if (!sub) {
+	struct ast_sip_subscription *sub;
+
+	sub = allocate_subscription(handler, resource, tree);
+	if (!sub) {
+		return NULL;
+	}
+
+	sub->body_generator = generator;
+
+	for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
+		struct ast_sip_subscription *child;
+
+		child = create_virtual_subscriptions(handler, resource, generator,
+				tree, AST_VECTOR_GET(&current->children, i));
+
+		if (!child) {
 			continue;
 		}
-		sub->body_generator = generator;
-		sub->reality.virtual.parent = parent;
-
-		create_virtual_subscriptions(handler, endpoint, rdata, resource,
-				generator, sub, AST_VECTOR_GET(&parent_resource->children, i));
-	}
+
+		AST_VECTOR_APPEND(&sub->children, child);
+	}
+
+	return sub;
+}
+
+static void subscription_tree_destructor(void *obj)
+{
+	struct sip_subscription_tree *sub_tree = obj;
+
+	subscription_persistence_remove(sub_tree);
+	ao2_cleanup(sub_tree->endpoint);
+
+	if (sub_tree->dlg) {
+		ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub_tree);
+	}
+
+	ast_taskprocessor_unreference(sub_tree->serializer);
+	remove_subscription(sub_tree);
+}
+
+static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
+{
+	/* We keep a reference to the dialog until our subscription is destroyed. See
+	 * the subscription_destructor for more details
+	 */
+	pjsip_dlg_inc_session(dlg, &pubsub_module);
+	sub_tree->dlg = dlg;
+	ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
+	pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
 }
 
 /*!
@@ -1087,264 +1062,30 @@
  * \retval NULL Could not create the subscription tree
  * \retval non-NULL The root of the created subscription tree
  */
-static struct ast_sip_subscription *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
+
+static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
 		struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
 {
-	struct ast_sip_subscription *sub;
-
-	/* Start by creating the root subscription. It's the only real subscription. */
-	sub = create_real_subscription(handler, endpoint, rdata, resource, generator, tree);
-	if (!sub) {
-		return NULL;
-	}
-
-	create_virtual_subscriptions(handler, endpoint, rdata, resource, generator, sub, tree->root);
-	return sub;
-}
-
-
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
-{
-	struct subscription_persistence *persistence = obj;
-	pj_pool_t *pool = arg;
-	pjsip_rx_data rdata = { { 0, }, };
-	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
-	struct ast_sip_subscription *sub;
-	struct ast_sip_pubsub_body_generator *generator;
-	int resp;
-	char *resource;
-	size_t resource_size;
-	pjsip_sip_uri *request_uri;
-	struct resource_tree tree;
-	pjsip_expires_hdr *expires_header;
-	struct ast_sip_subscription_handler *handler;
-
-	/* If this subscription has already expired remove it */
-	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
-	if (!endpoint) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	pj_pool_reset(pool);
-	rdata.tp_info.pool = pool;
-
-	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
-		persistence->transport_key, persistence->local_name, persistence->local_port)) {
-		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
-			persistence->endpoint);
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
-	resource_size = pj_strlen(&request_uri->user) + 1;
-	resource = alloca(resource_size);
-	ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
-	/* Update the expiration header with the new expiration */
-	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
-	if (!expires_header) {
-		expires_header = pjsip_expires_hdr_create(pool, 0);
-		if (!expires_header) {
-			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-			return 0;
-		}
-		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
-	}
-	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
-	handler = subscription_get_handler_from_rdata(&rdata);
-	if (!handler || !handler->notifier) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	generator = subscription_get_generator_from_rdata(&rdata, handler);
-	if (!generator) {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-		return 0;
-	}
-
-	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
-			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
-	memset(&tree, 0, sizeof(tree));
-	resp = build_resource_tree(endpoint, handler, resource, &tree);
-	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
-		sub = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
-		sub->persistence = ao2_bump(persistence);
-		subscription_persistence_update(sub, &rdata);
-	} else {
-		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-	}
-
-	return 0;
-}
-
-/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
-static int subscription_persistence_load(void *data)
-{
-	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
-		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
-	pj_pool_t *pool;
-
-	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
-		PJSIP_POOL_RDATA_INC);
-	if (!pool) {
-		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
-		return 0;
-	}
-
-	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
-
-	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
-	ao2_ref(persisted_subscriptions, -1);
-	return 0;
-}
-
-/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
-static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
-{
-	struct ast_json_payload *payload;
-	const char *type;
-
-	if (stasis_message_type(message) != ast_manager_get_generic_type()) {
-		return;
-	}
-
-	payload = stasis_message_data(message);
-	type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
-
-	/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
-	 * recreate SIP subscriptions.
-	 */
-	if (strcmp(type, "FullyBooted")) {
-		return;
-	}
-
-	/* This has to be here so the subscription is recreated when the body generator is available */
-	ast_sip_push_task(NULL, subscription_persistence_load, NULL);
-
-	/* Once the system is fully booted we don't care anymore */
-	stasis_unsubscribe(sub);
-}
-
-static void add_subscription(struct ast_sip_subscription *obj)
-{
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
-	AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
-	ast_module_ref(ast_module_info->self);
-}
-
-typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
-
-static int for_each_subscription(on_subscription_t on_subscription, void *arg)
-{
-	int num = 0;
-	struct ast_sip_subscription *i;
-	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
-	if (!on_subscription) {
-		return num;
-	}
-
-	AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
-		if (on_subscription(i, arg)) {
-			break;
-		}
-		++num;
-	}
-	return num;
-}
-
-static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
-				    struct ast_str **buf)
-{
-	char str[256];
-	struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
-
-	ast_str_append(buf, 0, "Role: %s\r\n",
-		       sip_subscription_roles_map[sub->role]);
-	ast_str_append(buf, 0, "Endpoint: %s\r\n",
-		       ast_sorcery_object_get_id(sub->endpoint));
-
-	ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
-	ast_str_append(buf, 0, "Callid: %s\r\n", str);
-
-	ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
-			       sip_subscription_get_evsub(sub)));
-
-	ast_callerid_merge(str, sizeof(str),
-			   S_COR(id->self.name.valid, id->self.name.str, NULL),
-			   S_COR(id->self.number.valid, id->self.number.str, NULL),
-			   "Unknown");
-
-	ast_str_append(buf, 0, "Callerid: %s\r\n", str);
-
-	if (sub->handler->to_ami) {
-		sub->handler->to_ami(sub, buf);
-	}
-}
-
-
-static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
-static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
-		int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
-static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
-		pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
-static void pubsub_on_client_refresh(pjsip_evsub *sub);
-static void pubsub_on_server_timeout(pjsip_evsub *sub);
-
-
-static pjsip_evsub_user pubsub_cb = {
-	.on_evsub_state = pubsub_on_evsub_state,
-	.on_rx_refresh = pubsub_on_rx_refresh,
-	.on_rx_notify = pubsub_on_rx_notify,
-	.on_client_refresh = pubsub_on_client_refresh,
-	.on_server_timeout = pubsub_on_server_timeout,
-};
-
-
-static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
-{
-	/* We keep a reference to the dialog until our subscription is destroyed. See
-	 * the subscription_destructor for more details
-	 */
-	pjsip_dlg_inc_session(dlg, &pubsub_module);
-	sub->reality.real.dlg = dlg;
-	ast_sip_dialog_set_serializer(dlg, sub->serializer);
-	pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
-}
-
-static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
-		struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
-		struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
-{
-	struct ast_sip_subscription *sub;
+	struct sip_subscription_tree *sub_tree;
 	pjsip_dialog *dlg;
 	struct subscription_persistence *persistence;
 
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_REAL);
-	if (!sub) {
+	sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
+	if (!sub_tree) {
 		return NULL;
 	}
 
-	sub->body_generator = generator;
+	sub_tree->serializer = ast_sip_create_serializer();
+	if (!sub_tree->serializer) {
+		ao2_ref(sub_tree, -1);
+		return NULL;
+	}
+
 	dlg = ast_sip_create_dialog_uas(endpoint, rdata);
 	if (!dlg) {
 		ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
-		ao2_ref(sub, -1);
+		ao2_ref(sub_tree, -1);
 		return NULL;
 	}
 
@@ -1360,20 +1101,215 @@
 		dlg->remote.cseq = persistence->cseq;
 	}
 
-	pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub);
-	subscription_setup_dialog(sub, dlg);
+	pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
+	subscription_setup_dialog(sub_tree, dlg);
 
 	ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
 			pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
-	sub->reality.real.notification_batch_interval = tree->notification_batch_interval;
-
-	add_subscription(sub);
-	return sub;
-}
+
+	sub_tree->endpoint = ao2_bump(endpoint);
+	sub_tree->notification_batch_interval = tree->notification_batch_interval;
+
+	sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
+	if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
+		sub_tree->is_list = 1;
+	}
+
+	add_subscription(sub_tree);
+
+	return sub_tree;
+}
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+	struct subscription_persistence *persistence = obj;
+	pj_pool_t *pool = arg;
+	pjsip_rx_data rdata = { { 0, }, };
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+	struct sip_subscription_tree *sub_tree;
+	struct ast_sip_pubsub_body_generator *generator;
+	int resp;
+	char *resource;
+	size_t resource_size;
+	pjsip_sip_uri *request_uri;
+	struct resource_tree tree;
+	pjsip_expires_hdr *expires_header;
+	struct ast_sip_subscription_handler *handler;
+
+	/* If this subscription has already expired remove it */
+	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+	if (!endpoint) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	pj_pool_reset(pool);
+	rdata.tp_info.pool = pool;
+
+	if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+		persistence->transport_key, persistence->local_name, persistence->local_port)) {
+		ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+			persistence->endpoint);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
+	resource_size = pj_strlen(&request_uri->user) + 1;
+	resource = alloca(resource_size);
+	ast_copy_pj_str(resource, &request_uri->user, resource_size);
+
+	/* Update the expiration header with the new expiration */
+	expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+	if (!expires_header) {
+		expires_header = pjsip_expires_hdr_create(pool, 0);
+		if (!expires_header) {
+			ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+			return 0;
+		}
+		pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+	}
+	expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+	handler = subscription_get_handler_from_rdata(&rdata);
+	if (!handler || !handler->notifier) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	generator = subscription_get_generator_from_rdata(&rdata, handler);
+	if (!generator) {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
+
+	ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+			pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+	memset(&tree, 0, sizeof(tree));
+	resp = build_resource_tree(endpoint, handler, resource, &tree);
+	if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
+		sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
+		sub_tree->persistence = ao2_bump(persistence);
+		subscription_persistence_update(sub_tree, &rdata);
+	} else {
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+	}
+
+	return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+	struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+		"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+	pj_pool_t *pool;
+
+	pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+		PJSIP_POOL_RDATA_INC);
+	if (!pool) {
+		ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+		return 0;
+	}
+
+	ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+	pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+	ao2_ref(persisted_subscriptions, -1);
+	return 0;
+}
+
+/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
+static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	struct ast_json_payload *payload;
+	const char *type;
+
+	if (stasis_message_type(message) != ast_manager_get_generic_type()) {
+		return;
+	}
+
+	payload = stasis_message_data(message);
+	type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+
+	/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
+	 * recreate SIP subscriptions.
+	 */
+	if (strcmp(type, "FullyBooted")) {
+		return;
+	}
+
+	/* This has to be here so the subscription is recreated when the body generator is available */
+	ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+
+	/* Once the system is fully booted we don't care anymore */
+	stasis_unsubscribe(sub);
+}
+
+typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
+
+static int for_each_subscription(on_subscription_t on_subscription, void *arg)
+{
+	int num = 0;
+	struct sip_subscription_tree *i;
+	SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+	if (!on_subscription) {
+		return num;
+	}
+
+	AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
+		if (on_subscription(i, arg)) {
+			break;
+		}
+		++num;
+	}
+	return num;
+}
+
+static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
+				    struct ast_str **buf)
+{
+	char str[256];
+	struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
+
+	ast_str_append(buf, 0, "Role: %s\r\n",
+		       sip_subscription_roles_map[sub_tree->role]);
+	ast_str_append(buf, 0, "Endpoint: %s\r\n",
+		       ast_sorcery_object_get_id(sub_tree->endpoint));
+
+	ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
+	ast_str_append(buf, 0, "Callid: %s\r\n", str);
+
+	ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
+
+	ast_callerid_merge(str, sizeof(str),
+			   S_COR(id->self.name.valid, id->self.name.str, NULL),
+			   S_COR(id->self.number.valid, id->self.number.str, NULL),
+			   "Unknown");
+
+	ast_str_append(buf, 0, "Callerid: %s\r\n", str);
+
+	/* XXX This needs to be done recursively for lists */
+	if (sub_tree->root->handler->to_ami) {
+		sub_tree->root->handler->to_ami(sub_tree->root, buf);
+	}
+}
+
 
 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
 {
-	pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
+	pjsip_dialog *dlg = sub->tree->dlg;
 	pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
 	pj_str_t name;
 
@@ -1391,8 +1327,10 @@
 	pj_str_t event;
 	pjsip_tx_data *tdata;
 	pjsip_evsub *evsub;
-
-	sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER, SIP_SUBSCRIPTION_REAL);
+	struct sip_subscription_tree *sub_tree = NULL;
+
+	/* XXX Need allocation function for the sub_tree here */
+	sub = allocate_subscription(handler, resource, sub_tree);
 	if (!sub) {
 		return NULL;
 	}
@@ -1415,12 +1353,12 @@
 	}
 
 	pj_cstr(&event, handler->event_name);
-	pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub);
-	subscription_setup_dialog(sub, dlg);
-
-	add_subscription(sub);
-
-	evsub = sip_subscription_get_evsub(sub);
+	pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
+	subscription_setup_dialog(sub_tree, dlg);
+
+	add_subscription(sub_tree);
+
+	evsub = sub_tree->evsub;
 
 	if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
 		pjsip_evsub_send_request(evsub, tdata);
@@ -1438,35 +1376,34 @@
 
 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
 {
-	ast_assert(sub->endpoint != NULL);
-	ao2_ref(sub->endpoint, +1);
-	return sub->endpoint;
+	ast_assert(sub->tree->endpoint != NULL);
+	return ao2_bump(sub->tree->endpoint);
 }
 
 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
 {
-	ast_assert(sub->serializer != NULL);
-	return sub->serializer;
-}
-
-static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
-{
-	struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
+	ast_assert(sub->tree->serializer != NULL);
+	return sub->tree->serializer;
+}
+
+static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
+{
+#ifdef TEST_FRAMEWORK
+	struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
+#endif
 	int res;
 
-	ao2_ref(sub, +1);
-	res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub),
-			tdata) == PJ_SUCCESS ? 0 : -1;
-
-	subscription_persistence_update(sub, NULL);
+	ao2_ref(sub_tree, +1);
+	res = pjsip_evsub_send_request(sub_tree->evsub, tdata) == PJ_SUCCESS ? 0 : -1;
+
+	subscription_persistence_update(sub_tree, NULL);
 
 	ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
 		"StateText: %s\r\n"
 		"Endpoint: %s\r\n",
-		pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)),
+		pjsip_evsub_get_state_name(sub_tree->evsub),
 		ast_sorcery_object_get_id(endpoint));
-	ao2_cleanup(sub);
-	ao2_cleanup(endpoint);
+	ao2_cleanup(sub_tree);
 
 	return res;
 }
@@ -1485,7 +1422,7 @@
 
 static int generate_notify_body(struct ast_sip_subscription *root, struct ast_str **body_text)
 {
-	if (AST_LIST_EMPTY(&root->children)) {
+	if (AST_VECTOR_SIZE(&root->children) > 0) {
 		/* Not a list. We've already generated the body and saved it on the subscription.
 		 * Use that directly.
 		 */
@@ -1497,14 +1434,13 @@
 	return generate_list_body(root, body_text);
 }
 
-static int send_notify(struct ast_sip_subscription *sub)
-{
-	pjsip_evsub *evsub = sip_subscription_get_evsub(sub);
+static int send_notify(struct sip_subscription_tree *sub_tree)
+{
+	pjsip_evsub *evsub = sub_tree->evsub;
 	pjsip_tx_data *tdata;
 	struct ast_sip_body body = {
-		.type = ast_sip_subscription_get_body_type(sub),
-		.subtype = ast_sip_subscription_get_body_subtype(sub),
-		.body_text = ast_str_buffer(sub->body_text),
+		.type = ast_sip_subscription_get_body_type(sub_tree->root),
+		.subtype = ast_sip_subscription_get_body_subtype(sub_tree->root),
 	};
 	struct ast_str *body_text;
 
@@ -1513,12 +1449,13 @@
 		return -1;
 	}
 
-	if (pjsip_evsub_notify(evsub, sub->subscription_state, NULL, NULL, &tdata) != PJ_SUCCESS) {
+	if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
+				NULL, NULL, &tdata) != PJ_SUCCESS) {
 		ast_free(body_text);
 		return -1;
 	}
 
-	if (generate_notify_body(sub, &body_text)) {
+	if (generate_notify_body(sub_tree->root, &body_text)) {
 		pjsip_tx_data_dec_ref(tdata);
 		ast_free(body_text);
 		return -1;
@@ -1532,13 +1469,13 @@
 		return -1;
 	}
 
-	if (sip_subscription_send_request(sub, tdata)) {
+	if (sip_subscription_send_request(sub_tree, tdata)) {
 		pjsip_tx_data_dec_ref(tdata);
 		ast_free(body_text);
 		return -1;
 	}
 
-	sub->reality.real.send_scheduled_notify = 0;
+	sub_tree->send_scheduled_notify = 0;
 	ast_free(body_text);
 	
 	return 0;
@@ -1546,41 +1483,44 @@
 
 static int serialized_send_notify(void *userdata)
 {
-	struct ast_sip_subscription *sub = userdata;
+	struct sip_subscription_tree *sub_tree = userdata;
 
 	/* It's possible that between when the notification was scheduled

[... 410 lines stripped ...]



More information about the svn-commits mailing list