[asterisk-commits] mmichelson: branch mmichelson/queue_bugbug r396943 - /team/mmichelson/queue_b...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Aug 19 18:15:38 CDT 2013


Author: mmichelson
Date: Mon Aug 19 18:15:37 2013
New Revision: 396943

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396943
Log:
Address review findings.


Modified:
    team/mmichelson/queue_bugbug/apps/app_queue.c

Modified: team/mmichelson/queue_bugbug/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/queue_bugbug/apps/app_queue.c?view=diff&rev=396943&r1=396942&r2=396943
==============================================================================
--- team/mmichelson/queue_bugbug/apps/app_queue.c (original)
+++ team/mmichelson/queue_bugbug/apps/app_queue.c Mon Aug 19 18:15:37 2013
@@ -1735,7 +1735,9 @@
 
 static inline struct call_queue *_queue_unref(struct call_queue *q, const char *tag, const char *file, int line, const char *filename)
 {
-	__ao2_ref_debug(q, -1, tag, file, line, filename);
+	if (q) {
+		__ao2_ref_debug(q, -1, tag, file, line, filename);
+	}
 	return NULL;
 }
 
@@ -1754,7 +1756,9 @@
 
 static inline struct call_queue *queue_unref(struct call_queue *q)
 {
-	ao2_ref(q, -1);
+	if (q) {
+		ao2_ref(q, -1);
+	}
 	return NULL;
 }
 #endif
@@ -5125,6 +5129,9 @@
 	const char *reason = NULL;	/* silence dumb compilers */
 	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
 
+	ast_assert(peer != NULL);
+	ast_assert(caller != NULL);
+
 	switch (rsn) {
 	case CALLER:
 		reason = "caller";
@@ -5192,10 +5199,8 @@
  * apply the data here into the queue_stasis_data.
  */
 struct local_optimization {
-	AST_DECLARE_STRING_FIELDS(
-		/*! The uniqueid of the channel that will be taking the place of the caller or member */
-		AST_STRING_FIELD(source_chan_uniqueid);
-	);
+	/*! The uniqueid of the channel that will be taking the place of the caller or member */
+	const char *source_chan_uniqueid;
 	/*! Indication of whether we think there is a local channel optimization in progress */
 	int in_progress;
 	/*! The identifier for this local channel optimization */
@@ -5240,10 +5245,10 @@
 	int callcompletedinsl;
 	/*! Indicates if the stasis subscriptions are shutting down */
 	int dying;
-	/*! The stasis subscription for bridge events */
-	struct stasis_subscription *bridge_sub;
-	/*! The stasis subscription for channel events */
-	struct stasis_subscription *channel_sub;
+	/*! The stasis message router for bridge events */
+	struct stasis_message_router *bridge_router;
+	/*! The stasis message router for channel events */
+	struct stasis_message_router *channel_router;
 	/*! Local channel optimization details for the caller */
 	struct local_optimization caller_optimize;
 	/*! Local channel optimization details for the member */
@@ -5258,10 +5263,12 @@
 {
 	struct queue_stasis_data *queue_data = obj;
 
+	/* This can only happen if refcounts for this object have got severely messed up */
+	ast_assert(queue_data->bridge_router == NULL);
+	ast_assert(queue_data->channel_router == NULL);
+
 	ao2_cleanup(queue_data->member);
-	ao2_cleanup(queue_data->queue);
-	ast_string_field_free_memory(&queue_data->caller_optimize);
-	ast_string_field_free_memory(&queue_data->member_optimize);
+	queue_unref(queue_data->queue);
 	ast_string_field_free_memory(queue_data);
 }
 
@@ -5274,8 +5281,10 @@
 	SCOPED_AO2LOCK(lock, queue_data);
 
 	queue_data->dying = 1;
-	queue_data->bridge_sub = stasis_unsubscribe(queue_data->bridge_sub);
-	queue_data->channel_sub = stasis_unsubscribe(queue_data->channel_sub);
+	stasis_message_router_unsubscribe(queue_data->bridge_router);
+	queue_data->bridge_router = NULL;
+	stasis_message_router_unsubscribe(queue_data->channel_router);
+	queue_data->channel_router = NULL;
 }
 
 /*!
@@ -5293,8 +5302,7 @@
 		return NULL;
 	}
 
-	if (ast_string_field_init(queue_data, 64) || ast_string_field_init(&queue_data->member_optimize, 64) ||
-			ast_string_field_init(&queue_data->caller_optimize, 64)) {
+	if (ast_string_field_init(queue_data, 64)) {
 		ao2_cleanup(queue_data);
 		return NULL;
 	}
@@ -5334,13 +5342,13 @@
 
 	switch (atxfer_msg->dest_type) {
 	case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
-		ast_str_append(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
+		ast_str_set(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
 		break;
 	case AST_ATTENDED_TRANSFER_DEST_APP:
-		ast_str_append(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
+		ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
 		break;
 	case AST_ATTENDED_TRANSFER_DEST_LINK:
-		ast_str_append(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
+		ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
 				atxfer_msg->dest.links[1]->name);
 		break;
 	case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
@@ -5363,13 +5371,20 @@
  * We track this particular event in order to learn what bridge
  * was created for the queue call.
  *
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
  * \param msg The stasis message for the bridge enter event
  */
-static void handle_bridge_enter(struct queue_stasis_data *queue_data,
-		struct stasis_message *msg)
-{
+static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
+
+	if (queue_data->dying) {
+		return;
+	}
 
 	if (!ast_strlen_zero(queue_data->bridge_uniqueid)) {
 		return;
@@ -5389,17 +5404,34 @@
  * This event is important in order to be able to log the end of the
  * call to the queue log and to stasis.
  *
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
  * \param msg The stasis message for the blind transfer event
  */
-static void handle_blind_transfer(struct queue_stasis_data *queue_data,
-		struct stasis_message *msg)
-{
+static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
+	struct ast_json *result_blob;
+	struct ast_json *exten_blob;
+	struct ast_json *context_blob;
+	const char *exten;
+	const char *context;
 	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
 
-	if (ast_json_integer_get(ast_json_object_get(blind_blob->blob, "result")) == AST_BRIDGE_TRANSFER_FAIL) {
+	if (queue_data->dying) {
+		return;
+	}
+
+	result_blob = ast_json_object_get(blind_blob->blob, "result");
+	if (!result_blob) {
+		return;
+	}
+
+	if (ast_json_integer_get(result_blob) == AST_BRIDGE_TRANSFER_FAIL) {
 		return;
 	}
 
@@ -5416,11 +5448,15 @@
 
 	ao2_unlock(queue_data);
 
+	exten_blob = ast_json_object_get(blind_blob->blob, "exten");
+	exten = exten_blob ? ast_json_string_get(exten_blob) : "<unknown>";
+	context_blob = ast_json_object_get(blind_blob->blob, "context");
+	context = context_blob ? ast_json_string_get(context_blob) : "<unknown>";
+
 	ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
 	ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
 			"BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
-			ast_json_string_get(ast_json_object_get(blind_blob->blob, "exten")),
-			ast_json_string_get(ast_json_object_get(blind_blob->blob, "context")),
+			exten, context,
 			(long) queue_data->starttime - queue_data->holdstart,
 			(long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
 
@@ -5437,15 +5473,22 @@
  * This event is important in order to be able to log the end of the
  * call to the queue log and to stasis.
  *
- * \param queue_data Data pertaining to the particular call in the queue.
- * \param msg The stasis message for the attended transfer event
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the attended transfer event.
  */
-static void handle_attended_transfer(struct queue_stasis_data *queue_data,
-		struct stasis_message *msg)
-{
+static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
 	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+	if (queue_data->dying) {
+		return;
+	}
 
 	if (atxfer_msg->result == AST_BRIDGE_TRANSFER_FAIL ||
 			atxfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_THREEWAY) {
@@ -5493,22 +5536,8 @@
 static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
 		struct stasis_topic *topic, struct stasis_message *msg)
 {
-	struct queue_stasis_data *queue_data = userdata;
-
 	if (stasis_subscription_final_message(sub, msg)) {
-		ao2_cleanup(queue_data);
-	}
-
-	if (queue_data->dying) {
-		return;
-	}
-
-	if (ast_channel_entered_bridge_type() == stasis_message_type(msg)) {
-		handle_bridge_enter(queue_data, msg);
-	} else if (ast_blind_transfer_type() == stasis_message_type(msg)) {
-		handle_blind_transfer(queue_data, msg);
-	} else if (ast_attended_transfer_type() == stasis_message_type(msg)) {
-		handle_attended_transfer(queue_data, msg);
+		ao2_cleanup(userdata);
 	}
 }
 
@@ -5519,18 +5548,26 @@
  * This method gathers data relevant to the local channel optimization and stores
  * it to be used once the local optimization completes.
  *
- * \param queue_data Data pertaining to the particular call in the queue
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
  * \param msg The stasis message for the local optimization begin event
  */
-static void handle_local_optimization_begin(struct queue_stasis_data *queue_data,
-		struct stasis_message *msg)
-{
+static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
 	struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
 	struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
 	struct ast_channel_snapshot *source = ast_multi_channel_blob_get_channel(optimization_blob, "source");
 	struct local_optimization *optimization;
 	unsigned int id;
+	SCOPED_AO2LOCK(lock, queue_data);
+
+	if (queue_data->dying) {
+		return;
+	}
 
 	if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
 		optimization = &queue_data->member_optimize;
@@ -5543,7 +5580,11 @@
 	/* We only allow move-swap optimizations, so there had BETTER be a source */
 	ast_assert(source != NULL);
 
-	ast_string_field_set(optimization, source_chan_uniqueid, source->uniqueid);
+	optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
+	if (!optimization->source_chan_uniqueid) {
+		ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
+		return;
+	}
 	id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
 
 	optimization->id = id;
@@ -5559,17 +5600,26 @@
  * updating the caller or member unique ID with the channel that is taking the place of
  * the previous caller or member.
  *
- * \param queue_data Data pertaining to the particular call in the queue
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
  * \param msg The stasis message for the local optimization end event
  */
-static void handle_local_optimization_end(struct queue_stasis_data *queue_data, struct stasis_message *msg)
-{
+static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
 	struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
 	struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
 	struct local_optimization *optimization;
 	int is_caller;
 	unsigned int id;
+	SCOPED_AO2LOCK(lock, queue_data);
+
+	if (queue_data->dying) {
+		return;
+	}
 
 	if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
 		optimization = &queue_data->member_optimize;
@@ -5615,16 +5665,24 @@
  * has ended. An appropriate queue log and stasis message are raised in this
  * callback.
  *
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
  * \param msg The stasis message for the hangup event.
  */
-static void handle_hangup(struct queue_stasis_data *queue_data, struct stasis_message *msg)
-{
+static void handle_hangup(void *userdata, struct stasis_subscription *sub,
+		struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct queue_stasis_data *queue_data = userdata;
 	struct ast_channel_blob *channel_blob = stasis_message_data(msg);
 	RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
 	enum agent_complete_reason reason;
+
+	if (queue_data->dying) {
+		return;
+	}
 
 	ao2_lock(queue_data);
 
@@ -5676,30 +5734,8 @@
 static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
 		struct stasis_topic *topic, struct stasis_message *msg)
 {
-	struct queue_stasis_data *queue_data = userdata;
-
 	if (stasis_subscription_final_message(sub, msg)) {
-		ao2_cleanup(queue_data);
-	}
-
-	if (queue_data->dying) {
-		return;
-	}
-
-	if (ast_local_optimization_begin_type() == stasis_message_type(msg)) {
-		SCOPED_AO2LOCK(lock, queue_data);
-		handle_local_optimization_begin(queue_data, msg);
-		return;
-	}
-
-	if (ast_local_optimization_end_type() == stasis_message_type(msg)) {
-		SCOPED_AO2LOCK(lock, queue_data);
-		handle_local_optimization_end(queue_data, msg);
-		return;
-	}
-
-	if (ast_channel_hangup_request_type() == stasis_message_type(msg)) {
-		handle_hangup(queue_data, msg);
+		ao2_cleanup(userdata);
 	}
 }
 
@@ -5729,9 +5765,41 @@
 		return -1;
 	}
 
-	queue_data->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), queue_bridge_cb, queue_data);
+	queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+	if (!queue_data->bridge_router) {
+		ao2_ref(queue_data, -1);
+		return -1;
+	}
+
+	stasis_message_router_add(queue_data->bridge_router, ast_channel_entered_bridge_type(),
+			handle_bridge_enter, queue_data);
+	stasis_message_router_add(queue_data->bridge_router, ast_blind_transfer_type(),
+			handle_blind_transfer, queue_data);
+	stasis_message_router_add(queue_data->bridge_router, ast_attended_transfer_type(),
+			handle_attended_transfer, queue_data);
+	stasis_message_router_set_default(queue_data->bridge_router,
+			queue_bridge_cb, queue_data);
+
+	queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+	if (!queue_data->channel_router) {
+		/* Unsubscribing from the bridge router will remove the only ref of queue_data,
+		 * thus beginning the destruction process
+		 */
+		stasis_message_router_unsubscribe(queue_data->bridge_router);
+		queue_data->bridge_router = NULL;
+		return -1;
+	}
+
 	ao2_ref(queue_data, +1);
-	queue_data->channel_sub = stasis_subscribe(ast_channel_topic_all(), queue_channel_cb, queue_data);
+	stasis_message_router_add(queue_data->channel_router, ast_local_optimization_begin_type(),
+			handle_local_optimization_begin, queue_data);
+	stasis_message_router_add(queue_data->channel_router, ast_local_optimization_end_type(),
+			handle_local_optimization_end, queue_data);
+	stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
+			handle_hangup, queue_data);
+	stasis_message_router_set_default(queue_data->channel_router,
+			queue_channel_cb, queue_data);
+
 	return 0;
 }
 




More information about the asterisk-commits mailing list