[svn-commits] mmichelson: branch mmichelson/rls-notify r420053 - in /team/mmichelson/rls-no...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Tue Aug 5 13:02:05 CDT 2014
Author: mmichelson
Date: Tue Aug 5 13:02:02 2014
New Revision: 420053
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=420053
Log:
Commit changes that were in the old rls-notify branch.
These changes are the ones from reviewboard /r/3723.
Next comes actually addressing the findings there.
Modified:
team/mmichelson/rls-notify/include/asterisk/res_pjsip_pubsub.h
team/mmichelson/rls-notify/res/res_pjsip_exten_state.c
team/mmichelson/rls-notify/res/res_pjsip_mwi.c
team/mmichelson/rls-notify/res/res_pjsip_pubsub.c
Modified: team/mmichelson/rls-notify/include/asterisk/res_pjsip_pubsub.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-notify/include/asterisk/res_pjsip_pubsub.h?view=diff&rev=420053&r1=420052&r2=420053
==============================================================================
--- team/mmichelson/rls-notify/include/asterisk/res_pjsip_pubsub.h (original)
+++ team/mmichelson/rls-notify/include/asterisk/res_pjsip_pubsub.h Tue Aug 5 13:02:02 2014
@@ -223,23 +223,29 @@
*/
int (*new_subscribe)(struct ast_sip_endpoint *endpoint, const char *resource);
/*!
- * \brief The subscription is in need of a NOTIFY request.
- *
- * A reason of AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED is given immediately
- * after a SUBSCRIBE is accepted. This is a good opportunity for the notifier to
- * perform setup duties such as establishing Stasis subscriptions or adding
- * datastores to the subscription.
- *
- * A reason of AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED is given when the
- * subscriber has terminated the subscription. If there are any duties that the
- *
- *
- * \param sub The subscription to send the NOTIFY on.
- * \param reason The reason why the NOTIFY is being sent.
+ * \brief Called when an inbound subscription has been accepted.
+ *
+ * This is a prime opportunity for notifiers to add any notifier-specific
+ * data to the subscription (such as datastores) that it needs to.
+ *
+ * \note There is no need to send a NOTIFY request when this callback
+ * is called
+ *
+ * \param sub The new subscription
* \retval 0 Success
* \retval -1 Failure
*/
- int (*notify_required)(struct ast_sip_subscription *sub, enum ast_sip_subscription_notify_reason reason);
+ int (*subscription_established)(struct ast_sip_subscription *sub);
+ /*!
+ * \brief Supply data needed to create a NOTIFY body.
+ *
+ * The returned data must be an ao2 object. The caller of this function
+ * will be responsible for decrementing the refcount of the returned object
+ *
+ * \param sub The subscription
+ * \return An ao2 object that can be used to create a NOTIFY body.
+ */
+ void *(*get_notify_data)(struct ast_sip_subscription *sub);
};
struct ast_sip_subscriber {
Modified: team/mmichelson/rls-notify/res/res_pjsip_exten_state.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-notify/res/res_pjsip_exten_state.c?view=diff&rev=420053&r1=420052&r2=420053
==============================================================================
--- team/mmichelson/rls-notify/res/res_pjsip_exten_state.c (original)
+++ team/mmichelson/rls-notify/res/res_pjsip_exten_state.c Tue Aug 5 13:02:02 2014
@@ -70,15 +70,23 @@
static void subscription_shutdown(struct ast_sip_subscription *sub);
static int new_subscribe(struct ast_sip_endpoint *endpoint, const char *resource);
-static int notify_required(struct ast_sip_subscription *sub,
- enum ast_sip_subscription_notify_reason reason);
+static int subscription_established(struct ast_sip_subscription *sub);
+static void *get_notify_data(struct ast_sip_subscription *sub);
static void to_ami(struct ast_sip_subscription *sub,
struct ast_str **buf);
struct ast_sip_notifier presence_notifier = {
.default_accept = DEFAULT_PRESENCE_BODY,
.new_subscribe = new_subscribe,
- .notify_required = notify_required,
+ .subscription_established = subscription_established,
+ .get_notify_data = get_notify_data,
+};
+
+struct ast_sip_notifier dialog_notifier = {
+ .default_accept = DEFAULT_DIALOG_BODY,
+ .new_subscribe = new_subscribe,
+ .subscription_established = subscription_established,
+ .get_notify_data = get_notify_data,
};
struct ast_sip_subscription_handler presence_handler = {
@@ -94,7 +102,7 @@
.accept = { DEFAULT_DIALOG_BODY, },
.subscription_shutdown = subscription_shutdown,
.to_ami = to_ami,
- .notifier = &presence_notifier,
+ .notifier = &dialog_notifier,
};
static void exten_state_subscription_destructor(void *obj)
@@ -151,45 +159,6 @@
exten_state_sub->last_presence_state = AST_PRESENCE_NOT_SET;
exten_state_sub->user_agent = get_user_agent(sip_sub);
return exten_state_sub;
-}
-
-/*!
- * \internal
- * \brief Get device state information and send notification to the subscriber.
- */
-static void send_notify(struct exten_state_subscription *exten_state_sub)
-{
- RAII_VAR(struct ao2_container*, info, NULL, ao2_cleanup);
- char *subtype = NULL, *message = NULL;
- struct ast_sip_exten_state_data exten_state_data = {
- .exten = exten_state_sub->exten,
- .presence_state = ast_hint_presence_state(NULL, exten_state_sub->context,
- exten_state_sub->exten, &subtype, &message),
- .presence_subtype = subtype,
- .presence_message = message,
- .sub = exten_state_sub->sip_sub,
- .user_agent = exten_state_sub->user_agent
- };
-
- ast_sip_subscription_get_local_uri(exten_state_sub->sip_sub,
- exten_state_data.local, sizeof(exten_state_data.local));
- ast_sip_subscription_get_remote_uri(exten_state_sub->sip_sub,
- exten_state_data.remote, sizeof(exten_state_data.remote));
-
- if ((exten_state_data.exten_state = ast_extension_state_extended(
- NULL, exten_state_sub->context, exten_state_sub->exten, &info)) < 0) {
-
- ast_log(LOG_WARNING, "Unable to get device hint/info for extension %s\n",
- exten_state_sub->exten);
- return;
- }
-
- exten_state_data.pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(),
- "exten_state", 1024, 1024);
-
- exten_state_data.device_state_info = info;
- ast_sip_subscription_notify(exten_state_sub->sip_sub, &exten_state_data, 0);
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), exten_state_data.pool);
}
struct notify_task_data {
@@ -232,6 +201,7 @@
task_data->exten_state_data.presence_message = ast_strdup(info->presence_message);
task_data->exten_state_data.user_agent = ast_strdup(exten_state_sub->user_agent);
task_data->exten_state_data.device_state_info = info->device_state_info;
+ task_data->exten_state_data.sub = exten_state_sub->sip_sub;
if (task_data->exten_state_data.device_state_info) {
ao2_ref(task_data->exten_state_data.device_state_info, +1);
@@ -366,7 +336,7 @@
return 200;
}
-static int initial_subscribe(struct ast_sip_subscription *sip_sub)
+static int subscription_established(struct ast_sip_subscription *sip_sub)
{
struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sip_sub);
const char *resource = ast_sip_subscription_get_resource_name(sip_sub);
@@ -403,33 +373,73 @@
return -1;
}
- send_notify(exten_state_sub);
ao2_cleanup(exten_state_sub);
return 0;
}
-static int notify_required(struct ast_sip_subscription *sub,
- enum ast_sip_subscription_notify_reason reason)
+static void exten_state_data_destructor(void *obj)
+{
+ struct ast_sip_exten_state_data *exten_state_data = obj;
+
+ ao2_cleanup(exten_state_data->device_state_info);
+ ast_free(exten_state_data->presence_subtype);
+ ast_free(exten_state_data->presence_message);
+ if (exten_state_data->pool) {
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), exten_state_data->pool);
+ }
+}
+
+static struct ast_sip_exten_state_data *exten_state_data_alloc(struct ast_sip_subscription *sip_sub,
+ struct exten_state_subscription *exten_state_sub)
+{
+ struct ast_sip_exten_state_data *exten_state_data;
+ struct ao2_container *info;
+ char *subtype = NULL;
+ char *message = NULL;
+
+ exten_state_data = ao2_alloc(sizeof(*exten_state_data), exten_state_data_destructor);
+ if (!exten_state_data) {
+ return NULL;
+ }
+
+ exten_state_data->exten = exten_state_sub->exten;
+ if ((exten_state_data->presence_state = ast_hint_presence_state(NULL, exten_state_sub->context,
+ exten_state_sub->exten, &subtype, &message)) == -1) {
+ ao2_cleanup(exten_state_data);
+ return NULL;
+ }
+ exten_state_data->presence_subtype = subtype;
+ exten_state_data->presence_message = message;
+ exten_state_data->user_agent = exten_state_sub->user_agent;
+ ast_sip_subscription_get_local_uri(sip_sub, exten_state_data->local,
+ sizeof(exten_state_data->local));
+ ast_sip_subscription_get_remote_uri(sip_sub, exten_state_data->remote,
+ sizeof(exten_state_data->remote));
+ exten_state_data->sub = sip_sub;
+
+ if ((exten_state_data->exten_state = ast_extension_state_extended(
+ NULL, exten_state_sub->context, exten_state_sub->exten, &info)) < 0) {
+ ao2_cleanup(exten_state_data);
+ return NULL;
+ }
+
+ exten_state_data->pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(),
+ "exten_state", 1024, 1024);
+
+ exten_state_data->device_state_info = info;
+ return exten_state_data;
+}
+
+static void *get_notify_data(struct ast_sip_subscription *sub)
{
struct exten_state_subscription *exten_state_sub;
- switch (reason) {
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED:
- return initial_subscribe(sub);
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED:
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED:
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_OTHER:
- exten_state_sub = get_exten_state_sub(sub);
-
- if (!exten_state_sub) {
- return -1;
- }
-
- send_notify(exten_state_sub);
- break;
- }
-
- return 0;
+ exten_state_sub = get_exten_state_sub(sub);
+ if (!exten_state_sub) {
+ return NULL;
+ }
+
+ return exten_state_data_alloc(sub, exten_state_sub);
}
static void to_ami(struct ast_sip_subscription *sub,
Modified: team/mmichelson/rls-notify/res/res_pjsip_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-notify/res/res_pjsip_mwi.c?view=diff&rev=420053&r1=420052&r2=420053
==============================================================================
--- team/mmichelson/rls-notify/res/res_pjsip_mwi.c (original)
+++ team/mmichelson/rls-notify/res/res_pjsip_mwi.c Tue Aug 5 13:02:02 2014
@@ -52,13 +52,14 @@
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
const char *resource);
-static int mwi_notify_required(struct ast_sip_subscription *sip_sub,
- enum ast_sip_subscription_notify_reason reason);
+static int mwi_subscription_established(struct ast_sip_subscription *sub);
+static void *mwi_get_notify_data(struct ast_sip_subscription *sub);
static struct ast_sip_notifier mwi_notifier = {
.default_accept = MWI_TYPE"/"MWI_SUBTYPE,
.new_subscribe = mwi_new_subscribe,
- .notify_required = mwi_notify_required,
+ .subscription_established = mwi_subscription_established,
+ .get_notify_data = mwi_get_notify_data,
};
static struct ast_sip_subscription_handler mwi_handler = {
@@ -676,7 +677,7 @@
return 200;
}
-static int mwi_initial_subscription(struct ast_sip_subscription *sip_sub)
+static int mwi_subscription_established(struct ast_sip_subscription *sip_sub)
{
const char *resource = ast_sip_subscription_get_resource_name(sip_sub);
struct mwi_subscription *sub;
@@ -694,39 +695,33 @@
return -1;
}
- send_mwi_notify(sub);
-
ao2_cleanup(sub);
ao2_cleanup(endpoint);
return 0;
-}
-
-static int mwi_notify_required(struct ast_sip_subscription *sip_sub,
- enum ast_sip_subscription_notify_reason reason)
-{
+
+}
+
+static void *mwi_get_notify_data(struct ast_sip_subscription *sub)
+{
+ struct ast_sip_message_accumulator *counter;
struct mwi_subscription *mwi_sub;
struct ast_datastore *mwi_datastore;
- switch (reason) {
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED:
- return mwi_initial_subscription(sip_sub);
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED:
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED:
- case AST_SIP_SUBSCRIPTION_NOTIFY_REASON_OTHER:
- mwi_datastore = ast_sip_subscription_get_datastore(sip_sub, "MWI datastore");
-
- if (!mwi_datastore) {
- return -1;
- }
-
- mwi_sub = mwi_datastore->data;
-
- send_mwi_notify(mwi_sub);
+ mwi_datastore = ast_sip_subscription_get_datastore(sub, "MWI datastore");
+ if (!mwi_datastore) {
+ return NULL;
+ }
+ mwi_sub = mwi_datastore->data;
+
+ counter = ao2_alloc(sizeof(*counter), NULL);
+ if (!counter) {
ao2_cleanup(mwi_datastore);
- break;
- }
-
- return 0;
+ return NULL;
+ }
+
+ ao2_callback(mwi_sub->stasis_subs, OBJ_NODATA, get_message_count, counter);
+ ao2_cleanup(mwi_datastore);
+ return counter;
}
static void mwi_subscription_mailboxes_str(struct ao2_container *stasis_subs,
Modified: team/mmichelson/rls-notify/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/rls-notify/res/res_pjsip_pubsub.c?view=diff&rev=420053&r1=420052&r2=420053
==============================================================================
--- team/mmichelson/rls-notify/res/res_pjsip_pubsub.c (original)
+++ team/mmichelson/rls-notify/res/res_pjsip_pubsub.c Tue Aug 5 13:02:02 2014
@@ -349,83 +349,66 @@
};
/*!
- * \brief Real subscription details
- *
- * A real subscription is one that has a direct link to a
- * PJSIP subscription and dialog.
+ * \brief A tree of SIP subscriptions
+ *
+ * Because of the ability to subscribe to resource lists, a SIP
+ * subscription can result in a tree of subscriptions being created.
+ * This structure represents the information relevant to the subscription
+ * as a whole, to include the underlying PJSIP structure for the
+ * subscription.
*/
-struct ast_sip_real_subscription {
+struct sip_subscription_tree {
+ /*! The endpoint with which the subscription is communicating */
+ struct ast_sip_endpoint *endpoint;
+ /*! Serializer on which to place operations for this subscription */
+ struct ast_taskprocessor *serializer;
+ /*! The role for this subscription */
+ enum ast_sip_subscription_role role;
+ /*! Persistence information */
+ struct subscription_persistence *persistence;
/*! The underlying PJSIP event subscription structure */
pjsip_evsub *evsub;
/*! The underlying PJSIP dialog */
pjsip_dialog *dlg;
+ /*! Interval to use for batching notifications */
+ unsigned int notification_batch_interval;
+ /*! Scheduler ID for batched notification */
+ int notify_sched_id;
+ /*! Indicator if scheduled batched notification should be sent */
+ unsigned int send_scheduled_notify;
+ /*! The root of the subscription tree */
+ struct ast_sip_subscription *root;
+ /*! Is this subscription to a list? */
+ int is_list;
+ /*! Next item in the list */
+ AST_LIST_ENTRY(sip_subscription_tree) next;
};
/*!
- * \brief Virtual subscription details
- *
- * A virtual subscription is one that does not have a direct
- * link to a PJSIP subscription. Instead, it is a descendent
- * of an ast_sip_subscription. Following the ancestry will
- * eventually lead to a real subscription.
- */
-struct ast_sip_virtual_subscription {
- struct ast_sip_subscription *parent;
-};
-
-/*!
- * \brief Discriminator between real and virtual subscriptions
- */
-enum sip_subscription_type {
- /*!
- * \brief a "real" subscription.
- *
- * Real subscriptions are at the root of a tree of subscriptions.
- * A real subscription has a corresponding SIP subscription in the
- * PJSIP stack.
- */
- SIP_SUBSCRIPTION_REAL,
- /*!
- * \brief a "virtual" subscription.
- *
- * Virtual subscriptions are the descendents of real subscriptions
- * in a tree of subscriptions. Virtual subscriptions do not have
- * a corresponding SIP subscription in the PJSIP stack. Instead,
- * when a state change happens on a virtual subscription, the
- * state change is indicated to the virtual subscription's parent.
- */
- SIP_SUBSCRIPTION_VIRTUAL,
-};
-
-/*!
- * \brief Structure representing a SIP subscription
+ * \brief Structure representing a "virtual" SIP subscription.
+ *
+ * This structure serves a dual purpose. Structurally, it is
+ * the constructed tree of subscriptions based on the resources
+ * being subscribed to. API-wise, this serves as the handle that
+ * subscription handlers use in order to interact with the pubsub API.
*/
struct ast_sip_subscription {
/*! Subscription datastores set up by handlers */
struct ao2_container *datastores;
- /*! The endpoint with which the subscription is communicating */
- struct ast_sip_endpoint *endpoint;
- /*! Serializer on which to place operations for this subscription */
- struct ast_taskprocessor *serializer;
/*! The handler for this subscription */
const struct ast_sip_subscription_handler *handler;
- /*! The role for this subscription */
- enum ast_sip_subscription_role role;
- /*! Indicator of real or virtual subscription */
- enum sip_subscription_type type;
- /*! Real and virtual components of the subscription */
- union {
- struct ast_sip_real_subscription real;
- struct ast_sip_virtual_subscription virtual;
- } reality;
+ /*! Pointer to the base of the tree */
+ struct sip_subscription_tree *tree;
/*! Body generaator for NOTIFYs */
struct ast_sip_pubsub_body_generator *body_generator;
- /*! Persistence information */
- struct subscription_persistence *persistence;
- /*! Next item in the list */
- AST_LIST_ENTRY(ast_sip_subscription) next;
- /*! List of child subscriptions */
- AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children;
+ /*! Vector of child subscriptions */
+ AST_VECTOR(, struct ast_sip_subscription *) children;
+ /*! Saved NOTIFY body text for this subscription */
+ struct ast_str *body_text;
+ /*! Indicator that the body text has changed since the last notification */
+ int body_changed;
+ /*! The current state of the subscription */
+ pjsip_evsub_state subscription_state;
/*! Name of resource being subscribed to */
char resource[0];
};
@@ -435,28 +418,27 @@
[AST_SIP_NOTIFIER] = "Notifier"
};
-AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
+AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
-static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
-{
- if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
- return sip_subscription_get_evsub(sub->reality.virtual.parent);
- } else {
- return sub->reality.real.evsub;
- }
-}
-
-static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
-{
- if (sub->type == SIP_SUBSCRIPTION_VIRTUAL) {
- return sip_subscription_get_dlg(sub->reality.virtual.parent);
- } else {
- return sub->reality.real.dlg;
- }
-}
+static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
+static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
+ int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
+ pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_client_refresh(pjsip_evsub *sub);
+static void pubsub_on_server_timeout(pjsip_evsub *sub);
+
+
+static pjsip_evsub_user pubsub_cb = {
+ .on_evsub_state = pubsub_on_evsub_state,
+ .on_rx_refresh = pubsub_on_rx_refresh,
+ .on_rx_notify = pubsub_on_rx_notify,
+ .on_client_refresh = pubsub_on_client_refresh,
+ .on_server_timeout = pubsub_on_server_timeout,
+};
/*! \brief Destructor for subscription persistence */
static void subscription_persistence_destroy(void *obj)
@@ -474,7 +456,7 @@
}
/*! \brief Function which creates initial persistence information of a subscription in sorcery */
-static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
+static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
{
char tag[PJ_GUID_STRING_LENGTH + 1];
@@ -484,13 +466,13 @@
struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
"subscription_persistence", NULL);
- pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
+ pjsip_dialog *dlg = sub_tree->dlg;
if (!persistence) {
return NULL;
}
- persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
+ persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
persistence->tag = ast_strdup(tag);
@@ -499,47 +481,49 @@
}
/*! \brief Function which updates persistence information of a subscription in sorcery */
-static void subscription_persistence_update(struct ast_sip_subscription *sub,
+static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
pjsip_rx_data *rdata)
{
pjsip_dialog *dlg;
- if (!sub->persistence) {
+ if (!sub_tree->persistence) {
return;
}
- dlg = sip_subscription_get_dlg(sub);
- sub->persistence->cseq = dlg->local.cseq;
+ dlg = sub_tree->dlg;
+ sub_tree->persistence->cseq = dlg->local.cseq;
if (rdata) {
int expires;
pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
- sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
-
- ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
- ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
- sub->persistence->src_port = rdata->pkt_info.src_port;
- ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
- sizeof(sub->persistence->transport_key));
- ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
- sizeof(sub->persistence->local_name));
- sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
- }
-
- ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
+ sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
+
+ ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
+ sizeof(sub_tree->persistence->packet));
+ ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
+ sizeof(sub_tree->persistence->src_name));
+ sub_tree->persistence->src_port = rdata->pkt_info.src_port;
+ ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
+ sizeof(sub_tree->persistence->transport_key));
+ ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
+ sizeof(sub_tree->persistence->local_name));
+ sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
+ }
+
+ ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
}
/*! \brief Function which removes persistence of a subscription from sorcery */
-static void subscription_persistence_remove(struct ast_sip_subscription *sub)
-{
- if (!sub->persistence) {
+static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
+{
+ if (!sub_tree->persistence) {
return;
}
- ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
- ao2_ref(sub->persistence, -1);
+ ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
+ ao2_ref(sub_tree->persistence, -1);
}
@@ -634,9 +618,7 @@
return find_body_generator(accept, num_accept_headers);
}
-static struct ast_sip_subscription *create_real_subscription(const struct ast_sip_subscription_handler *handler,
- struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
- struct ast_sip_pubsub_body_generator *generator);
+struct resource_tree;
/*!
* \brief A node for a resource tree.
@@ -839,6 +821,7 @@
*/
struct resource_tree {
struct tree_node *root;
+ unsigned int notification_batch_interval;
};
/*!
@@ -871,20 +854,17 @@
* \param handler The subscription handler for leaf nodes in the tree.
* \param resource The resource requested in the SUBSCRIBE request.
* \param tree The tree that is to be built.
- * \param supports_lists Indicates if we should attempt to find a resource list or not.
*
* \retval 200-299 Successfully subscribed to at least one resource.
* \retval 300-699 Failure to subscribe to requested resource.
*/
static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
- const char *resource, struct resource_tree *tree, int supports_lists)
-{
- struct resource_list *list = NULL;
+ const char *resource, struct resource_tree *tree)
+{
+ struct resource_list *list;
struct resources visited;
- if (supports_lists) {
- list = retrieve_resource_list(resource, handler->event_name);
- }
+ list = retrieve_resource_list(resource, handler->event_name);
if (!list) {
ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
tree->root = tree_node_alloc(resource, NULL);
@@ -893,7 +873,10 @@
ast_debug(1, "Subscription to resource %s is a list\n", resource);
AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items));
+
tree->root = tree_node_alloc(resource, &visited);
+ tree->notification_batch_interval = list->notification_batch_interval;
+
build_node_children(endpoint, handler, list, tree->root, &visited);
AST_VECTOR_FREE(&visited);
ao2_cleanup(list);
@@ -929,7 +912,7 @@
static int subscription_remove_serializer(void *obj)
{
- struct ast_sip_subscription *sub = obj;
+ struct sip_subscription_tree *sub_tree = obj;
/* This is why we keep the dialog on the subscription. When the subscription
* is destroyed, there is no guarantee that the underlying dialog is ready
@@ -940,21 +923,28 @@
* subscription is destroyed so that we can guarantee that our attempt to
* remove the serializer will be successful.
*/
- ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
- pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
-
- return 0;
-}
-
-static void remove_subscription(struct ast_sip_subscription *obj)
-{
- struct ast_sip_subscription *i;
+ ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
+ pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
+
+ return 0;
+}
+
+static void add_subscription(struct sip_subscription_tree *obj)
+{
+ SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
+ ast_module_ref(ast_module_info->self);
+}
+
+static void remove_subscription(struct sip_subscription_tree *obj)
+{
+ struct sip_subscription_tree *i;
SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
if (i == obj) {
AST_RWLIST_REMOVE_CURRENT(next);
ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
- ast_sip_subscription_get_resource_name(i));
+ ast_sip_subscription_get_resource_name(i->root));
ast_module_unref(ast_module_info->self);
break;
}
@@ -968,22 +958,11 @@
ast_debug(3, "Destroying SIP subscription\n");
- subscription_persistence_remove(sub);
-
- remove_subscription(sub);
-
ao2_cleanup(sub->datastores);
- ao2_cleanup(sub->endpoint);
-
- if (sip_subscription_get_dlg(sub)) {
- ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
- }
- ast_taskprocessor_unreference(sub->serializer);
}
static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
- struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role,
- enum sip_subscription_type type)
+ const char *resource, struct sip_subscription_tree *tree)
{
struct ast_sip_subscription *sub;
@@ -998,15 +977,16 @@
ao2_ref(sub, -1);
return NULL;
}
- sub->serializer = ast_sip_create_serializer();
- if (!sub->serializer) {
+
+ sub->body_text = ast_str_create(128);
+ if (!sub->body_text) {
ao2_ref(sub, -1);
return NULL;
}
- sub->role = role;
- sub->type = type;
- sub->endpoint = ao2_bump(endpoint);
+
sub->handler = handler;
+ sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
+ sub->tree = tree;
return sub;
}
@@ -1022,31 +1002,82 @@
* \param parent The subscription (real or virtual) that is parent to the subscriptions created here.
* \param parent_resource The tree node that corresponds to the parent subscription.
*/
-static void create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
- struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
- struct ast_sip_pubsub_body_generator *generator, struct ast_sip_subscription *parent,
- struct tree_node *parent_resource)
+static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
+ const char *resource, struct ast_sip_pubsub_body_generator *generator,
+ struct sip_subscription_tree *tree, struct tree_node *current)
{
int i;
-
- for (i = 0; i < AST_VECTOR_SIZE(&parent_resource->children); ++i) {
- struct ast_sip_subscription *sub;
-
- sub = allocate_subscription(handler, endpoint, resource,
- AST_SIP_NOTIFIER, SIP_SUBSCRIPTION_VIRTUAL);
- if (!sub) {
+ struct ast_sip_subscription *sub;
+
+ sub = allocate_subscription(handler, resource, tree);
+ if (!sub) {
+ return NULL;
+ }
+
+ sub->body_generator = generator;
+
+ for (i = 0; i < AST_VECTOR_SIZE(¤t->children); ++i) {
+ struct ast_sip_subscription *child;
+ struct tree_node *child_node = AST_VECTOR_GET(¤t->children, i);
+
+ child = create_virtual_subscriptions(handler, child_node->resource, generator,
+ tree, child_node);
+
+ if (!child) {
continue;
}
- /* XXX For subscriptions with children, the generator will need to be
- * the multipart RLMI generator instead. This will be handled in
- * ASTERISK-23869 or ASTERISK-23867
- */
- sub->body_generator = generator;
- sub->reality.virtual.parent = parent;
-
- create_virtual_subscriptions(handler, endpoint, rdata, resource,
- generator, sub, AST_VECTOR_GET(&parent_resource->children, i));
- }
+
+ AST_VECTOR_APPEND(&sub->children, child);
+ }
+
+ return sub;
+}
+
+static void subscription_tree_destructor(void *obj)
+{
+ struct sip_subscription_tree *sub_tree = obj;
+
+ subscription_persistence_remove(sub_tree);
+ ao2_cleanup(sub_tree->endpoint);
+
+ if (sub_tree->dlg) {
+ ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub_tree);
+ }
+
+ ast_taskprocessor_unreference(sub_tree->serializer);
+ remove_subscription(sub_tree);
+}
+
+static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
+{
+ /* We keep a reference to the dialog until our subscription is destroyed. See
+ * the subscription_destructor for more details
+ */
+ pjsip_dlg_inc_session(dlg, &pubsub_module);
+ sub_tree->dlg = dlg;
+ ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
+ pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
+}
+
+static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
+{
+ struct sip_subscription_tree *sub_tree;
+
+ sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
+ if (!sub_tree) {
+ return NULL;
+ }
+
+ sub_tree->serializer = ast_sip_create_serializer();
+ if (!sub_tree->serializer) {
+ ao2_ref(sub_tree, -1);
+ return NULL;
+ }
+
+ sub_tree->endpoint = ao2_bump(endpoint);
+
+ add_subscription(sub_tree);
+ return sub_tree;
}
/*!
@@ -1067,300 +1098,24 @@
* \retval NULL Could not create the subscription tree
* \retval non-NULL The root of the created subscription tree
*/
-static struct ast_sip_subscription *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
+
+static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
{
- struct ast_sip_subscription *sub;
-
- /* Start by creating the root subscription. It's the only real subscription.
- * XXX Since this is the root of a subscription tree, it should actually use the
- * multipart RLMI generator instead if this is a list. This will be handled in
- * ASTERISK-23869 or ASTERISK-23867
- */
- sub = create_real_subscription(handler, endpoint, rdata, resource, generator);
- if (!sub) {
- return NULL;
- }
-
- create_virtual_subscriptions(handler, endpoint, rdata, resource, generator, sub, tree->root);
- return sub;
-}
-
-/*!
- * \param Determine if SUBSCRIBE indicates support for RLS
- *
- * A SUBSCRIBE must have a Supported: eventlist header in order to
- * allow for resource list subscriptions. Otherwise, we must interpret
- * the inbound SUBSCRIBE as being for a single resource.
- *
- * \param rdata The inbound SUBSCRIBE request.
- * \retval 1 The endpoint supports RLS
- * \retval 0 The endpoint does not support RLS
- */
-static int supports_resource_lists(pjsip_rx_data *rdata)
-{
- pjsip_supported_hdr *supported;
-
- supported = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, NULL);
- if (supported) {
- int i;
-
- for (i = 0; i < supported->count; ++i) {
- if (pj_strcmp2(&supported->values[i], "eventlist")) {
- return 1;
- }
- }
- }
-
- return 0;
-}
-
-
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
-{
- struct subscription_persistence *persistence = obj;
- pj_pool_t *pool = arg;
- pjsip_rx_data rdata = { { 0, }, };
- RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
- struct ast_sip_subscription *sub;
- struct ast_sip_pubsub_body_generator *generator;
- int resp;
- char *resource;
- size_t resource_size;
- pjsip_sip_uri *request_uri;
- struct resource_tree tree;
- pjsip_expires_hdr *expires_header;
- struct ast_sip_subscription_handler *handler;
- int supports_lists;
-
- /* If this subscription has already expired remove it */
- if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
- if (!endpoint) {
- ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
- persistence->endpoint);
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- pj_pool_reset(pool);
- rdata.tp_info.pool = pool;
-
- if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
- persistence->transport_key, persistence->local_name, persistence->local_port)) {
- ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
- persistence->endpoint);
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
- resource_size = pj_strlen(&request_uri->user) + 1;
- resource = alloca(resource_size);
- ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
- /* Update the expiration header with the new expiration */
- expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
- if (!expires_header) {
- expires_header = pjsip_expires_hdr_create(pool, 0);
- if (!expires_header) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
- pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
- }
- expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
- handler = subscription_get_handler_from_rdata(&rdata);
- if (!handler || !handler->notifier) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- generator = subscription_get_generator_from_rdata(&rdata, handler);
- if (!generator) {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- return 0;
- }
-
- supports_lists = supports_resource_lists(&rdata);
-
- ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
- pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
- memset(&tree, 0, sizeof(tree));
- resp = build_resource_tree(endpoint, handler, resource, &tree, supports_lists);
- if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
- sub = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
- sub->persistence = ao2_bump(persistence);
- subscription_persistence_update(sub, &rdata);
- } else {
- ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
- }
-
- return 0;
-}
-
-/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
-static int subscription_persistence_load(void *data)
-{
- struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
- "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
- pj_pool_t *pool;
-
- pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
- PJSIP_POOL_RDATA_INC);
- if (!pool) {
- ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
- return 0;
- }
-
- ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
-
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-
- ao2_ref(persisted_subscriptions, -1);
- return 0;
-}
-
-/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
-static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
-{
- struct ast_json_payload *payload;
- const char *type;
-
- if (stasis_message_type(message) != ast_manager_get_generic_type()) {
- return;
- }
-
- payload = stasis_message_data(message);
- type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
-
- /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
- * recreate SIP subscriptions.
- */
- if (strcmp(type, "FullyBooted")) {
- return;
- }
-
- /* This has to be here so the subscription is recreated when the body generator is available */
- ast_sip_push_task(NULL, subscription_persistence_load, NULL);
-
- /* Once the system is fully booted we don't care anymore */
- stasis_unsubscribe(sub);
-}
-
-static void add_subscription(struct ast_sip_subscription *obj)
-{
- SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
- AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
- ast_module_ref(ast_module_info->self);
-}
-
-typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
-
-static int for_each_subscription(on_subscription_t on_subscription, void *arg)
-{
- int num = 0;
- struct ast_sip_subscription *i;
- SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
-
- if (!on_subscription) {
- return num;
- }
-
- AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
- if (on_subscription(i, arg)) {
- break;
- }
- ++num;
- }
- return num;
-}
-
-static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
- struct ast_str **buf)
-{
- char str[256];
- struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
-
- ast_str_append(buf, 0, "Role: %s\r\n",
- sip_subscription_roles_map[sub->role]);
- ast_str_append(buf, 0, "Endpoint: %s\r\n",
- ast_sorcery_object_get_id(sub->endpoint));
-
- ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
[... 1071 lines stripped ...]
More information about the svn-commits
mailing list