[asterisk-commits] mmichelson: trunk r397451 - in /trunk: ./ apps/ include/asterisk/ main/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Aug 22 13:53:01 CDT 2013


Author: mmichelson
Date: Thu Aug 22 13:52:41 2013
New Revision: 397451

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=397451
Log:
Massively clean up app_queue.

This essentially makes app_queue usable again. From reviewboard:

* Reporting of transfers and call completion is done by creating stasis 
  subscriptions and listening for specific events in order to determine
  when the call is finished (either via a transfer or hangup).
* Dial end messages have been added where they were previously missing.
* Queue stats are properly being updated again once calls have finished.
* AgentComplete stasis messages and AMI events are now occurring again.
* Mixmonitor starting has been factored into its own function and uses the
  Mixmonitor API now instead of using ast_pbx_run()

In addition to the changes in app_queue, there are several supplementary changes as well:

* Queue logging now differentiates between attended and blind transfers. A
  note about this is in the CHANGES file.
* Local channel optimization events now report more information. This
  includes which of the two local channels involved is the destination of
  the optimization, the channel that is replacing the destination local channel,
  and an identifier so that begin and end events can be matched to each other.
  The end events are now sent whether the optimization was successful or not and
  includes an indicator of whether the optimization was successful.
* Changes were made to features and bridging_basic so that additional flags may
  be set on a bridge. This is necessary because the queue requires that its
  bridge only allows move-swap local channel optimizations into the bridge.

(closes issue ASTERISK-21517)
Reported by Matt Jordan

(closes issue ASTERISK-21943)
Reported by Matt Jordan

Review: https://reviewboard.asterisk.org/r/2694


Modified:
    trunk/CHANGES
    trunk/apps/app_queue.c
    trunk/include/asterisk/app.h
    trunk/include/asterisk/bridge_basic.h
    trunk/include/asterisk/core_unreal.h
    trunk/include/asterisk/features.h
    trunk/main/app.c
    trunk/main/bridge.c
    trunk/main/bridge_basic.c
    trunk/main/core_local.c
    trunk/main/features.c

Modified: trunk/CHANGES
URL: http://svnview.digium.com/svn/asterisk/trunk/CHANGES?view=diff&rev=397451&r1=397450&r2=397451
==============================================================================
--- trunk/CHANGES (original)
+++ trunk/CHANGES Thu Aug 22 13:52:41 2013
@@ -97,6 +97,14 @@
    removed.  As a result, the AMI events QueueMemberStatus, AgentCalled,
    AgentConnect, AgentComplete, AgentDump, and AgentRingNoAnswer will always be
    sent.  The "Variable" fields will also no longer exist on the Agent* events.
+
+ * The queue log now differentiates between blind and attended transfers. A
+   blind transfer will result in a BLINDTRANSFER message with the destination
+   context and extension. An attended transfer will result in an
+   ATTENDEDTRANSFER message. This message will indicate the method by which
+   the attended transfer was completed: "BRIDGE" for a bridge merge, "APP"
+   for running an application on a bridge or channel, or "LINK" for linking
+   two bridges together with local channels.
 
  * Queues now support a hint for member paused state. The hint uses the form
    'Queue:{queue_name}_pause_{member_name}', where {queue_name} and {member_name}

Modified: trunk/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_queue.c?view=diff&rev=397451&r1=397450&r2=397451
==============================================================================
--- trunk/apps/app_queue.c (original)
+++ trunk/apps/app_queue.c Thu Aug 22 13:52:41 2013
@@ -108,6 +108,11 @@
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/bridge_after.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/core_local.h"
+#include "asterisk/mixmonitor.h"
+#include "asterisk/core_unreal.h"
+#include "asterisk/bridge_basic.h"
 
 /* Define, to debug reference counts on queues, without debugging reference counts on queue members */
 /* #define REF_DEBUG_ONLY_QUEUES */
@@ -1584,10 +1589,6 @@
 static struct member *interface_exists(struct call_queue *q, const char *interface);
 static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused);
 
-#if 0	// BUGBUG
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
-#endif	// BUGBUG
-
 static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
 /*! \brief sets the QUEUESTATUS channel variable */
 static void set_queue_result(struct ast_channel *chan, enum queue_result res)
@@ -1734,7 +1735,9 @@
 
 static inline struct call_queue *_queue_unref(struct call_queue *q, const char *tag, const char *file, int line, const char *filename)
 {
-	__ao2_ref_debug(q, -1, tag, file, line, filename);
+	if (q) {
+		__ao2_ref_debug(q, -1, tag, file, line, filename);
+	}
 	return NULL;
 }
 
@@ -1753,7 +1756,9 @@
 
 static inline struct call_queue *queue_unref(struct call_queue *q)
 {
-	ao2_ref(q, -1);
+	if (q) {
+		ao2_ref(q, -1);
+	}
 	return NULL;
 }
 #endif
@@ -1906,25 +1911,19 @@
 		"%s", ast_str_buffer(event_string));
 }
 
-static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent, struct stasis_message_type *type, struct ast_json *blob)
+static void queue_publish_multi_channel_snapshot_blob(struct stasis_topic *topic,
+		struct ast_channel_snapshot *caller_snapshot,
+		struct ast_channel_snapshot *agent_snapshot,
+		struct stasis_message_type *type, struct ast_json *blob)
 {
 	RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-	struct ast_channel_snapshot *caller_snapshot;
-	struct ast_channel_snapshot *agent_snapshot;
 
 	payload = ast_multi_channel_blob_create(blob);
 	if (!payload) {
 		return;
 	}
 
-	caller_snapshot = ast_channel_snapshot_create(caller);
-	agent_snapshot = ast_channel_snapshot_create(agent);
-
-	if (!caller_snapshot || !agent_snapshot) {
-		return;
-	}
-
 	ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
 	ast_multi_channel_blob_add_channel(payload, "agent", agent_snapshot);
 
@@ -1933,7 +1932,24 @@
 		return;
 	}
 
-	stasis_publish(ast_channel_topic(caller), msg);
+	stasis_publish(topic, msg);
+}
+
+static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent,
+		struct stasis_message_type *type, struct ast_json *blob)
+{
+	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel_snapshot *, agent_snapshot, NULL, ao2_cleanup);
+
+	caller_snapshot = ast_channel_snapshot_create(caller);
+	agent_snapshot = ast_channel_snapshot_create(agent);
+
+	if (!caller_snapshot || !agent_snapshot) {
+		return;
+	}
+
+	queue_publish_multi_channel_snapshot_blob(ast_channel_topic(caller), caller_snapshot,
+			agent_snapshot, type, blob);
 }
 
 static void queue_publish_member_blob(struct stasis_message_type *type, struct ast_json *blob)
@@ -3927,7 +3943,7 @@
 
 		member_call_pending_clear(tmp->member);
 
-		/* BUGBUG: Raise a BUSY dial end message here */
+		publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
 		tmp->stillgoing = 0;
 		++*busies;
 		return 0;
@@ -4350,14 +4366,8 @@
 		if (pos == 1 /* not found */) {
 			if (numlines == (numbusies + numnochan)) {
 				ast_debug(1, "Everyone is busy at this time\n");
-				/* BUGBUG: We shouldn't have to set anything here, as each
-				 * individual dial attempt should have set that CDR to busy
-				 */
 			} else {
 				ast_debug(3, "No one is answering queue '%s' (%d numlines / %d busies / %d failed channels)\n", queue, numlines, numbusies, numnochan);
-				/* BUGBUG: We shouldn't have to set anything here, as each
-				 * individual dial attempt should have set that CDR to busy
-				 */
 			}
 			*to = 0;
 			return NULL;
@@ -4983,7 +4993,6 @@
 	return res;
 }
 
-#if 0	// BUGBUG
 /*!
  * \brief update the queue status
  * \retval Always 0
@@ -5028,7 +5037,6 @@
 	ao2_unlock(q);
 	return 0;
 }
-#endif	// BUGBUG
 
 /*! \brief Calculate the metric of each member in the outgoing callattempts
  *
@@ -5117,14 +5125,16 @@
 	TRANSFER
 };
 
-#if 0	// BUGBUG
 /*! \brief Send out AMI message with member call completion status information */
-static void send_agent_complete(const struct queue_ent *qe, const char *queuename,
-	const struct ast_channel *peer, const struct member *member, time_t callstart,
-	char *vars, size_t vars_len, enum agent_complete_reason rsn)
+static void send_agent_complete(const char *queuename, struct ast_channel_snapshot *caller,
+	struct ast_channel_snapshot *peer, const struct member *member, time_t holdstart,
+	time_t callstart, enum agent_complete_reason rsn)
 {
 	const char *reason = NULL;	/* silence dumb compilers */
 	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+
+	ast_assert(peer != NULL);
+	ast_assert(caller != NULL);
 
 	switch (rsn) {
 	case CALLER:
@@ -5142,121 +5152,656 @@
 			     "Queue", queuename,
 			     "Interface", member->interface,
 			     "MemberName", member->membername,
-			     "HoldTime", (long)(callstart - qe->start)
-			     "TalkTime", (long)(time(NULL) - callstart)
+			     "HoldTime", (long)(callstart - holdstart),
+			     "TalkTime", (long)(time(NULL) - callstart),
 			     "Reason", reason);
-	queue_publish_multi_channel_blob(qe->chan, peer, queue_agent_complete_type(), blob);
-}
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-struct queue_transfer_ds {
-	struct queue_ent *qe;
+
+	queue_publish_multi_channel_snapshot_blob(ast_queue_topic(queuename), caller, peer,
+			queue_agent_complete_type(), blob);
+}
+
+static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct ast_channel_blob *agent_blob;
+
+	agent_blob = stasis_message_data(msg);
+
+	if (ast_channel_agent_login_type() == stasis_message_type(msg)) {
+		ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+			ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+			"AGENTLOGIN", "%s", agent_blob->snapshot->name);
+	} else if (ast_channel_agent_logoff_type() == stasis_message_type(msg)) {
+		ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+			ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+			"AGENTLOGOFF", "%s|%ld", agent_blob->snapshot->name,
+			(long) ast_json_integer_get(ast_json_object_get(agent_blob->blob, "logintime")));
+	}
+}
+
+/*!
+ * \brief Structure representing relevant data during a local channel optimization
+ *
+ * The reason we care about local channel optimizations is that we want to be able
+ * to accurately report when the caller and queue member have stopped talking to
+ * each other. A local channel optimization can cause it to appear that the conversation
+ * has stopped immediately after it has begun. By tracking that the relevant channels
+ * to monitor have changed due to a local channel optimization, we can give accurate
+ * reports.
+ *
+ * Local channel optimizations for queues are restricted from their normal operation.
+ * Bridges created by queues can only be the destination of local channel optimizations,
+ * not the source. In addition, move-swap local channel optimizations are the only
+ * permitted types of local channel optimization.
+ *
+ * This data is populated when we are told that a local channel optimization begin
+ * is occurring. When we get told the optimization has ended successfully, we then
+ * apply the data here into the queue_stasis_data.
+ */
+struct local_optimization {
+	/*! The uniqueid of the channel that will be taking the place of the caller or member */
+	const char *source_chan_uniqueid;
+	/*! Indication of whether we think there is a local channel optimization in progress */
+	int in_progress;
+	/*! The identifier for this local channel optimization */
+	unsigned int id;
+};
+
+/*!
+ * \brief User data for stasis subscriptions used for queue calls.
+ *
+ * app_queue subscribes to channel and bridge events for all bridged calls.
+ * app_queue cares about the following events:
+ *
+ * \li bridge enter: To determine the unique ID of the bridge created for the call.
+ * \li blind transfer: To send an appropriate agent complete event.
+ * \li attended transfer: To send an appropriate agent complete event.
+ * \li local optimization: To update caller and member unique IDs for the call.
+ * \li hangup: To send an appropriate agent complete event.
+ *
+ * The stasis subscriptions last until we determine that the caller and the member
+ * are no longer bridged with each other.
+ */
+struct queue_stasis_data {
+	AST_DECLARE_STRING_FIELDS(
+		/*! The unique ID of the caller's channel. */
+		AST_STRING_FIELD(caller_uniqueid);
+		/*! The unique ID of the queue member's channel */
+		AST_STRING_FIELD(member_uniqueid);
+		/*! The unique ID of the bridge created by the queue */
+		AST_STRING_FIELD(bridge_uniqueid);
+	);
+	/*! The relevant queue */
+	struct call_queue *queue;
+	/*! The queue member that has answered the call */
 	struct member *member;
+	/*! The time at which the caller entered the queue. Start of the caller's hold time */
+	time_t holdstart;
+	/*! The time at which the member answered the call. */
 	time_t starttime;
+	/*! The original position of the caller when he entered the queue */
+	int caller_pos;
+	/*! Indication if the call was answered within the configured service level of the queue */
 	int callcompletedinsl;
+	/*! Indicates if the stasis subscriptions are shutting down */
+	int dying;
+	/*! The stasis message router for bridge events */
+	struct stasis_message_router *bridge_router;
+	/*! The stasis message router for channel events */
+	struct stasis_message_router *channel_router;
+	/*! Local channel optimization details for the caller */
+	struct local_optimization caller_optimize;
+	/*! Local channel optimization details for the member */
+	struct local_optimization member_optimize;
 };
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-static void queue_transfer_destroy(void *data)
-{
-	struct queue_transfer_ds *qtds = data;
-	ast_free(qtds);
-}
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-/*! \brief a datastore used to help correctly log attended transfers of queue callers
+
+/*!
+ * \internal
+ * \brief Free memory for a queue_stasis_data
  */
-static const struct ast_datastore_info queue_transfer_info = {
-	.type = "queue_transfer",
-	.chan_fixup = queue_transfer_fixup,
-	.destroy = queue_transfer_destroy,
-};
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-/*! \brief Log an attended transfer when a queue caller channel is masqueraded
+static void queue_stasis_data_destructor(void *obj)
+{
+	struct queue_stasis_data *queue_data = obj;
+
+	/* This can only happen if refcounts for this object have got severely messed up */
+	ast_assert(queue_data->bridge_router == NULL);
+	ast_assert(queue_data->channel_router == NULL);
+
+	ao2_cleanup(queue_data->member);
+	queue_unref(queue_data->queue);
+	ast_string_field_free_memory(queue_data);
+}
+
+/*!
+ * \internal
+ * \brief End all stasis subscriptions on a queue_stasis_data
+ */
+static void remove_stasis_subscriptions(struct queue_stasis_data *queue_data)
+{
+	SCOPED_AO2LOCK(lock, queue_data);
+
+	queue_data->dying = 1;
+	stasis_message_router_unsubscribe(queue_data->bridge_router);
+	queue_data->bridge_router = NULL;
+	stasis_message_router_unsubscribe(queue_data->channel_router);
+	queue_data->channel_router = NULL;
+}
+
+/*!
+ * \internal
+ * \brief Allocate a queue_stasis_data and initialize its data.
+ */
+static struct queue_stasis_data *queue_stasis_data_alloc(struct queue_ent *qe,
+		struct ast_channel *peer, struct member *mem, time_t holdstart,
+		time_t starttime, int callcompletedinsl)
+{
+	struct queue_stasis_data *queue_data;
+
+	queue_data = ao2_alloc(sizeof(*queue_data), queue_stasis_data_destructor);
+	if (!queue_data) {
+		return NULL;
+	}
+
+	if (ast_string_field_init(queue_data, 64)) {
+		ao2_cleanup(queue_data);
+		return NULL;
+	}
+
+	ast_string_field_set(queue_data, caller_uniqueid, ast_channel_uniqueid(qe->chan));
+	ast_string_field_set(queue_data, member_uniqueid, ast_channel_uniqueid(peer));
+	queue_data->queue = queue_ref(qe->parent);
+	queue_data->starttime = starttime;
+	queue_data->holdstart = holdstart;
+	queue_data->callcompletedinsl = callcompletedinsl;
+	queue_data->caller_pos = qe->opos;
+	ao2_ref(mem, +1);
+	queue_data->member = mem;
+	return queue_data;
+}
+
+/*!
+ * \internal
+ * \brief Log an attended transfer in the queue log.
  *
- * When a caller is masqueraded, we want to log a transfer. Fixup time is the closest we can come to when
- * the actual transfer occurs. This happens during the masquerade after datastores are moved from old_chan
- * to new_chan. This is why new_chan is referenced for exten, context, and datastore information.
+ * Attended transfer queue log messages vary based on the method by which the
+ * attended transfer was completed.
  *
- * At the end of this, we want to remove the datastore so that this fixup function is not called on any
- * future masquerades of the caller during the current call.
+ * \param queue_data Data pertaining to the particular call in the queue.
+ * \param caller The channel snapshot for the caller channel in the queue.
+ * \param atxfer_msg The stasis attended transfer message data.
  */
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
-{
-	struct queue_transfer_ds *qtds = data;
-	struct queue_ent *qe = qtds->qe;
-	struct member *member = qtds->member;
-	time_t callstart = qtds->starttime;
-	int callcompletedinsl = qtds->callcompletedinsl;
-	struct ast_datastore *datastore;
-
-	ast_queue_log(qe->parent->name, ast_channel_uniqueid(qe->chan), member->membername, "TRANSFER", "%s|%s|%ld|%ld|%d",
-				ast_channel_exten(new_chan), ast_channel_context(new_chan), (long) (callstart - qe->start),
-				(long) (time(NULL) - callstart), qe->opos);
-
-	update_queue(qe->parent, member, callcompletedinsl, (time(NULL) - callstart));
-
-	/* No need to lock the channels because they are already locked in ast_do_masquerade */
-	if ((datastore = ast_channel_datastore_find(old_chan, &queue_transfer_info, NULL))) {
-		ast_channel_datastore_remove(old_chan, datastore);
+static void log_attended_transfer(struct queue_stasis_data *queue_data, struct ast_channel_snapshot *caller,
+		struct ast_attended_transfer_message *atxfer_msg)
+{
+	RAII_VAR(struct ast_str *, transfer_str, ast_str_create(32), ast_free);
+
+	if (!transfer_str) {
+		ast_log(LOG_WARNING, "Unable to log attended transfer to queue log\n");
+		return;
+	}
+
+	switch (atxfer_msg->dest_type) {
+	case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
+		ast_str_set(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
+		break;
+	case AST_ATTENDED_TRANSFER_DEST_APP:
+		ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
+		break;
+	case AST_ATTENDED_TRANSFER_DEST_LINK:
+		ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
+				atxfer_msg->dest.links[1]->name);
+		break;
+	case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
+	case AST_ATTENDED_TRANSFER_DEST_FAIL:
+		/* Threeways are headed off and should not be logged here */
+		ast_assert(0);
+		return;
+	}
+
+	ast_queue_log(queue_data->queue->name, caller->uniqueid, queue_data->member->membername, "ATTENDEDTRANSFER", "%s|%ld|%ld|%d",
+			ast_str_buffer(transfer_str),
+			(long) queue_data->starttime - queue_data->holdstart,
+			(long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+}
+
+/*!
+ * \internal
+ * \brief Handle a stasis bridge enter event.
+ *
+ * We track this particular event in order to learn what bridge
+ * was created for the queue call.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the bridge enter event
+ */
+static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	if (!ast_strlen_zero(queue_data->bridge_uniqueid)) {
+		return;
+	}
+
+	if (!strcmp(enter_blob->channel->uniqueid, queue_data->caller_uniqueid)) {
+		ast_string_field_set(queue_data, bridge_uniqueid,
+				enter_blob->bridge->uniqueid);
+		ast_debug(3, "Detected entry of caller channel %s into bridge %s\n",
+				enter_blob->channel->name, queue_data->bridge_uniqueid);
+	}
+}
+
+/*!
+ * \brief Handle a blind transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the blind transfer event
+ */
+static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
+	struct ast_json *result_blob;
+	struct ast_json *exten_blob;
+	struct ast_json *context_blob;
+	const char *exten;
+	const char *context;
+	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	result_blob = ast_json_object_get(blind_blob->blob, "result");
+	if (!result_blob) {
+		return;
+	}
+
+	if (ast_json_integer_get(result_blob) == AST_BRIDGE_TRANSFER_FAIL) {
+		return;
+	}
+
+	ao2_lock(queue_data);
+
+	if (ast_strlen_zero(queue_data->bridge_uniqueid) ||
+			strcmp(queue_data->bridge_uniqueid, blind_blob->bridge->uniqueid)) {
+		ao2_unlock(queue_data);
+		return;
+	}
+
+	caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+	member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+	ao2_unlock(queue_data);
+
+	exten_blob = ast_json_object_get(blind_blob->blob, "exten");
+	exten = exten_blob ? ast_json_string_get(exten_blob) : "<unknown>";
+	context_blob = ast_json_object_get(blind_blob->blob, "context");
+	context = context_blob ? ast_json_string_get(context_blob) : "<unknown>";
+
+	ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
+	ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+			"BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
+			exten, context,
+			(long) queue_data->starttime - queue_data->holdstart,
+			(long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+
+	send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+			queue_data->holdstart, queue_data->starttime, TRANSFER);
+	update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+			time(NULL) - queue_data->starttime);
+	remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \brief Handle an attended transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the attended transfer event.
+ */
+static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
+	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	if (atxfer_msg->result == AST_BRIDGE_TRANSFER_FAIL ||
+			atxfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_THREEWAY) {
+		return;
+	}
+
+	ao2_lock(queue_data);
+
+	if (ast_strlen_zero(queue_data->bridge_uniqueid)) {
+		ao2_unlock(queue_data);
+		return;
+	}
+
+	if ((!atxfer_msg->to_transferee.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+					atxfer_msg->to_transferee.bridge_snapshot->uniqueid)) &&
+			 (!atxfer_msg->to_transfer_target.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+				 atxfer_msg->to_transfer_target.bridge_snapshot->uniqueid))) {
+		ao2_unlock(queue_data);
+		return;
+	}
+
+	caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+	member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+	ao2_unlock(queue_data);
+
+	ast_debug(3, "Detected attended transfer in queue %s\n", queue_data->queue->name);
+
+	log_attended_transfer(queue_data, caller_snapshot, atxfer_msg);
+
+	send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+			queue_data->holdstart, queue_data->starttime, TRANSFER);
+	update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+			time(NULL) - queue_data->starttime);
+	remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \internal
+ * \brief Callback for all stasis bridge events
+ *
+ * Based on the event and what bridge it is on, the task is farmed out to relevant
+ * subroutines for further processing.
+ */
+static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	if (stasis_subscription_final_message(sub, msg)) {
+		ao2_cleanup(userdata);
+	}
+}
+
+/*!
+ * \internal
+ * \brief Handler for the beginning of a local channel optimization
+ *
+ * This method gathers data relevant to the local channel optimization and stores
+ * it to be used once the local optimization completes.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization begin event
+ */
+static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+	struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+	struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+	struct ast_channel_snapshot *source = ast_multi_channel_blob_get_channel(optimization_blob, "source");
+	struct local_optimization *optimization;
+	unsigned int id;
+	SCOPED_AO2LOCK(lock, queue_data);
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+		optimization = &queue_data->member_optimize;
+	} else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+		optimization = &queue_data->caller_optimize;
 	} else {
-		ast_log(LOG_WARNING, "Can't find the queue_transfer datastore.\n");
-	}
-}
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-/*! \brief mechanism to tell if a queue caller was atxferred by a queue member.
+		return;
+	}
+
+	/* We only allow move-swap optimizations, so there had BETTER be a source */
+	ast_assert(source != NULL);
+
+	optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
+	if (!optimization->source_chan_uniqueid) {
+		ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
+		return;
+	}
+	id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+	optimization->id = id;
+	optimization->in_progress = 1;
+}
+
+/*!
+ * \internal
+ * \brief Handler for the end of a local channel optimization
  *
- * When a caller is atxferred, then the queue_transfer_info datastore
- * is removed from the channel. If it's still there after the bridge is
- * broken, then the caller was not atxferred.
+ * This method takes the data gathered during the local channel optimization begin
+ * event and applies it to the queue stasis data appropriately. This generally involves
+ * updating the caller or member unique ID with the channel that is taking the place of
+ * the previous caller or member.
  *
- * \note Only call this with chan locked
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization end event
  */
-static int attended_transfer_occurred(struct ast_channel *chan)
-{
-	return ast_channel_datastore_find(chan, &queue_transfer_info, NULL) ? 0 : 1;
-}
-#endif	// BUGBUG
-
-#if 0	// BUGBUG
-/*! \brief create a datastore for storing relevant info to log attended transfers in the queue_log
+static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+	struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+	struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+	struct local_optimization *optimization;
+	int is_caller;
+	unsigned int id;
+	SCOPED_AO2LOCK(lock, queue_data);
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+		optimization = &queue_data->member_optimize;
+		is_caller = 0;
+	} else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+		optimization = &queue_data->caller_optimize;
+		is_caller = 1;
+	} else {
+		return;
+	}
+
+	id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+	if (!optimization->in_progress) {
+		ast_log(LOG_WARNING, "Told of a local optimization end when we had no previous begin\n");
+		return;
+	}
+
+	if (id != optimization->id) {
+		ast_log(LOG_WARNING, "Local optimization end event ID does not match begin (%u != %u)\n",
+				id, optimization->id);
+		return;
+	}
+
+	if (is_caller) {
+		ast_debug(3, "Local optimization: Changing queue caller uniqueid from %s to %s\n",
+				queue_data->caller_uniqueid, optimization->source_chan_uniqueid);
+		ast_string_field_set(queue_data, caller_uniqueid, optimization->source_chan_uniqueid);
+	} else {
+		ast_debug(3, "Local optimization: Changing queue member uniqueid from %s to %s\n",
+				queue_data->member_uniqueid, optimization->source_chan_uniqueid);
+		ast_string_field_set(queue_data, member_uniqueid, optimization->source_chan_uniqueid);
+	}
+
+	optimization->in_progress = 0;
+}
+
+/*!
+ * \internal
+ * \brief Handler for hangup stasis event
+ *
+ * This is how we determine that the caller or member has hung up and the call
+ * has ended. An appropriate queue log and stasis message are raised in this
+ * callback.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the hangup event.
  */
-static struct ast_datastore *setup_transfer_datastore(struct queue_ent *qe, struct member *member, time_t starttime, int callcompletedinsl)
-{
-	struct ast_datastore *ds;
-	struct queue_transfer_ds *qtds = ast_calloc(1, sizeof(*qtds));
-
-	if (!qtds) {
-		ast_log(LOG_WARNING, "Memory allocation error!\n");
-		return NULL;
-	}
-
-	ast_channel_lock(qe->chan);
-	if (!(ds = ast_datastore_alloc(&queue_transfer_info, NULL))) {
-		ast_channel_unlock(qe->chan);
-		ast_free(qtds);
-		ast_log(LOG_WARNING, "Unable to create transfer datastore. queue_log will not show attended transfer\n");
-		return NULL;
-	}
-
-	qtds->qe = qe;
-	/* This member is refcounted in try_calling, so no need to add it here, too */
-	qtds->member = member;
-	qtds->starttime = starttime;
-	qtds->callcompletedinsl = callcompletedinsl;
-	ds->data = qtds;
-	ast_channel_datastore_add(qe->chan, ds);
-	ast_channel_unlock(qe->chan);
-	return ds;
-}
-#endif	// BUGBUG
+static void handle_hangup(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
+	struct ast_channel_blob *channel_blob = stasis_message_data(msg);
+	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
+	enum agent_complete_reason reason;
+
+	if (queue_data->dying) {
+		return;
+	}
+
+	ao2_lock(queue_data);
+
+	if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->caller_uniqueid)) {
+		reason = CALLER;
+	} else if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->member_uniqueid)) {
+		reason = AGENT;
+	} else {
+		ao2_unlock(queue_data);
+		return;
+	}
+
+	chan = ast_channel_get_by_name(channel_blob->snapshot->name);
+	if (chan && ast_channel_has_role(chan, AST_TRANSFERER_ROLE_NAME)) {
+		/* Channel that is hanging up is doing it as part of a transfer.
+		 * We'll get a transfer event later
+		 */
+		ao2_unlock(queue_data);
+		return;
+	}
+
+	caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+	member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+	ao2_unlock(queue_data);
+
+	ast_debug(3, "Detected hangup of queue %s channel %s\n", reason == CALLER ? "caller" : "member",
+			channel_blob->snapshot->name);
+
+	ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+			reason == CALLER ? "COMPLETECALLER" : "COMPLETEAGENT", "%ld|%ld|%d",
+		(long) (queue_data->starttime - queue_data->holdstart),
+		(long) (time(NULL) - queue_data->starttime), queue_data->caller_pos);
+
+	send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+			queue_data->holdstart, queue_data->starttime, reason);
+	update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+			time(NULL) - queue_data->starttime);
+	remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \internal
+ * \brief Callback for all stasis channel events
+ *
+ * Based on the event and the channels involved, the work is farmed out into
+ * subroutines for further processing.
+ */
+static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	if (stasis_subscription_final_message(sub, msg)) {
+		ao2_cleanup(userdata);
+	}
+}
+
+/*!
+ * \internal
+ * \brief Create stasis subscriptions for a particular call in the queue.
+ *
+ * These subscriptions are created once the call has been answered. The subscriptions
+ * are put in place so that call progress may be tracked. Once the call can be determined
+ * to have ended, then messages are logged to the queue log and stasis events are emitted.
+ *
+ * \param qe The queue entry representing the caller
+ * \param peer The channel that has answered the call
+ * \param mem The queue member that answered the call
+ * \param holdstart The time at which the caller entered the queue
+ * \param starttime The time at which the call was answered
+ * \param callcompletedinsl Indicates if the call was answered within the configured service level of the queue.
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, struct member *mem,
+		time_t holdstart, time_t starttime, int callcompletedinsl)
+{
+	struct queue_stasis_data *queue_data = queue_stasis_data_alloc(qe, peer, mem, holdstart, starttime, callcompletedinsl);
+
+	if (!queue_data) {
+		return -1;
+	}
+
+	queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+	if (!queue_data->bridge_router) {
+		ao2_ref(queue_data, -1);
+		return -1;
+	}
+
+	stasis_message_router_add(queue_data->bridge_router, ast_channel_entered_bridge_type(),
+			handle_bridge_enter, queue_data);
+	stasis_message_router_add(queue_data->bridge_router, ast_blind_transfer_type(),
+			handle_blind_transfer, queue_data);
+	stasis_message_router_add(queue_data->bridge_router, ast_attended_transfer_type(),
+			handle_attended_transfer, queue_data);
+	stasis_message_router_set_default(queue_data->bridge_router,
+			queue_bridge_cb, queue_data);
+
+	queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+	if (!queue_data->channel_router) {
+		/* Unsubscribing from the bridge router will remove the only ref of queue_data,
+		 * thus beginning the destruction process
+		 */
+		stasis_message_router_unsubscribe(queue_data->bridge_router);
+		queue_data->bridge_router = NULL;
+		return -1;
+	}
+
+	ao2_ref(queue_data, +1);
+	stasis_message_router_add(queue_data->channel_router, ast_local_optimization_begin_type(),
+			handle_local_optimization_begin, queue_data);
+	stasis_message_router_add(queue_data->channel_router, ast_local_optimization_end_type(),
+			handle_local_optimization_end, queue_data);
+	stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
+			handle_hangup, queue_data);
+	stasis_message_router_set_default(queue_data->channel_router,
+			queue_channel_cb, queue_data);
+
+	return 0;
+}
 
 struct queue_end_bridge {
 	struct call_queue *q;
@@ -5309,6 +5854,82 @@
 		ast_channel_unlock(chan);
 		ast_bridge_set_after_go_on(peer, context, extension, priority,
 			opt_args[OPT_ARG_CALLEE_GO_ON]);
+	}
+}
+
+static void escape_and_substitute(struct ast_channel *chan, const char *input,
+		char *output, size_t size)
+{
+	const char *m = input;
+	char escaped[size];
+	char *p;
+
+	for (p = escaped; p < escaped + size - 1; p++, m++) {
+		switch (*m) {
+		case '^':
+			if (*(m + 1) == '{') {
+				*p = '$';
+			}
+			break;
+		case ',':
+			*p++ = '\\';
+			/* Fall through */
+		default:
+			*p = *m;
+		}
+		if (*m == '\0')
+			break;
+	}
+
+	if (p == escaped + size) {
+		escaped[size - 1] = '\0';
+	}
+
+	pbx_substitute_variables_helper(chan, escaped, output, size - 1);
+}
+
+static void setup_mixmonitor(struct queue_ent *qe, const char *filename)
+{
+	char escaped_filename[256];
+	char file_with_ext[256];
+	char mixmonargs[1512];
+	char escaped_monitor_exec[1024];
+	const char *monitor_options;
+	const char *monitor_exec;
+
+	if (filename) {
+		escape_and_substitute(qe->chan, filename, escaped_filename, sizeof(escaped_filename));
+	} else {
+		ast_copy_string(escaped_filename, ast_channel_uniqueid(qe->chan), sizeof(escaped_filename));
+	}
+
+	ast_channel_lock(qe->chan);
+	if ((monitor_exec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC"))) {
+		monitor_exec = ast_strdupa(monitor_exec);
+	}
+	if ((monitor_options = pbx_builtin_getvar_helper(qe->chan, "MONITOR_OPTIONS"))) {
+		monitor_options = ast_strdupa(monitor_options);
+	} else {
+		monitor_options = "";
+	}
+	ast_channel_unlock(qe->chan);
+
+	if (monitor_exec) {
+		escape_and_substitute(qe->chan, monitor_exec, escaped_monitor_exec, sizeof(escaped_monitor_exec));
+	}
+

[... 996 lines stripped ...]



More information about the asterisk-commits mailing list