[svn-commits] marquis: branch marquis/pubsub-distributed-events r213622 - in /team/marquis/...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Fri Aug 21 12:35:22 CDT 2009


Author: marquis
Date: Fri Aug 21 12:35:17 2009
New Revision: 213622

URL: http://svn.asterisk.org/svn-view/asterisk?view=rev&rev=213622
Log:
Initial commit of PubSub-based distributed events.  Mostly working for device states, with some error conditions not totally taken care of yet.

Modified:
    team/marquis/pubsub-distributed-events/include/asterisk/jabber.h
    team/marquis/pubsub-distributed-events/res/res_jabber.c

Modified: team/marquis/pubsub-distributed-events/include/asterisk/jabber.h
URL: http://svn.asterisk.org/svn-view/asterisk/team/marquis/pubsub-distributed-events/include/asterisk/jabber.h?view=diff&rev=213622&r1=213621&r2=213622
==============================================================================
--- team/marquis/pubsub-distributed-events/include/asterisk/jabber.h (original)
+++ team/marquis/pubsub-distributed-events/include/asterisk/jabber.h Fri Aug 21 12:35:17 2009
@@ -82,7 +82,8 @@
 
 enum {
 	AJI_AUTOPRUNE = (1 << 0),
-	AJI_AUTOREGISTER = (1 << 1)
+	AJI_AUTOREGISTER = (1 << 1),
+	AJI_XEP0248 = ( 1 << 2)
 };
 
 enum aji_btype {
@@ -142,6 +143,7 @@
 	char password[160];
 	char user[AJI_MAX_JIDLEN];
 	char serverhost[AJI_MAX_RESJIDLEN];
+	char pubsub_node[AJI_MAX_RESJIDLEN];
 	char statusmessage[256];
 	char name_space[256];
 	char sid[10]; /* Session ID */
@@ -167,6 +169,7 @@
 	int timeout;
 	int message_timeout;
 	int authorized;
+	int distribute_events;
 	struct ast_flags flags;
 	int component; /* 0 client,  1 component */
 	struct aji_buddy_container buddies;

Modified: team/marquis/pubsub-distributed-events/res/res_jabber.c
URL: http://svn.asterisk.org/svn-view/asterisk/team/marquis/pubsub-distributed-events/res/res_jabber.c?view=diff&rev=213622&r1=213621&r2=213622
==============================================================================
--- team/marquis/pubsub-distributed-events/res/res_jabber.c (original)
+++ team/marquis/pubsub-distributed-events/res/res_jabber.c Fri Aug 21 12:35:17 2009
@@ -58,6 +58,8 @@
 #include "asterisk/astobj.h"
 #include "asterisk/astdb.h"
 #include "asterisk/manager.h"
+#include "asterisk/event.h"
+#include "asterisk/devicestate.h"
 
 /*** DOCUMENTATION
 	<application name="JabberSend" language="en_US">
@@ -241,7 +243,24 @@
 static int aji_register_query_handler(void *data, ikspak *pak);
 static int aji_register_approve_handler(void *data, ikspak *pak);
 static int aji_reconnect(struct aji_client *client);
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *aji_cli_list_pubsub_items(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static iks *jabber_make_auth(iksid * id, const char *pass, const char *sid);
+static int aji_receive_item_list(void *data, ikspak* pak);
+static void aji_init_event_distribution(struct aji_client *client);
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type,
+	const char *name, const char *collection_name);
+static iks* aji_build_node_config(iks *pubsub, const char *node_type, const char *collection_name);
+static void aji_create_pubsub_collection(struct aji_client *client, const char *collection_name);
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+   const char *leaf_name);
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static void aji_create_affiliations(struct aji_client *client, const char *node);
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type);
+static void aji_publish_device_state(struct aji_client *client, const char * device,
+	const char *device_state);
+static int aji_handle_publish_error(void *data, ikspak *pak);
+static int aji_handle_pubsub_event(void *data, ikspak *pak);
 /* No transports in this version */
 /*
 static int aji_create_transport(char *label, struct aji_client *client);
@@ -255,6 +274,9 @@
 	AST_CLI_DEFINE(aji_show_clients, "Show state of clients and components"),
 	AST_CLI_DEFINE(aji_show_buddies, "Show buddy lists of our clients"),
 	AST_CLI_DEFINE(aji_test, "Shows roster, but is generally used for mog's debugging."),
+	AST_CLI_DEFINE(aji_cli_create_collection, "Creates a test PubSub node collection."),
+	AST_CLI_DEFINE(aji_cli_list_pubsub_items, "Lists Pubsub items"),
+	AST_CLI_DEFINE(aji_cli_create_leafnode, "Creates a test Pubsub leaf node"),
 };
 
 static char *app_ajisend = "JabberSend";
@@ -467,7 +489,7 @@
 	);
 
 	if (deprecation_warning++ % 10 == 0)
-		ast_log(LOG_WARNING, "JabberStatus is deprecated.  Please use the JABBER_STATUS dialplan function in the future.\n");
+	ast_log(LOG_WARNING,"JabberStatus is deprecated Please use the JABBER_STATUS dialplan function in the future.\n");
 
 	if (!data) {
 		ast_log(LOG_ERROR, "Usage: JabberStatus(<sender>,<jid>[/<resource>],<varname>\n");
@@ -496,7 +518,8 @@
 	if (!r && buddy->resources) 
 		r = buddy->resources;
 	if (!r)
-		ast_log(LOG_NOTICE, "Resource '%s' of buddy '%s' was not found\n", jid.resource, jid.screenname);
+		ast_log(LOG_NOTICE, "Resource '%s' of buddy '%s' was not found\n", jid.resource,
+jid.screenname);
 	else
 		stat = r->status;
 	snprintf(status, sizeof(status), "%d", stat);
@@ -504,7 +527,8 @@
 	return 0;
 }
 
-static int acf_jabberstatus_read(struct ast_channel *chan, const char *name, char *data, char *buf, size_t buflen)
+static int acf_jabberstatus_read(struct ast_channel *chan, const char *name, char *data,
+char *buf, size_t buflen)
 {
 	struct aji_client *client = NULL;
 	struct aji_buddy *buddy = NULL;
@@ -545,7 +569,8 @@
 	if (!r && buddy->resources) 
 		r = buddy->resources;
 	if (!r)
-		ast_log(LOG_NOTICE, "Resource %s of buddy %s was not found.\n", jid.resource, jid.screenname);
+		ast_log(LOG_NOTICE, "Resource %s of buddy %s was not found.\n", jid.resource,
+jid.screenname);
 	else
 		stat = r->status;
 	snprintf(buf, buflen, "%d", stat);
@@ -684,7 +709,8 @@
  * \return the number of read bytes on success, 0 on timeout expiration, 
  * -1 on  error
  */
-static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len, int timeout)
+static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len, int
+timeout)
 {
 	int sock;
 	fd_set fds;
@@ -883,7 +909,8 @@
 	struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
 
 	if (!ast_strlen_zero(xmpp))
-		manager_event(EVENT_FLAG_USER, "JabberEvent", "Account: %s\r\nPacket: %s\r\n", client->name, xmpp);
+		manager_event(EVENT_FLAG_USER, "JabberEvent", "Account: %s\r\nPacket: %s\r\n",
+client->name, xmpp);
 
 	if (client->debug) {
 		if (is_incoming)
@@ -910,7 +937,8 @@
  *
  * \return IKS_OK on success, IKSNET_NOTSUPP on failure.
  */
-static int aji_start_sasl(struct aji_client *client, enum ikssasltype type, char *username, char *pass)
+static int aji_start_sasl(struct aji_client *client, enum ikssasltype type, char
+*username, char *pass)
 {
 	iks *x = NULL;
 	int len;
@@ -967,7 +995,8 @@
 	int features = 0;
 
 	if(!node) {
-		ast_log(LOG_ERROR, "aji_act_hook was called with out a packet\n"); /* most likely cause type is IKS_NODE_ERROR lost connection */
+		ast_log(LOG_ERROR, "aji_act_hook was called with out a packet\n"); /* most likely
+cause type is IKS_NODE_ERROR lost connection */
 		ASTOBJ_UNREF(client, aji_client_destroy);
 		return IKS_HOOK;
 	}
@@ -997,8 +1026,10 @@
 				break;
 			}
 			if (!client->usesasl) {
-				iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid, IKS_RULE_DONE);
-				auth = jabber_make_auth(client->jid, client->password, iks_find_attrib(node, "id"));
+				iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid, IKS_RULE_DONE);
+				auth = jabber_make_auth(client->jid, client->password,
+iks_find_attrib(node, "id"));
 				if (auth) {
 					iks_insert_attrib(auth, "id", client->mid);
 					iks_insert_attrib(auth, "to", client->jid->server);
@@ -1025,7 +1056,8 @@
 						break;
 					if (client->authorized) {
 						if (features & IKS_STREAM_BIND) {
-							iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_DONE);
+							iks_filter_add_rule(client->f, aji_client_connect, client,
+IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_DONE);
 							auth = iks_make_resource_bind(client->jid);
 							if (auth) {
 								iks_insert_attrib(auth, "id", client->mid);
@@ -1038,7 +1070,9 @@
 							}
 						}
 						if (features & IKS_STREAM_SESSION) {
-							iks_filter_add_rule (client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "auth", IKS_RULE_DONE);
+							iks_filter_add_rule (client->f, aji_client_connect, client,
+IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "auth",
+IKS_RULE_DONE);
 							auth = iks_make_session();
 							if (auth) {
 								iks_insert_attrib(auth, "id", "auth");
@@ -1056,7 +1090,8 @@
 							break;
 						}
 
-						ret = aji_start_sasl(client, features, client->jid->user, client->password);
+						ret = aji_start_sasl(client, features, client->jid->user,
+client->password);
 						if (ret != IKS_OK) {
 							ASTOBJ_UNREF(client, aji_client_destroy);
 							return IKS_HOOK;
@@ -1097,7 +1132,8 @@
 					handshake = NULL;
 				}
 				client->state = AJI_CONNECTING;
-				if(aji_recv(client, 1) == 2) /*XXX proper result for iksemel library on iks_recv of <handshake/> XXX*/
+				if(aji_recv(client, 1) == 2) /*XXX proper result for iksemel library on
+iks_recv of <handshake/> XXX*/
 					client->state = AJI_CONNECTED;
 				else
 					ast_log(LOG_WARNING, "Jabber didn't seem to handshake, failed to authenticate.\n");
@@ -1228,7 +1264,8 @@
 			iks_insert_attrib(query, "xmlns", "jabber:iq:register");
 			iks_insert_attrib(error, "code" , "406");
 			iks_insert_attrib(error, "type", "modify");
-			iks_insert_attrib(notacceptable, "xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas");
+			iks_insert_attrib(notacceptable, "xmlns",
+"urn:ietf:params:xml:ns:xmpp-stanzas");
 			iks_insert_node(iq, query);
 			iks_insert_node(iq, error);
 			iks_insert_node(error, notacceptable);
@@ -1381,7 +1418,8 @@
 			ASTOBJ_UNREF(client, aji_client_destroy);
 			return IKS_FILTER_EAT;
 		}
-		if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
+		if (iks_find_with_attrib(pak->query, "feature", "var",
+"http://www.google.com/xmpp/protocol/voice/v1")) {
 			resource->cap->jingle = 1;
 		} else
 			resource->cap->jingle = 0;
@@ -1402,7 +1440,8 @@
 			iks_insert_attrib(ident, "type", "pc");
 			iks_insert_attrib(ident, "name", "asterisk");
 			iks_insert_attrib(disco, "var", "http://jabber.org/protocol/disco#info");
-			iks_insert_attrib(google, "var", "http://www.google.com/xmpp/protocol/voice/v1");
+			iks_insert_attrib(google, "var",
+"http://www.google.com/xmpp/protocol/voice/v1");
 			iks_insert_node(iq, query);
 			iks_insert_node(query, ident);
 			iks_insert_node(query, google);
@@ -1446,12 +1485,15 @@
 			ASTOBJ_UNREF(client, aji_client_destroy);
 			return IKS_FILTER_EAT;
 		}
-		if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
+		if (iks_find_with_attrib(pak->query, "feature", "var",
+"http://www.google.com/xmpp/protocol/voice/v1")) {
 			resource->cap->jingle = 1;
 		} else
 			resource->cap->jingle = 0;
-	} else if (pak->subtype == IKS_TYPE_GET && !(node = iks_find_attrib(pak->query, "node"))) {
-		iks *iq, *query, *identity, *disco, *reg, *commands, *gateway, *version, *vcard, *search;
+	} else if (pak->subtype == IKS_TYPE_GET && !(node = iks_find_attrib(pak->query,
+"node"))) {
+		iks *iq, *query, *identity, *disco, *reg, *commands, *gateway, *version, *vcard,
+*search;
 
 		iq = iks_new("iq");
 		query = iks_new("query");
@@ -1463,8 +1505,8 @@
 		version = iks_new("feature");
 		vcard = iks_new("feature");
 		search = iks_new("feature");
-
-		if (iq && query && identity && disco && reg && commands && gateway && version && vcard && search && client) {
+		if (iq && query && identity && disco && reg && commands && gateway && version &&
+vcard && search && client) {
 			iks_insert_attrib(iq, "from", client->user);
 			iks_insert_attrib(iq, "to", pak->from->full);
 			iks_insert_attrib(iq, "id", pak->id);
@@ -1505,8 +1547,9 @@
 		iks_delete(version);
 		iks_delete(vcard);
 		iks_delete(search);
-
-	} else if (pak->subtype == IKS_TYPE_GET && !strcasecmp(node, "http://jabber.org/protocol/commands")) {
+		
+	} else if (pak->subtype == IKS_TYPE_GET && !strcasecmp(node,
+"http://jabber.org/protocol/commands")) {
 		iks *iq, *query, *confirm;
 		iq = iks_new("iq");
 		query = iks_new("query");
@@ -1645,7 +1688,8 @@
 	}
 	ASTOBJ_WRLOCK(buddy);
 	status = (pak->show) ? pak->show : 6;
-	priority = atoi((iks_find_cdata(pak->x, "priority")) ? iks_find_cdata(pak->x, "priority") : "0");
+	priority = atoi((iks_find_cdata(pak->x, "priority")) ? iks_find_cdata(pak->x,
+"priority") : "0");
 	tmp = buddy->resources;
 	descrip = ast_strdup(iks_find_cdata(pak->x,"status"));
 
@@ -1786,7 +1830,8 @@
 				iks_insert_attrib(iq,"from", client->jid->full);
 				iks_insert_attrib(iq, "id", client->mid);
 				ast_aji_increment_mid(client->mid);
-				iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#info");
+				iks_insert_attrib(query, "xmlns",
+"http://jabber.org/protocol/disco#info");
 				iks_insert_node(iq, query);
 				ast_aji_send(client, iq);
 				
@@ -1862,7 +1907,8 @@
 		iks_delete(status);
 
 		if (client->component)
-			aji_set_presence(client, pak->from->full, iks_find_attrib(pak->x, "to"), client->status, client->statusmessage);
+			aji_set_presence(client, pak->from->full, iks_find_attrib(pak->x, "to"),
+client->status, client->statusmessage);
 	case IKS_TYPE_SUBSCRIBED:
 		buddy = ASTOBJ_CONTAINER_FIND(&client->buddies, pak->from->partial);
 		if (!buddy && pak->from->partial) {
@@ -2010,6 +2056,7 @@
 		res = aji_reconnect(client);
 		sleep(4);
 	}
+	
 
 	do {
 		if (res == IKS_NET_RWERR || client->timeout == 0) {
@@ -2089,7 +2136,9 @@
 			ASTOBJ_UNLOCK(iterator);
 		});
 		iks_filter_remove_hook(client->f, aji_register_transport);
-		iks_filter_add_rule(client->f, aji_register_transport2, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_NS, IKS_NS_REGISTER, IKS_RULE_DONE);
+		iks_filter_add_rule(client->f, aji_register_transport2, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_NS, IKS_NS_REGISTER,
+IKS_RULE_DONE);
 		iks_insert_attrib(send, "to", buddy->host);
 		iks_insert_attrib(send, "id", client->mid);
 		ast_aji_increment_mid(client->mid);
@@ -2156,7 +2205,8 @@
 #endif
 
 /*!
- * \brief goes through roster and prunes users not needed in list, or adds them accordingly.
+ * \brief goes through roster and prunes users not needed in list, or adds them
+accordingly.
  * \param client the configured XMPP client we use to connect to a XMPP server
  * \return void.
  * \note The messages here should be configurable.
@@ -2179,7 +2229,8 @@
 		ASTOBJ_RDLOCK(iterator);
 		/* For an aji_buddy, both AUTOPRUNE and AUTOREGISTER will never
 		 * be called at the same time */
-		if (ast_test_flag(&iterator->flags, AJI_AUTOPRUNE)) { /* If autoprune is set on jabber.conf */
+		if (ast_test_flag(&iterator->flags, AJI_AUTOPRUNE)) { /* If autoprune is set on
+jabber.conf */
 			res = ast_aji_send(client, iks_make_s10n(IKS_TYPE_UNSUBSCRIBE, iterator->name,
 								 "GoodBye. Your status is no longer needed by Asterisk the Open Source PBX"
 								 " so I am no longer subscribing to your presence.\n"));
@@ -2272,7 +2323,8 @@
 			if(ast_test_flag(&client->flags, AJI_AUTOPRUNE)) {
 				ast_set_flag(&buddy->flags, AJI_AUTOPRUNE);
 				ASTOBJ_MARK(buddy);
-			} else if (!iks_strcmp(iks_find_attrib(x, "subscription"), "none") || !iks_strcmp(iks_find_attrib(x, "subscription"), "from")) {
+			} else if (!iks_strcmp(iks_find_attrib(x, "subscription"), "none") ||
+!iks_strcmp(iks_find_attrib(x, "subscription"), "from")) {
 				/* subscribe to buddy's presence only 
 				   if we really need to */
 				ast_set_flag(&buddy->flags, AJI_AUTOREGISTER);
@@ -2327,7 +2379,8 @@
 
 	if(roster) {
 		iks_insert_attrib(roster, "id", "roster");
-		aji_set_presence(client, NULL, client->jid->full, client->status, client->statusmessage);
+		aji_set_presence(client, NULL, client->jid->full, client->status,
+client->statusmessage);
 		ast_aji_send(client, roster);
 	}
 
@@ -2349,12 +2402,17 @@
 
 	if (client) {
 		if (client->state == AJI_DISCONNECTED) {
-			iks_filter_add_rule(client->f, aji_filter_roster, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "roster", IKS_RULE_DONE);
+			iks_filter_add_rule(client->f, aji_filter_roster, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "roster", IKS_RULE_DONE);
 			client->state = AJI_CONNECTING;
 			client->jid = (iks_find_cdata(pak->query, "jid")) ? iks_id_new(client->stack, iks_find_cdata(pak->query, "jid")) : client->jid;
 			iks_filter_remove_hook(client->f, aji_client_connect);
-			if(!client->component) /*client*/
+			if(!client->component) { /*client*/
 				aji_get_roster(client);
+				if (client->distribute_events) {
+					aji_init_event_distribution(client);
+				}
+			}
 		}
 	} else
 		ast_log(LOG_ERROR, "Out of memory.\n");
@@ -2377,13 +2435,15 @@
 	client->stream_flags = 0;
 #endif
 	/* If it's a component, connect to user, otherwise, connect to server */
-	connected = iks_connect_via(client->p, S_OR(client->serverhost, client->jid->server), client->port, client->component ? client->user : client->jid->server);
+	connected = iks_connect_via(client->p, S_OR(client->serverhost, client->jid->server),
+client->port, client->component ? client->user : client->jid->server);
 
 	if (connected == IKS_NET_NOCONN) {
 		ast_log(LOG_ERROR, "JABBER ERROR: No Connection\n");
 		return IKS_HOOK;
 	} else 	if (connected == IKS_NET_NODNS) {
-		ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to  %s\n", client->name, S_OR(client->serverhost, client->jid->server));
+		ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to  %s\n", client->name,
+S_OR(client->serverhost, client->jid->server));
 		return IKS_HOOK;
 	}
 
@@ -2413,6 +2473,475 @@
 
 	return 1;
 }
+
+
+/*!
+ * \brief Callback function for evnts
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void aji_event_cb(const struct ast_event *ast_event, void *data)
+{
+	const char *device;
+	const char *device_state;
+	if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
+	{
+		/* If the event didn't originate from this server, don't send it back out. */
+		ast_log(LOG_DEBUG, "Returning here\n");
+		return;
+	}
+	
+	struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+	
+	if (ast_event_get_type(ast_event) == AST_EVENT_MWI) {
+		ast_log(LOG_ERROR, "Got an MWI event, mailbox: %s\n",
+				ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX));
+	} else if(ast_event_get_type(ast_event) == AST_EVENT_DEVICE_STATE_CHANGE) {
+		device = ast_event_get_ie_str(ast_event, AST_EVENT_IE_DEVICE);
+		device_state = ast_devstate_str(ast_event_get_ie_uint(ast_event, AST_EVENT_IE_STATE));
+		ast_log(LOG_ERROR, "Got a DEVICE_STATE_CHANGE event for %s\n", device);
+		aji_publish_device_state(client, device, device_state);
+		ast_log(LOG_ERROR, "DEVICE_STATE is %s\n", device_state);
+	}
+}
+
+/*!
+ * \brief Initialize collections for event distribution
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \return void
+ */
+static void aji_init_event_distribution(struct aji_client *client)
+{
+	struct ast_event_sub *mwi_sub;
+	struct ast_event_sub *device_state_sub;
+	if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+		aji_create_pubsub_collection(client, "device_state");
+		aji_create_pubsub_collection(client, "message_waiting");
+	} else {
+		aji_create_pubsub_node(client, NULL, "device_state", NULL);
+		aji_create_pubsub_node(client, NULL, "message_waiting", NULL);
+
+	}
+	mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_event_cb, "aji_mwi_subscription",
+		client, AST_EVENT_IE_END);
+	ast_log(LOG_DEBUG, "Subscribed to MWI\n");
+	iks_filter_add_rule(client->f, aji_handle_pubsub_event, client, IKS_RULE_TYPE,
+		IKS_PAK_MESSAGE, IKS_RULE_FROM, client->pubsub_node, IKS_RULE_DONE);
+	iks_filter_add_rule(client->f, aji_handle_publish_error, client, IKS_RULE_TYPE,
+		IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_ERROR, IKS_RULE_DONE);
+	
+	if (ast_enable_distributed_devstate()) {
+		return;
+	}
+	device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, aji_event_cb,
+		"aji_devstate_subscription", client, AST_EVENT_IE_END);
+	ast_event_dump_cache(device_state_sub);
+	ast_log(LOG_DEBUG, "Subscribed to Devstate changes\n");
+}
+
+/*!
+ * \brief Callback for handling PubSub events
+ * \param data void pointer to aji_client structure
+ * \return IKS_FILTER_EAT
+ */
+static int aji_handle_pubsub_event(void *data, ikspak *pak)
+{
+	char *device, *device_state;
+	iks *item, *entry, *state_node;
+	struct ast_eid pubsub_eid;
+	struct ast_event *event;
+	item = iks_find(iks_find(iks_find(pak->x, "event"), "items"), "item");
+	if(item) {
+		device = iks_find_attrib(item, "id");
+		entry = iks_find(item, "entry");
+		state_node = iks_find(entry, "state");
+		if(state_node) {
+			device_state = iks_find_cdata(entry, "state");
+			ast_str_to_eid(&pubsub_eid, iks_find_attrib(state_node,"eid"));
+			if (!ast_eid_cmp(&ast_eid_default, &pubsub_eid)) {
+				ast_log(LOG_DEBUG, "Returning here, eid of incoming event matches ours!\n");
+				return IKS_FILTER_EAT;
+			}
+			if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
+				AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, AST_EVENT_IE_STATE,
+				AST_EVENT_IE_PLTYPE_UINT, ast_devstate_val(device_state), AST_EVENT_IE_EID,
+				AST_EVENT_IE_PLTYPE_RAW, &pubsub_eid, sizeof(pubsub_eid),
+				AST_EVENT_IE_END))) {
+				return IKS_FILTER_EAT;
+			}
+
+			ast_event_queue_and_cache(event);
+		}
+	}
+	return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Add Owner affiliations for pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node the name of the node to which to add affiliations
+ * \return void
+ */
+static void aji_create_affiliations(struct aji_client *client, const char *node)
+{
+	int res = 0;
+	iks *modify_affiliates = aji_pubsub_iq_create(client, "set");
+	iks *pubsub, *affiliations, *affiliate;
+	pubsub = iks_insert(modify_affiliates, "pubsub");
+	iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub#owner");
+	affiliations = iks_insert(pubsub, "affiliations");
+	iks_insert_attrib(affiliations, "node", node);
+	ASTOBJ_CONTAINER_TRAVERSE(&client->buddies, 1, {
+		ASTOBJ_RDLOCK(iterator);
+		affiliate = iks_insert(affiliations, "affiliation");
+		iks_insert_attrib(affiliate, "jid", iterator->name);
+		iks_insert_attrib(affiliate, "affiliation", "owner");
+		ASTOBJ_UNLOCK(iterator);
+	});
+	res = ast_aji_send(client, modify_affiliates);
+	iks_delete(modify_affiliates);
+}
+
+/*!
+ * \brief Publish an item to a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param device the name of the device whose state to publish
+ * \param device_state the state to publish
+ * \return void
+ */
+static void aji_publish_device_state(struct aji_client *client, const char * device,
+	const char *device_state)
+{
+	iks *request = aji_pubsub_iq_create(client, "set");
+	iks *pubsub, *publish, *item, *entry, *state;
+	char eid_str[20];
+	ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+	ast_log(LOG_ERROR, "eid_str is %s\n", eid_str);
+	
+	pubsub = iks_insert(request, "pubsub");
+	iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+	publish = iks_insert(pubsub, "publish");
+	if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+		iks_insert_attrib(publish, "node", device);
+	} else {
+		iks_insert_attrib(publish, "node", "device_state");
+	}
+	item = iks_insert(publish, "item");
+	iks_insert_attrib(item, "id", device);
+	entry = iks_insert(item, "entry");
+	iks_insert_attrib(entry, "xmlns", "http://placeholder.org");
+	state = iks_insert(entry, "state");
+	iks_insert_attrib(state, "eid", eid_str);
+	iks_insert_cdata(state, device_state, strlen(device_state));
+	ast_aji_send(client, request);
+	iks_delete(request);
+}
+
+/*!
+ * \brief Create an IQ packet
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param type the type of IQ packet to create
+ * \return iks*
+ */
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type)
+{
+	iks *request = iks_new("iq");
+	
+	iks_insert_attrib(request, "to", client->pubsub_node);
+	iks_insert_attrib(request, "from", client->jid->full);
+	iks_insert_attrib(request, "type", type);
+	ast_aji_increment_mid(client->mid);
+	iks_insert_attrib(request, "id", client->mid);
+	return request;
+}
+
+static int aji_handle_publish_error(void *data, ikspak *pak)
+{
+	char *node_name;
+	iks *publish;
+	iks *pubsub = iks_find(pak->x, "pubsub");
+	struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+	if(pubsub) {
+		publish = iks_find(pubsub, "publish");
+		if(publish) {
+			node_name = iks_find_attrib(publish, "node");
+			if(iks_find(iks_find(iks_find(publish, "item"), "entry"), "state")) {
+				if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+					aji_create_pubsub_leaf(client, "device_state", node_name);
+				} else {
+					aji_create_pubsub_node(client, NULL, node_name, NULL);
+				}
+				iks *pubsub;
+				iks *request = aji_pubsub_iq_create(client, "set");
+				pubsub = iks_insert(request, "pubsub");
+				iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+				iks_insert_node(pubsub, publish);
+				ast_aji_send(client, request);
+				iks_delete(request);
+				return IKS_FILTER_EAT;
+			}
+		} 
+	}
+	return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Request item list from pubsub
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \return void
+ */
+static void aji_request_pubsub_items(struct aji_client *client)
+{
+	int res = 0;
+	iks *request = aji_pubsub_iq_create(client, "get");
+	iks *query;
+	query = iks_insert(request, "query");
+	iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#items");
+	iks_filter_add_rule(client->f, aji_receive_item_list, client, IKS_RULE_TYPE,
+		IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid,
+		IKS_RULE_DONE);
+	res = ast_aji_send(client, request);
+	iks_delete(request);
+
+}
+
+
+/*!
+ * \brief Receive pubsub item lists
+ * \param data pointer to aji_client structure
+ * \param pak response from pubsub diso#items query
+ * \return IKS_FILTER_EAT
+ */
+static int aji_receive_item_list(void *data, ikspak* pak)
+{
+
+	struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+	iks *item;
+	if(iks_has_children(pak->query)) {	
+		item = iks_first_tag(pak->query);
+		ast_log(LOG_WARNING, "Node name: %s\n", iks_find_attrib(item, "node"));
+		while((item = iks_next_tag(item))) {
+			ast_log(LOG_WARNING, "Node name: %s\n", iks_find_attrib(item, "node"));
+		} 
+	}
+	iks_delete(item);
+	return IKS_FILTER_EAT;
+}
+		
+	
+
+/*!
+ * \brief Method to expose PubSub node list via CLI.
+ * \return char *
+ */
+static char *aji_cli_list_pubsub_items(struct ast_cli_entry *e, int cmd, struct
+ast_cli_args *a)
+{
+		struct aji_client *client;
+		const char *name = "asterisk";
+
+		switch (cmd) {
+		case CLI_INIT:
+				e->command = "jabber list items";
+				e->usage =
+					"Usage: jabber list items [name]\n"
+					"       Lists root items on Pubsub server\n"
+					"       as configured in jabber.conf.\n";
+			return NULL;
+		case CLI_GENERATE:
+			return NULL;
+		}
+
+		if (a->argc > 4) {
+			return CLI_SHOWUSAGE;
+		} else if (a->argc == 4) {
+			name = a->argv[3];
+		}
+
+        if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+			ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+			return CLI_FAILURE;
+		}
+		
+		ast_cli(a->fd, "Listing pubsub items.\n");
+		aji_request_pubsub_items(client);
+		return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Create a PubSub collection node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param collection_name The name to use for this collection
+ * \return void.
+ */
+
+static void aji_create_pubsub_collection(struct aji_client *client, const char
+*collection_name)
+{
+	aji_create_pubsub_node(client, "collection", collection_name, NULL);
+}
+
+
+/*!
+ * \brief Create a PubSub leaf node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param leaf_name The name to use for this collection
+ * \return void.
+ */
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+const char *leaf_name)
+{
+	aji_create_pubsub_node(client, "leaf", leaf_name, collection_name);
+}
+
+/*!
+ * \brief Create a pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node_type the type of node to create
+ * \param name the name of the node to create
+ * \return iks*
+ */
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type, const
+		char *name, const char *collection_name)
+{
+	int res = 0;
+	iks *node = aji_pubsub_iq_create(client, "set");
+	iks *pubsub, *create, *configure;
+	pubsub = iks_insert(node, "pubsub");
+	iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+	create = iks_insert(pubsub, "create");
+	iks_insert_attrib(create, "node", name);
+	configure = aji_build_node_config(pubsub, node_type, collection_name);
+	res = ast_aji_send(client, node);
+	aji_create_affiliations(client, name);
+	iks_delete(node);
+	return 0;
+}
+
+
+
+static iks* aji_build_node_config(iks *pubsub, const char *node_type, const char *collection_name)
+{
+	iks *configure, *x, *field_owner, *field_node_type, *field_node_config,
+ 	*field_persist_items, *field_access_model, *field_pubsub_collection;
+	configure = iks_insert(pubsub, "configure");
+	x = iks_insert(configure, "x");
+	iks_insert_attrib(x, "xmlns", "jabber:x:data");
+	iks_insert_attrib(x, "type", "submit");
+	field_owner = iks_insert(x, "field");
+	iks_insert_attrib(field_owner, "var", "FORM_TYPE");
+	iks_insert_attrib(field_owner, "type", "hidden");
+	iks_insert_cdata(iks_insert(field_owner, "value"),
+		"http://jabber.org/protocol/pubsub#owner", 39);
+	if(node_type) {
+		field_node_type = iks_insert(x, "field");
+		iks_insert_attrib(field_node_type, "var", "pubsub#node_type");
+		iks_insert_cdata(iks_insert(field_node_type, "value"), node_type, strlen(node_type));
+	}
+	field_node_config = iks_insert(x, "field");
+	iks_insert_attrib(field_node_config, "var", "FORM_TYPE");
+	iks_insert_attrib(field_node_config, "type", "hidden");
+	iks_insert_cdata(iks_insert(field_node_config, "value"),
+		"http://jabber.org/protocol/pubsub#node_config", 45);
+	field_persist_items = iks_insert(x, "field");
+	iks_insert_attrib(field_persist_items, "var", "pubsub#persist_items");
+	iks_insert_cdata(iks_insert(field_persist_items, "value"), "1", 1);
+	field_access_model = iks_insert(x, "field");
+	iks_insert_attrib(field_access_model, "var", "pubsub#access_model");
+	iks_insert_cdata(iks_insert(field_access_model, "value"), "whitelist", 9);
+	if(node_type && !strcasecmp(node_type, "leaf")) {
+		field_pubsub_collection = iks_insert(x, "field");
+		iks_insert_attrib(field_pubsub_collection, "var", "pubsub#collection");
+		iks_insert_cdata(iks_insert(field_pubsub_collection, "value"), collection_name,
+			strlen(collection_name));
+	}
+	return configure;
+}
+
+
+
+/*!
+ * \brief Method to expose PubSub collection node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+		struct aji_client *client;
+		const char *name = "asterisk";
+		const char *collection_name = "test_collection";
+
+		switch (cmd) {
+		case CLI_INIT:
+				e->command = "jabber create collection";
+				e->usage =
+					"Usage: jabber create collection [name] [node name]\n"
+					"       Creates a PubSub collection node using the account\n"
+					"       as configured in jabber.conf.\n";
+			return NULL;
+		case CLI_GENERATE:
+			return NULL;
+		}
+
+		if (a->argc > 5) {
+			return CLI_SHOWUSAGE;
+		} else if (a->argc == 5) {
+			name = a->argv[3];
+			collection_name = a->argv[4];
+		}
+
+        if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+			ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+			return CLI_FAILURE;
+		}
+		
+		ast_cli(a->fd, "Creating test PubSub node collection.\n");
+		aji_create_pubsub_collection(client, collection_name);
+		return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Method to expose PubSub leaf node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct aji_client *client;
+	const char *name = "asterisk";
+	const char *collection_name = "test_collection";
+	const char *leaf_name = "test_leaf";
+
+	switch (cmd) {
+		case CLI_INIT:
+			e->command = "jabber create leaf";
+			e->usage =
+					"Usage: jabber create leaf [name] [collection_name] [node name]\n"
+					"       Creates a PubSub leaf node using the account\n"
+					"       as configured in jabber.conf.\n";
+			return NULL;
+		case CLI_GENERATE:
+			return NULL;
+	}
+
+	if (a->argc > 6) {
+		return CLI_SHOWUSAGE;
+	} else if (a->argc == 6) {
+		name = a->argv[3];
+		collection_name = a->argv[4];
+		leaf_name = a->argv[5];
+	}
+
+	if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+		ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+		return CLI_FAILURE;
+	}
+		
+	ast_cli(a->fd, "Creating test PubSub node collection.\n");
+	aji_create_pubsub_leaf(client, collection_name, leaf_name);
+	return CLI_SUCCESS;
+}
+
+
 
 /*!
  * \brief set presence of client.
@@ -2710,6 +3239,7 @@
 	client->keepalive = 1;
 	client->timeout = 50;
 	client->message_timeout = 100;
+	client->distribute_events = 0;
 	AST_LIST_HEAD_INIT(&client->messages);
 	client->component = 0;
 	ast_copy_string(client->statusmessage, "Online and Available", sizeof(client->statusmessage));
@@ -2736,9 +3266,24 @@
 		else if (!strcasecmp(var->name, "debug"))
 			client->debug = (ast_false(var->value)) ? 0 : 1;
 		else if (!strcasecmp(var->name, "type")) {
-			if (!strcasecmp(var->value, "component"))
+			if (!strcasecmp(var->value, "component")) {
 				client->component = 1;
-		} else if (!strcasecmp(var->name, "usetls")) {
+				if (client->distribute_events) {
+					ast_log(LOG_ERROR, "Client cannot be configure to be both a component and to distribute events!  Event distribution will be disabled.\n");
+					client->distribute_events = 0;
+				}
+			}
+		} else if (!strcasecmp(var->name, "distribute_events")) {
+			if(ast_true(var->value)) {
+				if (client->component) {
+					ast_log(LOG_ERROR, "Client cannot be configure to be both a component and to distribute events!  Event distribution will be disabled.\n");
+				} else {
+					client->distribute_events= 1;
+				}
+			}
+		} else if (!strcasecmp(var->name, "pubsub_node")) {
+			ast_copy_string(client->pubsub_node, var->value, sizeof(client->pubsub_node));
+		}else if (!strcasecmp(var->name, "usetls")) {
 			client->usetls = (ast_false(var->value)) ? 0 : 1;
 		} else if (!strcasecmp(var->name, "usesasl")) {
 			client->usesasl = (ast_false(var->value)) ? 0 : 1;
@@ -2832,11 +3377,14 @@
 	} else {
 		iks_filter_add_rule(client->f, aji_client_info_handler, client, IKS_RULE_NS, "http://jabber.org/protocol/disco#info", IKS_RULE_DONE);
 	}
+
 	iks_set_log_hook(client->p, aji_log_hook);
 	ASTOBJ_UNLOCK(client);
 	ASTOBJ_CONTAINER_LINK(&clients,client);
 	return 1;
 }
+
+
 
 #if 0
 /*!
@@ -2949,6 +3497,8 @@
 			ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOPRUNE);
 		} else if (!strcasecmp(var->name, "autoregister")) {
 			ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOREGISTER);
+		} else if (!strcasecmp(var->name, "collection_nodes")) {
+			ast_set2_flag(&globalflags, ast_true(var->value), AJI_XEP0248);
 		}
 	}
 
@@ -3060,8 +3610,13 @@
 		if(iterator->state == AJI_DISCONNECTED) {
 			if (!iterator->thread)
 				ast_pthread_create_background(&iterator->thread, NULL, aji_recv_loop, iterator);
-		} else if (iterator->state == AJI_CONNECTING)
+		} else if (iterator->state == AJI_CONNECTING) {
 			aji_get_roster(iterator);
+			if (iterator->distribute_events) {
+				aji_init_event_distribution(iterator);
+			}
+		}
+	
 		ASTOBJ_UNLOCK(iterator);
 	});
 	




More information about the svn-commits mailing list