[svn-commits] mjordan: trunk r405314 - in /trunk: ./ apps/ funcs/ include/asterisk/ main/

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sun Jan 12 16:13:14 CST 2014


Author: mjordan
Date: Sun Jan 12 16:13:12 2014
New Revision: 405314

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=405314
Log:
CDRs: Synchronize dialplan applications that manipulate CDRs with the engine

In https://reviewboard.asterisk.org/r/3057/, applications and functions that
manipulate CDRs were made to interact over Stasis. This was done to
synchronize manipulations of CDRs from the dialplan with the updates the
engine itself receives over the message bus.

This change rested on a faulty premise: that messages published to the CDR
topic or to a topic that forwards to the CDR topic are synchronized with the
messages handled by the CDR topic subscription in the CDR engine. This is not
the case. There is no ordering guaranteed for two messages published to the
same topic; ordering is only guaranteed if a message is published to the same
subscriber.

Stasis was modified in r405311 to allow a publisher to synchronize on the
subscriber. This patch uses that API to synchronize the CDR publishers with
the CDR engine message router, which maintains the overall topic subscription.

(closes issue ASTERISK-22884)
Reported by: Matt Jordan

Review: https://reviewboard.asterisk.org/r/3099/
........

Merged revisions 405312 from http://svn.asterisk.org/svn/asterisk/branches/12

Modified:
    trunk/   (props changed)
    trunk/apps/app_cdr.c
    trunk/apps/app_forkcdr.c
    trunk/funcs/func_cdr.c
    trunk/include/asterisk/cdr.h
    trunk/main/cdr.c

Propchange: trunk/
------------------------------------------------------------------------------
Binary property 'branch-12-merged' - no diff available.

Modified: trunk/apps/app_cdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_cdr.c?view=diff&rev=405314&r1=405313&r2=405314
==============================================================================
--- trunk/apps/app_cdr.c (original)
+++ trunk/apps/app_cdr.c Sun Jan 12 16:13:12 2014
@@ -37,6 +37,7 @@
 #include "asterisk/module.h"
 #include "asterisk/app.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 
 /*** DOCUMENTATION
 	<application name="NoCDR" language="en_US">
@@ -167,7 +168,13 @@
 static int publish_app_cdr_message(struct ast_channel *chan, struct app_cdr_message_payload *payload)
 {
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+	if (!router) {
+		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+			ast_channel_name(chan));
+		return -1;
+	}
 
 	message = stasis_message_create(appcdr_message_type(), payload);
 	if (!message) {
@@ -175,17 +182,8 @@
 			payload->channel_name);
 		return -1;
 	}
-
-	subscription = stasis_subscribe(ast_channel_topic(chan), appcdr_callback, NULL);
-	if (!subscription) {
-		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
-			payload->channel_name);
-		return -1;
-	}
-
-	stasis_publish(ast_channel_topic(chan), message);
-
-	subscription = stasis_unsubscribe_and_join(subscription);
+	stasis_message_router_publish_sync(router, message);
+
 	return 0;
 }
 
@@ -236,6 +234,11 @@
 
 static int unload_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+	if (router) {
+		stasis_message_router_remove(router, appcdr_message_type());
+	}
 	STASIS_MESSAGE_TYPE_CLEANUP(appcdr_message_type);
 	ast_unregister_application(nocdr_app);
 	ast_unregister_application(resetcdr_app);
@@ -244,11 +247,18 @@
 
 static int load_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
 	int res = 0;
+
+	if (!router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
 
 	res |= STASIS_MESSAGE_TYPE_INIT(appcdr_message_type);
 	res |= ast_register_application_xml(nocdr_app, nocdr_exec);
 	res |= ast_register_application_xml(resetcdr_app, resetcdr_exec);
+	res |= stasis_message_router_add(router, appcdr_message_type(),
+	                                 appcdr_callback, NULL);
 
 	if (res) {
 		return AST_MODULE_LOAD_FAILURE;

Modified: trunk/apps/app_forkcdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_forkcdr.c?view=diff&rev=405314&r1=405313&r2=405314
==============================================================================
--- trunk/apps/app_forkcdr.c (original)
+++ trunk/apps/app_forkcdr.c Sun Jan 12 16:13:12 2014
@@ -41,6 +41,7 @@
 #include "asterisk/app.h"
 #include "asterisk/module.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 
 /*** DOCUMENTATION
 	<application name="ForkCDR" language="en_US">
@@ -136,7 +137,7 @@
 {
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
 	RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
 
 	char *parse;
 	struct ast_flags flags = { 0, };
@@ -153,6 +154,12 @@
 	}
 
 	if (!payload) {
+		return -1;
+	}
+
+	if (!router) {
+		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+			ast_channel_name(chan));
 		return -1;
 	}
 
@@ -164,36 +171,41 @@
 			ast_channel_name(chan));
 		return -1;
 	}
-
-	subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL);
-	if (!subscription) {
-		ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n",
-			payload->channel_name);
-		return -1;
-	}
-
-	stasis_publish(ast_channel_topic(chan), message);
-
-	subscription = stasis_unsubscribe_and_join(subscription);
+	stasis_message_router_publish_sync(router, message);
 
 	return 0;
 }
 
 static int unload_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+	if (router) {
+		stasis_message_router_remove(router, forkcdr_message_type());
+	}
 	STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
-
-	return ast_unregister_application(app);
+	ast_unregister_application(app);
+	return 0;
 }
 
 static int load_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
 	int res = 0;
+
+	if (!router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
 
 	res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type);
 	res |= ast_register_application_xml(app, forkcdr_exec);
-
-	return res;
+	res |= stasis_message_router_add(router, forkcdr_message_type(),
+	                                 forkcdr_callback, NULL);
+
+	if (res) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+	return AST_MODULE_LOAD_SUCCESS;
 }
 
 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities");

Modified: trunk/funcs/func_cdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/funcs/func_cdr.c?view=diff&rev=405314&r1=405313&r2=405314
==============================================================================
--- trunk/funcs/func_cdr.c (original)
+++ trunk/funcs/func_cdr.c Sun Jan 12 16:13:12 2014
@@ -40,6 +40,7 @@
 #include "asterisk/app.h"
 #include "asterisk/cdr.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 
 /*** DOCUMENTATION
 	<function name="CDR" language="en_US">
@@ -207,6 +208,7 @@
 	const char *cmd;
 	const char *arguments;
 	const char *value;
+	void *data;
 };
 
 struct cdr_func_data {
@@ -220,8 +222,8 @@
 
 static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
-	struct cdr_func_data *output = data;
 	struct cdr_func_payload *payload = stasis_message_data(message);
+	struct cdr_func_data *output;
 	char *info;
 	char *value = NULL;
 	struct ast_flags flags = { 0 };
@@ -235,9 +237,9 @@
 		return;
 	}
 
-	if (!payload || !output) {
-		return;
-	}
+	ast_assert(payload != NULL);
+	output = payload->data;
+	ast_assert(output != NULL);
 
 	if (ast_strlen_zero(payload->arguments)) {
 		ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
@@ -441,6 +443,7 @@
 	payload->chan = chan;
 	payload->cmd = cmd;
 	payload->arguments = parse;
+	payload->data = &output;
 
 	buf[0] = '\0';/* Ensure the buffer is initialized. */
 	output.buf = buf;
@@ -460,18 +463,14 @@
 	if (ast_strlen_zero(ast_channel_name(chan))) {
 		cdr_read_callback(NULL, NULL, message);
 	} else {
-		RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
-
-		subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
-		if (!subscription) {
-			ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+		RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+		if (!router) {
+			ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
 				ast_channel_name(chan));
 			return -1;
 		}
-
-		stasis_publish(ast_channel_topic(chan), message);
-
-		subscription = stasis_unsubscribe_and_join(subscription);
+		stasis_message_router_publish_sync(router, message);
 	}
 
 	return 0;
@@ -482,8 +481,15 @@
 {
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
 	RAII_VAR(struct cdr_func_payload *, payload,
-		ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+		     ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, router,
+		     ast_cdr_message_router(), ao2_cleanup);
+
+	if (!router) {
+		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+			ast_channel_name(chan));
+		return -1;
+	}
 
 	if (!payload) {
 		return -1;
@@ -499,17 +505,7 @@
 			ast_channel_name(chan));
 		return -1;
 	}
-
-	subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
-	if (!subscription) {
-		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
-			ast_channel_name(chan));
-		return -1;
-	}
-
-	stasis_publish(ast_channel_topic(chan), message);
-
-	subscription = stasis_unsubscribe_and_join(subscription);
+	stasis_message_router_publish_sync(router, message);
 
 	return 0;
 }
@@ -520,7 +516,13 @@
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
 	RAII_VAR(struct cdr_func_payload *, payload,
 		ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+	if (!router) {
+		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+			ast_channel_name(chan));
+		return -1;
+	}
 
 	if (!payload) {
 		return -1;
@@ -536,17 +538,7 @@
 			ast_channel_name(chan));
 		return -1;
 	}
-
-	subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
-	if (!subscription) {
-		ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
-			ast_channel_name(chan));
-		return -1;
-	}
-
-	stasis_publish(ast_channel_topic(chan), message);
-
-	subscription = stasis_unsubscribe_and_join(subscription);
+	stasis_message_router_publish_sync(router, message);
 
 	return 0;
 }
@@ -565,8 +557,14 @@
 
 static int unload_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
 	int res = 0;
 
+	if (router) {
+		stasis_message_router_remove(router, cdr_prop_write_message_type());
+		stasis_message_router_remove(router, cdr_write_message_type());
+		stasis_message_router_remove(router, cdr_read_message_type());
+	}
 	STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
@@ -578,15 +576,29 @@
 
 static int load_module(void)
 {
+	RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
 	int res = 0;
+
+	if (!router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
 
 	res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
 	res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
 	res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
 	res |= ast_custom_function_register(&cdr_function);
 	res |= ast_custom_function_register(&cdr_prop_function);
-
-	return res;
+	res |= stasis_message_router_add(router, cdr_prop_write_message_type(),
+	                                 cdr_prop_write_callback, NULL);
+	res |= stasis_message_router_add(router, cdr_write_message_type(),
+	                                 cdr_write_callback, NULL);
+	res |= stasis_message_router_add(router, cdr_read_message_type(),
+	                                 cdr_read_callback, NULL);
+
+	if (res) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+	return AST_MODULE_LOAD_SUCCESS;
 }
 
 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions");

Modified: trunk/include/asterisk/cdr.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/cdr.h?view=diff&rev=405314&r1=405313&r2=405314
==============================================================================
--- trunk/include/asterisk/cdr.h (original)
+++ trunk/include/asterisk/cdr.h Sun Jan 12 16:13:12 2014
@@ -467,6 +467,20 @@
  */
 struct ast_cdr *ast_cdr_alloc(void);
 
+struct stasis_message_router;
+
+/*!
+ * \brief Return the message router for the CDR engine
+ *
+ * This returns the \ref stasis_message_router that the CDR engine
+ * uses for dispatching \ref stasis messages. The reference on the
+ * message router is bumped and must be released by the caller of
+ * this function.
+ *
+ * \retval NULL if the CDR engine is disabled or unavailable
+ * \retval the \ref stasis_message_router otherwise
+ */
+struct stasis_message_router *ast_cdr_message_router(void);
 
 /*!
  * \brief Duplicate a public CDR

Modified: trunk/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cdr.c?view=diff&rev=405314&r1=405313&r2=405314
==============================================================================
--- trunk/main/cdr.c (original)
+++ trunk/main/cdr.c Sun Jan 12 16:13:12 2014
@@ -3915,17 +3915,21 @@
 	ast_cdr_engine_term();
 }
 
-/*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+	if (!stasis_router) {
+		return NULL;
+	}
+
+	ao2_bump(stasis_router);
+	return stasis_router;
+}
+
+/*!
+ * \brief Destroy the active Stasis subscriptions
  */
 static void destroy_subscriptions(void)
 {
-	stasis_message_router_unsubscribe_and_join(stasis_router);
-	stasis_router = NULL;
-
-	ao2_cleanup(cdr_topic);
-	cdr_topic = NULL;
-
 	channel_subscription = stasis_forward_cancel(channel_subscription);
 	bridge_subscription = stasis_forward_cancel(bridge_subscription);
 	parking_subscription = stasis_forward_cancel(parking_subscription);
@@ -3936,14 +3940,12 @@
  */
 static int create_subscriptions(void)
 {
-	/* Use the CDR topic to determine if we've already created this */
-	if (cdr_topic) {
-		return 0;
-	}
-
-	cdr_topic = stasis_topic_create("cdr_engine");
 	if (!cdr_topic) {
 		return -1;
+	}
+
+	if (channel_subscription || bridge_subscription || parking_subscription) {
+		return 0;
 	}
 
 	channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
@@ -3958,16 +3960,6 @@
 	if (!parking_subscription) {
 		return -1;
 	}
-
-	stasis_router = stasis_message_router_create(cdr_topic);
-	if (!stasis_router) {
-		return -1;
-	}
-	stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
-	stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
-	stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
-	stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
-	stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
 
 	return 0;
 }
@@ -4019,6 +4011,12 @@
 
 static void cdr_engine_shutdown(void)
 {
+	stasis_message_router_unsubscribe_and_join(stasis_router);
+	stasis_router = NULL;
+
+	ao2_cleanup(cdr_topic);
+	cdr_topic = NULL;
+
 	ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
 		NULL);
 	finalize_batch_mode();
@@ -4113,6 +4111,21 @@
 		return -1;
 	}
 
+	cdr_topic = stasis_topic_create("cdr_engine");
+	if (!cdr_topic) {
+		return -1;
+	}
+
+	stasis_router = stasis_message_router_create(cdr_topic);
+	if (!stasis_router) {
+		return -1;
+	}
+	stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+	stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+	stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+	stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+	stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+
 	active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
 		cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
 	if (!active_cdrs_by_channel) {




More information about the svn-commits mailing list