[svn-commits] dlee: branch dlee/amqp-cdr-cel r430900 - in /team/dlee/amqp-cdr-cel: ./ build...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Jan 21 13:59:28 CST 2015


Author: dlee
Date: Wed Jan 21 13:59:24 2015
New Revision: 430900

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=430900
Log:
First cut at AMQP backend for CDR/CEL

Added:
    team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c   (with props)
    team/dlee/amqp-cdr-cel/cel/cel_amqp.c   (with props)
    team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample   (with props)
    team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample   (with props)
    team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample   (with props)
    team/dlee/amqp-cdr-cel/include/asterisk/amqp.h   (with props)
    team/dlee/amqp-cdr-cel/res/amqp/
    team/dlee/amqp-cdr-cel/res/amqp/cli.c   (with props)
    team/dlee/amqp-cdr-cel/res/amqp/config.c   (with props)
    team/dlee/amqp-cdr-cel/res/amqp/internal.h   (with props)
    team/dlee/amqp-cdr-cel/res/res_amqp.c   (with props)
    team/dlee/amqp-cdr-cel/res/res_amqp.exports.in   (with props)
Modified:
    team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in
    team/dlee/amqp-cdr-cel/configure
    team/dlee/amqp-cdr-cel/configure.ac
    team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq
    team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in
    team/dlee/amqp-cdr-cel/include/asterisk/cel.h
    team/dlee/amqp-cdr-cel/makeopts.in
    team/dlee/amqp-cdr-cel/res/Makefile
    team/dlee/amqp-cdr-cel/res/res_pjsip_config_wizard.c

Modified: team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in (original)
+++ team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in Wed Jan 21 13:59:24 2015
@@ -51,6 +51,7 @@
 PRI=@PBX_PRI@
 OPENR2=@PBX_OPENR2@
 RESAMPLE=@PBX_RESAMPLE@
+RABBITMQ=@PBX_RABBITMQ@
 RADIUS=@PBX_RADIUS@
 LAUNCHD=@PBX_LAUNCHD@
 SPANDSP=@PBX_SPANDSP@

Added: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c (added)
+++ team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c Wed Jan 21 13:59:24 2015
@@ -1,0 +1,385 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2015, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief AMQP CDR Backend
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ */
+
+/*** MODULEINFO
+	<depend>res_amqp</depend>
+	<support_level>core</support_level>
+ ***/
+
+/*** DOCUMENTATION
+	<configInfo name="cdr_amqp" language="en_US">
+		<synopsis>AMQP CDR Backend</synopsis>
+		<configFile name="cdr_amqp.conf">
+			<configObject name="global">
+				<synopsis>Global configuration settings</synopsis>
+				<configOption name="loguniqueid">
+					<synopsis></synopsis>
+					<description>
+						<para></para>
+					</description>
+				</configOption>
+				<configOption name="loguniqueid">
+					<synopsis>Determines whether to log the uniqueid for calls</synopsis>
+					<description>
+						<para>Defaults is no.</para>
+					</description>
+				</configOption>
+				<configOption name="loguserfield">
+					<synopsis>Determines whether to log the user field for calls</synopsis>
+					<description>
+						<para>Default is no.</para>
+					</description>
+				</configOption>
+				<configOption name="connection">
+					<synopsis>Name of the connection from amqp.conf to use</synopsis>
+					<description>
+						<para>Specifies the name of the connection from amqp.conf to use</para>
+					</description>
+				</configOption>
+				<configOption name="queue">
+					<synopsis>Name of the queue to post to</synopsis>
+					<description>
+						<para>Defaults to asterisk_cdr</para>
+					</description>
+				</configOption>
+				<configOption name="exchange">
+					<synopsis>Name of the exchange to post to</synopsis>
+					<description>
+						<para>Defaults to empty string</para>
+					</description>
+				</configOption>
+			</configObject>
+		</configFile>
+	</configInfo>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/cdr.h"
+#include "asterisk/config_options.h"
+#include "asterisk/json.h"
+#include "asterisk/module.h"
+#include "asterisk/amqp.h"
+#include "asterisk/stringfields.h"
+
+#define CDR_NAME "AMQP"
+#define CONF_FILENAME "cdr_amqp.conf"
+
+/*! \brief global config structure */
+struct cdr_amqp_global_conf {
+	AST_DECLARE_STRING_FIELDS(
+		/*! \brief connection name */
+		AST_STRING_FIELD(connection);
+		/*! \brief queue name */
+		AST_STRING_FIELD(queue);
+		/*! \brief exchange name */
+		AST_STRING_FIELD(exchange);
+	);
+	/*! \brief whether to log the unique id */
+	int loguniqueid;
+	/*! \brief whether to log the user field */
+	int loguserfield;
+
+	/*! \brief current connection to amqp */
+	struct ast_amqp_connection *amqp;
+};
+
+/*! \brief cdr_amqp configuration */
+struct cdr_amqp_conf {
+	struct cdr_amqp_global_conf *global;
+};
+
+/*! \brief Locking container for safe configuration access. */
+static AO2_GLOBAL_OBJ_STATIC(confs);
+
+static struct aco_type global_option = {
+	.type = ACO_GLOBAL,
+	.name = "global",
+	.item_offset = offsetof(struct cdr_amqp_conf, global),
+	.category = "^global$",
+	.category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *global_options[] = ACO_TYPES(&global_option);
+
+static void conf_global_dtor(void *obj)
+{
+	struct cdr_amqp_global_conf *global = obj;
+	ao2_cleanup(global->amqp);
+	ast_string_field_free_memory(global);
+}
+
+static struct cdr_amqp_global_conf *conf_global_create(void)
+{
+	RAII_VAR(struct cdr_amqp_global_conf *, global, NULL, ao2_cleanup);
+
+	global = ao2_alloc(sizeof(*global), conf_global_dtor);
+	if (!global) {
+		return NULL;
+	}
+
+	if (ast_string_field_init(global, 64) != 0) {
+		return NULL;
+	}
+
+	aco_set_defaults(&global_option, "global", global);
+
+	return ao2_bump(global);
+}
+
+/*! \brief The conf file that's processed for the module. */
+static struct aco_file conf_file = {
+	/*! The config file name. */
+	.filename = CONF_FILENAME,
+	/*! The mapping object types to be processed. */
+	.types = ACO_TYPES(&global_option),
+};
+
+static void conf_dtor(void *obj)
+{
+	struct cdr_amqp_conf *conf = obj;
+
+	ao2_cleanup(conf->global);
+}
+
+static void *conf_alloc(void)
+{
+	RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup);
+
+	conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!conf) {
+		return NULL;
+	}
+
+	conf->global = conf_global_create();
+	if (!conf->global) {
+		return NULL;
+	}
+
+	return ao2_bump(conf);
+}
+
+CONFIG_INFO_STANDARD(cfg_info, confs, conf_alloc,
+		     .files = ACO_FILES(&conf_file));
+
+/*!
+ * \brief CDR handler for AMQP.
+ *
+ * \param cdr CDR to log.
+ * \return 0 on success.
+ * \return -1 on error.
+ */
+static int amqp_cdr_log(struct ast_cdr *cdr)
+{
+	RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+	RAII_VAR(char *, str, NULL, ast_json_free);
+	RAII_VAR(struct ast_json *, disposition, NULL, ast_json_unref);
+	int res;
+	amqp_basic_properties_t props = {
+		._flags = AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG,
+		.delivery_mode = 2, /* persistent delivery mode */
+		.content_type = amqp_cstring_bytes("application/json")
+	};
+
+	conf = ao2_global_obj_ref(confs);
+	if (!conf || !conf->global) {
+		ast_log(LOG_ERROR, "Error obtaining config from cdr_amqp.conf\n");
+		return -1;
+	}
+
+	if (!conf->global->amqp) {
+		ast_log(LOG_ERROR, "No AMQP connection; discarding CDR\n");
+		return -1;
+	}
+
+	json = ast_json_pack("{"
+		/* clid, src, dst, dcontext */
+		"s: s, s: s, s: s, s: s,"
+		/* channel, dstchannel, lastapp, lastdata */
+		"s: s, s: s, s: s, s: s,"
+		/* start, answer, end, duration */
+		"s: o, s: o, s: o, s: i"
+		/* billsec, disposition, accountcode, amaflags */
+		"s: i, s: s, s: s, s: s"
+		/* peeraccount, linkedid */
+		"s: s, s: s }",
+
+		"clid", cdr->clid,
+		"src", cdr->src,
+		"dst", cdr->dst,
+		"dcontext", cdr->dcontext,
+
+		"channel", cdr->channel,
+		"dstchannel", cdr->dstchannel,
+		"lastapp", cdr->lastapp,
+		"lastdata", cdr->lastdata,
+
+		"start", ast_json_timeval(cdr->start, NULL),
+		"answer", ast_json_timeval(cdr->answer, NULL),
+		"end", ast_json_timeval(cdr->end, NULL),
+		"durationsec", cdr->duration,
+
+		"billsec", cdr->billsec,
+		"disposition", ast_cdr_disp2str(cdr->disposition),
+		"accountcode", cdr->accountcode,
+		"amaflags", ast_channel_amaflags2string(cdr->amaflags),
+
+		"peeraccount", cdr->peeraccount,
+		"linkedid", cdr->linkedid);
+	if (!json) {
+		return -1;
+	}
+
+	/* Set optional fields */
+	if (conf->global->loguniqueid) {
+		ast_json_object_set(json,
+			"uniqueid", ast_json_string_create(cdr->uniqueid));
+	}
+
+	if (conf->global->loguserfield) {
+		ast_json_object_set(json,
+			"userfield", ast_json_string_create(cdr->userfield));
+	}
+
+	/* Dump the JSON to a string for publication */
+	str = ast_json_dump_string(json);
+	if (!str) {
+		return -1;
+	}
+
+	res = ast_amqp_basic_publish(conf->global->amqp,
+		amqp_cstring_bytes(conf->global->exchange),
+		amqp_cstring_bytes(conf->global->queue),
+		0, /* mandatory; don't return unsendable messages */
+		0, /* immediate; allow messages to be queued */
+		&props,
+		amqp_cstring_bytes(str));
+
+	if (res != 0) {
+		ast_log(LOG_ERROR, "Error publishing CDR to AMQP\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+
+static int load_config(int reload)
+{
+	RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_amqp_connection *, amqp, NULL, ao2_cleanup);
+
+	switch (aco_process_config(&cfg_info, reload)) {
+	case ACO_PROCESS_ERROR:
+		return -1;
+	case ACO_PROCESS_OK:
+	case ACO_PROCESS_UNCHANGED:
+		break;
+	}
+
+	conf = ao2_global_obj_ref(confs);
+	if (!conf || !conf->global) {
+		ast_log(LOG_ERROR, "Error obtaining config from cdr_amqp.conf\n");
+		return -1;
+	}
+
+	/* Refresh the AMQP connection */
+	ao2_cleanup(conf->global->amqp);
+	conf->global->amqp = ast_amqp_get_connection(conf->global->connection);
+
+	if (!conf->global->amqp) {
+		ast_log(LOG_ERROR, "Could not get AMQP connection %s\n",
+			conf->global->connection);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int load_module(void)
+{
+	if (aco_info_init(&cfg_info) != 0) {
+		ast_log(LOG_ERROR, "Failed to initialize config");
+		aco_info_destroy(&cfg_info);
+		return -1;
+	}
+
+	aco_option_register(&cfg_info, "loguniqueid", ACO_EXACT,
+		global_options, "no", OPT_BOOL_T, 1,
+		FLDSET(struct cdr_amqp_global_conf, loguniqueid));
+	aco_option_register(&cfg_info, "loguserfield", ACO_EXACT,
+		global_options, "no", OPT_BOOL_T, 1,
+		FLDSET(struct cdr_amqp_global_conf, loguserfield));
+	aco_option_register(&cfg_info, "connection", ACO_EXACT,
+		global_options, "", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cdr_amqp_global_conf, connection));
+	aco_option_register(&cfg_info, "queue", ACO_EXACT,
+		global_options, "asterisk_cdr", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cdr_amqp_global_conf, queue));
+	aco_option_register(&cfg_info, "exchange", ACO_EXACT,
+		global_options, "", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cdr_amqp_global_conf, exchange));
+
+	if (load_config(0) != 0) {
+		ast_log(LOG_WARNING, "Configuration failed to load\n");
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
+	if (ast_cdr_register(CDR_NAME, ast_module_info->description, amqp_cdr_log) != 0) {
+		ast_log(LOG_ERROR, "Could not register CDR backend\n");
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	ast_log(LOG_NOTICE, "CDR AMQP logging enabled\n");
+	return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+	aco_info_destroy(&cfg_info);
+	ao2_global_obj_release(confs);
+	if (ast_cdr_unregister(CDR_NAME) != 0) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static int reload_module(void)
+{
+	return load_config(1);
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "AMQP CDR Backend",
+		.support_level = AST_MODULE_SUPPORT_CORE,
+		.load = load_module,
+		.unload = unload_module,
+		.reload = reload_module,
+		.load_pri = AST_MODPRI_CDR_DRIVER,
+	);

Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dlee/amqp-cdr-cel/cel/cel_amqp.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/cel/cel_amqp.c?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/cel/cel_amqp.c (added)
+++ team/dlee/amqp-cdr-cel/cel/cel_amqp.c Wed Jan 21 13:59:24 2015
@@ -1,0 +1,379 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2015, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief AMQP CEL Backend
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ */
+
+/*** MODULEINFO
+	<depend>res_amqp</depend>
+	<support_level>core</support_level>
+ ***/
+
+/*** DOCUMENTATION
+	<configInfo name="cel_amqp" language="en_US">
+		<synopsis>AMQP CEL Backend</synopsis>
+		<configFile name="cel_amqp.conf">
+			<configObject name="global">
+				<synopsis>Global configuration settings</synopsis>
+				<configOption name="loguniqueid">
+					<synopsis></synopsis>
+					<description>
+						<para></para>
+					</description>
+				</configOption>
+				<configOption name="connection">
+					<synopsis>Name of the connection from amqp.conf to use</synopsis>
+					<description>
+						<para>Specifies the name of the connection from amqp.conf to use</para>
+					</description>
+				</configOption>
+				<configOption name="queue">
+					<synopsis>Name of the queue to post to</synopsis>
+					<description>
+						<para>Defaults to asterisk_cel</para>
+					</description>
+				</configOption>
+				<configOption name="exchange">
+					<synopsis>Name of the exchange to post to</synopsis>
+					<description>
+						<para>Defaults to empty string</para>
+					</description>
+				</configOption>
+			</configObject>
+		</configFile>
+	</configInfo>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/cel.h"
+#include "asterisk/channel.h"
+#include "asterisk/config_options.h"
+#include "asterisk/json.h"
+#include "asterisk/module.h"
+#include "asterisk/amqp.h"
+#include "asterisk/stringfields.h"
+
+#define CEL_NAME "AMQP"
+#define CONF_FILENAME "cel_amqp.conf"
+
+/*! \brief global config structure */
+struct cel_amqp_global_conf {
+	AST_DECLARE_STRING_FIELDS(
+		/*! \brief connection name */
+		AST_STRING_FIELD(connection);
+		/*! \brief queue name */
+		AST_STRING_FIELD(queue);
+		/*! \brief exchange name */
+		AST_STRING_FIELD(exchange);
+	);
+
+	/*! \brief current connection to amqp */
+	struct ast_amqp_connection *amqp;
+};
+
+/*! \brief cel_amqp configuration */
+struct cel_amqp_conf {
+	struct cel_amqp_global_conf *global;
+};
+
+/*! \brief Locking container for safe configuration access. */
+static AO2_GLOBAL_OBJ_STATIC(confs);
+
+static struct aco_type global_option = {
+	.type = ACO_GLOBAL,
+	.name = "global",
+	.item_offset = offsetof(struct cel_amqp_conf, global),
+	.category = "^global$",
+	.category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *global_options[] = ACO_TYPES(&global_option);
+
+static void conf_global_dtor(void *obj)
+{
+	struct cel_amqp_global_conf *global = obj;
+	ao2_cleanup(global->amqp);
+	ast_string_field_free_memory(global);
+}
+
+static struct cel_amqp_global_conf *conf_global_create(void)
+{
+	RAII_VAR(struct cel_amqp_global_conf *, global, NULL, ao2_cleanup);
+
+	global = ao2_alloc(sizeof(*global), conf_global_dtor);
+	if (!global) {
+		return NULL;
+	}
+
+	if (ast_string_field_init(global, 64) != 0) {
+		return NULL;
+	}
+
+	aco_set_defaults(&global_option, "global", global);
+
+	return ao2_bump(global);
+}
+
+/*! \brief The conf file that's processed for the module. */
+static struct aco_file conf_file = {
+	/*! The config file name. */
+	.filename = CONF_FILENAME,
+	/*! The mapping object types to be processed. */
+	.types = ACO_TYPES(&global_option),
+};
+
+static void conf_dtor(void *obj)
+{
+	struct cel_amqp_conf *conf = obj;
+
+	ao2_cleanup(conf->global);
+}
+
+static void *conf_alloc(void)
+{
+	RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup);
+
+	conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!conf) {
+		return NULL;
+	}
+
+	conf->global = conf_global_create();
+	if (!conf->global) {
+		return NULL;
+	}
+
+	return ao2_bump(conf);
+}
+
+CONFIG_INFO_STANDARD(cfg_info, confs, conf_alloc,
+		     .files = ACO_FILES(&conf_file));
+
+/*!
+ * \brief CEL handler for AMQP.
+ *
+ * \param event CEL event.
+ */
+static void amqp_cel_log(struct ast_event *event)
+{
+	RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
+	RAII_VAR(char *, str, NULL, ast_json_free);
+	const char *name;
+	int res;
+	struct ast_cel_event_record record = {
+		.version = AST_CEL_EVENT_RECORD_VERSION,
+	};
+	amqp_basic_properties_t props = {
+		._flags = AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG,
+		.delivery_mode = 2, /* persistent delivery mode */
+		.content_type = amqp_cstring_bytes("application/json")
+	};
+
+	conf = ao2_global_obj_ref(confs);
+	if (!conf || !conf->global) {
+		ast_log(LOG_ERROR, "Error obtaining config from cel_amqp.conf\n");
+		return;
+	}
+
+	if (!conf->global->amqp) {
+		ast_log(LOG_ERROR, "No AMQP connection; discarding CEL\n");
+		return;
+	}
+
+	/* Extract the data from the CEL */
+	if (ast_cel_fill_record(event, &record) != 0) {
+		return;
+	}
+
+	/* Handle user define events */
+	name = record.event_name;
+	if (record.event_type == AST_CEL_USER_DEFINED) {
+		name = record.user_defined_name;
+	}
+
+	/* Handle the optional extra field, although re-parsing JSON
+	 * makes me sad :-( */
+	if (strlen(record.extra) == 0) {
+		extra = ast_json_null();
+	} else {
+		extra = ast_json_load_string(record.extra, NULL);
+		if (!extra) {
+			ast_log(LOG_ERROR, "Error parsing extra field\n");
+			extra = ast_json_string_create(record.extra);
+		}
+	}
+
+	json = ast_json_pack("{"
+		/* event_name, account_code */
+		"s: s, s: s,"
+		/* num, name, ani, rdnis, dnid */
+		"s: { s: s, s: s, s: s, s: s, s: s },"
+		/* extension, context, channel, application */
+		"s: s, s: s, s: s, s: s, "
+		/* app_data, event_time, amaflags, unique_id */
+		"s: s, s: o, s: s, s: s, "
+		/* linked_id, uesr_field, peer, peer_account */
+		"s: s, s: s, s: s, s: s, "
+		/* extra */
+		"s: o"
+		"}",
+		"event_name", name,
+		"account_code", record.account_code,
+
+		"caller_id",
+		"num", record.caller_id_num,
+		"name", record.caller_id_name,
+		"ani", record.caller_id_ani,
+		"rdnis", record.caller_id_rdnis,
+		"dnid", record.caller_id_dnid,
+
+		"extension", record.extension,
+		"context", record.context,
+		"channel", record.channel_name,
+		"application", record.application_name,
+
+		"app_data", record.application_data,
+		"event_time", ast_json_timeval(record.event_time, NULL),
+		"amaflags", ast_channel_amaflags2string(record.amaflag),
+		"unique_id", record.unique_id,
+
+		"linked_id", record.linked_id,
+		"user_field", record.user_field,
+		"peer", record.peer,
+		"peer_acount", record.peer_account,
+		"extra", extra);
+	if (!json) {
+		return;
+	}
+
+	/* Dump the JSON to a string for publication */
+	str = ast_json_dump_string(json);
+	if (!str) {
+		return;
+	}
+
+	res = ast_amqp_basic_publish(conf->global->amqp,
+		amqp_cstring_bytes(conf->global->exchange),
+		amqp_cstring_bytes(conf->global->queue),
+		0, /* mandatory; don't return unsendable messages */
+		0, /* immediate; allow messages to be queued */
+		&props,
+		amqp_cstring_bytes(str));
+
+	if (res != 0) {
+		ast_log(LOG_ERROR, "Error publishing CEL to AMQP\n");
+	}
+}
+
+static int load_config(int reload)
+{
+	RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_amqp_connection *, amqp, NULL, ao2_cleanup);
+
+	switch (aco_process_config(&cfg_info, reload)) {
+	case ACO_PROCESS_ERROR:
+		return -1;
+	case ACO_PROCESS_OK:
+	case ACO_PROCESS_UNCHANGED:
+		break;
+	}
+
+	conf = ao2_global_obj_ref(confs);
+	if (!conf || !conf->global) {
+		ast_log(LOG_ERROR, "Error obtaining config from cel_amqp.conf\n");
+		return -1;
+	}
+
+	/* Refresh the AMQP connection */
+	ao2_cleanup(conf->global->amqp);
+	conf->global->amqp = ast_amqp_get_connection(conf->global->connection);
+
+	if (!conf->global->amqp) {
+		ast_log(LOG_ERROR, "Could not get AMQP connection %s\n",
+			conf->global->connection);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int load_module(void)
+{
+	if (aco_info_init(&cfg_info) != 0) {
+		ast_log(LOG_ERROR, "Failed to initialize config");
+		aco_info_destroy(&cfg_info);
+		return -1;
+	}
+
+	aco_option_register(&cfg_info, "connection", ACO_EXACT,
+		global_options, "", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cel_amqp_global_conf, connection));
+	aco_option_register(&cfg_info, "queue", ACO_EXACT,
+		global_options, "asterisk_cel", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cel_amqp_global_conf, queue));
+	aco_option_register(&cfg_info, "exchange", ACO_EXACT,
+		global_options, "", OPT_STRINGFIELD_T, 0,
+		STRFLDSET(struct cel_amqp_global_conf, exchange));
+
+	if (load_config(0) != 0) {
+		ast_log(LOG_WARNING, "Configuration failed to load\n");
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
+	if (ast_cel_backend_register(CEL_NAME, amqp_cel_log) != 0) {
+		ast_log(LOG_ERROR, "Could not register CEL backend\n");
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	ast_log(LOG_NOTICE, "CEL AMQP logging enabled\n");
+	return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+	aco_info_destroy(&cfg_info);
+	ao2_global_obj_release(confs);
+	if (ast_cel_backend_unregister(CEL_NAME) != 0) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static int reload_module(void)
+{
+	return load_config(1);
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "AMQP CEL Backend",
+		.support_level = AST_MODULE_SUPPORT_CORE,
+		.load = load_module,
+		.unload = unload_module,
+		.reload = reload_module,
+		.load_pri = AST_MODPRI_CDR_DRIVER,
+	);

Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample (added)
+++ team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample Wed Jan 21 13:59:24 2015
@@ -1,0 +1,16 @@
+[general]
+;enabled = yes	; Set to no to disable
+
+;[bunny]
+;type = connection
+;url = amqp://localhost	; amqp://[$USERNAME@]$HOST[:$PORT][/$VHOST]
+      			;  username defaults to guest
+			;  port defaults to 5672
+			;  vhost defaults to /
+;password = 		; Password to use for login
+	  		;  defaults to guest
+;max_frame_bytes =	; Maximum frame size, in bytes; defaults to
+	   		; AMQP_DEFAULT_FRAME_SIZE (131072, or 128KB)
+;heartbeat_seconds = 	; number of seconds between heartbeat frames
+		   	; 0 disables hearbeats; defaults to
+			; AMQP_DEFAULT_HEARTBEAT (0)

Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample (added)
+++ team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample Wed Jan 21 13:59:24 2015
@@ -1,0 +1,10 @@
+;
+; cdr_amqp.conf
+;
+
+[global]
+;loguniqueid = no	; log uniqueid.  Default is "no"
+;loguserfield = no	; log user field.  Default is "no"
+;connection = bunny	; Connection name in amqp.conf
+;queue = asterisk_cdr	; Queue name to publish to; defaults to asterisk_cdr
+;exchange = 		; Exchange to publish to; defaults to empty string

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample (added)
+++ team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample Wed Jan 21 13:59:24 2015
@@ -1,0 +1,8 @@
+;
+; cel_amqp.conf
+;
+
+[global]
+;connection = bunny	; Connection name in amqp.conf
+;queue = asterisk_cel	; Queue name to publish to; defaults to asterisk_cel
+;exchange = 		; Exchange to publish to; defaults to empty string

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/dlee/amqp-cdr-cel/configure.ac
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configure.ac?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/configure.ac (original)
+++ team/dlee/amqp-cdr-cel/configure.ac Wed Jan 21 13:59:24 2015
@@ -485,6 +485,7 @@
 AST_EXT_LIB_SETUP_OPTIONAL([PRI_REVERSE_CHARGE], [ISDN reverse charge], [PRI], [pri])
 # ------------------------------------^
 AST_EXT_LIB_SETUP([PWLIB], [PWlib], [pwlib])
+AST_EXT_LIB_SETUP([RABBITMQ], [RabbitMQ client], [rabbitmq])
 AST_EXT_LIB_SETUP([RADIUS], [Radius Client], [radius])
 AST_EXT_LIB_SETUP([RESAMPLE], [LIBRESAMPLE], [resample])
 AST_EXT_LIB_SETUP([SDL], [Sdl], [sdl])
@@ -2195,6 +2196,8 @@
 # Some distributions (like SuSE) remove the 5.1 suffix.
 AST_EXT_LIB_CHECK([LUA], [lua], [luaL_openlib], [lua.h], [-lm])
 
+AST_EXT_LIB_CHECK([RABBITMQ], [rabbitmq], [amqp_socket_open], [amqp.h])
+
 # Accept either RADIUS client library, their APIs are fully compatible,
 # just different header filenames and different SONAMEs
 AST_EXT_LIB_CHECK([RADIUS], [freeradius-client], [rc_read_config], [freeradius-client.h])

Modified: team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq (original)
+++ team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq Wed Jan 21 13:59:24 2015
@@ -28,12 +28,12 @@
 PACKAGES_DEBIAN="$PACKAGES_DEBIAN libopenh323-dev libvpb-dev libgtk2.0-dev libmysqlclient-dev libbluetooth-dev libradiusclient-ng-dev freetds-dev"
 PACKAGES_DEBIAN="$PACKAGES_DEBIAN libsnmp-dev libiksemel-dev libcorosync-dev libnewt-dev libpopt-dev libical-dev libspandsp-dev libjack-dev"
 PACKAGES_DEBIAN="$PACKAGES_DEBIAN libresample-dev libc-client-dev binutils-dev libsrtp-dev libgsm1-dev libedit-dev doxygen libjansson-dev libldap-dev"
-PACKAGES_DEBIAN="$PACKAGES_DEBIAN subversion git libxslt1-dev"
+PACKAGES_DEBIAN="$PACKAGES_DEBIAN subversion git libxslt1-dev librabbitmq-dev"
 PACKAGES_RH="automake gcc gcc-c++ ncurses-devel openssl-devel libxml2-devel unixODBC-devel libcurl-devel libogg-devel libvorbis-devel speex-devel"
 PACKAGES_RH="$PACKAGES_RH spandsp-devel freetds-devel net-snmp-devel iksemel-devel corosynclib-devel newt-devel popt-devel libtool-ltdl-devel lua-devel"
 PACKAGES_RH="$PACKAGES_RH libsqlite3x-devel radiusclient-ng-devel portaudio-devel postgresql-devel libresample-devel neon-devel libical-devel"
 PACKAGES_RH="$PACKAGES_RH openldap-devel gmime22-devel sqlite2-devel mysql-devel bluez-libs-devel jack-audio-connection-kit-devel gsm-devel libedit-devel libuuid-devel"
-PACKAGES_RH="$PACKAGES_RH jansson-devel libsrtp-devel pjproject-devel subversion git libxslt-devel"
+PACKAGES_RH="$PACKAGES_RH jansson-devel libsrtp-devel pjproject-devel subversion git libxslt-devel librabbitmq-devel"
 
 PACKAGES_OBSD="popt gmake wget libxml libogg libvorbis curl iksemel spandsp speex iodbc freetds-0.63p1-msdblib mysql-client gmime sqlite sqlite3 jack libxslt"
 

Added: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/amqp.h?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/include/asterisk/amqp.h (added)
+++ team/dlee/amqp-cdr-cel/include/asterisk/amqp.h Wed Jan 21 13:59:24 2015
@@ -1,0 +1,85 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2015, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_AMQP_H
+#define _ASTERISK_AMQP_H
+
+/*! \file
+ * \brief AMQP client
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ * \since 13.x
+ *
+ * This file contains the Asterisk API for AMQP. Connections are configured
+ * in \c amqp.conf. You can get a connection by name, using \ref
+ * ast_amqp_get_connection().
+ *
+ * Only publish support is implemented, using \ref ast_amqp_basic_publish().
+ *
+ * Note that the AMQP protocol has a "channel" feature, which allows
+ * multiplexing multiple requests on a single TCP socket. Unfortunately, the
+ * underlying \c librabbitmq library is not thread safe, so we couldn't take
+ * advantage of this feature. Because of that, and the complications it adds
+ * to using the API, we've omitted that feature.
+ */
+
+#include <amqp.h>
+
+/*!
+ * Opaque handle for the AMQP connection.
+ */
+struct ast_amqp_connection;
+
+/*!
+ * \brief Gets the given AMQP connection.
+ *
+ * The returned connection is an AO2 managed object, which must be freed with
+ * \ref ao2_cleanup().
+ *
+ * \param name The name of the connection.
+ * \return The connection object.
+ * \return \c NULL if connection not found, or some other error.
+ */
+struct ast_amqp_connection *ast_amqp_get_connection(const char *name);
+
+/*!
+ * \brief Publishes a message to a AMQP connection.
+ *
+ * \param cxn The connection to publish to.
+ * \param exchange the exchange on the broker to publish to
+ * \param routing_key the routing key (queue) to use when publishing the message
+ * \param mandatory indicate to the broker that the message MUST be routed to a
+ *                  queue. If the broker cannot do this it should respond with
+ *                  a basic.reject method
+ * \param immediate indicate to the broker that the message MUST be delivered
+ *                  to a consumer immediately. If the broker cannot do this it
+ *                  should response with a basic.reject method
+ * \param properties Properties of the message (content-type, delivery mode, etc.)
+ * \param body The text of the message to send.
+ * \return 0 on success.
+ * \return -1 on failure.
+ */
+int ast_amqp_basic_publish(struct ast_amqp_connection *cxn,
+	amqp_bytes_t exchange,
+	amqp_bytes_t routing_key,
+	amqp_boolean_t mandatory,
+	amqp_boolean_t immediate,
+	const amqp_basic_properties_t *properties,
+	amqp_bytes_t body);
+
+#endif /* _ASTERISK_AMQP_H */

Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in (original)
+++ team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in Wed Jan 21 13:59:24 2015
@@ -717,6 +717,9 @@
 
 /* Define if your system has the PWLib libraries. */
 #undef HAVE_PWLIB
+
+/* Define to 1 if you have the RabbitMQ client library. */
+#undef HAVE_RABBITMQ
 
 /* Define to 1 if you have the Radius Client library. */
 #undef HAVE_RADIUS
@@ -1299,6 +1302,11 @@
 /* Define to 1 if running on Darwin. */
 #undef _DARWIN_UNLIMITED_SELECT
 
+/* Enable large inode numbers on Mac OS X 10.5.  */
+#ifndef _DARWIN_USE_64_BIT_INODE
+# define _DARWIN_USE_64_BIT_INODE 1
+#endif
+
 /* Number of bits in a file offset, on hosts where this is settable. */
 #undef _FILE_OFFSET_BITS
 

Modified: team/dlee/amqp-cdr-cel/include/asterisk/cel.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/cel.h?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/include/asterisk/cel.h (original)
+++ team/dlee/amqp-cdr-cel/include/asterisk/cel.h Wed Jan 21 13:59:24 2015
@@ -34,6 +34,8 @@
 #endif
 
 #include "asterisk/event.h"
+#include "asterisk/json.h"
+#include "asterisk/stringfields.h"
 
 /*!
  * \brief CEL event types

Modified: team/dlee/amqp-cdr-cel/makeopts.in
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/makeopts.in?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/makeopts.in (original)
+++ team/dlee/amqp-cdr-cel/makeopts.in Wed Jan 21 13:59:24 2015
@@ -237,6 +237,9 @@
 PRI_INCLUDE=@PRI_INCLUDE@
 PRI_LIB=@PRI_LIB@
 
+RABBITMQ_INCLUDE=@RABBITMQ_INCLUDE@
+RABBITMQ_LIB=@RABBITMQ_LIB@
+
 RESAMPLE_INCLUDE=@RESAMPLE_INCLUDE@
 RESAMPLE_LIB=@RESAMPLE_LIB@
 

Modified: team/dlee/amqp-cdr-cel/res/Makefile
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/res/Makefile?view=diff&rev=430900&r1=430899&r2=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/res/Makefile (original)
+++ team/dlee/amqp-cdr-cel/res/Makefile Wed Jan 21 13:59:24 2015
@@ -76,6 +76,7 @@
 	rm -f snmp/*.[oi] ael/*.[oi] ais/*.[oi] ari/*.[oi]
 	rm -f res_pjsip/*.[oi] stasis/*.[oi]
 	rm -f parking/*.o parking/*.i stasis_recording/*.[oi]
+	rm -f amqp/*.[oi]
 
 $(if $(filter res_parking,$(EMBEDDED_MODS)),modules.link,res_parking.so): $(subst .c,.o,$(wildcard parking/*.c))
 $(subst .c,.o,$(wildcard parking/*.c)): _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_parking)
@@ -89,5 +90,8 @@
 res_stasis_recording.so: stasis_recording/stored.o
 stasis_recording/stored.o:  _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_stasis_recording)
 
+res_amqp.so: amqp/cli.o amqp/config.o
+amqp/cli.o amqp/config.o: _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_amqp)
+
 # Dependencies for res_ari_*.so are generated, so they're in this file
 include ari.make

Added: team/dlee/amqp-cdr-cel/res/amqp/cli.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/res/amqp/cli.c?view=auto&rev=430900
==============================================================================
--- team/dlee/amqp-cdr-cel/res/amqp/cli.c (added)
+++ team/dlee/amqp-cdr-cel/res/amqp/cli.c Wed Jan 21 13:59:24 2015
@@ -1,0 +1,243 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2015, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Command line for AMQP.
+ * \author David M. Lee, II <dlee at digium.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/cli.h"
+#include "asterisk/amqp.h"
+#include "internal.h"
+
+#define CLI_NAME_WIDTH 15
+#define CLI_URL_WIDTH 25
+

[... 1149 lines stripped ...]



More information about the svn-commits mailing list