[asterisk-commits] file: branch file/pimp_my_publish r391751 - in /team/file/pimp_my_publish: in...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Jun 13 17:23:00 CDT 2013


Author: file
Date: Thu Jun 13 17:22:58 2013
New Revision: 391751

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=391751
Log:
Implement the PUBLISH part of res_sip_pubsub as documented on the wiki.

This doesn't have datastores yet, but it seemingly works regardless.

Modified:
    team/file/pimp_my_publish/include/asterisk/res_sip_pubsub.h
    team/file/pimp_my_publish/res/res_sip_pubsub.c
    team/file/pimp_my_publish/res/res_sip_pubsub.exports.in

Modified: team/file/pimp_my_publish/include/asterisk/res_sip_pubsub.h
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_my_publish/include/asterisk/res_sip_pubsub.h?view=diff&rev=391751&r1=391750&r2=391751
==============================================================================
--- team/file/pimp_my_publish/include/asterisk/res_sip_pubsub.h (original)
+++ team/file/pimp_my_publish/include/asterisk/res_sip_pubsub.h Thu Jun 13 17:22:58 2013
@@ -30,6 +30,114 @@
 struct ast_datastore_info;
 
 /*!
+ * \brief Opaque structure representing a publication
+ */
+struct ast_sip_publication;
+
+/*!
+ * \brief Callbacks that publication handlers will define
+ */
+struct ast_sip_publish_handler {
+	/*! \brief The name of the event this handler deals with */
+	const char *event_name;
+
+	/*! \brief Publications */
+	struct ao2_container *publications;
+
+	/*!
+	 * \brief Called when a PUBLISH to establish a new publication arrives.
+	 *
+	 * \param endpoint The endpoint from whom the PUBLISH arrived
+	 * \param rdata The PUBLISH request
+	 * \retval NULL PUBLISH was not accepted
+	 * \retval non-NULL New publication
+	 *
+	 * \note The callback is expected to send a response for the PUBLISH in success cases.
+	 */
+	struct ast_sip_publication *(*new_publication)(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata);
+
+	/*!
+	 * \brief Called when a PUBLISH for an existing publication arrives.
+	 *
+	 * This PUBLISH may be intending to change state or it may be simply renewing
+	 * the publication since the publication is nearing expiration. The callback
+	 * is expected to send a response to the PUBLISH.
+	 *
+	 * \param pub The publication on which the PUBLISH arrived
+	 * \param rdata The PUBLISH request
+	 * \retval 0 Publication was accepted
+	 * \retval non-zero Publication was denied
+	 *
+	 * \note The callback is expected to send a response for the PUBLISH.
+	 */
+	int (*publish_refresh)(struct ast_sip_publication *pub, pjsip_rx_data *rdata);
+
+	/*!
+	 * \brief Called when a publication has reached its expiration.
+	 */
+	void (*publish_expire)(struct ast_sip_publication *pub);
+
+	/*!
+	 * \brief Called when a PUBLISH arrives to terminate a publication.
+	 *
+	 * \param pub The publication that is terminating
+	 * \param rdata The PUBLISH request terminating the publication
+	 *
+	 * \note The callback is expected to send a response for the PUBLISH.
+	 */
+	void (*publish_termination)(struct ast_sip_publication *pub, pjsip_rx_data *rdata);
+
+	AST_LIST_ENTRY(ast_sip_publish_handler) next;
+};
+
+/*!
+ * \brief Create a new publication
+ *
+ * Publication handlers should call this when a PUBLISH arrives to establish a new publication.
+ *
+ * \param endpoint The endpoint from whom the PUBLISHes arrive
+ * \param rdata The PUBLISH that established the publication
+ * \retval NULL Failed to create a publication
+ * \retval non-NULL The newly-created publication
+ */
+struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata);
+ 
+/*!
+ * \brief Given a publication, get the associated endpoint
+ *
+ * \param pub The publication
+ * \retval NULL Failure
+ * \retval non-NULL The associated endpoint
+ */
+struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub);
+ 
+/*!
+ * \brief Create a response to an inbound PUBLISH
+ *
+ * The created response can be sent using pjsip_endpt_send_response() or pjsip_endpt_send_response2()
+ *
+ * \param pub The publication
+ * \param status code The status code to place in the response
+ * \param rdata The request to which the response is being made
+ * \param[out] tdata The created response
+ */
+int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
+	pjsip_tx_data **tdata);
+
+/*!
+ * \brief Register a publish handler
+ *
+ * \retval 0 Handler was registered successfully
+ * \retval non-zero Handler was not registered successfully
+ */
+int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler);
+ 
+/*!
+ * \brief Unregister a publish handler
+ */
+void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler);
+
+/*!
  * \brief Opaque structure representing an RFC 3265 SIP subscription
  */
 struct ast_sip_subscription;
@@ -38,10 +146,10 @@
  * \brief Role for the subscription that is being created
  */
 enum ast_sip_subscription_role {
-    /* Sending SUBSCRIBEs, receiving NOTIFYs */
-    AST_SIP_SUBSCRIBER,
-    /* Sending NOTIFYs, receiving SUBSCRIBEs */
-    AST_SIP_NOTIFIER,
+	/* Sending SUBSCRIBEs, receiving NOTIFYs */
+	AST_SIP_SUBSCRIBER,
+	/* Sending NOTIFYs, receiving SUBSCRIBEs */
+	AST_SIP_NOTIFIER,
 };
 
 /*!
@@ -56,147 +164,147 @@
  * not provide it with any additional data.
  */
 struct ast_sip_subscription_response_data {
-    /*! Status code of the response */
-    int status_code;
-    /*! Optional status text */
-    const char *status_text;
-    /*! Optional additional headers to add to the response */
-    struct ast_variable *headers;
-    /*! Optional body to add to the response */
-    struct ast_sip_body *body;
+	/*! Status code of the response */
+	int status_code;
+	/*! Optional status text */
+	const char *status_text;
+	/*! Optional additional headers to add to the response */
+	struct ast_variable *headers;
+	/*! Optional body to add to the response */
+	struct ast_sip_body *body;
 };
 
 #define AST_SIP_MAX_ACCEPT 32
 
 struct ast_sip_subscription_handler {
-    /*! The name of the event this handler deals with */
-    const char *event_name;
-    /*! The types of body this handler accepts */
-    const char *accept[AST_SIP_MAX_ACCEPT];
- 
-    /*!
-     * \brief Called when a subscription is to be destroyed
-     *
-     * This is a subscriber and notifier callback.
-     *
-     * The handler is not expected to send any sort of requests or responses
-     * during this callback. The handler MUST, however, begin the destruction
+	/*! The name of the event this handler deals with */
+	const char *event_name;
+	/*! The types of body this handler accepts */
+	const char *accept[AST_SIP_MAX_ACCEPT];
+ 
+	/*!
+	 * \brief Called when a subscription is to be destroyed
+	 *
+	 * This is a subscriber and notifier callback.
+	 *
+	 * The handler is not expected to send any sort of requests or responses
+	 * during this callback. The handler MUST, however, begin the destruction
 	 * process for the subscription during this callback.
-     */
+	 */
    void (*subscription_shutdown)(struct ast_sip_subscription *subscription);
  
-    /*!
-     * \brief Called when a SUBSCRIBE arrives in order to create a new subscription
-     *
-     * This is a notifier callback.
-     *
-     * If the notifier wishes to accept the subscription, then it can create
-     * a new ast_sip_subscription to do so. 
-     *
-     * If the notifier chooses to create a new subscription, then it must accept
-     * the incoming subscription using pjsip_evsub_accept() and it must also
-     * send an initial NOTIFY with the current subscription state.
-     *
-     * \param endpoint The endpoint from which we received the SUBSCRIBE
-     * \param rdata The SUBSCRIBE request
-     * \retval NULL The SUBSCRIBE has not been accepted
-     * \retval non-NULL The newly-created subscription
-     */
-    struct ast_sip_subscription *(*new_subscribe)(struct ast_sip_endpoint *endpoint,
-            pjsip_rx_data *rdata);
- 
-    /*!
-     * \brief Called when an endpoint renews a subscription.
-     *
-     * This is a notifier callback.
-     *
-     * Because of the way that the PJSIP evsub framework works, it will automatically
-     * send a response to the SUBSCRIBE. However, the subscription handler must send
+	/*!
+	 * \brief Called when a SUBSCRIBE arrives in order to create a new subscription
+	 *
+	 * This is a notifier callback.
+	 *
+	 * If the notifier wishes to accept the subscription, then it can create
+	 * a new ast_sip_subscription to do so. 
+	 *
+	 * If the notifier chooses to create a new subscription, then it must accept
+	 * the incoming subscription using pjsip_evsub_accept() and it must also
+	 * send an initial NOTIFY with the current subscription state.
+	 *
+	 * \param endpoint The endpoint from which we received the SUBSCRIBE
+	 * \param rdata The SUBSCRIBE request
+	 * \retval NULL The SUBSCRIBE has not been accepted
+	 * \retval non-NULL The newly-created subscription
+	 */
+	struct ast_sip_subscription *(*new_subscribe)(struct ast_sip_endpoint *endpoint,
+			pjsip_rx_data *rdata);
+ 
+	/*!
+	 * \brief Called when an endpoint renews a subscription.
+	 *
+	 * This is a notifier callback.
+	 *
+	 * Because of the way that the PJSIP evsub framework works, it will automatically
+	 * send a response to the SUBSCRIBE. However, the subscription handler must send
 	 * a NOTIFY with the current subscription state when this callback is called.
 	 *
 	 * The response_data that is passed into this callback is used to craft what should
 	 * be in the response to the incoming SUBSCRIBE. It is initialized with a 200 status
 	 * code and all other parameters are empty.
-     *
-     * \param sub The subscription that is being renewed
-     * \param rdata The SUBSCRIBE request in question
+	 *
+	 * \param sub The subscription that is being renewed
+	 * \param rdata The SUBSCRIBE request in question
 	 * \param[out] response_data Data pertaining to the SIP response that should be
 	 * sent to the SUBSCRIBE
-     */
-    void (*resubscribe)(struct ast_sip_subscription *sub,
-            pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data);
- 
-    /*!
-     * \brief Called when a subscription times out.
-     *
-     * This is a notifier callback
-     *
-     * This indicates that the subscription has timed out. The subscription handler is
-     * expected to send a NOTIFY that terminates the subscription.
-     *
-     * \param sub The subscription that has timed out
-     */
-    void (*subscription_timeout)(struct ast_sip_subscription *sub);
- 
-    /*!
-     * \brief Called when a subscription is terminated via a SUBSCRIBE or NOTIFY request
-     *
-     * This is a notifier and subscriber callback.
-     *
-     * The PJSIP subscription framework will automatically send the response to the
-     * request. If a notifier receives this callback, then the subscription handler
+	 */
+	void (*resubscribe)(struct ast_sip_subscription *sub,
+			pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data);
+ 
+	/*!
+	 * \brief Called when a subscription times out.
+	 *
+	 * This is a notifier callback
+	 *
+	 * This indicates that the subscription has timed out. The subscription handler is
+	 * expected to send a NOTIFY that terminates the subscription.
+	 *
+	 * \param sub The subscription that has timed out
+	 */
+	void (*subscription_timeout)(struct ast_sip_subscription *sub);
+ 
+	/*!
+	 * \brief Called when a subscription is terminated via a SUBSCRIBE or NOTIFY request
+	 *
+	 * This is a notifier and subscriber callback.
+	 *
+	 * The PJSIP subscription framework will automatically send the response to the
+	 * request. If a notifier receives this callback, then the subscription handler
 	 * is expected to send a final NOTIFY to terminate the subscription.
-     *
-     * \param sub The subscription being terminated
-     * \param rdata The request that terminated the subscription
-     */
-    void (*subscription_terminated)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
- 
-    /*!
-     * \brief Called when a subscription handler's outbound NOTIFY receives a response
-     *
-     * This is a notifier callback.
-     *
-     * \param sub The subscription
-     * \param rdata The NOTIFY response
-     */
-    void (*notify_response)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
- 
-    /*!
-     * \brief Called when a subscription handler receives an inbound NOTIFY
-     *
-     * This is a subscriber callback.
-     *
-     * Because of the way that the PJSIP evsub framework works, it will automatically
-     * send a response to the NOTIFY. By default this will be a 200 OK response, but
-     * this callback can change details of the response by returning response data
-     * to use.
+	 *
+	 * \param sub The subscription being terminated
+	 * \param rdata The request that terminated the subscription
+	 */
+	void (*subscription_terminated)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
+ 
+	/*!
+	 * \brief Called when a subscription handler's outbound NOTIFY receives a response
+	 *
+	 * This is a notifier callback.
+	 *
+	 * \param sub The subscription
+	 * \param rdata The NOTIFY response
+	 */
+	void (*notify_response)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
+ 
+	/*!
+	 * \brief Called when a subscription handler receives an inbound NOTIFY
+	 *
+	 * This is a subscriber callback.
+	 *
+	 * Because of the way that the PJSIP evsub framework works, it will automatically
+	 * send a response to the NOTIFY. By default this will be a 200 OK response, but
+	 * this callback can change details of the response by returning response data
+	 * to use.
 	 *
 	 * The response_data that is passed into this callback is used to craft what should
 	 * be in the response to the incoming SUBSCRIBE. It is initialized with a 200 status
 	 * code and all other parameters are empty.
-     *
-     * \param sub The subscription
-     * \param rdata The NOTIFY request
+	 *
+	 * \param sub The subscription
+	 * \param rdata The NOTIFY request
 	 * \param[out] response_data Data pertaining to the SIP response that should be
 	 * sent to the SUBSCRIBE
-     */
-    void (*notify_request)(struct ast_sip_subscription *sub,
-            pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data);
- 
-    /*!
-     * \brief Called when it is time for a subscriber to resubscribe
-     *
-     * This is a subscriber callback.
-     *
-     * The subscriber can reresh the subscription using the pjsip_evsub_initiate()
-     * function.
-     *
-     * \param sub The subscription to refresh
-     * \retval 0 Success
-     * \retval non-zero Failure
-     */
-    int (*refresh_subscription)(struct ast_sip_subscription *sub);
+	 */
+	void (*notify_request)(struct ast_sip_subscription *sub,
+			pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data);
+ 
+	/*!
+	 * \brief Called when it is time for a subscriber to resubscribe
+	 *
+	 * This is a subscriber callback.
+	 *
+	 * The subscriber can reresh the subscription using the pjsip_evsub_initiate()
+	 * function.
+	 *
+	 * \param sub The subscription to refresh
+	 * \retval 0 Success
+	 * \retval non-zero Failure
+	 */
+	int (*refresh_subscription)(struct ast_sip_subscription *sub);
 	AST_LIST_ENTRY(ast_sip_subscription_handler) next;
 };
 
@@ -221,7 +329,7 @@
  * \param rdata If acting as a notifier, the SUBSCRIBE request that triggered subscription creation
  */
 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
-        enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata);
+		enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata);
  
  
 /*!

Modified: team/file/pimp_my_publish/res/res_sip_pubsub.c
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_my_publish/res/res_sip_pubsub.c?view=diff&rev=391751&r1=391750&r2=391751
==============================================================================
--- team/file/pimp_my_publish/res/res_sip_pubsub.c (original)
+++ team/file/pimp_my_publish/res/res_sip_pubsub.c Thu Jun 13 17:22:58 2013
@@ -38,14 +38,109 @@
 #include "asterisk/datastore.h"
 #include "asterisk/uuid.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/sched.h"
 #include "asterisk/res_sip.h"
 
-static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata);
-
-static struct pjsip_module sub_module = {
+static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
+
+static struct pjsip_module pubsub_module = {
 	.name = { "PubSub Module", 13 },
 	.priority = PJSIP_MOD_PRIORITY_APPLICATION,
-	.on_rx_request = sub_on_rx_request,
+	.on_rx_request = pubsub_on_rx_request,
+};
+
+static const pj_str_t str_event_name = { "Event", 5 };
+
+/*! \brief Scheduler used for automatically expiring publications */
+static struct ast_sched_context *sched;
+
+/*! \brief Number of buckets for publications (on a per handler) */
+#define PUBLICATIONS_BUCKETS 37
+
+/*! \brief Default expiration time for PUBLISH if one is not specified */
+#define DEFAULT_PUBLISH_EXPIRES 3600
+
+/*! \brief Defined method for PUBLISH */
+const pjsip_method pjsip_publish_method =
+{
+	PJSIP_OTHER_METHOD,
+	{ "PUBLISH", 7 }
+};
+
+/*!
+ * \brief The types of PUBLISH messages defined in RFC 3903
+ */
+enum sip_publish_type {
+	/*!
+	 * \brief Unknown
+	 *
+	 * \details
+	 * This actually is not defined in RFC 3903. We use this as a constant
+	 * to indicate that an incoming PUBLISH does not fit into any of the
+	 * other categories and is thus invalid.
+	 */
+	SIP_PUBLISH_UNKNOWN,
+
+	/*!
+	 * \brief Initial
+	 *
+	 * \details
+	 * The first PUBLISH sent. This will contain a non-zero Expires header
+	 * as well as a body that indicates the current state of the endpoint
+	 * that has sent the message. The initial PUBLISH is the only type
+	 * of PUBLISH to not contain a Sip-If-Match header in it.
+	 */
+	SIP_PUBLISH_INITIAL,
+
+	/*!
+	 * \brief Refresh
+	 *
+	 * \details
+	 * Used to keep a published state from expiring. This will contain a
+	 * non-zero Expires header but no body since its purpose is not to
+	 * update state.
+	 */
+	SIP_PUBLISH_REFRESH,
+
+	/*!
+	 * \brief Modify
+	 *
+	 * \details
+	 * Used to change state from its previous value. This will contain
+	 * a body updating the published state. May or may not contain an
+	 * Expires header.
+	 */
+	SIP_PUBLISH_MODIFY,
+
+	/*!
+	 * \brief Remove
+	 *
+	 * \details
+	 * Used to remove published state from an ESC. This will contain
+	 * an Expires header set to 0 and likely no body.
+	 */
+	SIP_PUBLISH_REMOVE,
+};
+
+/*!
+ * Used to create new entity IDs by ESCs.
+ */
+static int esc_etag_counter;
+
+/*!
+ * \brief Structure representing a SIP publication
+ */
+struct ast_sip_publication {
+	/*! \brief Entity tag for the publication */
+	int entity_tag;
+	/*! \brief Handler for this publication */
+	struct ast_sip_publish_handler *handler;
+	/*! \brief The endpoint with which the subscription is communicating */
+	struct ast_sip_endpoint *endpoint;
+	/*! \brief Expiration time of the publication */
+	int expires;
+	/*! \brief Scheduled item for expiration of publication */
+	int sched_id;
 };
 
 /*!
@@ -114,7 +209,7 @@
 		 * remove the serializer will be successful.
 		 */
 		ast_sip_dialog_set_serializer(sub->dlg, NULL);
-		pjsip_dlg_dec_session(sub->dlg, &sub_module);
+		pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
 	}
 	ast_taskprocessor_unreference(sub->serializer);
 }
@@ -169,7 +264,7 @@
 }
 
 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
-        enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
+		enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
 {
 	struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
 	pjsip_dialog *dlg;
@@ -211,10 +306,10 @@
 	/* We keep a reference to the dialog until our subscription is destroyed. See
 	 * the subscription_destructor for more details
 	 */
-	pjsip_dlg_inc_session(dlg, &sub_module);
+	pjsip_dlg_inc_session(dlg, &pubsub_module);
 	sub->dlg = dlg;
 	ast_sip_dialog_set_serializer(dlg, sub->serializer);
-	pjsip_evsub_set_mod_data(sub->evsub, sub_module.id, sub);
+	pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
 	ao2_ref(endpoint, +1);
 	sub->endpoint = endpoint;
 	sub->handler = handler;
@@ -316,16 +411,78 @@
 	ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
 }
 
+AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
+
+static int publication_hash_fn(const void *obj, const int flags)
+{
+	const struct ast_sip_publication *publication = obj;
+	const int *entity_tag = obj;
+
+	return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
+}
+
+static int publication_cmp_fn(void *obj, void *arg, int flags)
+{
+	const struct ast_sip_publication *publication1 = obj;
+	const struct ast_sip_publication *publication2 = arg;
+	const int *entity_tag = obj;
+
+	return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
+		CMP_MATCH | CMP_STOP : 0);
+}
+
+static void publish_add_handler(struct ast_sip_publish_handler *handler)
+{
+	SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
+}
+
+int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
+{
+	if (ast_strlen_zero(handler->event_name)) {
+		ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
+		return -1;
+	}
+
+	if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
+		publication_hash_fn, publication_cmp_fn))) {
+		ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
+			handler->event_name);
+		return -1;
+	}
+
+	publish_add_handler(handler);
+
+	ast_module_ref(ast_module_info->self);
+
+	return 0;
+}
+
+void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
+{
+	struct ast_sip_publish_handler *iter;
+	SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+	AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
+		if (handler == iter) {
+			AST_RWLIST_REMOVE_CURRENT(next);
+			ao2_cleanup(handler->publications);
+			ast_module_unref(ast_module_info->self);
+			break;
+		}
+	}
+	AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
 
-static void add_handler(struct ast_sip_subscription_handler *handler)
+static void sub_add_handler(struct ast_sip_subscription_handler *handler)
 {
 	SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
 	AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
 	ast_module_ref(ast_module_info->self);
 }
 
-static int handler_exists_for_event_name(const char *event_name)
+static int sub_handler_exists_for_event_name(const char *event_name)
 {
 	struct ast_sip_subscription_handler *iter;
 	SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
@@ -357,7 +514,7 @@
 		pj_cstr(&accept[i], handler->accept[i]);
 	}
 
-	if (!handler_exists_for_event_name(handler->event_name)) {
+	if (!sub_handler_exists_for_event_name(handler->event_name)) {
 		pj_str_t event;
 
 		pj_cstr(&event, handler->event_name);
@@ -367,14 +524,14 @@
 		} else if (!strcmp(handler->event_name, "presence")) {
 			pjsip_pres_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
 		} else {
-			pjsip_evsub_register_pkg(&sub_module, &event, DEFAULT_EXPIRES, i, accept);
+			pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
 		}
 	} else {
-		pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &sub_module, PJSIP_H_ACCEPT, NULL,
+		pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL,
 			i, accept);
 	}
 
-	add_handler(handler);
+	sub_add_handler(handler);
 	return 0;
 }
 
@@ -392,7 +549,7 @@
 	AST_RWLIST_TRAVERSE_SAFE_END;
 }
 
-static struct ast_sip_subscription_handler *find_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
+static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
 {
 	struct ast_sip_subscription_handler *iter;
 	int match = 0;
@@ -430,9 +587,8 @@
 	return iter;
 }
 
-static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata)
-{
-	static const pj_str_t event_name = { "Event", 5 };
+static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
+{
 	char event[32];
 	char accept[AST_SIP_MAX_ACCEPT][64];
 	pjsip_accept_hdr *accept_header;
@@ -442,14 +598,10 @@
 	struct ast_sip_subscription *sub;
 	int i;
 
-	if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
-		return PJ_FALSE;
-	}
-
 	endpoint = ast_pjsip_rdata_get_endpoint(rdata);
 	ast_assert(endpoint != NULL);
 
-	event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &event_name, rdata->msg_info.msg->hdr.next);
+	event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
 	if (!event_header) {
 		ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
 		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
@@ -468,9 +620,9 @@
 		ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
 	}
 
-	handler = find_handler(event, accept, accept_header->count);
+	handler = find_sub_handler(event, accept, accept_header->count);
 	if (!handler) {
-		ast_log(LOG_WARNING, "No registered handler for event %s\n", event);
+		ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
 		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
 		return PJ_TRUE;
 	}
@@ -481,6 +633,238 @@
 	return PJ_TRUE;
 }
 
+static struct ast_sip_publish_handler *find_pub_handler(const char *event)
+{
+	struct ast_sip_publish_handler *iter = NULL;
+	SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+	AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
+		if (strcmp(event, iter->event_name)) {
+			ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
+			continue;
+		}
+		ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
+		break;
+	}
+
+	return iter;
+}
+
+static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
+	pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
+{
+	pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+	if (etag_hdr) {
+		char etag[pj_strlen(&etag_hdr->hvalue) + 1];
+
+		ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
+
+		if (sscanf(etag, "%30d", entity_id) != 1) {
+			return SIP_PUBLISH_UNKNOWN;
+		}
+	}
+
+	*expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+
+	if (!(*expires)) {
+		return SIP_PUBLISH_REMOVE;
+	} else if (!etag_hdr && rdata->msg_info.msg->body) {
+		return SIP_PUBLISH_INITIAL;
+	} else if (etag_hdr && !rdata->msg_info.msg->body) {
+		return SIP_PUBLISH_REFRESH;
+	} else if (etag_hdr && rdata->msg_info.msg->body) {
+		return SIP_PUBLISH_MODIFY;
+	}
+
+	return SIP_PUBLISH_UNKNOWN;
+}
+
+static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
+	struct ast_sip_publish_handler *handler)
+{
+	struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
+
+	if (!publication) {
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+		return NULL;
+	}
+
+	publication->handler = handler;
+
+	ao2_link(handler->publications, publication);
+
+	return publication;
+}
+
+static int publish_expire(const void *data)
+{
+	RAII_VAR(struct ast_sip_publication *, publication, (struct ast_sip_publication*) data, ao2_cleanup);
+
+	publication->handler->publish_expire(publication);
+
+	ao2_unlink(publication->handler->publications, publication);
+	publication->sched_id = -1;
+
+	return 0;
+}
+
+static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
+{
+	pjsip_event_hdr *event_header;
+	struct ast_sip_publish_handler *handler;
+	RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+	char event[32];
+	static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
+	pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
+	enum sip_publish_type publish_type;
+	RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
+	int expires = 0, entity_id;
+
+	endpoint = ast_pjsip_rdata_get_endpoint(rdata);
+	ast_assert(endpoint != NULL);
+
+	event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
+	if (!event_header) {
+		ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+		return PJ_TRUE;
+	}
+	ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+
+	handler = find_pub_handler(event);
+	if (!handler) {
+		ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+		return PJ_TRUE;
+	}
+
+	publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
+
+	if (publish_type == SIP_PUBLISH_UNKNOWN) {
+		pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
+		return PJ_TRUE;
+	}
+
+	/* If this is not an initial publish ensure that a publication is present */
+	if (publish_type != SIP_PUBLISH_INITIAL) {
+		if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY))) {
+			static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
+
+			ast_log(LOG_NOTICE, "No entity '%d'\n", entity_id);
+
+			pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
+				NULL, NULL);
+			return PJ_TRUE;
+		}
+
+		/* Update the expires here so that the created responses will contain the correct value */
+		publication->expires = expires;
+	}
+
+	switch (publish_type) {
+		case SIP_PUBLISH_UNKNOWN:
+			/* This will never get reached as we short circuit early */
+			break;
+		case SIP_PUBLISH_INITIAL:
+			publication = publish_request_initial(endpoint, rdata, handler);
+			break;
+		case SIP_PUBLISH_REFRESH:
+		case SIP_PUBLISH_MODIFY:
+			if (handler->publish_refresh(publication, rdata)) {
+				/* If an error occurs we want to terminate the publication */
+				expires = 0;
+			}
+			break;
+		case SIP_PUBLISH_REMOVE:
+			handler->publish_termination(publication, rdata);
+			break;
+		default:
+			pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
+			break;
+	}
+
+	if (publication) {
+		if (expires) {
+			AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
+				ao2_ref(publication, -1), NULL, ao2_ref(publication, +1));
+		} else {
+			AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
+			ao2_unlink(handler->publications, publication);
+		}
+	}
+
+	return PJ_TRUE;
+}
+
+/*! \brief Internal destructor for publications */
+static void publication_destroy_fn(void *obj)
+{
+	struct ast_sip_publication *publication = obj;
+
+	ao2_cleanup(publication->endpoint);
+}
+
+struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
+{
+	struct ast_sip_publication *publication;
+	pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+	ast_assert(endpoint != NULL);
+
+	if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
+		return NULL;
+	}
+
+	publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
+	ao2_ref(endpoint, +1);
+	publication->endpoint = endpoint;
+	publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+	publication->sched_id = -1;
+
+	return publication;
+}
+
+struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
+{
+	return pub->endpoint;
+}
+
+int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
+	pjsip_tx_data **tdata)
+{
+	RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
+	RAII_VAR(char *, expires, NULL, ast_free_ptr);
+
+	if (ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) {
+		return -1;
+	}
+
+	if (ast_asprintf(&expires, "%d", pub->expires) < 0) {
+		return -1;
+	}
+
+	if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
+		return -1;
+	}
+
+	ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
+	ast_sip_add_header(*tdata, "Expires", expires);
+
+	return 0;
+}
+
+static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
+{
+	if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
+		return pubsub_on_rx_subscribe_request(rdata);
+	} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
+		return pubsub_on_rx_publish_request(rdata);
+	}
+
+	return PJ_FALSE;
+}
+
 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
 {
 	struct ast_sip_subscription *sub;
@@ -488,7 +872,7 @@
 		return;
 	}
 
-	sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 	if (!sub) {
 		return;
 	}
@@ -505,12 +889,12 @@
 	if (sub->handler->subscription_shutdown) {
 		sub->handler->subscription_shutdown(sub);
 	}
-	pjsip_evsub_set_mod_data(evsub, sub_module.id, NULL);
+	pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
 }
 
 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
 {
-	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 
 	if (!sub) {
 		return;
@@ -573,7 +957,7 @@
 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
 		int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
 {
-	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 	struct ast_sip_subscription_response_data response_data = {
 		.status_code = 200,
 	};
@@ -595,7 +979,7 @@
 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
 		pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
 {
-	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 	struct ast_sip_subscription_response_data response_data = {
 		.status_code = 200,
 	};
@@ -625,7 +1009,7 @@
 
 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
 {
-	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 
 	ao2_ref(sub, +1);
 	ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
@@ -642,7 +1026,7 @@
 
 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
 {
-	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id);
+	struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 
 	ao2_ref(sub, +1);
 	ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
@@ -651,14 +1035,33 @@
 static int load_module(void)
 {
 	pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
-	if (ast_sip_register_service(&sub_module)) {
-		return AST_MODULE_LOAD_DECLINE;
-	}
+
+	if (!(sched = ast_sched_context_create())) {
+		ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	if (ast_sched_start_thread(sched)) {
+		ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
+		ast_sched_context_destroy(sched);
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	if (ast_sip_register_service(&pubsub_module)) {
+		ast_log(LOG_ERROR, "Could not register pubsub service\n");
+		ast_sched_context_destroy(sched);
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
 	return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
+	if (sched) {
+		ast_sched_context_destroy(sched);
+	}
+
 	return 0;
 }
 

Modified: team/file/pimp_my_publish/res/res_sip_pubsub.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/file/pimp_my_publish/res/res_sip_pubsub.exports.in?view=diff&rev=391751&r1=391750&r2=391751
==============================================================================
--- team/file/pimp_my_publish/res/res_sip_pubsub.exports.in (original)
+++ team/file/pimp_my_publish/res/res_sip_pubsub.exports.in Thu Jun 13 17:22:58 2013
@@ -11,6 +11,11 @@
 		LINKER_SYMBOL_PREFIXast_sip_subscription_remove_datastore;
 		LINKER_SYMBOL_PREFIXast_sip_register_subscription_handler;
 		LINKER_SYMBOL_PREFIXast_sip_unregister_subscription_handler;
+		LINKER_SYMBOL_PREFIXast_sip_create_publication;
+		LINKER_SYMBOL_PREFIXast_sip_publication_get_endpoint;
+		LINKER_SYMBOL_PREFIXast_sip_publication_create_response;
+		LINKER_SYMBOL_PREFIXast_sip_register_publish_handler;
+		LINKER_SYMBOL_PREFIXast_sip_unregister_publish_handler;
 	local:
 		*;
 };




More information about the asterisk-commits mailing list