[asterisk-commits] mjordan: branch mjordan/cdrs-of-doom r386588 - in /team/mjordan/cdrs-of-doom:...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Apr 25 16:55:35 CDT 2013
Author: mjordan
Date: Thu Apr 25 16:55:31 2013
New Revision: 386588
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=386588
Log:
Commit initial bridging support in CDRs
This is only the beginning, but it's better to get it in before
I change my mind, delete it all, and start over again.
Modified:
team/mjordan/cdrs-of-doom/include/asterisk/cdr.h
team/mjordan/cdrs-of-doom/main/bridging.c
team/mjordan/cdrs-of-doom/main/cdr.c
team/mjordan/cdrs-of-doom/tests/test_cdr.c
Modified: team/mjordan/cdrs-of-doom/include/asterisk/cdr.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/cdrs-of-doom/include/asterisk/cdr.h?view=diff&rev=386588&r1=386587&r2=386588
==============================================================================
--- team/mjordan/cdrs-of-doom/include/asterisk/cdr.h (original)
+++ team/mjordan/cdrs-of-doom/include/asterisk/cdr.h Thu Apr 25 16:55:31 2013
@@ -40,7 +40,7 @@
CDR_CONGESTION = 1 << 3, /*< Treat congestion as if it were a failed call */
CDR_END_BEFORE_H_EXTEN = 1 << 4, /*< End the CDR before the 'h' extension runs */
CDR_INITIATED_SECONDS = 1 << 5, /*< Include microseconds into the billing time */
- CDR_DEBUG = 1 << 6,
+ CDR_DEBUG = 1 << 6, /*< Enables extra debug statements */
};
/*! \brief CDR Batch Mode settings */
@@ -61,6 +61,7 @@
AST_CDR_FLAG_FINALIZE = (1 << 4), /*< Finalize the current CDRs */
AST_CDR_FLAG_SET_ANSWER = (1 << 5), /*< If the channel is answered, set the answer time to now */
AST_CDR_FLAG_RESET = (1 << 6), /*< If set, set the start and answer time to now */
+ AST_CDR_FLAG_LAST = AST_CDR_FLAG_RESET, /*< Marker. Denotes the last flag in the set. */
};
/*!
Modified: team/mjordan/cdrs-of-doom/main/bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/cdrs-of-doom/main/bridging.c?view=diff&rev=386588&r1=386587&r2=386588
==============================================================================
--- team/mjordan/cdrs-of-doom/main/bridging.c (original)
+++ team/mjordan/cdrs-of-doom/main/bridging.c Thu Apr 25 16:55:31 2013
@@ -513,6 +513,10 @@
bridge->technology->leave(bridge, bridge_channel);
}
}
+
+ /* BUGBUG If this was an outbound channel and it is not hung up, clear the
+ * outbound flag
+ */
/* Remove channel from the bridge */
if (!bridge_channel->suspended) {
Modified: team/mjordan/cdrs-of-doom/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/cdrs-of-doom/main/cdr.c?view=diff&rev=386588&r1=386587&r2=386588
==============================================================================
--- team/mjordan/cdrs-of-doom/main/cdr.c (original)
+++ team/mjordan/cdrs-of-doom/main/cdr.c Thu Apr 25 16:55:31 2013
@@ -66,6 +66,7 @@
#include "asterisk/json.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_bridging.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/astobj2.h"
@@ -197,6 +198,10 @@
ast_verb(1, (fmt), ##__VA_ARGS__); \
} } while (0)
+enum cdr_flag_internal {
+ INTERNAL_CDR_FLAG_SKIP = AST_CDR_FLAG_LAST, /*< Set when we need to temporarily skip a CDR for some reason */
+ INTERNAL_CDR_FLAG_FORKED = (INTERNAL_CDR_FLAG_SKIP << 1),
+};
/*! \brief The configuration settings for this module */
struct module_config {
@@ -296,11 +301,17 @@
static ast_cond_t cdr_pending_cond;
-
-
+/*! \brief A container of the active CDRs indexed by Party A channel name */
static struct ao2_container *active_cdrs_by_channel;
-static struct stasis_message_router *stasis_router;
+/*! \brief A container of the active CDRs indexed by the bridge ID */
+static struct ao2_container *active_cdrs_by_bridge;
+
+/*! \brief Message router for stasis messages regarding channel state */
+static struct stasis_message_router *stasis_channel_router;
+
+/*! \brief Message router for stasis messages regarding bridge state */
+static struct stasis_message_router *stasis_bridge_router;
struct cdr_object;
@@ -310,14 +321,18 @@
void (* const process_party_a_update)(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot);
void (* const process_party_b_update)(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot);
int (* const process_dial_message)(struct cdr_object *cdr, struct ast_channel_snapshot *caller, struct ast_channel_snapshot *peer, const char *dial_status);
+ int (* const process_bridge_enter)(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel);
+ int (* const process_bridge_leave)(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel);
};
AST_MUTEX_DEFINE_STATIC(cdr_sched_lock);
static void base_process_party_a_update(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot);
+static int base_process_bridge_leave_message(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel);
static void single_state_init_function(struct cdr_object *cdr);
static void single_state_process_party_b_update(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot);
static int single_state_process_dial_message(struct cdr_object *cdr, struct ast_channel_snapshot *caller, struct ast_channel_snapshot *peer, const char *dial_status);
+static int single_state_process_bridge_enter(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel);
struct cdr_object_fn_table single_state_fn_table = {
.name = "Single",
@@ -325,6 +340,8 @@
.process_party_a_update = base_process_party_a_update,
.process_party_b_update = single_state_process_party_b_update,
.process_dial_message = single_state_process_dial_message,
+ .process_bridge_enter = single_state_process_bridge_enter,
+ .process_bridge_leave = base_process_bridge_leave_message,
};
static void dial_state_process_party_b_update(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot);
@@ -336,6 +353,19 @@
.process_party_a_update = base_process_party_a_update,
.process_party_b_update = dial_state_process_party_b_update,
.process_dial_message = dial_state_process_dial_message,
+ .process_bridge_leave = base_process_bridge_leave_message,
+};
+
+static void bridge_state_init_function(struct cdr_object *cdr);
+
+struct cdr_object_fn_table bridge_state_fn_table = {
+ .name = "Bridged",
+ .init_function = bridge_state_init_function,
+ .process_party_a_update = base_process_party_a_update,
+ .process_party_b_update = NULL,
+ .process_dial_message = NULL,
+ .process_bridge_enter = NULL,
+ .process_bridge_leave = NULL,
};
static void finalized_state_init_function(struct cdr_object *cdr);
@@ -347,14 +377,8 @@
.process_party_a_update = finalized_state_process_party_a_update,
.process_party_b_update = NULL,
.process_dial_message = NULL,
-};
-
-struct cdr_object_fn_table hangup_state_fn_table = {
- .name = "Hangup",
- .init_function = NULL,
- .process_party_a_update = NULL,
- .process_party_b_update = NULL,
- .process_dial_message = NULL,
+ .process_bridge_enter = NULL,
+ .process_bridge_leave = NULL,
};
struct cdr_object_snapshot {
@@ -381,83 +405,9 @@
AST_STRING_FIELD(bridge); /*< The bridge the party A happens to be in. */
);
struct cdr_object *next; /*< The next CDR object in the chain */
+ struct cdr_object *prev; /*< The previous CDR object in the chain */
struct cdr_object *last; /*< The last CDR object in the chain */
};
-
-
-
-/* NON-VIRTUAL FUNCTIONS */
-
-static void cdr_object_transition_state(struct cdr_object *cdr, struct cdr_object_fn_table *fn_table);
-
-static int cdr_object_channel_hash_fn(const void *obj, const int flags)
-{
- const struct cdr_object *cdr = obj;
- const char *name = (flags & OBJ_KEY) ? obj : cdr->name;
- return ast_str_case_hash(name);
-}
-
-static int cdr_object_channel_cmp_fn(void *obj, void *arg, int flags)
-{
- struct cdr_object *left = obj;
- struct cdr_object *right = arg;
- const char *match = (flags & OBJ_KEY) ? arg : right->name;
- return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
-}
-
-static void cdr_object_dtor(void *obj)
-{
- struct cdr_object *cdr = obj;
- struct ast_var_t *it_var;
-
- if (!cdr) {
- return;
- }
-
- ao2_cleanup(cdr->party_a.snapshot);
- ao2_cleanup(cdr->party_b.snapshot);
- while ((it_var = AST_LIST_REMOVE_HEAD(&cdr->party_a.variables, entries))) {
- ast_var_delete(it_var);
- }
- while ((it_var = AST_LIST_REMOVE_HEAD(&cdr->party_b.variables, entries))) {
- ast_var_delete(it_var);
- }
- ast_string_field_free_memory(cdr);
-
- if (cdr->next) {
- ao2_cleanup(cdr->next);
- }
-}
-
-static struct cdr_object *cdr_object_alloc(struct ast_channel_snapshot *chan)
-{
- RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
- struct cdr_object *cdr;
-
- ast_assert(chan != NULL);
-
- cdr = ao2_alloc(sizeof(*cdr), cdr_object_dtor);
- if (!cdr) {
- return NULL;
- }
- cdr->last = cdr;
- if (ast_string_field_init(cdr, 64)) {
- return NULL;
- }
- ast_string_field_set(cdr, name, chan->name);
- ast_string_field_set(cdr, linkedid, chan->linkedid);
- cdr->disposition = AST_CDR_NULL;
- cdr->sequence = ast_atomic_fetchadd_int(&cdr_sequence, +1);
-
- cdr->party_a.snapshot = chan;
- ao2_ref(cdr->party_a.snapshot, +1);
-
- CDR_DEBUG(mod_cfg, "%p - created CDR for channel %s\n", cdr, chan->name);
-
- cdr_object_transition_state(cdr, &single_state_fn_table);
-
- return cdr;
-}
static int copy_vars(struct varshead *to_list, struct varshead *from_list)
{
@@ -478,6 +428,119 @@
return x;
}
+static void cdr_object_snapshot_copy(struct cdr_object_snapshot *dst, struct cdr_object_snapshot *src)
+{
+ if (dst->snapshot) {
+ ao2_ref(dst->snapshot, -1);
+ }
+ dst->snapshot = src->snapshot;
+ ao2_ref(dst->snapshot, +1);
+ strcpy(dst->userfield, src->userfield);
+ dst->flags = src->flags;
+ copy_vars(&dst->variables, &src->variables);
+}
+
+/* NON-VIRTUAL FUNCTIONS */
+
+static void cdr_object_transition_state(struct cdr_object *cdr, struct cdr_object_fn_table *fn_table);
+
+/*! \internal
+ * \brief Hash function for containers of CDRs indexing by Party A name */
+static int cdr_object_channel_hash_fn(const void *obj, const int flags)
+{
+ const struct cdr_object *cdr = obj;
+ const char *name = (flags & OBJ_KEY) ? obj : cdr->name;
+ return ast_str_case_hash(name);
+}
+
+/*! \internal
+ * \brief Comparison function for containers of CDRs indexing by Party A name
+ */
+static int cdr_object_channel_cmp_fn(void *obj, void *arg, int flags)
+{
+ struct cdr_object *left = obj;
+ struct cdr_object *right = arg;
+ const char *match = (flags & OBJ_KEY) ? arg : right->name;
+ return strcasecmp(left->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
+}
+
+/*! \internal
+ * \brief Hash function for containers of CDRs indexing by bridge ID
+ */
+static int cdr_object_bridge_hash_fn(const void *obj, const int flags)
+{
+ const struct cdr_object *cdr = obj;
+ const char *id = (flags & OBJ_KEY) ? obj : cdr->bridge;
+ return ast_str_case_hash(id);
+}
+
+/*! \internal
+ * \brief Comparison function for containers of CDRs indexing by bridge. Note
+ * that we expect there to be collisions, as a single bridge may have multiple
+ * CDRs active at one point in time
+ */
+static int cdr_object_bridge_cmp_fn(void *obj, void *arg, int flags)
+{
+ struct cdr_object *left = obj;
+ struct cdr_object *right = arg;
+ const char *match = (flags & OBJ_KEY) ? arg : right->bridge;
+ return strcasecmp(left->bridge, match) ? 0 : (CMP_MATCH);
+}
+
+static void cdr_object_dtor(void *obj)
+{
+ struct cdr_object *cdr = obj;
+ struct ast_var_t *it_var;
+
+ if (!cdr) {
+ return;
+ }
+
+ ao2_cleanup(cdr->party_a.snapshot);
+ ao2_cleanup(cdr->party_b.snapshot);
+ while ((it_var = AST_LIST_REMOVE_HEAD(&cdr->party_a.variables, entries))) {
+ ast_var_delete(it_var);
+ }
+ while ((it_var = AST_LIST_REMOVE_HEAD(&cdr->party_b.variables, entries))) {
+ ast_var_delete(it_var);
+ }
+ ast_string_field_free_memory(cdr);
+
+ if (cdr->next) {
+ ao2_cleanup(cdr->next);
+ }
+}
+
+static struct cdr_object *cdr_object_alloc(struct ast_channel_snapshot *chan)
+{
+ RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
+ struct cdr_object *cdr;
+
+ ast_assert(chan != NULL);
+
+ cdr = ao2_alloc(sizeof(*cdr), cdr_object_dtor);
+ if (!cdr) {
+ return NULL;
+ }
+ cdr->last = cdr;
+ if (ast_string_field_init(cdr, 64)) {
+ return NULL;
+ }
+ ast_string_field_set(cdr, name, chan->name);
+ ast_string_field_set(cdr, linkedid, chan->linkedid);
+ cdr->disposition = AST_CDR_NULL;
+ cdr->sequence = ast_atomic_fetchadd_int(&cdr_sequence, +1);
+
+ cdr->party_a.snapshot = chan;
+ ao2_ref(cdr->party_a.snapshot, +1);
+
+ CDR_DEBUG(mod_cfg, "%p - created CDR for channel %s\n", cdr, chan->name);
+
+ cdr_object_transition_state(cdr, &single_state_fn_table);
+
+ return cdr;
+}
+
static struct cdr_object *cdr_object_create_and_append_cdr(struct cdr_object *cdr)
{
struct cdr_object *new_cdr;
@@ -492,9 +555,7 @@
/* Copy over the linkedid, as it may have changed */
ast_string_field_set(new_cdr, linkedid, cdr_last->linkedid);
/* Copy over other Party A information */
- strcpy(new_cdr->party_a.userfield, cdr_last->party_a.userfield);
- new_cdr->party_a.flags = cdr_last->party_a.flags;
- copy_vars(&new_cdr->party_a.variables, &cdr_last->party_a.variables);
+ cdr_object_snapshot_copy(&new_cdr->party_a, &cdr_last->party_a);
/* Append the CDR to the end of the list */
for (it_cdr = cdr; it_cdr->next; it_cdr = it_cdr->next) {
@@ -502,8 +563,61 @@
}
it_cdr->last = new_cdr;
it_cdr->next = new_cdr;
+ new_cdr->prev = it_cdr;
return new_cdr;
+}
+
+/*static void cdr_object_remove_cdr(struct cdr_object *master, struct cdr_object *record)
+{
+ struct cdr_object *it_cdr;
+ struct cdr_object *prev = NULL;
+ struct cdr_object *next = NULL;
+
+ for (it_cdr = master; it_cdr; prev = it_cdr, it_cdr = it_cdr->next) {
+ if (it_cdr != record) {
+ continue;
+ }
+ if (it_cdr->next) {
+ next = it_cdr->next;
+ }
+ if (it_cdr == master) {*/
+ /* Remove root */
+/* ao2_unlink(active_cdrs_by_channel, master);
+ if (!ast_strlen_zero(master->bridge)) {
+ ao2_unlink(active_cdrs_by_bridge, master);
+ }
+ }
+ it_cdr->next = NULL;
+ it_cdr->last = NULL;
+ if (next && prev) {
+ prev->next = next;
+ prev->last = next->last;
+ }
+ if (next && it_cdr == master) {
+ ao2_link(active_cdrs_by_channel, next);
+ if (!ast_strlen_zero(next->bridge)) {
+ ao2_link(active_cdrs_by_bridge, next);
+ }
+ }
+ ao2_ref(it_cdr, -1);
+ }
+}*/
+
+static struct ast_channel_snapshot *cdr_object_determine_party_a(struct ast_channel_snapshot *left, struct ast_channel_snapshot *right)
+{
+ /* TODO: check party a flag */
+
+ if (left->creationtime.tv_sec < right->creationtime.tv_sec) {
+ return left;
+ } else if (left->creationtime.tv_sec > right->creationtime.tv_sec) {
+ return right;
+ } else {
+ if (left->creationtime.tv_usec > right->creationtime.tv_usec) {
+ return right;
+ }
+ return left;
+ }
}
static long cdr_object_get_duration(struct cdr_object *cdr)
@@ -543,6 +657,15 @@
while (cdr) {
struct ast_cdr *cdr_copy;
+
+ /* Don't create records for CDRs where the party A is an outgoing,
+ * non-originated channel.
+ */
+ if (ast_test_flag(&cdr->party_a.snapshot->flags, AST_FLAG_OUTGOING)
+ && !ast_test_flag(&cdr->party_a.snapshot->flags, AST_FLAG_ORIGINATED)) {
+ continue;
+ }
+
cdr_copy = ast_calloc(1, sizeof(*cdr_copy));
if (!cdr_copy) {
ast_free(pub_cdr);
@@ -687,10 +810,6 @@
cdr->end.tv_sec,
cdr->end.tv_usec,
cdr->disposition);
-
- if (ast_test_flag(&cdr->party_a.snapshot->flags, AST_FLAG_ZOMBIE)) {
- cdr_object_transition_state(cdr, &hangup_state_fn_table);
- }
}
static void cdr_object_transition_state(struct cdr_object *cdr, struct cdr_object_fn_table *fn_table)
@@ -739,8 +858,6 @@
{
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
- ast_assert(cdr != NULL);
- ast_assert(snapshot != NULL);
ast_assert(strcmp(snapshot->name, cdr->party_a.snapshot->name) == 0);
cdr_object_swap_snapshot(&cdr->party_a, snapshot);
@@ -749,6 +866,14 @@
cdr_object_check_party_a_answer(cdr);
cdr_object_check_party_a_hangup(cdr);
}
+
+static int base_process_bridge_leave_message(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel)
+{
+ /* Assume we shouldn't get a bridge leave message for ourselves */
+ ast_assert(strcmp(channel->name, cdr->party_a.snapshot->name) != 0);
+ return 0;
+}
+
/* SINGLE STATE */
@@ -791,6 +916,87 @@
return 0;
}
+static int single_state_bridge_enter_comparison(struct cdr_object *cdr,
+ struct cdr_object *cand_cdr)
+{
+ struct ast_channel_snapshot *party_a;
+
+ party_a = cdr_object_determine_party_a(cdr->party_a.snapshot, cand_cdr->party_a.snapshot);
+ if (party_a == cdr->party_a.snapshot) {
+ cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_a);
+ if (!cand_cdr->party_b.snapshot) {
+ /* We just stole them - finalize the CDR. Note that this won't
+ * transition their state, it just sets the end time and the
+ * disposition - if we need to re-activate them later, we can.
+ */
+ cdr_object_finalize(cand_cdr);
+ }
+ return 0;
+ }
+
+ if (!cand_cdr->party_b.snapshot) {
+ return 1;
+ }
+
+ party_a = cdr_object_determine_party_a(cdr->party_a.snapshot, cand_cdr->party_b.snapshot);
+ if (party_a == cdr->party_a.snapshot) {
+ cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_b);
+ return 0;
+ }
+
+ return 1;
+}
+
+static int single_state_process_bridge_enter(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel)
+{
+ struct ao2_iterator *it_cdrs;
+ struct cdr_object *cand_cdr_master;
+ char *bridge_id = ast_strdupa(bridge->uniqueid);
+ int success = 1;
+
+ ast_string_field_set(cdr, bridge, bridge->uniqueid);
+
+ /* Get parties in the bridge */
+ it_cdrs = ao2_callback(active_cdrs_by_bridge, OBJ_MULTIPLE | OBJ_KEY,
+ cdr_object_bridge_cmp_fn, bridge_id);
+ if (!it_cdrs) {
+ /* No one in the bridge yet! */
+ cdr_object_transition_state(cdr, &bridge_state_fn_table);
+ return 0;
+ }
+
+ while ((cand_cdr_master = ao2_iterator_next(it_cdrs))) {
+ struct cdr_object *cand_cdr;
+
+ ao2_lock(cand_cdr_master);
+ for (cand_cdr = cand_cdr_master; cand_cdr; cand_cdr = cand_cdr->next) {
+ /* Skip any records that are not in a bridge or in this bridge.
+ * I'm not sure how that would happen, but it pays to be careful. */
+ if (cand_cdr->fn_table != &bridge_state_fn_table || strcmp(cdr->bridge, cand_cdr->bridge)) {
+ continue;
+ }
+
+ if (single_state_bridge_enter_comparison(cdr, cand_cdr)) {
+ /* Keep looking */
+ continue;
+ }
+ /* We successfully got a party B - break out */
+ success = 0;
+ break;
+ }
+ ao2_unlock(cand_cdr_master);
+ ao2_ref(cand_cdr_master, -1);
+ }
+ ao2_iterator_destroy(it_cdrs);
+
+ /* We always transition state, even if we didn't get a peer */
+ cdr_object_transition_state(cdr, &bridge_state_fn_table);
+
+ /* Success implies that we have a Party B */
+ return success;
+}
+
+
/* FINALIZED STATE */
static void finalized_state_init_function(struct cdr_object *cdr)
@@ -807,7 +1013,7 @@
static void finalized_state_process_party_a_update(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot)
{
if (ast_test_flag(&cdr->party_a.snapshot->flags, AST_FLAG_ZOMBIE)) {
- cdr_object_transition_state(cdr, &hangup_state_fn_table);
+ cdr_object_finalize(cdr);
}
}
@@ -872,6 +1078,15 @@
}
+/* BRIDGE STATE */
+
+static void bridge_state_init_function(struct cdr_object *cdr)
+{
+
+}
+
+
+
static struct cdr_object *cdr_object_find_or_create_cdr_by_channel(struct ast_channel_snapshot *snapshot)
@@ -890,22 +1105,6 @@
ao2_link(active_cdrs_by_channel, cdr);
return cdr;
-}
-
-static struct ast_channel_snapshot *cdr_object_determine_party_a(struct ast_channel_snapshot *left, struct ast_channel_snapshot *right)
-{
- /* TODO: check party a flag */
-
- if (left->creationtime.tv_sec < right->creationtime.tv_sec) {
- return left;
- } else if (left->creationtime.tv_sec > right->creationtime.tv_sec) {
- return right;
- } else {
- if (left->creationtime.tv_usec > right->creationtime.tv_usec) {
- return right;
- }
- return left;
- }
}
/* TOPIC ROUTER CALLBACKS */
@@ -952,32 +1151,24 @@
return;
}
- if (ast_test_flag(&party_a->flags, AST_FLAG_OUTGOING)
- && !ast_test_flag(&party_a->flags, AST_FLAG_ORIGINATED)) {
- CDR_DEBUG(mod_cfg, "%p - removing CDR for outbound channel %s\n", cdr, party_a->name);
- ao2_unlink(active_cdrs_by_channel, cdr);
- return;
- }
-
CDR_DEBUG(mod_cfg, "%p - processing dial message for channel %s, peer %s\n",
cdr, party_a->name, party_b ? party_b->name : "(none)");
- {
- SCOPED_AO2LOCK(lock, cdr);
- for (it_cdr = cdr; res && it_cdr; it_cdr = it_cdr->next) {
- if (it_cdr->fn_table->process_dial_message) {
- res = it_cdr->fn_table->process_dial_message(it_cdr, party_a, party_b, dial_status);
- }
- }
- /* If no CDR handled the dial message, make a new one */
- if (res) {
- struct cdr_object *new_cdr;
-
- new_cdr = cdr_object_create_and_append_cdr(cdr);
- if (!new_cdr) {
- return;
- }
- new_cdr->fn_table->process_dial_message(new_cdr, party_a, party_b, dial_status);
- }
+ ao2_lock(cdr);
+ for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
+ if (it_cdr->fn_table->process_dial_message) {
+ res &= it_cdr->fn_table->process_dial_message(it_cdr, party_a, party_b, dial_status);
+ }
+ }
+
+ /* If no CDR handled the dial message, make a new one */
+ if (res) {
+ struct cdr_object *new_cdr;
+
+ new_cdr = cdr_object_create_and_append_cdr(cdr);
+ if (!new_cdr) {
+ return;
+ }
+ new_cdr->fn_table->process_dial_message(new_cdr, party_a, party_b, dial_status);
}
}
@@ -1010,7 +1201,7 @@
return 0;
}
-static void handle_cache_update_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
{
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -1036,18 +1227,6 @@
return;
}
ao2_link(active_cdrs_by_channel, cdr);
- }
-
- /* We won't know this is a dialed channel until after its creation. */
- if (ast_test_flag(&new_snapshot->flags, AST_FLAG_OUTGOING)
- && !ast_test_flag(&new_snapshot->flags, AST_FLAG_ORIGINATED)) {
- cdr = ao2_find(active_cdrs_by_channel, name, OBJ_KEY);
- if (cdr) {
- CDR_DEBUG(mod_cfg, "%p - removing CDR for outbound channel %s\n", cdr, name);
- ao2_unlink(active_cdrs_by_channel, cdr);
- }
- ao2_cleanup(cdr);
- cdr = NULL;
}
}
@@ -1085,7 +1264,294 @@
}
-
+static void handle_bridge_leave_message(struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel)
+{
+
+}
+
+struct bridge_candidate {
+ struct cdr_object *cdr; /*!< The actual CDR this candidate belongs to, either as A or B */
+ struct ast_channel_snapshot *candidate; /*!< The candidate for a new pairing */
+};
+
+/*! \internal
+ * \brief Comparison function for \ref bridge_candidate objects
+ */
+static int bridge_candidate_cmp_fn(void *obj, void *arg, int flags)
+{
+ struct bridge_candidate *left = obj;
+ struct bridge_candidate *right = arg;
+ const char *match = (flags & OBJ_KEY) ? arg : right->candidate->name;
+ return strcasecmp(left->candidate->name, match) ? 0 : (CMP_MATCH | CMP_STOP);
+}
+
+/*! \internal
+ * \brief Hash function for \ref bridge_candidate objects
+ */
+static int bridge_candidate_hash_fn(const void *obj, const int flags)
+{
+ const struct bridge_candidate *bc = obj;
+ const char *id = (flags & OBJ_KEY) ? obj : bc->candidate->name;
+ return ast_str_case_hash(id);
+}
+
+static void bridge_candidate_dtor(void *obj)
+{
+ struct bridge_candidate *bcand = obj;
+ ao2_cleanup(bcand->cdr);
+ ao2_cleanup(bcand->candidate);
+}
+
+static struct bridge_candidate *bridge_candidate_alloc(struct cdr_object *cdr, struct ast_channel_snapshot *snapshot)
+{
+ struct bridge_candidate *bcand;
+
+ bcand = ao2_alloc(sizeof(*bcand), bridge_candidate_dtor);
+ if (!bcand) {
+ return NULL;
+ }
+ bcand->cdr = cdr;
+ bcand->candidate = snapshot;
+ ao2_ref(bcand->cdr, +1);
+ ao2_ref(bcand->candidate, +1);
+
+ return bcand;
+}
+
+/*!
+ * Note that we use two passes here instead of one so that we only create a
+ * candidate for a party B if they are never a party A in the bridge.
+ */
+static struct ao2_container *handle_bridge_create_candidates(struct ast_bridge_snapshot *bridge)
+{
+ struct ao2_container *candidates = ao2_container_alloc(51, bridge_candidate_hash_fn, bridge_candidate_cmp_fn);
+ char *bridge_id = ast_strdupa(bridge->uniqueid);
+ struct ao2_iterator *it_cdrs;
+ struct cdr_object *cand_cdr_master;
+
+ if (!candidates) {
+ return NULL;
+ }
+
+ /* For each CDR that has a record in the bridge, get their Party A and
+ * make them a candidate. Don't add a party twice.
+ */
+ it_cdrs = ao2_callback(active_cdrs_by_bridge, OBJ_MULTIPLE | OBJ_KEY,
+ cdr_object_bridge_cmp_fn, bridge_id);
+ if (!it_cdrs) {
+ /* No one in the bridge yet! */
+ ao2_cleanup(candidates);
+ return NULL;
+ }
+ while ((cand_cdr_master = ao2_iterator_next(it_cdrs))) {
+ struct cdr_object *it_cdr;
+
+ ao2_lock(cand_cdr_master);
+ for (it_cdr = cand_cdr_master; it_cdr; it_cdr = it_cdr->next) {
+ struct bridge_candidate *bcand = NULL;
+
+ if (it_cdr->fn_table != &bridge_state_fn_table || strcmp(bridge->uniqueid, it_cdr->bridge)) {
+ continue;
+ }
+ bcand = ao2_find(candidates, it_cdr->party_a.snapshot->name, OBJ_KEY);
+ if (!bcand) {
+ bcand = bridge_candidate_alloc(it_cdr, it_cdr->party_a.snapshot);
+ if (bcand) {
+ ao2_link(candidates, bcand);
+ }
+ }
+ ao2_cleanup(bcand);
+ }
+ }
+ ao2_iterator_destroy(it_cdrs);
+
+ /* For each CDR that has a record in the bridge, get their Party B and
+ * make them a candidate. Don't add a party twice.
+ */
+ it_cdrs = ao2_callback(active_cdrs_by_bridge, OBJ_MULTIPLE | OBJ_KEY,
+ cdr_object_bridge_cmp_fn, bridge_id);
+ if (!it_cdrs) {
+ /* No one in the bridge yet! */
+ ao2_cleanup(candidates);
+ return NULL;
+ }
+ while ((cand_cdr_master = ao2_iterator_next(it_cdrs))) {
+ struct cdr_object *it_cdr;
+
+ ao2_lock(cand_cdr_master);
+ for (it_cdr = cand_cdr_master; it_cdr; it_cdr = it_cdr->next) {
+ struct bridge_candidate *bcand = NULL;
+
+ if (it_cdr->fn_table != &bridge_state_fn_table || strcmp(bridge->uniqueid, it_cdr->bridge)) {
+ continue;
+ }
+ if (!it_cdr->party_b.snapshot) {
+ continue;
+ }
+ bcand = ao2_find(candidates, it_cdr->party_b.snapshot->name, OBJ_KEY);
+ if (!bcand) {
+ bcand = bridge_candidate_alloc(it_cdr, it_cdr->party_b.snapshot);
+ if (bcand) {
+ ao2_link(candidates, bcand);
+ }
+ }
+ ao2_cleanup(bcand);
+ }
+ ao2_unlock(cand_cdr_master);
+ }
+ ao2_iterator_destroy(it_cdrs);
+
+ return candidates;
+}
+
+static int bridge_candidate_process(void *obj, void *arg, int flags)
+{
+ struct bridge_candidate *bcand = obj;
+ struct cdr_object *cdr = arg;
+ struct cdr_object *new_cdr;
+ struct ast_channel_snapshot *party_a;
+
+ /* If the candidate is us or someone we've taken on, pass on by */
+ if (!strcmp(cdr->party_a.snapshot->name, bcand->candidate->name)
+ || (cdr->party_b.snapshot && !(strcmp(cdr->party_b.snapshot->name, bcand->candidate->name)))) {
+ return 0;
+ }
+
+ party_a = cdr_object_determine_party_a(cdr->party_a.snapshot, bcand->candidate);
+ if (party_a == cdr->party_a.snapshot) {
+ new_cdr = cdr_object_create_and_append_cdr(cdr);
+ if (bcand->candidate == bcand->cdr->party_a.snapshot) {
+ cdr_object_snapshot_copy(&new_cdr->party_b, &bcand->cdr->party_a);
+ } else if (bcand->candidate == bcand->cdr->party_b.snapshot) {
+ cdr_object_snapshot_copy(&new_cdr->party_b, &bcand->cdr->party_b);
+ }
+ ast_string_field_set(new_cdr, bridge, cdr->bridge);
+ cdr_object_transition_state(new_cdr, &bridge_state_fn_table);
+ } else {
+ /* The candidate may be the CDR's Party A. If so, find out if we can
+ * add ourselves directly as the Party B, or if we need a new CDR.
+ */
+ if (bcand->cdr->party_a.snapshot == bcand->candidate) {
+ if (bcand->cdr->party_b.snapshot) {
+ new_cdr = cdr_object_create_and_append_cdr(bcand->cdr);
+ cdr_object_snapshot_copy(&new_cdr->party_b, &cdr->party_a);
+ ast_string_field_set(new_cdr, bridge, cdr->bridge);
+ cdr_object_transition_state(new_cdr, &bridge_state_fn_table);
+ } else {
+ cdr_object_snapshot_copy(&bcand->cdr->party_b, &cdr->party_a);
+ /* It's possible that this joined at one point and was never chosen
+ * as party A. Clear their end time, as it would be set in such a
+ * case.
+ */
+ memset(&bcand->cdr->end, 0, sizeof(bcand->cdr->end));
+ }
+ } else {
+ struct cdr_object *b_party = ao2_find(active_cdrs_by_channel, bcand->candidate->name, OBJ_KEY);
+ if (!b_party) {
+ b_party = cdr_object_alloc(bcand->candidate);
+ cdr_object_transition_state(b_party, &bridge_state_fn_table);
+ cdr_object_snapshot_copy(&b_party->party_b, &cdr->party_a);
+ ast_string_field_set(b_party, bridge, cdr->bridge);
+ ao2_link(active_cdrs_by_channel, b_party);
+ ao2_link(active_cdrs_by_bridge, b_party);
+ ao2_ref(b_party, -1);
+ return 0;
+ }
+ new_cdr = cdr_object_create_and_append_cdr(b_party);
+ cdr_object_transition_state(b_party, &bridge_state_fn_table);
+ cdr_object_snapshot_copy(&b_party->party_b, &cdr->party_a);
+ ast_string_field_set(b_party, bridge, cdr->bridge);
+ ao2_link(active_cdrs_by_bridge, b_party);
+ }
+ }
+
+ return 0;
+}
+
+static void handle_bridge_pairings(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge)
+{
+ RAII_VAR(struct ao2_container *, candidates,
+ handle_bridge_create_candidates(bridge),
+ ao2_cleanup);
+
+ if (!candidates) {
+ return;
+ }
+
+ ao2_callback(candidates, OBJ_NODATA,
+ bridge_candidate_process,
+ cdr);
+
+ return;
+}
+
+
+static void handle_bridge_enter_message(struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel)
+{
+ RAII_VAR(struct cdr_object *, cdr,
+ ao2_find(active_cdrs_by_channel, channel->name, OBJ_KEY),
+ ao2_cleanup);
+ int res = 1;
+ struct cdr_object *it_cdr;
+ struct cdr_object *handled_cdr = NULL;
+
+ ast_assert(cdr != NULL);
+
+ ao2_lock(cdr);
+
+ for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
+ if (it_cdr->fn_table->process_party_a_update) {
+ it_cdr->fn_table->process_party_a_update(it_cdr, channel);
+ }
+
+ /* Notify all states that they have entered a bridge */
+ if (it_cdr->fn_table->process_bridge_enter) {
+ res &= it_cdr->fn_table->process_bridge_enter(it_cdr, bridge, channel);
+ if (!res && !handled_cdr) {
+ handled_cdr = it_cdr;
+ }
+ }
+ }
+
+ if (res) {
+ /* We didn't win on any - end this CDR. If someone else comes in later
+ * that is Party B to this CDR, it can re-activate this CDR.
+ */
+ cdr_object_finalize(cdr);
+ }
+
+ /* Create the new matchings, but only for either:
+ * * The first CDR in the chain that handled it. This avoids issues with
+ * forked CDRs.
+ * * If no one handled it, the last CDR in the chain. This would occur if
+ * a CDR joined a bridge and it wasn't Party A for anyone. We still need
+ * to make pairings with everyone in the bridge.
+ */
+ if (!handled_cdr) {
+ handled_cdr = cdr->last;
+ }
+ handle_bridge_pairings(handled_cdr, bridge);
+
+ ao2_link(active_cdrs_by_bridge, cdr);
+ ao2_unlock(cdr);
+}
+
+static void handle_bridge_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+ RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
+ RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
+ struct ast_bridge_blob *update = stasis_message_data(message);
+ const char *type = ast_bridge_blob_json_type(update);
+
+ ast_assert(update->bridge != NULL);
+ ast_assert(update->channel != NULL);
+
+ if (!strcmp(type, "leave")) {
+ handle_bridge_leave_message(update->bridge, update->channel);
+ } else if (!strcmp(type, "enter")) {
+ handle_bridge_enter_message(update->bridge, update->channel);
+ }
+}
@@ -1880,6 +2346,12 @@
}
cdr_object_transition_state(it_cdr, &finalized_state_fn_table);
}
+ } else {
+ /* Denote that this CDR is a fork of a previous, and the previous
+ * is still active. Operations on CDRs will use this flag to force
+ * this CDR record to mirror the state of the previous.
+ */
+ ast_set_flag(&new_cdr->flags, INTERNAL_CDR_FLAG_FORKED);
}
}
@@ -2293,13 +2765,23 @@
return -1;
}
- stasis_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
- if (!stasis_router) {
+ active_cdrs_by_bridge = ao2_container_alloc(51, cdr_object_bridge_hash_fn, cdr_object_bridge_cmp_fn);
+ if (!active_cdrs_by_bridge) {
return -1;
}
- stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_cache_update_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
-
+
+ stasis_channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ if (!stasis_channel_router) {
+ return -1;
+ }
+ stasis_message_router_add(stasis_channel_router, stasis_cache_update_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add(stasis_channel_router, ast_channel_dial_type(), handle_dial_message, NULL);
+
+ stasis_bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached()));
+ if (!stasis_bridge_router) {
+ return -1;
+ }
+ stasis_message_router_add(stasis_bridge_router, ast_bridge_blob_type(), handle_bridge_message, NULL);
sched = ast_sched_context_create();
if (!sched) {
Modified: team/mjordan/cdrs-of-doom/tests/test_cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/cdrs-of-doom/tests/test_cdr.c?view=diff&rev=386588&r1=386587&r2=386588
==============================================================================
--- team/mjordan/cdrs-of-doom/tests/test_cdr.c (original)
+++ team/mjordan/cdrs-of-doom/tests/test_cdr.c Thu Apr 25 16:55:31 2013
@@ -42,7 +42,10 @@
#include "asterisk/utils.h"
#include "asterisk/causes.h"
#include "asterisk/time.h"
+#include "asterisk/bridging.h"
+#include "asterisk/bridging_basic.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_bridging.h"
#define EPSILON 0.001
@@ -70,15 +73,22 @@
.settings.flags = CDR_ENABLED | CDR_UNANSWERED | CDR_DEBUG | CDR_CONGESTION,
};
+/*! \brief Macro to swap a configuration out from the CDR engine. This should be
+ * used at the beginning of each test to set the needed configuration for that
+ * test.
+ */
#define SWAP_CONFIG(ao2_config, template) do { \
*(ao2_config) = (template); \
ast_cdr_set_config((ao2_config)); \
} while (0)
+/*! \brief A linked list of received CDR entries from the engine */
static AST_LIST_HEAD(, test_cdr_entry) actual_cdr_entries = AST_LIST_HEAD_INIT_VALUE;
+/*! \brief The Mock CDR backend condition wait */
static ast_cond_t mock_cdr_cond;
+/*! \brief A channel technology used for the unit tests */
static struct ast_channel_tech test_cdr_chan_tech = {
.type = CHANNEL_TECH_NAME,
.description = "Mock channel technology for CDR tests",
@@ -89,9 +99,22 @@
AST_LIST_ENTRY(test_cdr_entry) list;
};
-static int mock_cdr_count;
-
-static int mock_cdr_backend(struct ast_cdr *cdr)
+/*! \brief The number of CDRs the mock backend has received */
+static int global_mock_cdr_count;
+
+/*! \internal
+ * \brief Callback function for the mock CDR backend
+ *
+ * This function 'processes' a dispatched CDR record by adding it to the
+ * \ref actual_cdr_entries list. When a test completes, it can verify the
+ * expected records against this list of actual CDRs created by the engine.
+ *
+ * \param cdr The public CDR object created by the engine
+ *
+ * \retval -1 on error
+ * \retval 0 on success
+ */
+static int mock_cdr_backend_cb(struct ast_cdr *cdr)
{
struct ast_cdr *cdr_copy, *cdr_prev = NULL;
struct ast_cdr *mock_cdr = NULL;
@@ -134,13 +157,16 @@
AST_LIST_LOCK(&actual_cdr_entries);
AST_LIST_INSERT_TAIL(&actual_cdr_entries, cdr_wrapper, list);
- mock_cdr_count++;
+ global_mock_cdr_count++;
ast_cond_signal(&mock_cdr_cond);
AST_LIST_UNLOCK(&actual_cdr_entries);
return 0;
}
+/*! \internal
+ * \brief Remove all entries from \ref actual_cdr_entries
+ */
static void clear_mock_cdr_backend(void)
{
struct test_cdr_entry *cdr_wrapper;
@@ -150,10 +176,14 @@
ast_cdr_free(cdr_wrapper->cdr);
ast_free(cdr_wrapper);
}
- mock_cdr_count = 0;
+ global_mock_cdr_count = 0;
AST_LIST_UNLOCK(&actual_cdr_entries);
}
+/*! \brief Verify a string field. This will set the test status result to fail;
+ * as such, it assumes that (a) test is the test object variable, and (b) that
+ * a return variable res exists.
+ */
#define VERIFY_STRING_FIELD(field, actual, expected) do { \
if (strcmp((actual)->field, (expected)->field)) { \
ast_test_status_update(test, "Field %s failed: actual %s, expected %s\n", #field, (actual)->field, (expected)->field); \
@@ -161,6 +191,10 @@
res = AST_TEST_FAIL; \
} } while (0)
+/*! \brief Verify a numeric field. This will set the test status result to fail;
+ * as such, it assumes that (a) test is the test object variable, and (b) that
+ * a return variable res exists.
+ */
#define VERIFY_NUMERIC_FIELD(field, actual, expected) do { \
if ((actual)->field != (expected)->field) { \
ast_test_status_update(test, "Field %s failed: actual %ld, expected %ld\n", #field, (long)(actual)->field, (long)(expected)->field); \
@@ -168,6 +202,10 @@
res = AST_TEST_FAIL; \
} } while (0)
+/*! \brief Verify a time field. This will set the test status result to fail;
+ * as such, it assumes that (a) test is the test object variable, and (b) that
+ * a return variable res exists.
+ */
#define VERIFY_TIME_VALUE(field, actual) do { \
if (ast_tvzero((actual)->field)) { \
ast_test_status_update(test, "Field %s failed: should not be 0\n", #field); \
@@ -175,7 +213,37 @@
res = AST_TEST_FAIL; \
} } while (0)
+/*! \brief Alice's Caller ID */
#define ALICE_CALLERID { .id.name.str = "Alice", .id.name.valid = 1, .id.number.str = "100", .id.number.valid = 1, }
+
+/*! \brief Copy the linkedid and uniqueid from a channel to an expected CDR */
+#define COPY_IDS(channel_var, expected_record) do { \
+ ast_copy_string((expected_record)->uniqueid, ast_channel_uniqueid((channel_var)), sizeof((expected_record)->uniqueid)); \
+ ast_copy_string((expected_record)->linkedid, ast_channel_linkedid((channel_var)), sizeof((expected_record)->linkedid)); \
+ } while (0)
+
+/*! \brief Create a \ref test_cdr_chan_tech for Alice, and set the expected
+ * CDR records (if available) to her linkedid and uniqueid. */
+#define CREATE_ALICE_CHANNEL(channel_var, caller_id, expected_record) do { \
+ (channel_var) = ast_channel_alloc(0, AST_STATE_DOWN, "100", "Alice", "100", "100", "default", NULL, 0, CHANNEL_TECH_NAME "/Alice"); \
+ ast_channel_set_caller((channel_var), (caller_id), NULL); \
+ ast_copy_string((expected_record)->uniqueid, ast_channel_uniqueid((channel_var)), sizeof((expected_record)->uniqueid)); \
+ ast_copy_string((expected_record)->linkedid, ast_channel_linkedid((channel_var)), sizeof((expected_record)->linkedid)); \
+ } while (0)
+
+/*! \brief Emulate a channel entering into an application */
+#define EMULATE_APP_DATA(channel, application, data) do { \
+ ast_channel_appl_set((channel), (application)); \
+ ast_channel_data_set((channel), (data)); \
+ publish_channel_snapshot((channel)); \
+ } while (0)
+
[... 715 lines stripped ...]
More information about the asterisk-commits
mailing list