[asterisk-commits] marquis: branch marquis/pubsub-distributed-events r213622 - in /team/marquis/...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Aug 21 12:35:22 CDT 2009
Author: marquis
Date: Fri Aug 21 12:35:17 2009
New Revision: 213622
URL: http://svn.asterisk.org/svn-view/asterisk?view=rev&rev=213622
Log:
Initial commit of PubSub-based distributed events. Mostly working for device states, with some error conditions not totally taken care of yet.
Modified:
team/marquis/pubsub-distributed-events/include/asterisk/jabber.h
team/marquis/pubsub-distributed-events/res/res_jabber.c
Modified: team/marquis/pubsub-distributed-events/include/asterisk/jabber.h
URL: http://svn.asterisk.org/svn-view/asterisk/team/marquis/pubsub-distributed-events/include/asterisk/jabber.h?view=diff&rev=213622&r1=213621&r2=213622
==============================================================================
--- team/marquis/pubsub-distributed-events/include/asterisk/jabber.h (original)
+++ team/marquis/pubsub-distributed-events/include/asterisk/jabber.h Fri Aug 21 12:35:17 2009
@@ -82,7 +82,8 @@
enum {
AJI_AUTOPRUNE = (1 << 0),
- AJI_AUTOREGISTER = (1 << 1)
+ AJI_AUTOREGISTER = (1 << 1),
+ AJI_XEP0248 = ( 1 << 2)
};
enum aji_btype {
@@ -142,6 +143,7 @@
char password[160];
char user[AJI_MAX_JIDLEN];
char serverhost[AJI_MAX_RESJIDLEN];
+ char pubsub_node[AJI_MAX_RESJIDLEN];
char statusmessage[256];
char name_space[256];
char sid[10]; /* Session ID */
@@ -167,6 +169,7 @@
int timeout;
int message_timeout;
int authorized;
+ int distribute_events;
struct ast_flags flags;
int component; /* 0 client, 1 component */
struct aji_buddy_container buddies;
Modified: team/marquis/pubsub-distributed-events/res/res_jabber.c
URL: http://svn.asterisk.org/svn-view/asterisk/team/marquis/pubsub-distributed-events/res/res_jabber.c?view=diff&rev=213622&r1=213621&r2=213622
==============================================================================
--- team/marquis/pubsub-distributed-events/res/res_jabber.c (original)
+++ team/marquis/pubsub-distributed-events/res/res_jabber.c Fri Aug 21 12:35:17 2009
@@ -58,6 +58,8 @@
#include "asterisk/astobj.h"
#include "asterisk/astdb.h"
#include "asterisk/manager.h"
+#include "asterisk/event.h"
+#include "asterisk/devicestate.h"
/*** DOCUMENTATION
<application name="JabberSend" language="en_US">
@@ -241,7 +243,24 @@
static int aji_register_query_handler(void *data, ikspak *pak);
static int aji_register_approve_handler(void *data, ikspak *pak);
static int aji_reconnect(struct aji_client *client);
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *aji_cli_list_pubsub_items(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static iks *jabber_make_auth(iksid * id, const char *pass, const char *sid);
+static int aji_receive_item_list(void *data, ikspak* pak);
+static void aji_init_event_distribution(struct aji_client *client);
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type,
+ const char *name, const char *collection_name);
+static iks* aji_build_node_config(iks *pubsub, const char *node_type, const char *collection_name);
+static void aji_create_pubsub_collection(struct aji_client *client, const char *collection_name);
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+ const char *leaf_name);
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static void aji_create_affiliations(struct aji_client *client, const char *node);
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type);
+static void aji_publish_device_state(struct aji_client *client, const char * device,
+ const char *device_state);
+static int aji_handle_publish_error(void *data, ikspak *pak);
+static int aji_handle_pubsub_event(void *data, ikspak *pak);
/* No transports in this version */
/*
static int aji_create_transport(char *label, struct aji_client *client);
@@ -255,6 +274,9 @@
AST_CLI_DEFINE(aji_show_clients, "Show state of clients and components"),
AST_CLI_DEFINE(aji_show_buddies, "Show buddy lists of our clients"),
AST_CLI_DEFINE(aji_test, "Shows roster, but is generally used for mog's debugging."),
+ AST_CLI_DEFINE(aji_cli_create_collection, "Creates a test PubSub node collection."),
+ AST_CLI_DEFINE(aji_cli_list_pubsub_items, "Lists Pubsub items"),
+ AST_CLI_DEFINE(aji_cli_create_leafnode, "Creates a test Pubsub leaf node"),
};
static char *app_ajisend = "JabberSend";
@@ -467,7 +489,7 @@
);
if (deprecation_warning++ % 10 == 0)
- ast_log(LOG_WARNING, "JabberStatus is deprecated. Please use the JABBER_STATUS dialplan function in the future.\n");
+ ast_log(LOG_WARNING,"JabberStatus is deprecated Please use the JABBER_STATUS dialplan function in the future.\n");
if (!data) {
ast_log(LOG_ERROR, "Usage: JabberStatus(<sender>,<jid>[/<resource>],<varname>\n");
@@ -496,7 +518,8 @@
if (!r && buddy->resources)
r = buddy->resources;
if (!r)
- ast_log(LOG_NOTICE, "Resource '%s' of buddy '%s' was not found\n", jid.resource, jid.screenname);
+ ast_log(LOG_NOTICE, "Resource '%s' of buddy '%s' was not found\n", jid.resource,
+jid.screenname);
else
stat = r->status;
snprintf(status, sizeof(status), "%d", stat);
@@ -504,7 +527,8 @@
return 0;
}
-static int acf_jabberstatus_read(struct ast_channel *chan, const char *name, char *data, char *buf, size_t buflen)
+static int acf_jabberstatus_read(struct ast_channel *chan, const char *name, char *data,
+char *buf, size_t buflen)
{
struct aji_client *client = NULL;
struct aji_buddy *buddy = NULL;
@@ -545,7 +569,8 @@
if (!r && buddy->resources)
r = buddy->resources;
if (!r)
- ast_log(LOG_NOTICE, "Resource %s of buddy %s was not found.\n", jid.resource, jid.screenname);
+ ast_log(LOG_NOTICE, "Resource %s of buddy %s was not found.\n", jid.resource,
+jid.screenname);
else
stat = r->status;
snprintf(buf, buflen, "%d", stat);
@@ -684,7 +709,8 @@
* \return the number of read bytes on success, 0 on timeout expiration,
* -1 on error
*/
-static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len, int timeout)
+static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len, int
+timeout)
{
int sock;
fd_set fds;
@@ -883,7 +909,8 @@
struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
if (!ast_strlen_zero(xmpp))
- manager_event(EVENT_FLAG_USER, "JabberEvent", "Account: %s\r\nPacket: %s\r\n", client->name, xmpp);
+ manager_event(EVENT_FLAG_USER, "JabberEvent", "Account: %s\r\nPacket: %s\r\n",
+client->name, xmpp);
if (client->debug) {
if (is_incoming)
@@ -910,7 +937,8 @@
*
* \return IKS_OK on success, IKSNET_NOTSUPP on failure.
*/
-static int aji_start_sasl(struct aji_client *client, enum ikssasltype type, char *username, char *pass)
+static int aji_start_sasl(struct aji_client *client, enum ikssasltype type, char
+*username, char *pass)
{
iks *x = NULL;
int len;
@@ -967,7 +995,8 @@
int features = 0;
if(!node) {
- ast_log(LOG_ERROR, "aji_act_hook was called with out a packet\n"); /* most likely cause type is IKS_NODE_ERROR lost connection */
+ ast_log(LOG_ERROR, "aji_act_hook was called with out a packet\n"); /* most likely
+cause type is IKS_NODE_ERROR lost connection */
ASTOBJ_UNREF(client, aji_client_destroy);
return IKS_HOOK;
}
@@ -997,8 +1026,10 @@
break;
}
if (!client->usesasl) {
- iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid, IKS_RULE_DONE);
- auth = jabber_make_auth(client->jid, client->password, iks_find_attrib(node, "id"));
+ iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid, IKS_RULE_DONE);
+ auth = jabber_make_auth(client->jid, client->password,
+iks_find_attrib(node, "id"));
if (auth) {
iks_insert_attrib(auth, "id", client->mid);
iks_insert_attrib(auth, "to", client->jid->server);
@@ -1025,7 +1056,8 @@
break;
if (client->authorized) {
if (features & IKS_STREAM_BIND) {
- iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_DONE);
+ iks_filter_add_rule(client->f, aji_client_connect, client,
+IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_DONE);
auth = iks_make_resource_bind(client->jid);
if (auth) {
iks_insert_attrib(auth, "id", client->mid);
@@ -1038,7 +1070,9 @@
}
}
if (features & IKS_STREAM_SESSION) {
- iks_filter_add_rule (client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "auth", IKS_RULE_DONE);
+ iks_filter_add_rule (client->f, aji_client_connect, client,
+IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "auth",
+IKS_RULE_DONE);
auth = iks_make_session();
if (auth) {
iks_insert_attrib(auth, "id", "auth");
@@ -1056,7 +1090,8 @@
break;
}
- ret = aji_start_sasl(client, features, client->jid->user, client->password);
+ ret = aji_start_sasl(client, features, client->jid->user,
+client->password);
if (ret != IKS_OK) {
ASTOBJ_UNREF(client, aji_client_destroy);
return IKS_HOOK;
@@ -1097,7 +1132,8 @@
handshake = NULL;
}
client->state = AJI_CONNECTING;
- if(aji_recv(client, 1) == 2) /*XXX proper result for iksemel library on iks_recv of <handshake/> XXX*/
+ if(aji_recv(client, 1) == 2) /*XXX proper result for iksemel library on
+iks_recv of <handshake/> XXX*/
client->state = AJI_CONNECTED;
else
ast_log(LOG_WARNING, "Jabber didn't seem to handshake, failed to authenticate.\n");
@@ -1228,7 +1264,8 @@
iks_insert_attrib(query, "xmlns", "jabber:iq:register");
iks_insert_attrib(error, "code" , "406");
iks_insert_attrib(error, "type", "modify");
- iks_insert_attrib(notacceptable, "xmlns", "urn:ietf:params:xml:ns:xmpp-stanzas");
+ iks_insert_attrib(notacceptable, "xmlns",
+"urn:ietf:params:xml:ns:xmpp-stanzas");
iks_insert_node(iq, query);
iks_insert_node(iq, error);
iks_insert_node(error, notacceptable);
@@ -1381,7 +1418,8 @@
ASTOBJ_UNREF(client, aji_client_destroy);
return IKS_FILTER_EAT;
}
- if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
+ if (iks_find_with_attrib(pak->query, "feature", "var",
+"http://www.google.com/xmpp/protocol/voice/v1")) {
resource->cap->jingle = 1;
} else
resource->cap->jingle = 0;
@@ -1402,7 +1440,8 @@
iks_insert_attrib(ident, "type", "pc");
iks_insert_attrib(ident, "name", "asterisk");
iks_insert_attrib(disco, "var", "http://jabber.org/protocol/disco#info");
- iks_insert_attrib(google, "var", "http://www.google.com/xmpp/protocol/voice/v1");
+ iks_insert_attrib(google, "var",
+"http://www.google.com/xmpp/protocol/voice/v1");
iks_insert_node(iq, query);
iks_insert_node(query, ident);
iks_insert_node(query, google);
@@ -1446,12 +1485,15 @@
ASTOBJ_UNREF(client, aji_client_destroy);
return IKS_FILTER_EAT;
}
- if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
+ if (iks_find_with_attrib(pak->query, "feature", "var",
+"http://www.google.com/xmpp/protocol/voice/v1")) {
resource->cap->jingle = 1;
} else
resource->cap->jingle = 0;
- } else if (pak->subtype == IKS_TYPE_GET && !(node = iks_find_attrib(pak->query, "node"))) {
- iks *iq, *query, *identity, *disco, *reg, *commands, *gateway, *version, *vcard, *search;
+ } else if (pak->subtype == IKS_TYPE_GET && !(node = iks_find_attrib(pak->query,
+"node"))) {
+ iks *iq, *query, *identity, *disco, *reg, *commands, *gateway, *version, *vcard,
+*search;
iq = iks_new("iq");
query = iks_new("query");
@@ -1463,8 +1505,8 @@
version = iks_new("feature");
vcard = iks_new("feature");
search = iks_new("feature");
-
- if (iq && query && identity && disco && reg && commands && gateway && version && vcard && search && client) {
+ if (iq && query && identity && disco && reg && commands && gateway && version &&
+vcard && search && client) {
iks_insert_attrib(iq, "from", client->user);
iks_insert_attrib(iq, "to", pak->from->full);
iks_insert_attrib(iq, "id", pak->id);
@@ -1505,8 +1547,9 @@
iks_delete(version);
iks_delete(vcard);
iks_delete(search);
-
- } else if (pak->subtype == IKS_TYPE_GET && !strcasecmp(node, "http://jabber.org/protocol/commands")) {
+
+ } else if (pak->subtype == IKS_TYPE_GET && !strcasecmp(node,
+"http://jabber.org/protocol/commands")) {
iks *iq, *query, *confirm;
iq = iks_new("iq");
query = iks_new("query");
@@ -1645,7 +1688,8 @@
}
ASTOBJ_WRLOCK(buddy);
status = (pak->show) ? pak->show : 6;
- priority = atoi((iks_find_cdata(pak->x, "priority")) ? iks_find_cdata(pak->x, "priority") : "0");
+ priority = atoi((iks_find_cdata(pak->x, "priority")) ? iks_find_cdata(pak->x,
+"priority") : "0");
tmp = buddy->resources;
descrip = ast_strdup(iks_find_cdata(pak->x,"status"));
@@ -1786,7 +1830,8 @@
iks_insert_attrib(iq,"from", client->jid->full);
iks_insert_attrib(iq, "id", client->mid);
ast_aji_increment_mid(client->mid);
- iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#info");
+ iks_insert_attrib(query, "xmlns",
+"http://jabber.org/protocol/disco#info");
iks_insert_node(iq, query);
ast_aji_send(client, iq);
@@ -1862,7 +1907,8 @@
iks_delete(status);
if (client->component)
- aji_set_presence(client, pak->from->full, iks_find_attrib(pak->x, "to"), client->status, client->statusmessage);
+ aji_set_presence(client, pak->from->full, iks_find_attrib(pak->x, "to"),
+client->status, client->statusmessage);
case IKS_TYPE_SUBSCRIBED:
buddy = ASTOBJ_CONTAINER_FIND(&client->buddies, pak->from->partial);
if (!buddy && pak->from->partial) {
@@ -2010,6 +2056,7 @@
res = aji_reconnect(client);
sleep(4);
}
+
do {
if (res == IKS_NET_RWERR || client->timeout == 0) {
@@ -2089,7 +2136,9 @@
ASTOBJ_UNLOCK(iterator);
});
iks_filter_remove_hook(client->f, aji_register_transport);
- iks_filter_add_rule(client->f, aji_register_transport2, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_NS, IKS_NS_REGISTER, IKS_RULE_DONE);
+ iks_filter_add_rule(client->f, aji_register_transport2, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_NS, IKS_NS_REGISTER,
+IKS_RULE_DONE);
iks_insert_attrib(send, "to", buddy->host);
iks_insert_attrib(send, "id", client->mid);
ast_aji_increment_mid(client->mid);
@@ -2156,7 +2205,8 @@
#endif
/*!
- * \brief goes through roster and prunes users not needed in list, or adds them accordingly.
+ * \brief goes through roster and prunes users not needed in list, or adds them
+accordingly.
* \param client the configured XMPP client we use to connect to a XMPP server
* \return void.
* \note The messages here should be configurable.
@@ -2179,7 +2229,8 @@
ASTOBJ_RDLOCK(iterator);
/* For an aji_buddy, both AUTOPRUNE and AUTOREGISTER will never
* be called at the same time */
- if (ast_test_flag(&iterator->flags, AJI_AUTOPRUNE)) { /* If autoprune is set on jabber.conf */
+ if (ast_test_flag(&iterator->flags, AJI_AUTOPRUNE)) { /* If autoprune is set on
+jabber.conf */
res = ast_aji_send(client, iks_make_s10n(IKS_TYPE_UNSUBSCRIBE, iterator->name,
"GoodBye. Your status is no longer needed by Asterisk the Open Source PBX"
" so I am no longer subscribing to your presence.\n"));
@@ -2272,7 +2323,8 @@
if(ast_test_flag(&client->flags, AJI_AUTOPRUNE)) {
ast_set_flag(&buddy->flags, AJI_AUTOPRUNE);
ASTOBJ_MARK(buddy);
- } else if (!iks_strcmp(iks_find_attrib(x, "subscription"), "none") || !iks_strcmp(iks_find_attrib(x, "subscription"), "from")) {
+ } else if (!iks_strcmp(iks_find_attrib(x, "subscription"), "none") ||
+!iks_strcmp(iks_find_attrib(x, "subscription"), "from")) {
/* subscribe to buddy's presence only
if we really need to */
ast_set_flag(&buddy->flags, AJI_AUTOREGISTER);
@@ -2327,7 +2379,8 @@
if(roster) {
iks_insert_attrib(roster, "id", "roster");
- aji_set_presence(client, NULL, client->jid->full, client->status, client->statusmessage);
+ aji_set_presence(client, NULL, client->jid->full, client->status,
+client->statusmessage);
ast_aji_send(client, roster);
}
@@ -2349,12 +2402,17 @@
if (client) {
if (client->state == AJI_DISCONNECTED) {
- iks_filter_add_rule(client->f, aji_filter_roster, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "roster", IKS_RULE_DONE);
+ iks_filter_add_rule(client->f, aji_filter_roster, client, IKS_RULE_TYPE,
+IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, "roster", IKS_RULE_DONE);
client->state = AJI_CONNECTING;
client->jid = (iks_find_cdata(pak->query, "jid")) ? iks_id_new(client->stack, iks_find_cdata(pak->query, "jid")) : client->jid;
iks_filter_remove_hook(client->f, aji_client_connect);
- if(!client->component) /*client*/
+ if(!client->component) { /*client*/
aji_get_roster(client);
+ if (client->distribute_events) {
+ aji_init_event_distribution(client);
+ }
+ }
}
} else
ast_log(LOG_ERROR, "Out of memory.\n");
@@ -2377,13 +2435,15 @@
client->stream_flags = 0;
#endif
/* If it's a component, connect to user, otherwise, connect to server */
- connected = iks_connect_via(client->p, S_OR(client->serverhost, client->jid->server), client->port, client->component ? client->user : client->jid->server);
+ connected = iks_connect_via(client->p, S_OR(client->serverhost, client->jid->server),
+client->port, client->component ? client->user : client->jid->server);
if (connected == IKS_NET_NOCONN) {
ast_log(LOG_ERROR, "JABBER ERROR: No Connection\n");
return IKS_HOOK;
} else if (connected == IKS_NET_NODNS) {
- ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to %s\n", client->name, S_OR(client->serverhost, client->jid->server));
+ ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to %s\n", client->name,
+S_OR(client->serverhost, client->jid->server));
return IKS_HOOK;
}
@@ -2413,6 +2473,475 @@
return 1;
}
+
+
+/*!
+ * \brief Callback function for evnts
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void aji_event_cb(const struct ast_event *ast_event, void *data)
+{
+ const char *device;
+ const char *device_state;
+ if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
+ {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_log(LOG_DEBUG, "Returning here\n");
+ return;
+ }
+
+ struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+
+ if (ast_event_get_type(ast_event) == AST_EVENT_MWI) {
+ ast_log(LOG_ERROR, "Got an MWI event, mailbox: %s\n",
+ ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX));
+ } else if(ast_event_get_type(ast_event) == AST_EVENT_DEVICE_STATE_CHANGE) {
+ device = ast_event_get_ie_str(ast_event, AST_EVENT_IE_DEVICE);
+ device_state = ast_devstate_str(ast_event_get_ie_uint(ast_event, AST_EVENT_IE_STATE));
+ ast_log(LOG_ERROR, "Got a DEVICE_STATE_CHANGE event for %s\n", device);
+ aji_publish_device_state(client, device, device_state);
+ ast_log(LOG_ERROR, "DEVICE_STATE is %s\n", device_state);
+ }
+}
+
+/*!
+ * \brief Initialize collections for event distribution
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \return void
+ */
+static void aji_init_event_distribution(struct aji_client *client)
+{
+ struct ast_event_sub *mwi_sub;
+ struct ast_event_sub *device_state_sub;
+ if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+ aji_create_pubsub_collection(client, "device_state");
+ aji_create_pubsub_collection(client, "message_waiting");
+ } else {
+ aji_create_pubsub_node(client, NULL, "device_state", NULL);
+ aji_create_pubsub_node(client, NULL, "message_waiting", NULL);
+
+ }
+ mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_event_cb, "aji_mwi_subscription",
+ client, AST_EVENT_IE_END);
+ ast_log(LOG_DEBUG, "Subscribed to MWI\n");
+ iks_filter_add_rule(client->f, aji_handle_pubsub_event, client, IKS_RULE_TYPE,
+ IKS_PAK_MESSAGE, IKS_RULE_FROM, client->pubsub_node, IKS_RULE_DONE);
+ iks_filter_add_rule(client->f, aji_handle_publish_error, client, IKS_RULE_TYPE,
+ IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_ERROR, IKS_RULE_DONE);
+
+ if (ast_enable_distributed_devstate()) {
+ return;
+ }
+ device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, aji_event_cb,
+ "aji_devstate_subscription", client, AST_EVENT_IE_END);
+ ast_event_dump_cache(device_state_sub);
+ ast_log(LOG_DEBUG, "Subscribed to Devstate changes\n");
+}
+
+/*!
+ * \brief Callback for handling PubSub events
+ * \param data void pointer to aji_client structure
+ * \return IKS_FILTER_EAT
+ */
+static int aji_handle_pubsub_event(void *data, ikspak *pak)
+{
+ char *device, *device_state;
+ iks *item, *entry, *state_node;
+ struct ast_eid pubsub_eid;
+ struct ast_event *event;
+ item = iks_find(iks_find(iks_find(pak->x, "event"), "items"), "item");
+ if(item) {
+ device = iks_find_attrib(item, "id");
+ entry = iks_find(item, "entry");
+ state_node = iks_find(entry, "state");
+ if(state_node) {
+ device_state = iks_find_cdata(entry, "state");
+ ast_str_to_eid(&pubsub_eid, iks_find_attrib(state_node,"eid"));
+ if (!ast_eid_cmp(&ast_eid_default, &pubsub_eid)) {
+ ast_log(LOG_DEBUG, "Returning here, eid of incoming event matches ours!\n");
+ return IKS_FILTER_EAT;
+ }
+ if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
+ AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, AST_EVENT_IE_STATE,
+ AST_EVENT_IE_PLTYPE_UINT, ast_devstate_val(device_state), AST_EVENT_IE_EID,
+ AST_EVENT_IE_PLTYPE_RAW, &pubsub_eid, sizeof(pubsub_eid),
+ AST_EVENT_IE_END))) {
+ return IKS_FILTER_EAT;
+ }
+
+ ast_event_queue_and_cache(event);
+ }
+ }
+ return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Add Owner affiliations for pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node the name of the node to which to add affiliations
+ * \return void
+ */
+static void aji_create_affiliations(struct aji_client *client, const char *node)
+{
+ int res = 0;
+ iks *modify_affiliates = aji_pubsub_iq_create(client, "set");
+ iks *pubsub, *affiliations, *affiliate;
+ pubsub = iks_insert(modify_affiliates, "pubsub");
+ iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub#owner");
+ affiliations = iks_insert(pubsub, "affiliations");
+ iks_insert_attrib(affiliations, "node", node);
+ ASTOBJ_CONTAINER_TRAVERSE(&client->buddies, 1, {
+ ASTOBJ_RDLOCK(iterator);
+ affiliate = iks_insert(affiliations, "affiliation");
+ iks_insert_attrib(affiliate, "jid", iterator->name);
+ iks_insert_attrib(affiliate, "affiliation", "owner");
+ ASTOBJ_UNLOCK(iterator);
+ });
+ res = ast_aji_send(client, modify_affiliates);
+ iks_delete(modify_affiliates);
+}
+
+/*!
+ * \brief Publish an item to a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param device the name of the device whose state to publish
+ * \param device_state the state to publish
+ * \return void
+ */
+static void aji_publish_device_state(struct aji_client *client, const char * device,
+ const char *device_state)
+{
+ iks *request = aji_pubsub_iq_create(client, "set");
+ iks *pubsub, *publish, *item, *entry, *state;
+ char eid_str[20];
+ ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+ ast_log(LOG_ERROR, "eid_str is %s\n", eid_str);
+
+ pubsub = iks_insert(request, "pubsub");
+ iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+ publish = iks_insert(pubsub, "publish");
+ if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+ iks_insert_attrib(publish, "node", device);
+ } else {
+ iks_insert_attrib(publish, "node", "device_state");
+ }
+ item = iks_insert(publish, "item");
+ iks_insert_attrib(item, "id", device);
+ entry = iks_insert(item, "entry");
+ iks_insert_attrib(entry, "xmlns", "http://placeholder.org");
+ state = iks_insert(entry, "state");
+ iks_insert_attrib(state, "eid", eid_str);
+ iks_insert_cdata(state, device_state, strlen(device_state));
+ ast_aji_send(client, request);
+ iks_delete(request);
+}
+
+/*!
+ * \brief Create an IQ packet
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param type the type of IQ packet to create
+ * \return iks*
+ */
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type)
+{
+ iks *request = iks_new("iq");
+
+ iks_insert_attrib(request, "to", client->pubsub_node);
+ iks_insert_attrib(request, "from", client->jid->full);
+ iks_insert_attrib(request, "type", type);
+ ast_aji_increment_mid(client->mid);
+ iks_insert_attrib(request, "id", client->mid);
+ return request;
+}
+
+static int aji_handle_publish_error(void *data, ikspak *pak)
+{
+ char *node_name;
+ iks *publish;
+ iks *pubsub = iks_find(pak->x, "pubsub");
+ struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+ if(pubsub) {
+ publish = iks_find(pubsub, "publish");
+ if(publish) {
+ node_name = iks_find_attrib(publish, "node");
+ if(iks_find(iks_find(iks_find(publish, "item"), "entry"), "state")) {
+ if(ast_test_flag(&globalflags, AJI_XEP0248)) {
+ aji_create_pubsub_leaf(client, "device_state", node_name);
+ } else {
+ aji_create_pubsub_node(client, NULL, node_name, NULL);
+ }
+ iks *pubsub;
+ iks *request = aji_pubsub_iq_create(client, "set");
+ pubsub = iks_insert(request, "pubsub");
+ iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+ iks_insert_node(pubsub, publish);
+ ast_aji_send(client, request);
+ iks_delete(request);
+ return IKS_FILTER_EAT;
+ }
+ }
+ }
+ return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Request item list from pubsub
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \return void
+ */
+static void aji_request_pubsub_items(struct aji_client *client)
+{
+ int res = 0;
+ iks *request = aji_pubsub_iq_create(client, "get");
+ iks *query;
+ query = iks_insert(request, "query");
+ iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#items");
+ iks_filter_add_rule(client->f, aji_receive_item_list, client, IKS_RULE_TYPE,
+ IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid,
+ IKS_RULE_DONE);
+ res = ast_aji_send(client, request);
+ iks_delete(request);
+
+}
+
+
+/*!
+ * \brief Receive pubsub item lists
+ * \param data pointer to aji_client structure
+ * \param pak response from pubsub diso#items query
+ * \return IKS_FILTER_EAT
+ */
+static int aji_receive_item_list(void *data, ikspak* pak)
+{
+
+ struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+ iks *item;
+ if(iks_has_children(pak->query)) {
+ item = iks_first_tag(pak->query);
+ ast_log(LOG_WARNING, "Node name: %s\n", iks_find_attrib(item, "node"));
+ while((item = iks_next_tag(item))) {
+ ast_log(LOG_WARNING, "Node name: %s\n", iks_find_attrib(item, "node"));
+ }
+ }
+ iks_delete(item);
+ return IKS_FILTER_EAT;
+}
+
+
+
+/*!
+ * \brief Method to expose PubSub node list via CLI.
+ * \return char *
+ */
+static char *aji_cli_list_pubsub_items(struct ast_cli_entry *e, int cmd, struct
+ast_cli_args *a)
+{
+ struct aji_client *client;
+ const char *name = "asterisk";
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "jabber list items";
+ e->usage =
+ "Usage: jabber list items [name]\n"
+ " Lists root items on Pubsub server\n"
+ " as configured in jabber.conf.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc > 4) {
+ return CLI_SHOWUSAGE;
+ } else if (a->argc == 4) {
+ name = a->argv[3];
+ }
+
+ if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+ ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "Listing pubsub items.\n");
+ aji_request_pubsub_items(client);
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Create a PubSub collection node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param collection_name The name to use for this collection
+ * \return void.
+ */
+
+static void aji_create_pubsub_collection(struct aji_client *client, const char
+*collection_name)
+{
+ aji_create_pubsub_node(client, "collection", collection_name, NULL);
+}
+
+
+/*!
+ * \brief Create a PubSub leaf node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param leaf_name The name to use for this collection
+ * \return void.
+ */
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+const char *leaf_name)
+{
+ aji_create_pubsub_node(client, "leaf", leaf_name, collection_name);
+}
+
+/*!
+ * \brief Create a pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node_type the type of node to create
+ * \param name the name of the node to create
+ * \return iks*
+ */
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type, const
+ char *name, const char *collection_name)
+{
+ int res = 0;
+ iks *node = aji_pubsub_iq_create(client, "set");
+ iks *pubsub, *create, *configure;
+ pubsub = iks_insert(node, "pubsub");
+ iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+ create = iks_insert(pubsub, "create");
+ iks_insert_attrib(create, "node", name);
+ configure = aji_build_node_config(pubsub, node_type, collection_name);
+ res = ast_aji_send(client, node);
+ aji_create_affiliations(client, name);
+ iks_delete(node);
+ return 0;
+}
+
+
+
+static iks* aji_build_node_config(iks *pubsub, const char *node_type, const char *collection_name)
+{
+ iks *configure, *x, *field_owner, *field_node_type, *field_node_config,
+ *field_persist_items, *field_access_model, *field_pubsub_collection;
+ configure = iks_insert(pubsub, "configure");
+ x = iks_insert(configure, "x");
+ iks_insert_attrib(x, "xmlns", "jabber:x:data");
+ iks_insert_attrib(x, "type", "submit");
+ field_owner = iks_insert(x, "field");
+ iks_insert_attrib(field_owner, "var", "FORM_TYPE");
+ iks_insert_attrib(field_owner, "type", "hidden");
+ iks_insert_cdata(iks_insert(field_owner, "value"),
+ "http://jabber.org/protocol/pubsub#owner", 39);
+ if(node_type) {
+ field_node_type = iks_insert(x, "field");
+ iks_insert_attrib(field_node_type, "var", "pubsub#node_type");
+ iks_insert_cdata(iks_insert(field_node_type, "value"), node_type, strlen(node_type));
+ }
+ field_node_config = iks_insert(x, "field");
+ iks_insert_attrib(field_node_config, "var", "FORM_TYPE");
+ iks_insert_attrib(field_node_config, "type", "hidden");
+ iks_insert_cdata(iks_insert(field_node_config, "value"),
+ "http://jabber.org/protocol/pubsub#node_config", 45);
+ field_persist_items = iks_insert(x, "field");
+ iks_insert_attrib(field_persist_items, "var", "pubsub#persist_items");
+ iks_insert_cdata(iks_insert(field_persist_items, "value"), "1", 1);
+ field_access_model = iks_insert(x, "field");
+ iks_insert_attrib(field_access_model, "var", "pubsub#access_model");
+ iks_insert_cdata(iks_insert(field_access_model, "value"), "whitelist", 9);
+ if(node_type && !strcasecmp(node_type, "leaf")) {
+ field_pubsub_collection = iks_insert(x, "field");
+ iks_insert_attrib(field_pubsub_collection, "var", "pubsub#collection");
+ iks_insert_cdata(iks_insert(field_pubsub_collection, "value"), collection_name,
+ strlen(collection_name));
+ }
+ return configure;
+}
+
+
+
+/*!
+ * \brief Method to expose PubSub collection node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct aji_client *client;
+ const char *name = "asterisk";
+ const char *collection_name = "test_collection";
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "jabber create collection";
+ e->usage =
+ "Usage: jabber create collection [name] [node name]\n"
+ " Creates a PubSub collection node using the account\n"
+ " as configured in jabber.conf.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc > 5) {
+ return CLI_SHOWUSAGE;
+ } else if (a->argc == 5) {
+ name = a->argv[3];
+ collection_name = a->argv[4];
+ }
+
+ if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+ ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "Creating test PubSub node collection.\n");
+ aji_create_pubsub_collection(client, collection_name);
+ return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Method to expose PubSub leaf node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct aji_client *client;
+ const char *name = "asterisk";
+ const char *collection_name = "test_collection";
+ const char *leaf_name = "test_leaf";
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "jabber create leaf";
+ e->usage =
+ "Usage: jabber create leaf [name] [collection_name] [node name]\n"
+ " Creates a PubSub leaf node using the account\n"
+ " as configured in jabber.conf.\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc > 6) {
+ return CLI_SHOWUSAGE;
+ } else if (a->argc == 6) {
+ name = a->argv[3];
+ collection_name = a->argv[4];
+ leaf_name = a->argv[5];
+ }
+
+ if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+ ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "Creating test PubSub node collection.\n");
+ aji_create_pubsub_leaf(client, collection_name, leaf_name);
+ return CLI_SUCCESS;
+}
+
+
/*!
* \brief set presence of client.
@@ -2710,6 +3239,7 @@
client->keepalive = 1;
client->timeout = 50;
client->message_timeout = 100;
+ client->distribute_events = 0;
AST_LIST_HEAD_INIT(&client->messages);
client->component = 0;
ast_copy_string(client->statusmessage, "Online and Available", sizeof(client->statusmessage));
@@ -2736,9 +3266,24 @@
else if (!strcasecmp(var->name, "debug"))
client->debug = (ast_false(var->value)) ? 0 : 1;
else if (!strcasecmp(var->name, "type")) {
- if (!strcasecmp(var->value, "component"))
+ if (!strcasecmp(var->value, "component")) {
client->component = 1;
- } else if (!strcasecmp(var->name, "usetls")) {
+ if (client->distribute_events) {
+ ast_log(LOG_ERROR, "Client cannot be configure to be both a component and to distribute events! Event distribution will be disabled.\n");
+ client->distribute_events = 0;
+ }
+ }
+ } else if (!strcasecmp(var->name, "distribute_events")) {
+ if(ast_true(var->value)) {
+ if (client->component) {
+ ast_log(LOG_ERROR, "Client cannot be configure to be both a component and to distribute events! Event distribution will be disabled.\n");
+ } else {
+ client->distribute_events= 1;
+ }
+ }
+ } else if (!strcasecmp(var->name, "pubsub_node")) {
+ ast_copy_string(client->pubsub_node, var->value, sizeof(client->pubsub_node));
+ }else if (!strcasecmp(var->name, "usetls")) {
client->usetls = (ast_false(var->value)) ? 0 : 1;
} else if (!strcasecmp(var->name, "usesasl")) {
client->usesasl = (ast_false(var->value)) ? 0 : 1;
@@ -2832,11 +3377,14 @@
} else {
iks_filter_add_rule(client->f, aji_client_info_handler, client, IKS_RULE_NS, "http://jabber.org/protocol/disco#info", IKS_RULE_DONE);
}
+
iks_set_log_hook(client->p, aji_log_hook);
ASTOBJ_UNLOCK(client);
ASTOBJ_CONTAINER_LINK(&clients,client);
return 1;
}
+
+
#if 0
/*!
@@ -2949,6 +3497,8 @@
ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOPRUNE);
} else if (!strcasecmp(var->name, "autoregister")) {
ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOREGISTER);
+ } else if (!strcasecmp(var->name, "collection_nodes")) {
+ ast_set2_flag(&globalflags, ast_true(var->value), AJI_XEP0248);
}
}
@@ -3060,8 +3610,13 @@
if(iterator->state == AJI_DISCONNECTED) {
if (!iterator->thread)
ast_pthread_create_background(&iterator->thread, NULL, aji_recv_loop, iterator);
- } else if (iterator->state == AJI_CONNECTING)
+ } else if (iterator->state == AJI_CONNECTING) {
aji_get_roster(iterator);
+ if (iterator->distribute_events) {
+ aji_init_event_distribution(iterator);
+ }
+ }
+
ASTOBJ_UNLOCK(iterator);
});
More information about the asterisk-commits
mailing list