[asterisk-commits] russell: branch russell/events r84131 - in /team/russell/events/res: ./ ais/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Sep 29 15:10:44 CDT 2007
Author: russell
Date: Sat Sep 29 15:10:44 2007
New Revision: 84131
URL: http://svn.digium.com/view/asterisk?view=rev&rev=84131
Log:
Restructure code to break service specific pieces into different files.
Start adding support for the distributed locking service.
(Think func_lock, but for cluster-wide locks. This will allow synchronization
amongst cluster-wide shared resources. It seems like a really nice compliment
when doing things with func_odbc or DUNDi lookups in a cluster for doing call
routing logic.)
Added:
team/russell/events/res/ais/
team/russell/events/res/ais/ais.h (with props)
team/russell/events/res/ais/clm.c (with props)
team/russell/events/res/ais/evt.c (with props)
team/russell/events/res/ais/lck.c (with props)
Modified:
team/russell/events/res/Makefile
team/russell/events/res/res_ais.c
Modified: team/russell/events/res/Makefile
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/Makefile?view=diff&rev=84131&r1=84130&r2=84131
==============================================================================
--- team/russell/events/res/Makefile (original)
+++ team/russell/events/res/Makefile Sat Sep 29 15:10:44 2007
@@ -31,6 +31,8 @@
ael/ael.tab.o: ael/ael.tab.c ael/ael.tab.h ../include/asterisk/ael_structs.h
ael/ael.tab.o: ASTCFLAGS+=-I. -Iael -DYYENABLE_NLS=0
+$(if $(filter res_snmp,$(EMBEDDED_MODS)),modules.link,res_ais.so): ais/clm.o ais/evt.o ais/lck.o
+
$(if $(filter res_snmp,$(EMBEDDED_MODS)),modules.link,res_snmp.so): snmp/agent.o
$(if $(filter res_ael_share,$(EMBEDDED_MODS)),modules.link,res_ael_share.so): ael/ael_lex.o ael/ael.tab.o ael/pval.o
@@ -44,5 +46,4 @@
ael/pval.o: ael/pval.c
clean::
- rm -f snmp/*.o
- rm -f ael*.o
+ rm -f snmp/*.o ais/*.o ael*.o
Added: team/russell/events/res/ais/ais.h
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/ais/ais.h?view=auto&rev=84131
==============================================================================
--- team/russell/events/res/ais/ais.h (added)
+++ team/russell/events/res/ais/ais.h Sat Sep 29 15:10:44 2007
@@ -1,0 +1,53 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Digium, Inc.
+ *
+ * Russell Bryant <russell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \author Russell Bryant <russell at digium.com>
+ *
+ * \brief Usage of the SAForum AIS (Application Interface Specification)
+ *
+ * \arg http://developer.osdl.org/dev/openais/
+ */
+
+#ifndef AST_AIS_H
+#define AST_AIS_H
+
+#include <openais/saAis.h>
+#include <openais/saClm.h>
+#include <openais/saEvt.h>
+#include <openais/saLck.h>
+
+extern SaVersionT ais_version;
+
+extern SaClmHandleT clm_handle;
+extern SaEvtHandleT evt_handle;
+extern SaLckHandleT lck_handle;
+
+int ast_ais_clm_load_module(void);
+int ast_ais_clm_unload_module(void);
+
+int ast_ais_evt_load_module(void);
+int ast_ais_evt_unload_module(void);
+
+int ast_ais_lck_load_module(void);
+int ast_ais_lck_unload_module(void);
+
+const char *ais_err2str(SaAisErrorT error);
+
+#endif /* AST_AIS_H */
Propchange: team/russell/events/res/ais/ais.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/russell/events/res/ais/ais.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/russell/events/res/ais/ais.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/russell/events/res/ais/clm.c
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/ais/clm.c?view=auto&rev=84131
==============================================================================
--- team/russell/events/res/ais/clm.c (added)
+++ team/russell/events/res/ais/clm.c Sat Sep 29 15:10:44 2007
@@ -1,0 +1,164 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Digium, Inc.
+ *
+ * Russell Bryant <russell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \author Russell Bryant <russell at digium.com>
+ *
+ * \brief Usage of the SAForum AIS (Application Interface Specification)
+ *
+ * \arg http://developer.osdl.org/dev/openais/
+ *
+ * This file contains the code specific to the use of the CLM
+ * (Cluster Membership) Service.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "ais.h"
+
+#include "asterisk/module.h"
+#include "asterisk/utils.h"
+#include "asterisk/cli.h"
+#include "asterisk/logger.h"
+
+SaClmHandleT clm_handle;
+
+static void clm_node_get_cb(SaInvocationT invocation,
+ const SaClmClusterNodeT *cluster_node, SaAisErrorT error);
+static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
+ SaUint32T num_members, SaAisErrorT error);
+
+static const SaClmCallbacksT clm_callbacks = {
+ .saClmClusterNodeGetCallback = clm_node_get_cb,
+ .saClmClusterTrackCallback = clm_track_cb,
+};
+
+static void clm_node_get_cb(SaInvocationT invocation,
+ const SaClmClusterNodeT *cluster_node, SaAisErrorT error)
+{
+
+}
+
+static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
+ SaUint32T num_members, SaAisErrorT error)
+{
+
+}
+
+static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ int i, res;
+ SaClmClusterNotificationBufferT buf;
+ SaClmClusterNotificationT notif[64];
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "ais clm show members";
+ e->usage =
+ "Usage: ais clm show members\n"
+ " List members of the cluster using the CLM (Cluster Membership) service.\n";
+ return NULL;
+
+ case CLI_GENERATE:
+ return NULL; /* no completion */
+ }
+
+ if (a->argc != e->args)
+ return CLI_SHOWUSAGE;
+
+ buf.notification = notif;
+ buf.numberOfItems = ARRAY_LEN(notif);
+
+ res = saClmClusterTrack(clm_handle, SA_TRACK_CURRENT, &buf);
+ if (res != SA_AIS_OK) {
+ ast_cli(a->fd, "Error retrieving current cluster members.\n");
+ return CLI_FAILURE;
+ }
+
+ ast_cli(a->fd, "\n"
+ "=============================================================\n"
+ "=== Cluster Members =========================================\n"
+ "=============================================================\n"
+ "===\n");
+
+ for (i = 0; i < buf.numberOfItems; i++) {
+ SaClmClusterNodeT *node = &buf.notification[i].clusterNode;
+
+ ast_cli(a->fd, "=== ---------------------------------------------------------\n"
+ "=== Node Name: %s\n"
+ "=== ==> ID: 0x%x\n"
+ "=== ==> Address: %s\n"
+ "=== ==> Member: %s\n",
+ (char *) node->nodeName.value, (int) node->nodeId,
+ (char *) node->nodeAddress.value,
+ node->member ? "Yes" : "No");
+
+ ast_cli(a->fd, "=== ---------------------------------------------------------\n"
+ "===\n");
+ }
+
+ ast_cli(a->fd, "=============================================================\n"
+ "\n");
+
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry ais_cli[] = {
+ NEW_CLI(ais_clm_show_members, "List current members of the cluster"),
+};
+
+int ast_ais_clm_load_module(void)
+{
+ SaAisErrorT res;
+
+ res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n",
+ ais_err2str(res));
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+int ast_ais_clm_unload_module(void)
+{
+ SaAisErrorT res;
+
+ ast_cli_unregister_multiple(ais_cli, ARRAY_LEN(ais_cli));
+
+ res = saClmFinalize(clm_handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Problem stopping cluster membership service: %s\n",
+ ais_err2str(res));
+ return -1;
+ }
+
+ return 0;
+}
Propchange: team/russell/events/res/ais/clm.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/russell/events/res/ais/clm.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/russell/events/res/ais/clm.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/russell/events/res/ais/evt.c
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/ais/evt.c?view=auto&rev=84131
==============================================================================
--- team/russell/events/res/ais/evt.c (added)
+++ team/russell/events/res/ais/evt.c Sat Sep 29 15:10:44 2007
@@ -1,0 +1,580 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Digium, Inc.
+ *
+ * Russell Bryant <russell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \author Russell Bryant <russell at digium.com>
+ *
+ * \brief Usage of the SAForum AIS (Application Interface Specification)
+ *
+ * \arg http://developer.osdl.org/dev/openais/
+ *
+ * This file contains the code specific to the use of the EVT
+ * (Event) Service.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "ais.h"
+
+#include "asterisk/module.h"
+#include "asterisk/utils.h"
+#include "asterisk/cli.h"
+#include "asterisk/logger.h"
+#include "asterisk/event.h"
+#include "asterisk/config.h"
+#include "asterisk/linkedlists.h"
+
+SaEvtHandleT evt_handle;
+
+void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
+ SaAisErrorT error);
+void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
+ const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
+
+static const SaEvtCallbacksT evt_callbacks = {
+ .saEvtChannelOpenCallback = evt_channel_open_cb,
+ .saEvtEventDeliverCallback = evt_event_deliver_cb,
+};
+
+static const struct {
+ const char *str;
+ enum ast_event_type type;
+} supported_event_types[] = {
+ { "mwi", AST_EVENT_MWI },
+};
+
+/*! Used to provide unique id's to egress subscriptions */
+static int unique_id;
+
+/*! \brief This server's unique entity ID */
+static struct ast_eid g_eid;
+static char g_eid_str[32];
+
+struct subscribe_event {
+ AST_LIST_ENTRY(subscribe_event) entry;
+ /*! This is a unique identifier to identify this subscription in the event
+ * channel through the different API calls, subscribe, unsubscribe, and
+ * the event deliver callback. */
+ SaEvtSubscriptionIdT id;
+ enum ast_event_type type;
+};
+
+struct publish_event {
+ AST_LIST_ENTRY(publish_event) entry;
+ /*! We subscribe to events internally so that we can publish them
+ * on this event channel. */
+ struct ast_event_sub *sub;
+ enum ast_event_type type;
+};
+
+struct event_channel {
+ AST_RWLIST_ENTRY(event_channel) entry;
+ AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
+ AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
+ SaEvtChannelHandleT handle;
+ char name[1];
+};
+
+static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
+
+void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
+ SaAisErrorT error)
+{
+
+}
+
+static void queue_event(struct ast_event *ast_event)
+{
+ /*!
+ * \todo This hack macks me sad. I need to come up with a better way to
+ * figure out whether an event should be cached or not, and what
+ * parameters to cache on.
+ *
+ * As long as the types of events that are supported is limited,
+ * this isn't *terrible*, I guess. Perhaps we should just define
+ * caching rules in the core, and make them configurable, and not
+ * have it be the job of the event publishers.
+ */
+
+ if (ast_event_get_type(ast_event) == AST_EVENT_MWI) {
+ ast_event_queue_and_cache(ast_event,
+ AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR,
+ AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR,
+ AST_EVENT_IE_END);
+ } else {
+ ast_event_queue(ast_event);
+ }
+}
+
+void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
+ const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
+{
+ /* It is important to note that this works because we *know* that this
+ * function will only be called by a single thread, the dispatch_thread.
+ * If this module gets changed such that this is no longer the case, this
+ * should get changed to a thread-local buffer, instead. */
+ static unsigned char buf[4096];
+ struct ast_event *event_dup, *event = (void *) buf;
+ SaAisErrorT res;
+ SaSizeT len = sizeof(buf);
+
+ if (event_datalen > len) {
+ ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
+ "for the allocated size %u. Change the code to increase the size.\n",
+ (unsigned int) event_datalen, (unsigned int) len);
+ return;
+ }
+
+ res = saEvtEventDataGet(event_handle, event, &len);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
+ ais_err2str(res));
+ return;
+ }
+
+ if (!strcasecmp(g_eid_str, ast_event_get_ie_str(event, AST_EVENT_IE_EID))) {
+ /* Don't feed events back in that originated locally. */
+ return;
+ }
+
+ if (!(event_dup = ast_malloc(len)))
+ return;
+
+ memcpy(event_dup, event, len);
+
+ queue_event(event_dup);
+}
+
+static const char *type_to_filter_str(enum ast_event_type type)
+{
+ const char *filter_str = NULL;
+ int i;
+
+ for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
+ if (supported_event_types[i].type == type) {
+ filter_str = supported_event_types[i].str;
+ break;
+ }
+ }
+
+ return filter_str;
+}
+
+static void ast_event_cb(const struct ast_event *ast_event, void *data)
+{
+ SaEvtEventHandleT event_handle;
+ SaAisErrorT res;
+ struct event_channel *event_channel = data;
+ SaClmClusterNodeT local_node;
+ SaEvtEventPatternArrayT pattern_array;
+ SaEvtEventPatternT pattern;
+ SaSizeT len;
+ const char *filter_str;
+ SaEvtEventIdT event_id;
+
+ ast_log(LOG_DEBUG, "Got an event to forward\n");
+
+ if (strcasecmp(g_eid_str, ast_event_get_ie_str(ast_event, AST_EVENT_IE_EID))) {
+ /* If the event didn't originate from this server, don't send it back out. */
+ ast_log(LOG_DEBUG, "Returning here\n");
+ return;
+ }
+
+ res = saEvtEventAllocate(event_channel->handle, &event_handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(res));
+ ast_log(LOG_DEBUG, "Returning here\n");
+ return;
+ }
+
+ res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
+ SA_TIME_ONE_SECOND, &local_node);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(res));
+ goto return_event_free;
+ }
+
+ filter_str = type_to_filter_str(ast_event_get_type(ast_event));
+ len = strlen(filter_str) + 1;
+ pattern.pattern = (SaUint8T *) filter_str;
+ pattern.patternSize = len;
+ pattern.allocatedSize = len;
+
+ pattern_array.allocatedNumber = 1;
+ pattern_array.patternsNumber = 1;
+ pattern_array.patterns = &pattern;
+
+ /*!
+ * /todo Make retention time configurable
+ * /todo Make event priorities configurable
+ */
+ res = saEvtEventAttributesSet(event_handle, &pattern_array,
+ SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(res));
+ goto return_event_free;
+ }
+
+ res = saEvtEventPublish(event_handle,
+ ast_event, ast_event_get_size(ast_event), &event_id);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(res));
+ goto return_event_free;
+ }
+
+return_event_free:
+ res = saEvtEventFree(event_handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(res));
+ }
+ ast_log(LOG_DEBUG, "Returning here (event_free)\n");
+}
+
+static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ struct event_channel *event_channel;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "ais evt show event channels";
+ e->usage =
+ "Usage: ais evt show event channels\n"
+ " List configured event channels for the (EVT) Eventing service.\n";
+ return NULL;
+
+ case CLI_GENERATE:
+ return NULL; /* no completion */
+ }
+
+ if (a->argc != e->args)
+ return CLI_SHOWUSAGE;
+
+ ast_cli(a->fd, "\n"
+ "=============================================================\n"
+ "=== Event Channels ==========================================\n"
+ "=============================================================\n"
+ "===\n");
+
+ AST_RWLIST_RDLOCK(&event_channels);
+ AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
+ struct publish_event *publish_event;
+ struct subscribe_event *subscribe_event;
+
+ ast_cli(a->fd, "=== ---------------------------------------------------------\n"
+ "=== Event Channel Name: %s\n", event_channel->name);
+
+ AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
+ ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
+ type_to_filter_str(publish_event->type));
+ }
+
+ AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
+ ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
+ type_to_filter_str(subscribe_event->type));
+ }
+
+ ast_cli(a->fd, "=== ---------------------------------------------------------\n"
+ "===\n");
+ }
+ AST_RWLIST_UNLOCK(&event_channels);
+
+ ast_cli(a->fd, "=============================================================\n"
+ "\n");
+
+ return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry ais_cli[] = {
+ NEW_CLI(ais_evt_show_event_channels, "Show configured event channels"),
+};
+
+static void add_publish_event(struct event_channel *event_channel, const char *event_type)
+{
+ int i;
+ enum ast_event_type type = -1;
+ struct publish_event *publish_event;
+
+ for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
+ if (!strcasecmp(event_type, supported_event_types[i].str)) {
+ type = supported_event_types[i].type;
+ break;
+ }
+ }
+
+ if (type == -1) {
+ ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
+ return;
+ }
+
+ if (!(publish_event = ast_calloc(1, sizeof(*publish_event))))
+ return;
+
+ publish_event->type = type;
+ ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
+ publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
+ AST_EVENT_IE_END);
+ ast_event_dump_cache(publish_event->sub);
+
+ AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
+}
+
+static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
+ struct subscribe_event *subscribe_event)
+{
+ SaAisErrorT res;
+ SaEvtEventFilterArrayT filter_array;
+ SaEvtEventFilterT filter;
+ const char *filter_str = NULL;
+ SaSizeT len;
+
+ /* We know it's going to be valid. It was checked earlier. */
+ filter_str = type_to_filter_str(subscribe_event->type);
+
+ filter.filterType = SA_EVT_EXACT_FILTER;
+ len = strlen(filter_str) + 1;
+ filter.filter.allocatedSize = len;
+ filter.filter.patternSize = len;
+ filter.filter.pattern = (SaUint8T *) filter_str;
+
+ filter_array.filtersNumber = 1;
+ filter_array.filters = &filter;
+
+ res = saEvtEventSubscribe(event_channel->handle, &filter_array,
+ subscribe_event->id);
+
+ return res;
+}
+
+static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
+{
+ int i;
+ enum ast_event_type type = -1;
+ struct subscribe_event *subscribe_event;
+ SaAisErrorT res;
+
+ for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
+ if (!strcasecmp(event_type, supported_event_types[i].str)) {
+ type = supported_event_types[i].type;
+ break;
+ }
+ }
+
+ if (type == -1) {
+ ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
+ return;
+ }
+
+ if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event))))
+ return;
+
+ subscribe_event->type = type;
+ subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
+
+ res = set_egress_subscription(event_channel, subscribe_event);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
+ ais_err2str(res));
+ free(subscribe_event);
+ return;
+ }
+
+ AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
+}
+
+static void build_event_channel(struct ast_config *cfg, const char *cat)
+{
+ struct ast_variable *var;
+ struct event_channel *event_channel;
+ SaAisErrorT res;
+ SaNameT sa_name = { 0, };
+
+ AST_RWLIST_WRLOCK(&event_channels);
+ AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
+ if (!strcasecmp(event_channel->name, cat))
+ break;
+ }
+ AST_RWLIST_UNLOCK(&event_channels);
+ if (event_channel) {
+ ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
+ "configuration. Second instance ignored.\n", cat);
+ return;
+ }
+
+ if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
+ return;
+
+ strcpy(event_channel->name, cat);
+ ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
+ sa_name.length = strlen((char *) sa_name.value);
+ res = saEvtChannelOpen(evt_handle, &sa_name,
+ SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
+ SA_TIME_MAX, &event_channel->handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(res));
+ free(event_channel);
+ return;
+ }
+
+ for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
+ if (!strcasecmp(var->name, "type")) {
+ continue;
+ } else if (!strcasecmp(var->name, "publish_event")) {
+ add_publish_event(event_channel, var->value);
+ } else if (!strcasecmp(var->name, "subscribe_event")) {
+ add_subscribe_event(event_channel, var->value);
+ } else {
+ ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
+ event_channel->name, var->name);
+ }
+ }
+
+ AST_RWLIST_WRLOCK(&event_channels);
+ AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
+ AST_RWLIST_UNLOCK(&event_channels);
+}
+
+static void load_config(void)
+{
+ static const char filename[] = "ais.conf";
+ struct ast_config *cfg;
+ const char *cat = NULL;
+ struct ast_flags config_flags = { 0 };
+
+ if (!(cfg = ast_config_load(filename, config_flags)))
+ return;
+
+ while ((cat = ast_category_browse(cfg, cat))) {
+ const char *type;
+
+ if (!strcasecmp(cat, "general"))
+ continue;
+
+ if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
+ ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
+ filename);
+ continue;
+ }
+
+ if (!strcasecmp(type, "event_channel")) {
+ build_event_channel(cfg, cat);
+ } else {
+ ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
+ filename, type);
+ }
+ }
+
+ ast_config_destroy(cfg);
+}
+
+static void publish_event_destroy(struct publish_event *publish_event)
+{
+ ast_event_unsubscribe(publish_event->sub);
+
+ free(publish_event);
+}
+
+static void subscribe_event_destroy(const struct event_channel *event_channel,
+ struct subscribe_event *subscribe_event)
+{
+ SaAisErrorT res;
+
+ /* saEvtChannelClose() will actually do this automatically, but it just
+ * feels cleaner to go ahead and do it manually ... */
+ res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(res));
+ }
+
+ free(subscribe_event);
+}
+
+static void event_channel_destroy(struct event_channel *event_channel)
+{
+ struct publish_event *publish_event;
+ struct subscribe_event *subscribe_event;
+ SaAisErrorT res;
+
+ while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
+ publish_event_destroy(publish_event);
+ while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
+ subscribe_event_destroy(event_channel, subscribe_event);
+
+ res = saEvtChannelClose(event_channel->handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
+ event_channel->name, ais_err2str(res));
+ }
+
+ free(event_channel);
+}
+
+static void destroy_event_channels(void)
+{
+ struct event_channel *event_channel;
+
+ AST_RWLIST_WRLOCK(&event_channels);
+ while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry)))
+ event_channel_destroy(event_channel);
+ AST_RWLIST_UNLOCK(&event_channels);
+}
+
+int ast_ais_evt_load_module(void)
+{
+ SaAisErrorT res;
+
+ ast_set_eid(&g_eid);
+ ast_eid_to_str(g_eid_str, sizeof(g_eid_str), &g_eid);
+
+ res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
+ ais_err2str(res));
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ load_config();
+
+ ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+int ast_ais_evt_unload_module(void)
+{
+ SaAisErrorT res;
+
+ destroy_event_channels();
+
+ res = saEvtFinalize(evt_handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
+ ais_err2str(res));
+ return -1;
+ }
+
+ return 0;
+}
Propchange: team/russell/events/res/ais/evt.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/russell/events/res/ais/evt.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/russell/events/res/ais/evt.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/russell/events/res/ais/lck.c
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/ais/lck.c?view=auto&rev=84131
==============================================================================
--- team/russell/events/res/ais/lck.c (added)
+++ team/russell/events/res/ais/lck.c Sat Sep 29 15:10:44 2007
@@ -1,0 +1,95 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Digium, Inc.
+ *
+ * Russell Bryant <russell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \author Russell Bryant <russell at digium.com>
+ *
+ * \brief Usage of the SAForum AIS (Application Interface Specification)
+ *
+ * \arg http://developer.osdl.org/dev/openais/
+ *
+ * This file contains the code specific to the use of the LCK
+ * (Distributed Locks) Service.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "ais.h"
+
+#include "asterisk/module.h"
+#include "asterisk/utils.h"
+#include "asterisk/cli.h"
+#include "asterisk/logger.h"
+
+SaLckHandleT lck_handle;
+
+static SaLckCallbacksT lck_callbacks = {
+ /*! Get notified when a cluster-wide lock gets created */
+ .saLckResourceOpenCallback = NULL,
+ /*! Get notified when an asynchronous lock request gets granted */
+ .saLckLockGrantCallback = NULL,
+ /*! Be informed when a currently held lock is blocking another node */
+ .saLckLockWaiterCallback = NULL,
+ /*! Get notified when an asynchronous unlock request is done */
+ .saLckResourceUnlockCallback = NULL,
+};
+
+static struct ao2_container * attribute_unused lock_resources;
+
+struct lock_resource {
+ SaLckResourceHandleT *handle;
+ SaNameT ais_name;
+ char name[1];
+};
+
+int ast_ais_lck_load_module(void)
+{
+ SaAisErrorT res;
+
+ res = saLckInitialize(&lck_handle, &lck_callbacks, &ais_version);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Could not initialize distributed locking service: %s\n",
+ ais_err2str(res));
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+int ast_ais_lck_unload_module(void)
+{
+ SaAisErrorT res;
+
+ res = saLckFinalize(lck_handle);
+ if (res != SA_AIS_OK) {
+ ast_log(LOG_ERROR, "Problem stopping distributed locking service: %s\n",
+ ais_err2str(res));
+ return -1;
+ }
+
+ return 0;
+}
Propchange: team/russell/events/res/ais/lck.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/russell/events/res/ais/lck.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/russell/events/res/ais/lck.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: team/russell/events/res/res_ais.c
URL: http://svn.digium.com/view/asterisk/team/russell/events/res/res_ais.c?view=diff&rev=84131&r1=84130&r2=84131
==============================================================================
--- team/russell/events/res/res_ais.c (original)
+++ team/russell/events/res/res_ais.c Sat Sep 29 15:10:44 2007
@@ -23,11 +23,15 @@
* \brief Usage of the SAForum AIS (Application Interface Specification)
*
* \arg http://developer.osdl.org/dev/openais/
+ *
+ * This file contains the common code between the uses of the different AIS
+ * services.
*/
/*** MODULEINFO
<depend>SaClm</depend>
<depend>SaEvt</depend>
+ <depend>SaLck</depend>
***/
#include "asterisk.h"
@@ -40,9 +44,7 @@
#include <unistd.h>
#include <errno.h>
-#include <openais/saAis.h>
-#include <openais/saClm.h>
-#include <openais/saEvt.h>
+#include "ais/ais.h"
#include "asterisk/module.h"
#include "asterisk/options.h"
@@ -50,77 +52,15 @@
#include "asterisk/channel.h"
#include "asterisk/utils.h"
#include "asterisk/cli.h"
-#include "asterisk/linkedlists.h"
-#include "asterisk/event.h"
-#include "asterisk/config.h"
-
-static const struct {
- const char *str;
- enum ast_event_type type;
-} supported_event_types[] = {
- { "mwi", AST_EVENT_MWI },
+
+static struct {
+ pthread_t id;
+ unsigned int stop:1;
+} dispatch_thread = {
+ .id = AST_PTHREADT_NULL,
};
-/*! Used to provide unique id's to egress subscriptions */
-static int unique_id;
-
-/*! \brief This server's unique entity ID */
-static struct ast_eid g_eid;
-static char g_eid_str[32];
-
-struct subscribe_event {
- AST_LIST_ENTRY(subscribe_event) entry;
- /*! This is a unique identifier to identify this subscription in the event
- * channel through the different API calls, subscribe, unsubscribe, and
- * the event deliver callback. */
- SaEvtSubscriptionIdT id;
- enum ast_event_type type;
-};
-
-struct publish_event {
- AST_LIST_ENTRY(publish_event) entry;
- /*! We subscribe to events internally so that we can publish them
- * on this event channel. */
- struct ast_event_sub *sub;
- enum ast_event_type type;
-};
-
-struct event_channel {
- AST_RWLIST_ENTRY(event_channel) entry;
- AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
- AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
- SaEvtChannelHandleT handle;
- char name[1];
-};
-
-static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
-
-static SaClmHandleT clm_handle;
-static SaEvtHandleT evt_handle;
-
-static pthread_t dispatch_thread_id = AST_PTHREADT_NULL;
-
-static SaVersionT version = { 'B', 1, 1 };
-
-static void clm_node_get_cb(SaInvocationT invocation,
- const SaClmClusterNodeT *cluster_node, SaAisErrorT error);
-static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
- SaUint32T num_members, SaAisErrorT error);
-
-static const SaClmCallbacksT clm_callbacks = {
- .saClmClusterNodeGetCallback = clm_node_get_cb,
- .saClmClusterTrackCallback = clm_track_cb,
-};
-
-void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
- SaAisErrorT error);
-void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
- const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
-
-static const SaEvtCallbacksT evt_callbacks = {
- .saEvtChannelOpenCallback = evt_channel_open_cb,
- .saEvtEventDeliverCallback = evt_event_deliver_cb,
-};
+SaVersionT ais_version = { 'B', 1, 1 };
static const struct ais_error {
SaAisErrorT error;
@@ -155,7 +95,7 @@
{ SA_AIS_ERR_NO_SECTIONS, "No More Sections to Initialize" },
};
-static const char *ais_err2str(SaAisErrorT error)
+const char *ais_err2str(SaAisErrorT error)
{
int x;
@@ -167,609 +107,98 @@
return "Unknown";
}
-static void clm_node_get_cb(SaInvocationT invocation,
- const SaClmClusterNodeT *cluster_node, SaAisErrorT error)
-{
-
-}
-
-static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
- SaUint32T num_members, SaAisErrorT error)
-{
-
-}
-
-void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
- SaAisErrorT error)
-{
-
-}
-
-static void queue_event(struct ast_event *ast_event)
-{
- /*!
- * \todo This hack macks me sad. I need to come up with a better way to
- * figure out whether an event should be cached or not, and what
- * parameters to cache on.
- *
- * As long as the types of events that are supported is limited,
- * this isn't *terrible*, I guess. Perhaps we should just define
- * caching rules in the core, and make the configurable, and not
- * have it be the job of the event publishers.
- */
-
- if (ast_event_get_type(ast_event) == AST_EVENT_MWI) {
- ast_event_queue_and_cache(ast_event,
- AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR,
- AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR,
- AST_EVENT_IE_END);
- } else {
- ast_event_queue(ast_event);
- }
-}
-
-void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
- const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
-{
- /* It is important to note that this works because we *know* that this
- * function will only be called by a single thread, the dispatch_thread.
- * If this module gets changed such that this is no longer the case, this
- * should get changed to a thread-local buffer, instead. */
- static unsigned char buf[4096];
- struct ast_event *event_dup, *event = (void *) buf;
- SaAisErrorT res;
- SaSizeT len = sizeof(buf);
-
- if (event_datalen > len) {
- ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
- "for the allocated size %u. Change the code to increase the size.\n",
- (unsigned int) event_datalen, (unsigned int) len);
- return;
- }
-
- res = saEvtEventDataGet(event_handle, event, &len);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
- ais_err2str(res));
- return;
- }
-
- if (!strcasecmp(g_eid_str, ast_event_get_ie_str(event, AST_EVENT_IE_EID))) {
- /* Don't feed events back in that originated locally. */
- return;
- }
-
- if (!(event_dup = ast_malloc(len)))
- return;
-
- memcpy(event_dup, event, len);
-
- queue_event(event_dup);
-}
-
-static const char *type_to_filter_str(enum ast_event_type type)
-{
- const char *filter_str = NULL;
- int i;
-
- for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
- if (supported_event_types[i].type == type) {
- filter_str = supported_event_types[i].str;
- break;
- }
- }
-
- return filter_str;
-}
-
-static void ast_event_cb(const struct ast_event *ast_event, void *data)
-{
- SaEvtEventHandleT event_handle;
- SaAisErrorT res;
- struct event_channel *event_channel = data;
- SaClmClusterNodeT local_node;
- SaEvtEventPatternArrayT pattern_array;
- SaEvtEventPatternT pattern;
- SaSizeT len;
- const char *filter_str;
- SaEvtEventIdT event_id;
-
- ast_log(LOG_DEBUG, "Got an event to forward\n");
-
- if (strcasecmp(g_eid_str, ast_event_get_ie_str(ast_event, AST_EVENT_IE_EID))) {
- /* If the event didn't originate from this server, don't send it back out. */
- ast_log(LOG_DEBUG, "Returning here\n");
- return;
- }
-
- res = saEvtEventAllocate(event_channel->handle, &event_handle);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(res));
- ast_log(LOG_DEBUG, "Returning here\n");
- return;
- }
-
- res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
- SA_TIME_ONE_SECOND, &local_node);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(res));
- goto return_event_free;
- }
-
- filter_str = type_to_filter_str(ast_event_get_type(ast_event));
- len = strlen(filter_str) + 1;
- pattern.pattern = (SaUint8T *) filter_str;
- pattern.patternSize = len;
- pattern.allocatedSize = len;
-
- pattern_array.allocatedNumber = 1;
- pattern_array.patternsNumber = 1;
- pattern_array.patterns = &pattern;
-
- /*!
- * /todo Make retention time configurable
- * /todo Make event priorities configurable
- */
- res = saEvtEventAttributesSet(event_handle, &pattern_array,
- SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(res));
- goto return_event_free;
- }
-
- res = saEvtEventPublish(event_handle,
- ast_event, ast_event_get_size(ast_event), &event_id);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(res));
- goto return_event_free;
- }
-
-return_event_free:
- res = saEvtEventFree(event_handle);
- if (res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(res));
- }
- ast_log(LOG_DEBUG, "Returning here (event_free)\n");
-}
-
-static void *dispatch_thread(void *data)
-{
- SaSelectionObjectT clm_fd, evt_fd, max_fd;
+static void *dispatch_thread_handler(void *data)
+{
+ SaSelectionObjectT clm_fd, evt_fd, lck_fd, max_fd;
int res;
fd_set read_fds;
SaAisErrorT ais_res;
ais_res = saClmSelectionObjectGet(clm_handle, &clm_fd);
if (ais_res != SA_AIS_OK) {
- ast_log(LOG_ERROR, "Failed to retrieve select fd for CLM service. This module will not operate.\n");
+ ast_log(LOG_ERROR, "Failed to retrieve select fd for CLM service. "
+ "This module will not operate.\n");
return NULL;
}
ais_res = saEvtSelectionObjectGet(evt_handle, &evt_fd);
if (ais_res != SA_AIS_OK) {
[... 476 lines stripped ...]
More information about the asterisk-commits
mailing list