[asterisk-commits] mmichelson: branch mmichelson/queue_bugbug r396943 - /team/mmichelson/queue_b...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Aug 19 18:15:38 CDT 2013
Author: mmichelson
Date: Mon Aug 19 18:15:37 2013
New Revision: 396943
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396943
Log:
Address review findings.
Modified:
team/mmichelson/queue_bugbug/apps/app_queue.c
Modified: team/mmichelson/queue_bugbug/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/queue_bugbug/apps/app_queue.c?view=diff&rev=396943&r1=396942&r2=396943
==============================================================================
--- team/mmichelson/queue_bugbug/apps/app_queue.c (original)
+++ team/mmichelson/queue_bugbug/apps/app_queue.c Mon Aug 19 18:15:37 2013
@@ -1735,7 +1735,9 @@
static inline struct call_queue *_queue_unref(struct call_queue *q, const char *tag, const char *file, int line, const char *filename)
{
- __ao2_ref_debug(q, -1, tag, file, line, filename);
+ if (q) {
+ __ao2_ref_debug(q, -1, tag, file, line, filename);
+ }
return NULL;
}
@@ -1754,7 +1756,9 @@
static inline struct call_queue *queue_unref(struct call_queue *q)
{
- ao2_ref(q, -1);
+ if (q) {
+ ao2_ref(q, -1);
+ }
return NULL;
}
#endif
@@ -5125,6 +5129,9 @@
const char *reason = NULL; /* silence dumb compilers */
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+ ast_assert(peer != NULL);
+ ast_assert(caller != NULL);
+
switch (rsn) {
case CALLER:
reason = "caller";
@@ -5192,10 +5199,8 @@
* apply the data here into the queue_stasis_data.
*/
struct local_optimization {
- AST_DECLARE_STRING_FIELDS(
- /*! The uniqueid of the channel that will be taking the place of the caller or member */
- AST_STRING_FIELD(source_chan_uniqueid);
- );
+ /*! The uniqueid of the channel that will be taking the place of the caller or member */
+ const char *source_chan_uniqueid;
/*! Indication of whether we think there is a local channel optimization in progress */
int in_progress;
/*! The identifier for this local channel optimization */
@@ -5240,10 +5245,10 @@
int callcompletedinsl;
/*! Indicates if the stasis subscriptions are shutting down */
int dying;
- /*! The stasis subscription for bridge events */
- struct stasis_subscription *bridge_sub;
- /*! The stasis subscription for channel events */
- struct stasis_subscription *channel_sub;
+ /*! The stasis message router for bridge events */
+ struct stasis_message_router *bridge_router;
+ /*! The stasis message router for channel events */
+ struct stasis_message_router *channel_router;
/*! Local channel optimization details for the caller */
struct local_optimization caller_optimize;
/*! Local channel optimization details for the member */
@@ -5258,10 +5263,12 @@
{
struct queue_stasis_data *queue_data = obj;
+ /* This can only happen if refcounts for this object have got severely messed up */
+ ast_assert(queue_data->bridge_router == NULL);
+ ast_assert(queue_data->channel_router == NULL);
+
ao2_cleanup(queue_data->member);
- ao2_cleanup(queue_data->queue);
- ast_string_field_free_memory(&queue_data->caller_optimize);
- ast_string_field_free_memory(&queue_data->member_optimize);
+ queue_unref(queue_data->queue);
ast_string_field_free_memory(queue_data);
}
@@ -5274,8 +5281,10 @@
SCOPED_AO2LOCK(lock, queue_data);
queue_data->dying = 1;
- queue_data->bridge_sub = stasis_unsubscribe(queue_data->bridge_sub);
- queue_data->channel_sub = stasis_unsubscribe(queue_data->channel_sub);
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ stasis_message_router_unsubscribe(queue_data->channel_router);
+ queue_data->channel_router = NULL;
}
/*!
@@ -5293,8 +5302,7 @@
return NULL;
}
- if (ast_string_field_init(queue_data, 64) || ast_string_field_init(&queue_data->member_optimize, 64) ||
- ast_string_field_init(&queue_data->caller_optimize, 64)) {
+ if (ast_string_field_init(queue_data, 64)) {
ao2_cleanup(queue_data);
return NULL;
}
@@ -5334,13 +5342,13 @@
switch (atxfer_msg->dest_type) {
case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
- ast_str_append(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
+ ast_str_set(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
break;
case AST_ATTENDED_TRANSFER_DEST_APP:
- ast_str_append(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
+ ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
break;
case AST_ATTENDED_TRANSFER_DEST_LINK:
- ast_str_append(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
+ ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
atxfer_msg->dest.links[1]->name);
break;
case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
@@ -5363,13 +5371,20 @@
* We track this particular event in order to learn what bridge
* was created for the queue call.
*
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
* \param msg The stasis message for the bridge enter event
*/
-static void handle_bridge_enter(struct queue_stasis_data *queue_data,
- struct stasis_message *msg)
-{
+static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
+
+ if (queue_data->dying) {
+ return;
+ }
if (!ast_strlen_zero(queue_data->bridge_uniqueid)) {
return;
@@ -5389,17 +5404,34 @@
* This event is important in order to be able to log the end of the
* call to the queue log and to stasis.
*
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
* \param msg The stasis message for the blind transfer event
*/
-static void handle_blind_transfer(struct queue_stasis_data *queue_data,
- struct stasis_message *msg)
-{
+static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
+ struct ast_json *result_blob;
+ struct ast_json *exten_blob;
+ struct ast_json *context_blob;
+ const char *exten;
+ const char *context;
RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
- if (ast_json_integer_get(ast_json_object_get(blind_blob->blob, "result")) == AST_BRIDGE_TRANSFER_FAIL) {
+ if (queue_data->dying) {
+ return;
+ }
+
+ result_blob = ast_json_object_get(blind_blob->blob, "result");
+ if (!result_blob) {
+ return;
+ }
+
+ if (ast_json_integer_get(result_blob) == AST_BRIDGE_TRANSFER_FAIL) {
return;
}
@@ -5416,11 +5448,15 @@
ao2_unlock(queue_data);
+ exten_blob = ast_json_object_get(blind_blob->blob, "exten");
+ exten = exten_blob ? ast_json_string_get(exten_blob) : "<unknown>";
+ context_blob = ast_json_object_get(blind_blob->blob, "context");
+ context = context_blob ? ast_json_string_get(context_blob) : "<unknown>";
+
ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
"BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
- ast_json_string_get(ast_json_object_get(blind_blob->blob, "exten")),
- ast_json_string_get(ast_json_object_get(blind_blob->blob, "context")),
+ exten, context,
(long) queue_data->starttime - queue_data->holdstart,
(long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
@@ -5437,15 +5473,22 @@
* This event is important in order to be able to log the end of the
* call to the queue log and to stasis.
*
- * \param queue_data Data pertaining to the particular call in the queue.
- * \param msg The stasis message for the attended transfer event
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the attended transfer event.
*/
-static void handle_attended_transfer(struct queue_stasis_data *queue_data,
- struct stasis_message *msg)
-{
+static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+ if (queue_data->dying) {
+ return;
+ }
if (atxfer_msg->result == AST_BRIDGE_TRANSFER_FAIL ||
atxfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_THREEWAY) {
@@ -5493,22 +5536,8 @@
static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
{
- struct queue_stasis_data *queue_data = userdata;
-
if (stasis_subscription_final_message(sub, msg)) {
- ao2_cleanup(queue_data);
- }
-
- if (queue_data->dying) {
- return;
- }
-
- if (ast_channel_entered_bridge_type() == stasis_message_type(msg)) {
- handle_bridge_enter(queue_data, msg);
- } else if (ast_blind_transfer_type() == stasis_message_type(msg)) {
- handle_blind_transfer(queue_data, msg);
- } else if (ast_attended_transfer_type() == stasis_message_type(msg)) {
- handle_attended_transfer(queue_data, msg);
+ ao2_cleanup(userdata);
}
}
@@ -5519,18 +5548,26 @@
* This method gathers data relevant to the local channel optimization and stores
* it to be used once the local optimization completes.
*
- * \param queue_data Data pertaining to the particular call in the queue
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
* \param msg The stasis message for the local optimization begin event
*/
-static void handle_local_optimization_begin(struct queue_stasis_data *queue_data,
- struct stasis_message *msg)
-{
+static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
struct ast_channel_snapshot *source = ast_multi_channel_blob_get_channel(optimization_blob, "source");
struct local_optimization *optimization;
unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
optimization = &queue_data->member_optimize;
@@ -5543,7 +5580,11 @@
/* We only allow move-swap optimizations, so there had BETTER be a source */
ast_assert(source != NULL);
- ast_string_field_set(optimization, source_chan_uniqueid, source->uniqueid);
+ optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
+ if (!optimization->source_chan_uniqueid) {
+ ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
+ return;
+ }
id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
optimization->id = id;
@@ -5559,17 +5600,26 @@
* updating the caller or member unique ID with the channel that is taking the place of
* the previous caller or member.
*
- * \param queue_data Data pertaining to the particular call in the queue
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
* \param msg The stasis message for the local optimization end event
*/
-static void handle_local_optimization_end(struct queue_stasis_data *queue_data, struct stasis_message *msg)
-{
+static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
struct local_optimization *optimization;
int is_caller;
unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
optimization = &queue_data->member_optimize;
@@ -5615,16 +5665,24 @@
* has ended. An appropriate queue log and stasis message are raised in this
* callback.
*
- * \param queue_data Data pertaining to the particular call in the queue.
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
* \param msg The stasis message for the hangup event.
*/
-static void handle_hangup(struct queue_stasis_data *queue_data, struct stasis_message *msg)
-{
+static void handle_hangup(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
struct ast_channel_blob *channel_blob = stasis_message_data(msg);
RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
enum agent_complete_reason reason;
+
+ if (queue_data->dying) {
+ return;
+ }
ao2_lock(queue_data);
@@ -5676,30 +5734,8 @@
static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
{
- struct queue_stasis_data *queue_data = userdata;
-
if (stasis_subscription_final_message(sub, msg)) {
- ao2_cleanup(queue_data);
- }
-
- if (queue_data->dying) {
- return;
- }
-
- if (ast_local_optimization_begin_type() == stasis_message_type(msg)) {
- SCOPED_AO2LOCK(lock, queue_data);
- handle_local_optimization_begin(queue_data, msg);
- return;
- }
-
- if (ast_local_optimization_end_type() == stasis_message_type(msg)) {
- SCOPED_AO2LOCK(lock, queue_data);
- handle_local_optimization_end(queue_data, msg);
- return;
- }
-
- if (ast_channel_hangup_request_type() == stasis_message_type(msg)) {
- handle_hangup(queue_data, msg);
+ ao2_cleanup(userdata);
}
}
@@ -5729,9 +5765,41 @@
return -1;
}
- queue_data->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), queue_bridge_cb, queue_data);
+ queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+ if (!queue_data->bridge_router) {
+ ao2_ref(queue_data, -1);
+ return -1;
+ }
+
+ stasis_message_router_add(queue_data->bridge_router, ast_channel_entered_bridge_type(),
+ handle_bridge_enter, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_blind_transfer_type(),
+ handle_blind_transfer, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_attended_transfer_type(),
+ handle_attended_transfer, queue_data);
+ stasis_message_router_set_default(queue_data->bridge_router,
+ queue_bridge_cb, queue_data);
+
+ queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+ if (!queue_data->channel_router) {
+ /* Unsubscribing from the bridge router will remove the only ref of queue_data,
+ * thus beginning the destruction process
+ */
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ return -1;
+ }
+
ao2_ref(queue_data, +1);
- queue_data->channel_sub = stasis_subscribe(ast_channel_topic_all(), queue_channel_cb, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_begin_type(),
+ handle_local_optimization_begin, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_end_type(),
+ handle_local_optimization_end, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
+ handle_hangup, queue_data);
+ stasis_message_router_set_default(queue_data->channel_router,
+ queue_channel_cb, queue_data);
+
return 0;
}
More information about the asterisk-commits
mailing list