[asterisk-commits] file: branch file/pjsip-subscription-persistence r415224 - in /team/file/pjsi...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Jun 5 12:31:27 CDT 2014
Author: file
Date: Thu Jun 5 12:31:23 2014
New Revision: 415224
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=415224
Log:
Add support for persisting subscriptions using sorcery and recreating them at startup.
Modified:
team/file/pjsip-subscription-persistence/include/asterisk/res_pjsip_pubsub.h
team/file/pjsip-subscription-persistence/res/res_pjsip_exten_state.c
team/file/pjsip-subscription-persistence/res/res_pjsip_mwi.c
team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.c
team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.exports.in
Modified: team/file/pjsip-subscription-persistence/include/asterisk/res_pjsip_pubsub.h
URL: http://svnview.digium.com/svn/asterisk/team/file/pjsip-subscription-persistence/include/asterisk/res_pjsip_pubsub.h?view=diff&rev=415224&r1=415223&r2=415224
==============================================================================
--- team/file/pjsip-subscription-persistence/include/asterisk/res_pjsip_pubsub.h (original)
+++ team/file/pjsip-subscription-persistence/include/asterisk/res_pjsip_pubsub.h Thu Jun 5 12:31:23 2014
@@ -456,6 +456,18 @@
pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub);
/*!
+ * \brief Accept a subscription request
+ *
+ * \param sub The subscription to be accepted
+ * \param rdata The received subscription request
+ * \param response The response code to send
+ *
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response);
+
+/*!
* \brief Send a request created via a PJSIP evsub method
*
* Callers of this function should take care to do so within a SIP servant
Modified: team/file/pjsip-subscription-persistence/res/res_pjsip_exten_state.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pjsip-subscription-persistence/res/res_pjsip_exten_state.c?view=diff&rev=415224&r1=415223&r2=415224
==============================================================================
--- team/file/pjsip-subscription-persistence/res/res_pjsip_exten_state.c (original)
+++ team/file/pjsip-subscription-persistence/res/res_pjsip_exten_state.c Thu Jun 5 12:31:23 2014
@@ -445,8 +445,7 @@
return NULL;
}
- if (pjsip_evsub_accept(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub),
- rdata, 200, NULL) != PJ_SUCCESS) {
+ if (ast_sip_subscription_accept(exten_state_sub->sip_sub, rdata, 200)) {
ast_log(LOG_WARNING, "Unable to accept the incoming extension state subscription.\n");
pjsip_evsub_terminate(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub), PJ_FALSE);
return NULL;
Modified: team/file/pjsip-subscription-persistence/res/res_pjsip_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pjsip-subscription-persistence/res/res_pjsip_mwi.c?view=diff&rev=415224&r1=415223&r2=415224
==============================================================================
--- team/file/pjsip-subscription-persistence/res/res_pjsip_mwi.c (original)
+++ team/file/pjsip-subscription-persistence/res/res_pjsip_mwi.c Thu Jun 5 12:31:23 2014
@@ -699,7 +699,6 @@
RAII_VAR(struct mwi_subscription *, sub, NULL, ao2_cleanup);
pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
pjsip_sip_uri *sip_ruri;
- pjsip_evsub *evsub;
char aor_name[80];
if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
@@ -715,8 +714,7 @@
return NULL;
}
- evsub = ast_sip_subscription_get_evsub(sub->sip_sub);
- pjsip_evsub_accept(evsub, rdata, 200, NULL);
+ ast_sip_subscription_accept(sub->sip_sub, rdata, 200);
send_mwi_notify(sub, PJSIP_EVSUB_STATE_ACTIVE, NULL);
return sub->sip_sub;
Modified: team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.c?view=diff&rev=415224&r1=415223&r2=415224
==============================================================================
--- team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.c (original)
+++ team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.c Thu Jun 5 12:31:23 2014
@@ -83,6 +83,7 @@
};
#define MOD_DATA_BODY_GENERATOR "sub_body_generator"
+#define MOD_DATA_PERSISTENCE "sub_persistence"
static const pj_str_t str_event_name = { "Event", 5 };
@@ -178,6 +179,33 @@
int expires;
/*! \brief Scheduled item for expiration of publication */
int sched_id;
+};
+
+
+/*!
+ * \brief Structure used for persisting an inbound subscription
+ */
+struct subscription_persistence {
+ /*! Sorcery object details */
+ SORCERY_OBJECT(details);
+ /*! The name of the endpoint involved in the subscrption */
+ char *endpoint;
+ /*! SIP message that creates the subscription */
+ char packet[PJSIP_MAX_PKT_LEN];
+ /*! Source address of the message */
+ char src_name[PJ_INET6_ADDRSTRLEN];
+ /*! Source port of the message */
+ int src_port;
+ /*! Local transport key type */
+ char transport_key[32];
+ /*! Local transport address */
+ char local_name[PJ_INET6_ADDRSTRLEN];
+ /*! Local transport port */
+ int local_port;
+ /*! Next CSeq to use for message */
+ unsigned int cseq;
+ /*! Local tag of the dialog */
+ char *tag;
};
/*!
@@ -200,6 +228,8 @@
pjsip_dialog *dlg;
/*! Body generaator for NOTIFYs */
struct ast_sip_pubsub_body_generator *body_generator;
+ /*! Persistence information */
+ struct subscription_persistence *persistence;
/*! Next item in the list */
AST_LIST_ENTRY(ast_sip_subscription) next;
};
@@ -213,6 +243,198 @@
AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
+
+/*! \brief Destructor for subscription persistence */
+static void subscription_persistence_destroy(void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ ast_free(persistence->endpoint);
+ ast_free(persistence->tag);
+}
+
+/*! \brief Allocator for subscription persistence */
+static void *subscription_persistence_alloc(const char *name)
+{
+ return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
+}
+
+/*! \brief Function which creates initial persistence information of a subscription in sorcery */
+static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
+{
+ char tag[PJ_GUID_STRING_LENGTH + 1];
+
+ /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
+ * look it up by id at all.
+ */
+ struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
+ "subscription_persistence", NULL);
+
+ if (!persistence) {
+ return NULL;
+ }
+
+ persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
+ ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
+ persistence->tag = ast_strdup(tag);
+
+ ast_sorcery_create(ast_sip_get_sorcery(), persistence);
+ return persistence;
+}
+
+/*! \brief Function which updates persistence information of a subscription in sorcery */
+static void subscription_persistence_update(struct ast_sip_subscription *sub,
+ pjsip_rx_data *rdata)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ sub->persistence->cseq = sub->dlg->local.cseq;
+
+ if (rdata) {
+ ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
+ ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
+ sub->persistence->src_port = rdata->pkt_info.src_port;
+ ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
+ sizeof(sub->persistence->transport_key));
+ ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
+ sizeof(sub->persistence->local_name));
+ sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
+ }
+
+ ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
+}
+
+/*! \brief Function which removes persistence of a subscription from sorcery */
+static void subscription_persistence_remove(struct ast_sip_subscription *sub)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
+ ao2_ref(sub->persistence, -1);
+}
+
+static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
+static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
+ size_t num_accept);
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+ struct subscription_persistence *persistence = obj;
+ pj_pool_t *pool = arg;
+ pjsip_transport transport = { "", };
+ pjsip_rx_data rdata = { { 0, }, };
+ pj_str_t tmp;
+ char event[32];
+ char accept[AST_SIP_MAX_ACCEPT][64];
+ pjsip_accept_hdr *accept_header;
+ pjsip_event_hdr *event_header;
+ struct ast_sip_subscription_handler *handler;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ struct ast_sip_subscription *sub;
+ size_t num_accept_headers;
+ struct ast_sip_pubsub_body_generator *generator;
+
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+ if (!endpoint) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ pj_pool_reset(pool);
+ rdata.tp_info.pool = pool;
+ rdata.tp_info.transport = &transport;
+
+ /* The following code constructs a basic rdata structure using persisted information */
+ ast_copy_string(rdata.pkt_info.packet, persistence->packet, sizeof(rdata.pkt_info.packet));
+ ast_copy_string(rdata.pkt_info.src_name, persistence->src_name, sizeof(rdata.pkt_info.src_name));
+ rdata.pkt_info.src_port = persistence->src_port;
+
+ pjsip_parse_rdata(persistence->packet, sizeof(persistence->packet), &rdata);
+ if (!rdata.msg_info.msg) {
+ return 0;
+ }
+ pj_strdup2(rdata.tp_info.pool, &rdata.msg_info.via->recvd_param, rdata.pkt_info.src_name);
+ rdata.msg_info.via->rport_param = -1;
+
+ rdata.tp_info.transport->key.type = pjsip_transport_get_type_from_name(pj_cstr(&tmp, persistence->transport_key));
+ rdata.tp_info.transport->type_name = persistence->transport_key;
+ pj_strset2(&rdata.tp_info.transport->local_name.host, persistence->local_name);
+ rdata.tp_info.transport->local_name.port = persistence->local_port;
+
+ event_header = pjsip_msg_find_hdr_by_name(rdata.msg_info.msg, &str_event_name, rdata.msg_info.msg->hdr.next);
+ ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+
+ handler = find_sub_handler_for_event_name(event);
+ if (!handler) {
+ /* This may eventually get a handler once everything is loaded */
+ return 0;
+ }
+
+ accept_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_ACCEPT, rdata.msg_info.msg->hdr.next);
+ if (accept_header) {
+ int i;
+
+ for (i = 0; i < accept_header->count; ++i) {
+ ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ }
+ num_accept_headers = accept_header->count;
+ } else {
+ /* If a SUBSCRIBE contains no Accept headers, then we must assume that
+ * the default accept type for the event package is to be used.
+ */
+ ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
+ num_accept_headers = 1;
+ }
+
+ generator = find_body_generator(accept, num_accept_headers);
+ if (!generator) {
+ return 0;
+ }
+
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+ sub = handler->new_subscribe(endpoint, &rdata);
+ if (sub) {
+ sub->persistence = ao2_bump(persistence);
+ subscription_persistence_update(sub, &rdata);
+ } else {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ }
+
+ return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup */
+static int subscription_persistence_load(void *data)
+{
+ struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+ "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ pj_pool_t *pool;
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+ PJSIP_POOL_RDATA_INC);
+ if (!pool) {
+ ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+ return 0;
+ }
+
+ ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+ ao2_ref(persisted_subscriptions, -1);
+ return 0;
+}
static void add_subscription(struct ast_sip_subscription *obj)
{
@@ -335,6 +557,9 @@
struct ast_sip_subscription *sub = obj;
ast_debug(3, "Destroying SIP subscription\n");
+
+ subscription_persistence_remove(sub);
+
remove_subscription(sub);
ao2_cleanup(sub->datastores);
@@ -388,6 +613,7 @@
{
struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
pjsip_dialog *dlg;
+ struct subscription_persistence *persistence;
if (!sub) {
return NULL;
@@ -423,6 +649,17 @@
ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
ao2_ref(sub, -1);
return NULL;
+ }
+ persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE);
+ if (persistence) {
+ /* Update the created dialog with the persisted information */
+ pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
+ pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
+ dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
+ pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
+ dlg->local.cseq = persistence->cseq;
+ dlg->remote.cseq = persistence->cseq;
}
sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
/* We keep a reference to the dialog until our subscription is destroyed. See
@@ -463,6 +700,16 @@
return sub->dlg;
}
+int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
+{
+ /* If this is a persistence recreation the subscription has already been accepted */
+ if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
+ return 0;
+ }
+
+ return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
+}
+
int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
{
struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
@@ -471,6 +718,8 @@
ao2_ref(sub, +1);
res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
tdata) == PJ_SUCCESS ? 0 : -1;
+
+ subscription_persistence_update(sub, NULL);
ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
"StateText: %s\r\n"
@@ -688,6 +937,10 @@
pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
sub_add_handler(handler);
+
+ /* This has to be here so the subscription is recreated when the body generator is available */
+ ast_sip_push_task_synchronous(NULL, subscription_persistence_load, NULL);
+
return 0;
}
@@ -839,7 +1092,11 @@
} else {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
}
- }
+ } else {
+ sub->persistence = subscription_persistence_create(sub);
+ subscription_persistence_update(sub, rdata);
+ }
+
return PJ_TRUE;
}
@@ -1471,9 +1728,42 @@
#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
+static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->endpoint = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->endpoint);
+ return 0;
+}
+
+static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->tag = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->tag);
+ return 0;
+}
+
static int load_module(void)
{
static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
+ struct ast_sorcery *sorcery = ast_sip_get_sorcery();
pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
@@ -1495,6 +1785,34 @@
ast_sched_context_destroy(sched);
return AST_MODULE_LOAD_FAILURE;
}
+
+
+ ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
+ if (ast_sorcery_internal_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
+ NULL, NULL)) {
+ ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
+ ast_sip_unregister_service(&pubsub_module);
+ ast_sched_context_destroy(sched);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, packet));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, src_name));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, src_port));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, transport_key));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, local_name));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, local_port));
+ ast_sorcery_object_field_register_nodoc(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, cseq));
+ ast_sorcery_object_field_register_custom_nodoc(sorcery, "subscription_persistence", "endpoint", "",
+ persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
+ ast_sorcery_object_field_register_custom_nodoc(sorcery, "subscription_persistence", "tag", "",
+ persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
ami_show_subscriptions_inbound);
Modified: team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.exports.in?view=diff&rev=415224&r1=415223&r2=415224
==============================================================================
--- team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.exports.in (original)
+++ team/file/pjsip-subscription-persistence/res/res_pjsip_pubsub.exports.in Thu Jun 5 12:31:23 2014
@@ -5,6 +5,7 @@
LINKER_SYMBOL_PREFIXast_sip_subscription_get_serializer;
LINKER_SYMBOL_PREFIXast_sip_subscription_get_evsub;
LINKER_SYMBOL_PREFIXast_sip_subscription_get_dlg;
+ LINKER_SYMBOL_PREFIXast_sip_subscription_accept;
LINKER_SYMBOL_PREFIXast_sip_subscription_send_request;
LINKER_SYMBOL_PREFIXast_sip_subscription_alloc_datastore;
LINKER_SYMBOL_PREFIXast_sip_subscription_add_datastore;
More information about the asterisk-commits
mailing list