[svn-commits] kmoore: branch kmoore/stasis-mwi r382290 - in /team/kmoore/stasis-mwi: channe...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Feb 28 15:14:06 CST 2013


Author: kmoore
Date: Thu Feb 28 15:14:01 2013
New Revision: 382290

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382290
Log:
Functional MWI via stasis

This is working correctly on chan_sip and is assumed to be working
correctly on other channel drivers. MWI distribution via XMPP/Jabber
may be functional, but is untested. Pollmailboxes is most definitely
broken until subscription change notices make their way into Stasis
since it relies on being able to tell what has been subscribed to.

At this point, AST_EVENT_MWI events are generated, but aren't actually
used by anything now that all direct subscriptions have been changed
over.

The various stasis subscriptions still need heavy message type checking
to be type safe.

Modified:
    team/kmoore/stasis-mwi/channels/chan_dahdi.c
    team/kmoore/stasis-mwi/channels/chan_iax2.c
    team/kmoore/stasis-mwi/channels/chan_mgcp.c
    team/kmoore/stasis-mwi/channels/chan_sip.c
    team/kmoore/stasis-mwi/channels/chan_skinny.c
    team/kmoore/stasis-mwi/channels/sig_pri.c
    team/kmoore/stasis-mwi/channels/sig_pri.h
    team/kmoore/stasis-mwi/channels/sip/include/sip.h
    team/kmoore/stasis-mwi/include/asterisk/app.h
    team/kmoore/stasis-mwi/include/asterisk/stasis.h
    team/kmoore/stasis-mwi/include/asterisk/xmpp.h
    team/kmoore/stasis-mwi/main/app.c
    team/kmoore/stasis-mwi/main/stasis_message.c
    team/kmoore/stasis-mwi/res/res_jabber.c
    team/kmoore/stasis-mwi/res/res_xmpp.c

Modified: team/kmoore/stasis-mwi/channels/chan_dahdi.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/chan_dahdi.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/chan_dahdi.c (original)
+++ team/kmoore/stasis-mwi/channels/chan_dahdi.c Thu Feb 28 15:14:01 2013
@@ -502,7 +502,7 @@
 
 static int dahdi_sendtext(struct ast_channel *c, const char *text);
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	/* This module does not handle MWI in an event-based manner.  However, it
 	 * subscribes to MWI for each mailbox that is configured so that the core
@@ -1215,7 +1215,7 @@
 	 */
 	char mailbox[AST_MAX_EXTENSION];
 	/*! \brief Opaque event subscription parameters for message waiting indication support. */
-	struct ast_event_sub *mwi_event_sub;
+	struct stasis_subscription *mwi_event_sub;
 	/*! \brief Delayed dialing for E911.  Overlap digits for ISDN. */
 	char dialdest[256];
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
@@ -5970,7 +5970,7 @@
 	if (p->use_smdi)
 		ast_smdi_interface_unref(p->smdi_iface);
 	if (p->mwi_event_sub)
-		ast_event_unsubscribe(p->mwi_event_sub);
+		stasis_unsubscribe(p->mwi_event_sub);
 	if (p->vars) {
 		ast_variables_destroy(p->vars);
 	}
@@ -13228,15 +13228,14 @@
 		ast_copy_string(tmp->mailbox, conf->chan.mailbox, sizeof(tmp->mailbox));
 		if (channel != CHAN_PSEUDO && !ast_strlen_zero(tmp->mailbox)) {
 			char *mailbox, *context;
+			struct ast_str *uniqueid = ast_str_alloca(128);
 			mailbox = context = ast_strdupa(tmp->mailbox);
 			strsep(&context, "@");
 			if (ast_strlen_zero(context))
 				context = "default";
-			tmp->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "Dahdi MWI subscription", NULL,
-				AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-				AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-				AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-				AST_EVENT_IE_END);
+
+			ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+			tmp->mwi_event_sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), mwi_event_cb, NULL);
 		}
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
 		tmp->mwisend_setting = conf->chan.mwisend_setting;

Modified: team/kmoore/stasis-mwi/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/chan_iax2.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/chan_iax2.c (original)
+++ team/kmoore/stasis-mwi/channels/chan_iax2.c Thu Feb 28 15:14:01 2013
@@ -533,7 +533,7 @@
 
 	int expire;					/*!< Schedule entry for expiry */
 	int expiry;					/*!< How soon to expire */
-	iax2_format capability;        /*!< Capability */
+	iax2_format capability;				/*!< Capability */
 
 	/* Qualification */
 	int callno;					/*!< Call number of POKE request */
@@ -545,12 +545,12 @@
 	int pokefreqnotok;				/*!< How often to check when the host has been determined to be down */
 	int historicms;					/*!< How long recent average responses took */
 	int smoothing;					/*!< Sample over how many units to determine historic ms */
-	uint16_t maxcallno;					/*!< Max call number limit for this peer.  Set on registration */
-
-	struct ast_event_sub *mwi_event_sub;
+	uint16_t maxcallno;				/*!< Max call number limit for this peer.  Set on registration */
+
+	struct stasis_subscription *mwi_event_sub;	/*!< This subscription lets pollmailboxes know which mailboxes need to be polled */
 
 	struct ast_acl_list *acl;
-	enum calltoken_peer_enum calltoken_required;        /*!< Is calltoken validation required or not, can be YES, NO, or AUTO */
+	enum calltoken_peer_enum calltoken_required;	/*!< Is calltoken validation required or not, can be YES, NO, or AUTO */
 };
 
 #define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
@@ -1316,7 +1316,7 @@
 	}
 }
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	/* The MWI subscriptions exist just so the core knows we care about those
 	 * mailboxes.  However, we just grab the events out of the cache when it
@@ -12392,7 +12392,7 @@
 		ast_dnsmgr_release(peer->dnsmgr);
 
 	if (peer->mwi_event_sub)
-		ast_event_unsubscribe(peer->mwi_event_sub);
+		stasis_unsubscribe(peer->mwi_event_sub);
 
 	ast_string_field_free_memory(peer);
 }
@@ -12666,14 +12666,17 @@
 
 	if (!ast_strlen_zero(peer->mailbox)) {
 		char *mailbox, *context;
+		struct ast_str *uniqueid = ast_str_alloca(128);
+
 		context = mailbox = ast_strdupa(peer->mailbox);
 		strsep(&context, "@");
-		if (ast_strlen_zero(context))
+		if (ast_strlen_zero(context)) {
 			context = "default";
-		peer->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "IAX MWI subscription", NULL,
-			AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-			AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-			AST_EVENT_IE_END);
+		}
+
+		ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+		peer->mwi_event_sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), mwi_event_cb, NULL);
 	}
 
 	if (subscribe_acl_change) {

Modified: team/kmoore/stasis-mwi/channels/chan_mgcp.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/chan_mgcp.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/chan_mgcp.c (original)
+++ team/kmoore/stasis-mwi/channels/chan_mgcp.c Thu Feb 28 15:14:01 2013
@@ -342,7 +342,7 @@
 	char curtone[80];			/*!< Current tone */
 	char mailbox[AST_MAX_EXTENSION];
 	char parkinglot[AST_MAX_CONTEXT];   /*!< Parkinglot */
-	struct ast_event_sub *mwi_event_sub;
+	struct stasis_subscription *mwi_event_sub;
 	ast_group_t callgroup;
 	ast_group_t pickupgroup;
 	int callwaiting;
@@ -3974,6 +3974,7 @@
 	struct mgcp_endpoint *e;
 	struct mgcp_subchannel *sub;
 	struct ast_variable *chanvars = NULL;
+	struct ast_str *uniqueid = ast_str_alloca(128);
 
 	/*char txident[80];*/
 	int i=0, y=0;
@@ -4175,11 +4176,9 @@
 					if (ast_strlen_zero(cntx)) {
 						cntx = "default";
 					}
-					e->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "MGCP MWI subscription", NULL,
-						AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox,
-						AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx,
-						AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-						AST_EVENT_IE_END);
+					ast_str_reset(uniqueid);
+					ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx);
+					e->mwi_event_sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), mwi_event_cb, NULL);
 				}
 				snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", ast_random());
 				e->msgstate = -1;
@@ -4518,8 +4517,9 @@
 		ast_free(s);
 	}
 
-	if (e->mwi_event_sub)
-		ast_event_unsubscribe(e->mwi_event_sub);
+	if (e->mwi_event_sub) {
+		stasis_unsunscribe(e->mwi_event_sub);
+	}
 
 	if (e->chanvars) {
 		ast_variables_destroy(e->chanvars);

Modified: team/kmoore/stasis-mwi/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/chan_sip.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/chan_sip.c (original)
+++ team/kmoore/stasis-mwi/channels/chan_sip.c Thu Feb 28 15:14:01 2013
@@ -1274,7 +1274,7 @@
 static int sip_poke_peer(struct sip_peer *peer, int force);
 static void sip_poke_all_peers(void);
 static void sip_peer_hold(struct sip_pvt *p, int hold);
-static void mwi_event_cb(const struct ast_event *, void *);
+static void mwi_event_cb(void *, struct stasis_topic *, struct stasis_message *);
 static void network_change_event_cb(const struct ast_event *, void *);
 static void acl_change_event_cb(const struct ast_event *event, void *userdata);
 static void sip_keepalive_all_peers(void);
@@ -5203,7 +5203,7 @@
 static void destroy_mailbox(struct sip_mailbox *mailbox)
 {
 	if (mailbox->event_sub)
-		ast_event_unsubscribe(mailbox->event_sub);
+		stasis_unsubscribe(mailbox->event_sub);
 	ast_free(mailbox);
 }
 
@@ -16504,7 +16504,7 @@
 }
 
 /*! \brief Receive MWI events that we have subscribed to */
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct sip_peer *peer = userdata;
 
@@ -27444,16 +27444,17 @@
 static void add_peer_mwi_subs(struct sip_peer *peer)
 {
 	struct sip_mailbox *mailbox;
+	struct ast_str *uniqueid = ast_str_alloca(128);
 
 	AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
 		if (mailbox->event_sub) {
-			ast_event_unsubscribe(mailbox->event_sub);
-		}
-
-		mailbox->event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "SIP mbox event", peer,
-			AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox,
-			AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"),
-			AST_EVENT_IE_END);
+			stasis_unsubscribe(mailbox->event_sub);
+		}
+
+		ast_str_reset(uniqueid);
+		ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default"));
+
+		mailbox->event_sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), mwi_event_cb, peer);
 	}
 }
 

Modified: team/kmoore/stasis-mwi/channels/chan_skinny.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/chan_skinny.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/chan_skinny.c (original)
+++ team/kmoore/stasis-mwi/channels/chan_skinny.c Thu Feb 28 15:14:01 2013
@@ -1400,7 +1400,7 @@
 	SKINNY_LINE_OPTIONS
 	ast_mutex_t lock;
 	struct skinny_container *container;
-	struct ast_event_sub *mwi_event_sub; /* Event based MWI */
+	struct stasis_subscription *mwi_event_sub; /* Event based MWI */
 	struct skinny_subchannel *activesub;
 	AST_LIST_HEAD(, skinny_subchannel) sub;
 	AST_LIST_HEAD(, skinny_subline) sublines;
@@ -1560,7 +1560,7 @@
 static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
 static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
 static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
-static void mwi_event_cb(const struct ast_event *event, void *userdata);
+static void mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg);
 static int skinny_dialer_cb(const void *data);
 static int skinny_reload(void);
 
@@ -2210,7 +2210,7 @@
 				manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Registered\r\n", l->name, d->name);
 				register_exten(l);
 				/* initialize MWI on line and device */
-				mwi_event_cb(0, l);
+				mwi_event_cb(l, NULL, NULL);
 				AST_LIST_TRAVERSE(&l->sublines, subline, list) {
 					ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
 				}
@@ -3331,7 +3331,7 @@
 	send_callinfo(sub);
 }
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct skinny_line *l = userdata;
 	struct skinny_device *d = l->device;
@@ -3342,8 +3342,9 @@
 		return;
 	}
 
-	if (event) {
-		l->newmsgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+	if (msg) {
+		struct stasis_mwi_state *mwi_state = stasis_message_data(msg);
+		l->newmsgs = mwi_state->new_msgs;
 	}
 
 	if (l->newmsgs) {
@@ -7892,16 +7893,18 @@
 
 	if (!ast_strlen_zero(l->mailbox)) {
 		char *cfg_mailbox, *cfg_context;
+		struct ast_str *uniqueid = ast_str_alloca(128);
+
 		cfg_context = cfg_mailbox = ast_strdupa(l->mailbox);
 		ast_verb(3, "Setting mailbox '%s' on line %s\n", cfg_mailbox, l->name);
 		strsep(&cfg_context, "@");
-		if (ast_strlen_zero(cfg_context))
-			 cfg_context = "default";
-		l->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "skinny MWI subsciption", l,
-			AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, cfg_mailbox,
-			AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cfg_context,
-			AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-			AST_EVENT_IE_END);
+		if (ast_strlen_zero(cfg_context)) {
+			cfg_context = "default";
+		}
+
+		ast_str_set(&uniqueid, 0, "%s@%s", cfg_mailbox, cfg_context);
+
+		l->mwi_event_sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), mwi_event_cb, l);
 	}
 
 	if (!ast_strlen_zero(vmexten) && ast_strlen_zero(l->vmexten)) {
@@ -8332,7 +8335,7 @@
 				ast_mutex_unlock(&sub->lock);
 			}
 			if (l->mwi_event_sub)
-				ast_event_unsubscribe(l->mwi_event_sub);
+				stasis_unsubscribe(l->mwi_event_sub);
 			ast_mutex_unlock(&l->lock);
 			manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Unregistered\r\n", l->name, d->name);
 			unregister_exten(l);

Modified: team/kmoore/stasis-mwi/channels/sig_pri.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/sig_pri.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/sig_pri.c (original)
+++ team/kmoore/stasis-mwi/channels/sig_pri.c Thu Feb 28 15:14:01 2013
@@ -8752,23 +8752,30 @@
  *
  * \return Nothing
  */
-static void sig_pri_mwi_event_cb(const struct ast_event *event, void *userdata)
+static void sig_pri_mwi_event_cb(void *userdata, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct sig_pri_span *pri = userdata;
 	const char *mbox_context;
 	const char *mbox_number;
 	int num_messages;
 	int idx;
-
-	mbox_number = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+	struct stasis_mwi_state *mwi_state;
+
+	if (!msg) {
+		return;
+	}
+
+	mwi_state = stasis_message_data(msg);
+
+	mbox_number = mwi_state->mailbox;
 	if (ast_strlen_zero(mbox_number)) {
 		return;
 	}
-	mbox_context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+	mbox_context = mwi_state->context;
 	if (ast_strlen_zero(mbox_context)) {
 		return;
 	}
-	num_messages = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+	num_messages = mwi_state->new_msgs;
 
 	for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) {
 		if (!pri->mbox[idx].sub) {
@@ -8842,7 +8849,8 @@
 #if defined(HAVE_PRI_MWI)
 	for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) {
 		if (pri->mbox[idx].sub) {
-			pri->mbox[idx].sub = ast_event_unsubscribe(pri->mbox[idx].sub);
+			stasis_unsubscribe(pri->mbox[idx].sub);
+			pri->mbox[idx].sub = NULL;
 		}
 	}
 #endif	/* defined(HAVE_PRI_MWI) */
@@ -8906,13 +8914,15 @@
 	char *saveptr;
 	char *prev_vm_number;
 	struct ast_str *mwi_description = ast_str_alloca(64);
+	struct ast_str *uniqueid = ast_str_alloca(128);
 #endif	/* defined(HAVE_PRI_MWI) */
 
 #if defined(HAVE_PRI_MWI)
 	/* Prepare the mbox[] for use. */
 	for (i = 0; i < ARRAY_LEN(pri->mbox); ++i) {
 		if (pri->mbox[i].sub) {
-			pri->mbox[i].sub = ast_event_unsubscribe(pri->mbox[i].sub);
+			stasis_unsubscribe(pri->mbox[i].sub);
+			pri->mbox[i].sub = NULL;
 		}
 	}
 #endif	/* defined(HAVE_PRI_MWI) */
@@ -8977,13 +8987,13 @@
 		/* Fill the mbox[] element. */
 		pri->mbox[i].number = mbox_number;
 		pri->mbox[i].context = mbox_context;
+
+		ast_str_reset(uniqueid);
+		ast_str_set(&uniqueid, 0, "%s@%s", mbox_number, mbox_context);
+
 		ast_str_set(&mwi_description, -1, "%s span %d[%d] MWI mailbox %s@%s",
 			sig_pri_cc_type_name, pri->span, i, mbox_number, mbox_context);
-		pri->mbox[i].sub = ast_event_subscribe(AST_EVENT_MWI, sig_pri_mwi_event_cb,
-			ast_str_buffer(mwi_description), pri,
-			AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox_number,
-			AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, mbox_context,
-			AST_EVENT_IE_END);
+		pri->mbox[i].sub = stasis_subscribe(stasis_mwi_topic(ast_str_buffer(uniqueid)), sig_pri_mwi_event_cb, pri);
 		if (!pri->mbox[i].sub) {
 			ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s@%s.",
 				sig_pri_cc_type_name, pri->span, mbox_number, mbox_context);

Modified: team/kmoore/stasis-mwi/channels/sig_pri.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/sig_pri.h?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/sig_pri.h (original)
+++ team/kmoore/stasis-mwi/channels/sig_pri.h Thu Feb 28 15:14:01 2013
@@ -405,7 +405,7 @@
 	 * \brief MWI mailbox event subscription.
 	 * \note NULL if mailbox not configured.
 	 */
-	struct ast_event_sub *sub;
+	struct stasis_subscription *sub;
 	/*! \brief Mailbox number */
 	const char *number;
 	/*! \brief Mailbox context. */

Modified: team/kmoore/stasis-mwi/channels/sip/include/sip.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/channels/sip/include/sip.h?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/channels/sip/include/sip.h (original)
+++ team/kmoore/stasis-mwi/channels/sip/include/sip.h Thu Feb 28 15:14:01 2013
@@ -1258,7 +1258,7 @@
  */
 struct sip_mailbox {
 	/*! Associated MWI subscription */
-	struct ast_event_sub *event_sub;
+	struct stasis_subscription *event_sub;
 	AST_LIST_ENTRY(sip_mailbox) entry;
 	unsigned int delme:1;
 	char *context;

Modified: team/kmoore/stasis-mwi/include/asterisk/app.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/app.h?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/app.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/app.h Thu Feb 28 15:14:01 2013
@@ -28,6 +28,7 @@
 #include "asterisk/threadstorage.h"
 #include "asterisk/file.h"
 #include "asterisk/linkedlists.h"
+#include "asterisk/utils.h"
 
 struct ast_flags64;
 
@@ -1097,12 +1098,28 @@
  * \retval -1 Failure
  * \since 12
  */
-int stasis_publish_mwi_state(
+#define stasis_publish_mwi_state(uniqueid, mailbox, context, new_msgs, old_msgs) \
+	stasis_publish_mwi_state_full(uniqueid, mailbox, context, new_msgs, old_msgs, NULL)
+
+/*!
+ * \brief Publish a MWI state update via stasis with EID
+ * \param[in] uniqueid A unique identifier for this mailbox (usually mailbox at context)
+ * \param[in] mailbox The number identifying this mailbox
+ * \param[in] context The context this mailbox resides in
+ * \param[in] new_msgs The number of new messages in this mailbox
+ * \param[in] old_msgs The number of old messages in this mailbox
+ * \param[in] eid The EID of the server that originally published the message
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int stasis_publish_mwi_state_full(
 			const char *uniqueid,
 			const char *mailbox,
 			const char *context,
 			int new_msgs,
-			int old_msgs);
+			int old_msgs,
+			struct ast_eid *eid);
 
 /*!
  * \brief The structure that contains MWI state
@@ -1124,7 +1141,16 @@
  * \retval NULL if it has not been allocated
  * \since 12
  */
-struct stasis_topic *stasis_mwi_topic(void);
+struct stasis_topic *stasis_mwi_topic_all(void);
+
+/*!
+ * \brief Get the Stasis topic for MWI messages on a unique ID
+ * \param uniqueid The unique id for which to get the topic
+ * \retval The topic structure for MWI messages for a given uniqueid
+ * \retval NULL if it failed to be found or allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_mwi_topic(const char *uniqueid);
 
 /*!
  * \brief Get the Stasis caching topic for MWI messages

Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Thu Feb 28 15:14:01 2013
@@ -130,6 +130,8 @@
  * are cleaned up.
  */
 
+#include "asterisk/utils.h"
+
 /*! @{ */
 
 /*!
@@ -207,6 +209,23 @@
  * \since 12
  */
 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
+
+/*!
+ * \brief Set the EID for the server on which this message was created.
+ * \param msg Message.
+ * \param eid EID to set on this message.
+ * \since 12
+ */
+void stasis_message_eid_set(struct stasis_message *msg, const struct ast_eid *eid);
+
+/*!
+ * \brief Get the EID for the server on which this message was created.
+ * \param msg Message.
+ * \return Pointer to the \a eid.
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
 
 /*! @} */
 

Modified: team/kmoore/stasis-mwi/include/asterisk/xmpp.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/xmpp.h?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/xmpp.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/xmpp.h Thu Feb 28 15:14:01 2013
@@ -134,7 +134,7 @@
         pthread_t thread;
 	int timeout;
 	unsigned int reconnect:1; /*!< Reconnect this client */
-	struct ast_event_sub *mwi_sub; /*!< If distributing event information the MWI subscription */
+	struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */
 	struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */
 };
 

Modified: team/kmoore/stasis-mwi/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/app.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/main/app.c (original)
+++ team/kmoore/stasis-mwi/main/app.c Thu Feb 28 15:14:01 2013
@@ -80,9 +80,29 @@
 
 static AST_LIST_HEAD_STATIC(zombies, zombie);
 
-static struct stasis_topic *__mwi_topic;
+static struct stasis_topic *__mwi_topic_all;
 static struct stasis_caching_topic *__mwi_topic_cached;
 static struct stasis_message_type *__mwi_message_type;
+static struct ao2_container *__mwi_topics;
+
+struct mwi_topic {
+	char *uniqueid;
+	struct stasis_subscription *forward;
+	struct stasis_topic *topic;
+};
+
+static void mwi_topic_dtor(void *obj)
+{
+	struct mwi_topic *topic = obj;
+	ast_free(topic->uniqueid);
+	ao2_cleanup(topic->forward);
+	ao2_cleanup(topic->topic);
+}
+
+static struct mwi_topic *mwi_topic_alloc(void)
+{
+	return ao2_alloc(sizeof(struct mwi_topic), mwi_topic_dtor);
+}
 
 static void *shaun_of_the_dead(void *data)
 {
@@ -2646,9 +2666,9 @@
 	ast_string_field_free_memory(mwi_state);
 }
 
-struct stasis_topic *stasis_mwi_topic(void)
-{
-	return __mwi_topic;
+struct stasis_topic *stasis_mwi_topic_all(void)
+{
+	return __mwi_topic_all;
 }
 
 struct stasis_caching_topic *stasis_mwi_topic_cached(void)
@@ -2661,12 +2681,47 @@
 	return __mwi_message_type;
 }
 
-int stasis_publish_mwi_state(
+struct stasis_topic *stasis_mwi_topic(const char *uniqueid)
+{
+	RAII_VAR(struct mwi_topic *, topic, ao2_find(__mwi_topics, uniqueid, OBJ_KEY), ao2_cleanup);
+
+	if (topic) {
+		return topic->topic;
+	}
+
+	topic = mwi_topic_alloc();
+
+	if (!topic) {
+		return NULL;
+	}
+
+	topic->topic = stasis_topic_create(uniqueid);
+	if (!topic->topic) {
+		return NULL;
+	}
+
+	topic->forward = stasis_forward_all(topic->topic, stasis_mwi_topic_all());
+	if (!topic->forward) {
+		return NULL;
+	}
+
+	topic->uniqueid = ast_strdup(uniqueid);
+	if (!topic->uniqueid) {
+		return NULL;
+	}
+	
+	ao2_link(__mwi_topics, topic);
+
+	return topic->topic;
+}
+
+int stasis_publish_mwi_state_full(
 			const char *uniqueid,
 			const char *mailbox,
 			const char *context,
 			int new_msgs,
-			int old_msgs)
+			int old_msgs,
+			struct ast_eid *eid)
 {
 	RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@@ -2676,15 +2731,19 @@
 		return -1;
 	}
 
+	ast_string_field_set(mwi_state, uniqueid, uniqueid);
 	ast_string_field_set(mwi_state, mailbox, mailbox);
 	ast_string_field_set(mwi_state, context, context);
 	mwi_state->new_msgs = new_msgs;
 	mwi_state->old_msgs = old_msgs;
 
 	message = stasis_message_create(stasis_mwi_message_type(), mwi_state);
-
-	ast_assert(stasis_mwi_topic() != NULL);
-	stasis_publish(stasis_mwi_topic(), message);
+	if (eid) {
+		stasis_message_eid_set(message, eid);
+	}
+
+	ast_assert(stasis_mwi_topic(uniqueid) != NULL);
+	stasis_publish(stasis_mwi_topic(uniqueid), message);
 
 	ao2_ref(mwi_state, +1);
 	return 0;
@@ -2701,21 +2760,39 @@
 	return mwi_state->uniqueid;
 }
 
+static int mwi_topic_hash(const void *obj, const int flags)
+{
+	const char *uniqueid = (flags & OBJ_KEY) ? obj : ((struct mwi_topic*) obj)->uniqueid;
+	return ast_str_case_hash(uniqueid);
+}
+
+static int mwi_topic_cmp(void *obj, void *arg, int flags)
+{
+	struct mwi_topic *opt1 = obj, *opt2 = arg;
+	const char *uniqueid = (flags & OBJ_KEY) ? arg : opt2->uniqueid;
+	return strcasecmp(opt1->uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
 static void app_exit(void)
 {
-	ao2_cleanup(__mwi_topic);
-	__mwi_topic = NULL;
+	ao2_cleanup(__mwi_topic_all);
+	__mwi_topic_all = NULL;
 	ao2_cleanup(__mwi_topic_cached);
 	__mwi_topic_cached = NULL;
 	ao2_cleanup(__mwi_message_type);
 	__mwi_message_type = NULL;
-}
+	ao2_cleanup(__mwi_topics);
+	__mwi_topics = NULL;
+}
+
+#define MWI_TOPIC_BUCKETS 57
 
 int app_init(void)
 {
-	__mwi_topic = stasis_topic_create("stasis_mwi_topic");
-	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic, mwi_state_get_id);
+	__mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id);
 	__mwi_message_type = stasis_message_type_create("stasis_mwi_state");
+	__mwi_topics = ao2_container_alloc(MWI_TOPIC_BUCKETS, mwi_topic_hash, mwi_topic_cmp);
 	ast_register_atexit(app_exit);
 	return 0;
 }

Modified: team/kmoore/stasis-mwi/main/stasis_message.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_message.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_message.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_message.c Thu Feb 28 15:14:01 2013
@@ -78,6 +78,8 @@
 	struct stasis_message_type *type;
 	/*! Message content */
 	void *data;
+	/*! Server EID */
+	struct ast_eid eid;
 };
 
 static void stasis_message_dtor(void *obj)
@@ -106,6 +108,8 @@
 	ao2_ref(data, +1);
 	message->data = data;
 
+	ast_set_default_eid(&message->eid);
+
 	ao2_ref(message, +1);
 	return message;
 }
@@ -133,3 +137,19 @@
 	}
 	return &msg->timestamp;
 }
+
+void stasis_message_eid_set(struct stasis_message *msg, const struct ast_eid *eid)
+{
+	if (msg == NULL) {
+		return;
+	}
+	msg->eid = *eid;
+}
+
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg)
+{
+	if (msg == NULL) {
+		return NULL;
+	}
+	return &msg->eid;
+}

Modified: team/kmoore/stasis-mwi/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/res/res_jabber.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/res/res_jabber.c (original)
+++ team/kmoore/stasis-mwi/res/res_jabber.c Thu Feb 28 15:14:01 2013
@@ -373,7 +373,7 @@
 static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
 	const char *context, const char *oldmsgs, const char *newmsgs);
 static void aji_devstate_cb(const struct ast_event *ast_event, void *data);
-static void aji_mwi_cb(const struct ast_event *ast_event, void *data);
+static void aji_mwi_cb(void *data, struct stasis_topic *topic, struct stasis_message *msg);
 static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
 				       const char *event_type, unsigned int cachable);
 /* No transports in this version */
@@ -410,7 +410,7 @@
 
 static struct aji_client_container clients;
 static struct aji_capabilities *capabilities = NULL;
-static struct ast_event_sub *mwi_sub = NULL;
+static struct stasis_subscription *mwi_sub = NULL;
 static struct ast_event_sub *device_state_sub = NULL;
 static ast_cond_t message_received_condition;
 static ast_mutex_t messagelock;
@@ -3240,14 +3240,16 @@
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_mwi_cb(const struct ast_event *ast_event, void *data)
+static void aji_mwi_cb(void *data, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	const char *mailbox;
 	const char *context;
 	char oldmsgs[10];
 	char newmsgs[10];
 	struct aji_client *client;
-	if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
+	struct stasis_mwi_state *mwi_state;
+
+	if (ast_eid_cmp(&ast_eid_default, stasis_message_eid(msg)))
 	{
 		/* If the event didn't originate from this server, don't send it back out. */
 		ast_debug(1, "Returning here\n");
@@ -3255,12 +3257,14 @@
 	}
 
 	client = ASTOBJ_REF((struct aji_client *) data);
-	mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX);
-	context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT);
+	mwi_state = stasis_message_data(msg);
+	
+	mailbox = mwi_state->mailbox;
+	context = mwi_state->context;
 	snprintf(oldmsgs, sizeof(oldmsgs), "%d",
-		ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS));
+		mwi_state->old_msgs);
 	snprintf(newmsgs, sizeof(newmsgs), "%d",
-		ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS));
+		mwi_state->new_msgs);
 	aji_publish_mwi(client, mailbox, context, oldmsgs, newmsgs);
 	ASTOBJ_UNREF(client, ast_aji_client_destroy);
 
@@ -3300,8 +3304,7 @@
 static void aji_init_event_distribution(struct aji_client *client)
 {
 	if (!mwi_sub) {
-		mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_mwi_cb, "aji_mwi_subscription",
-			client, AST_EVENT_IE_END);
+		mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), aji_mwi_cb, client);
 	}
 	if (!device_state_sub) {
 		if (ast_enable_distributed_devstate()) {
@@ -3366,7 +3369,7 @@
 		sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs);
 		sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs);
 
-		stasis_publish_mwi_state(uniqueid, item_id, context, newmsgs, oldmsgs);
+		stasis_publish_mwi_state_full(uniqueid, item_id, context, newmsgs, oldmsgs, &pubsub_eid);
 
 		if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX,
 			AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT,
@@ -4775,7 +4778,7 @@
 	ast_manager_unregister("JabberSend");
 	ast_custom_function_unregister(&jabberstatus_function);
 	if (mwi_sub) {
-		ast_event_unsubscribe(mwi_sub);
+		stasis_unsubscribe(mwi_sub);
 	}
 	if (device_state_sub) {
 		ast_event_unsubscribe(device_state_sub);

Modified: team/kmoore/stasis-mwi/res/res_xmpp.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/res/res_xmpp.c?view=diff&rev=382290&r1=382289&r2=382290
==============================================================================
--- team/kmoore/stasis-mwi/res/res_xmpp.c (original)
+++ team/kmoore/stasis-mwi/res/res_xmpp.c Thu Feb 28 15:14:01 2013
@@ -1319,24 +1319,27 @@
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_mwi_cb(const struct ast_event *ast_event, void *data)
+static void xmpp_pubsub_mwi_cb(void *data, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct ast_xmpp_client *client = data;
 	const char *mailbox, *context;
 	char oldmsgs[10], newmsgs[10];
-
-	if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
+	struct stasis_mwi_state *mwi_state;
+
+	if (ast_eid_cmp(&ast_eid_default, stasis_message_eid(msg))) {
 		/* If the event didn't originate from this server, don't send it back out. */
 		ast_debug(1, "Returning here\n");
 		return;
 	}
 
-	mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX);
-	context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT);
+	mwi_state = stasis_message_data(msg);
+
+	mailbox = mwi_state->mailbox;
+	context = mwi_state->context;
 	snprintf(oldmsgs, sizeof(oldmsgs), "%d",
-		 ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS));
+		 mwi_state->old_msgs);
 	snprintf(newmsgs, sizeof(newmsgs), "%d",
-		 ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS));
+		 mwi_state->new_msgs);
 	xmpp_pubsub_publish_mwi(client, mailbox, context, oldmsgs, newmsgs);
 }
 
@@ -1481,7 +1484,7 @@
 		sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs);
 		sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs);
 
-		stasis_publish_mwi_state(uniqueid, item_id, context, newmsgs, oldmsgs);
+		stasis_publish_mwi_state_full(uniqueid, item_id, context, newmsgs, oldmsgs, &pubsub_eid);
 
 		if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX,
 					    AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT,
@@ -1591,8 +1594,7 @@
 	xmpp_pubsub_unsubscribe(client, "device_state");
 	xmpp_pubsub_unsubscribe(client, "message_waiting");
 
-	if (!(client->mwi_sub = ast_event_subscribe(AST_EVENT_MWI, xmpp_pubsub_mwi_cb, "xmpp_pubsub_mwi_subscription",
-						    client, AST_EVENT_IE_END))) {
+	if (!(client->mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
 		return;
 	}
 
@@ -1603,7 +1605,7 @@
 
 	if (!(client->device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
 							     xmpp_pubsub_devstate_cb, "xmpp_pubsub_devstate_subscription", client, AST_EVENT_IE_END))) {
-		ast_event_unsubscribe(client->mwi_sub);
+		stasis_unsubscribe(client->mwi_sub);
 		client->mwi_sub = NULL;
 		return;
 	}
@@ -3511,7 +3513,7 @@
 	}
 
 	if (client->mwi_sub) {
-		ast_event_unsubscribe(client->mwi_sub);
+		stasis_unsubscribe(client->mwi_sub);
 		client->mwi_sub = NULL;
 		xmpp_pubsub_unsubscribe(client, "message_waiting");
 	}




More information about the svn-commits mailing list