[asterisk-commits] irroot: branch irroot/distrotech-customers-trunk r341919 - in /team/irroot/di...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sun Oct 23 06:22:47 CDT 2011
Author: irroot
Date: Sun Oct 23 06:22:43 2011
New Revision: 341919
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=341919
Log:
app_queue patch RB1538
Modified:
team/irroot/distrotech-customers-trunk/apps/app_queue.c
team/irroot/distrotech-customers-trunk/configs/queues.conf.sample
Modified: team/irroot/distrotech-customers-trunk/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/distrotech-customers-trunk/apps/app_queue.c?view=diff&rev=341919&r1=341918&r2=341919
==============================================================================
--- team/irroot/distrotech-customers-trunk/apps/app_queue.c (original)
+++ team/irroot/distrotech-customers-trunk/apps/app_queue.c Sun Oct 23 06:22:43 2011
@@ -896,9 +896,6 @@
{ QUEUE_AUTOPAUSE_ALL,"all" },
};
-
-static struct ast_taskprocessor *devicestate_tps;
-
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
#define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */
@@ -956,9 +953,6 @@
/*! \brief queues.conf [general] option */
static int log_membername_as_agent = 0;
-
-/*! \brief queues.conf [general] option */
-static int check_state_unknown = 0;
enum queue_result {
QUEUE_UNKNOWN = 0,
@@ -1052,24 +1046,27 @@
struct queue_ent *next; /*!< The next queue entry */
};
+struct mem_state {
+ char state_interface[80]; /*!< Technology/Location from which to read devicestate changes */
+ int reserved; /*!< This interface is reserved for pending call */
+ int status; /*!< Status of queue member */
+};
+
struct member {
char interface[80]; /*!< Technology/Location to dial to reach this member*/
- char state_exten[AST_MAX_EXTENSION]; /*!< Extension to get state from (if using hint) */
- char state_context[AST_MAX_CONTEXT]; /*!< Context to use when getting state (if using hint) */
- char state_interface[80]; /*!< Technology/Location from which to read devicestate changes */
char membername[80]; /*!< Member name to use in queue logs */
int penalty; /*!< Are we a last resort? */
int calls; /*!< Number of calls serviced by this member */
int dynamic; /*!< Are we dynamically added? */
- int realtime; /*!< Is this member realtime? */
- int status; /*!< Status of queue member */
- int paused; /*!< Are we paused (not accepting calls)? */
+ int realtime:1; /*!< Is this member realtime? */
+ int paused:1; /*!< Are we paused (not accepting calls)? */
time_t lastcall; /*!< When last successful call was hungup */
struct call_queue *lastqueue; /*!< Last queue we received a call */
unsigned int dead:1; /*!< Used to detect members deleted in realtime */
unsigned int delme:1; /*!< Flag to delete entry on reload */
char rt_uniqueid[80]; /*!< Unique id of realtime member entry */
unsigned int ignorebusy:1; /*!< Flag to ignore member if the status is not available */
+ struct mem_state *device; /*!< Device information */
};
enum empty_conditions {
@@ -1218,6 +1215,7 @@
static AST_LIST_HEAD_STATIC(rule_lists, rule_list);
static struct ao2_container *queues;
+static struct ao2_container *devices;
static void update_realtime_members(struct call_queue *q);
static struct member *interface_exists(struct call_queue *q, const char *interface);
@@ -1292,34 +1290,35 @@
static int queue_cmp_cb(void *obj, void *arg, int flags)
{
- struct call_queue *q = obj, *q2 = arg;
- return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
-}
-
-#ifdef REF_DEBUG_ONLY_QUEUES
-#define queue_ref(a) __ao2_ref_debug(a,1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_unref(a) __ao2_ref_debug(a,-1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_ref(a,b) __ao2_ref_debug(a,1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_unref(a,b) __ao2_ref_debug(a,-1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_link(c,q,tag) __ao2_link_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_unlink(c,q,tag) __ao2_unlink_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#else
-#define queue_t_ref(a,b) queue_ref(a)
-#define queue_t_unref(a,b) queue_unref(a)
-#define queues_t_link(c,q,tag) ao2_t_link(c,q,tag)
-#define queues_t_unlink(c,q,tag) ao2_t_unlink(c,q,tag)
-static inline struct call_queue *queue_ref(struct call_queue *q)
-{
- ao2_ref(q, 1);
- return q;
-}
-
-static inline struct call_queue *queue_unref(struct call_queue *q)
-{
- ao2_ref(q, -1);
- return NULL;
-}
-#endif
+ struct call_queue *q = obj;
+
+ if (flags & OBJ_POINTER) {
+ struct call_queue *q2 = arg;
+ return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
+ } else {
+ char *name = arg;
+ return !strcasecmp(q->name, name) ? CMP_MATCH | CMP_STOP : 0;
+ }
+}
+
+static int device_hash_cb(const void *obj, const int flags)
+{
+ const struct mem_state *d = obj;
+
+ return ast_str_case_hash(d->state_interface);
+}
+
+static int device_cmp_cb(void *obj, void *arg, int flags)
+{
+ struct mem_state *d = obj;
+ if (flags & OBJ_POINTER) {
+ struct mem_state *d2 = arg;
+ return !strcasecmp(d->state_interface, d2->state_interface) ? CMP_MATCH | CMP_STOP : 0;
+ } else {
+ char *iface = arg;
+ return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
+ }
+}
/*! \brief Set variables of queue */
static void set_queue_variables(struct call_queue *q, struct ast_channel *chan)
@@ -1365,7 +1364,7 @@
/* every queue_ent must have a reference to it's parent call_queue, this
* reference does not go away until the end of the queue_ent's life, meaning
* that even when the queue_ent leaves the call_queue this ref must remain. */
- queue_ref(q);
+ ao2_ref(q, 1);
new->parent = q;
new->pos = ++(*pos);
new->opos = *pos;
@@ -1392,7 +1391,7 @@
}
}
- switch (member->status) {
+ switch (member->device->status) {
case AST_DEVICE_INVALID:
if (conditions & QUEUE_EMPTY_INVALID) {
ast_debug(4, "%s is unavailable because his device state is 'invalid'\n", member->membername);
@@ -1447,93 +1446,59 @@
return -1;
}
-struct statechange {
- AST_LIST_ENTRY(statechange) entry;
- int state;
- char dev[0];
-};
-
-/*! \brief set a member's status based on device state of that member's state_interface.
- *
- * Lock interface list find sc, iterate through each queues queue_member list for member to
- * update state inside queues
+/*! \brief send a QueueMemberStatus manager_event
*/
-static int update_status(struct call_queue *q, struct member *m, const int status)
-{
- m->status = status;
-
- if (q->maskmemberstatus)
- return 0;
-
- manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
- "Queue: %s\r\n"
- "Location: %s\r\n"
- "MemberName: %s\r\n"
- "StateInterface: %s\r\n"
- "Membership: %s\r\n"
- "Penalty: %d\r\n"
- "CallsTaken: %d\r\n"
- "LastCall: %d\r\n"
- "Status: %d\r\n"
- "Paused: %d\r\n",
- q->name, m->interface, m->membername, m->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
- m->penalty, m->calls, (int)m->lastcall, m->status, m->paused
- );
-
- return 0;
-}
-
-/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
- struct statechange *sc = datap;
- struct ao2_iterator miter, qiter;
+static void update_status(struct mem_state *s)
+{
+ struct ao2_iterator qiter;
+ struct ao2_iterator miter;
+ struct call_queue *q;
struct member *m;
- struct call_queue *q;
- char interface[80], *slash_pos;
- int found = 0;
qiter = ao2_iterator_init(queues, 0);
- while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
+ while ((q = ao2_iterator_next(&qiter))) {
ao2_lock(q);
-
+ if (q->maskmemberstatus) {
+ ao2_ref(q, -1);
+ ao2_unlock(q);
+ continue;
+ }
miter = ao2_iterator_init(q->members, 0);
- for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
- ast_copy_string(interface, m->state_interface, sizeof(interface));
-
- if ((slash_pos = strchr(interface, '/')))
- if (!strncasecmp(interface, "Local/", 6) && (slash_pos = strchr(slash_pos + 1, '/')))
- *slash_pos = '\0';
-
- if (!strcasecmp(interface, sc->dev)) {
- found = 1;
- update_status(q, m, sc->state);
+ while((m = ao2_iterator_next(&miter))) {
+ if (strcmp(s->state_interface, s->state_interface)) {
ao2_ref(m, -1);
- break;
- }
+ continue;
+ }
+ manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
+ "Queue: %s\r\n"
+ "Location: %s\r\n"
+ "MemberName: %s\r\n"
+ "StateInterface: %s\r\n"
+ "Membership: %s\r\n"
+ "Penalty: %d\r\n"
+ "CallsTaken: %d\r\n"
+ "LastCall: %d\r\n"
+ "Status: %d\r\n"
+ "Paused: %d\r\n",
+ q->name, m->interface, m->membername, s->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
+ m->penalty, m->calls, (int)m->lastcall, s->status, m->paused
+ );
+ ao2_ref(m, -1);
}
ao2_iterator_destroy(&miter);
-
ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
+ ao2_ref(q, -1);
}
ao2_iterator_destroy(&qiter);
-
- if (found)
- ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state));
- else
- ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-
- ast_free(sc);
- return 0;
-}
-
+}
+
+
+/*! \brief callback used when device state changes*/
static void device_state_cb(const struct ast_event *event, void *unused)
{
enum ast_device_state state;
const char *device;
- struct statechange *sc;
- size_t datapsize;
+ struct mem_state *s;
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -1542,15 +1507,14 @@
ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
return;
}
- datapsize = sizeof(*sc) + strlen(device) + 1;
- if (!(sc = ast_calloc(1, datapsize))) {
- ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
- return;
- }
- sc->state = state;
- strcpy(sc->dev, device);
- if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
- ast_free(sc);
+
+ if ((s = ao2_find(devices, (void *)device, 0))) {
+ s->status = state;
+ update_status(s);
+ ao2_ref(s, -1);
+ ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
+ } else {
+ ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
}
}
@@ -1588,74 +1552,107 @@
static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
{
- struct ao2_iterator miter, qiter;
- struct member *m;
- struct call_queue *q;
- int found = 0, device_state = extensionstate2devicestate(state);
-
- qiter = ao2_iterator_init(queues, 0);
- while ((q = ao2_t_iterator_next(&qiter, "Iterate through queues"))) {
- ao2_lock(q);
-
- miter = ao2_iterator_init(q->members, 0);
- for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
- if (!strcmp(m->state_context, context) && !strcmp(m->state_exten, exten)) {
- update_status(q, m, device_state);
- ao2_ref(m, -1);
- found = 1;
- break;
- }
- }
- ao2_iterator_destroy(&miter);
-
- ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
- }
- ao2_iterator_destroy(&qiter);
-
- if (found) {
- ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, device_state, ast_devstate2str(device_state));
+ struct mem_state *s;
+ char *device;
+
+ if (!asprintf(&device, "hint:%s@%s", exten, context)) {
+ ast_log(LOG_WARNING, "asprintf() failed: %s\n", strerror(errno));
+ return 0;
+ }
+
+ if ((s = ao2_find(devices, device, 0))) {
+ s->status = extensionstate2devicestate(state);
+ update_status(s);
+ ao2_ref(s, -1);
+ ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, s->status, ast_devstate2str(s->status));
} else {
- ast_debug(3, "Extension '%s@%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
- exten, context, device_state, ast_devstate2str(device_state));
+ ast_debug(3, "Extension '%s@%s' changed state but we don't care because they're not a member of any queue.\n",
+ exten, context);
}
return 0;
}
-/*! \brief Return the current state of a member */
-static int get_queue_member_status(struct member *cur)
-{
- return ast_strlen_zero(cur->state_exten) ? ast_device_state(cur->state_interface) : extensionstate2devicestate(ast_extension_state(NULL, cur->state_context, cur->state_exten));
+static void remove_queue_member(void *data) {
+ struct member *mem = data;
+ struct mem_state *s = mem->device;
+
+ if (s) {
+ /* we have a device lets unlink and unref it*/
+ /* remove our ref*/
+ if (ao2_ref(s, -1) == 2) {
+ /* we were the only consumer unlink*/
+ ao2_unlink(devices, s);
+ }
+ }
+
+}
+
+static struct mem_state *create_member_state(const char *state_interface) {
+ struct mem_state *state;
+ char *dev_int;
+ char *device;
+ char *exten;
+ char *context;
+
+ dev_int = ast_strdupa(state_interface);
+ /* ref will be held for each shared member and one ref for container */
+ if (!(state = ao2_find(devices, dev_int, 0))) {
+ if ((state = ao2_alloc(sizeof(*state), NULL))) {
+ ao2_link(devices, state);
+ state->reserved = 0;
+ } else {
+ return NULL;
+ }
+ }
+
+ if (!strncmp(dev_int, "hint:", 5)) {
+ context = ast_strdupa(dev_int);
+ exten = strsep(&context, "@") + 5;
+
+ if (context) {
+ ast_copy_string(state->state_interface, dev_int, sizeof(state->state_interface));
+ } else {
+ asprintf(&device, "%s at default", dev_int);
+ ast_copy_string(state->state_interface, device, sizeof(state->state_interface));
+ }
+ state->status = extensionstate2devicestate(ast_extension_state(NULL, context, exten));
+ } else {
+ ast_copy_string(state->state_interface, dev_int, sizeof(state->state_interface));
+ state->status = ast_device_state(state->state_interface);
+ }
+
+ return state;
}
/*! \brief allocate space for new queue member and set fields based on parameters passed */
static struct member *create_queue_member(const char *interface, const char *membername, int penalty, int paused, const char *state_interface)
{
struct member *cur;
-
- if ((cur = ao2_alloc(sizeof(*cur), NULL))) {
- cur->penalty = penalty;
- cur->paused = paused;
- ast_copy_string(cur->interface, interface, sizeof(cur->interface));
- if (!ast_strlen_zero(state_interface))
- ast_copy_string(cur->state_interface, state_interface, sizeof(cur->state_interface));
- else
- ast_copy_string(cur->state_interface, interface, sizeof(cur->state_interface));
- if (!ast_strlen_zero(membername))
- ast_copy_string(cur->membername, membername, sizeof(cur->membername));
- else
- ast_copy_string(cur->membername, interface, sizeof(cur->membername));
- if (!strchr(cur->interface, '/'))
- ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
- if (!strncmp(cur->state_interface, "hint:", 5)) {
- char *tmp = ast_strdupa(cur->state_interface), *context = tmp;
- char *exten = strsep(&context, "@") + 5;
-
- ast_copy_string(cur->state_exten, exten, sizeof(cur->state_exten));
- ast_copy_string(cur->state_context, S_OR(context, "default"), sizeof(cur->state_context));
- }
- cur->status = get_queue_member_status(cur);
+ const char* state_int;
+
+ if (!(cur = ao2_alloc(sizeof(*cur), remove_queue_member))) {
+ return NULL;
+ }
+
+ /* setup device state*/
+ state_int = (!ast_strlen_zero(state_interface)) ? state_interface : interface;
+ if (!(cur->device = create_member_state(state_int))) {
+ ao2_ref(cur, -1);
+ return NULL;
+ }
+ ast_copy_string(cur->interface, interface, sizeof(cur->interface));
+ if (!strchr(cur->interface, '/')) {
+ ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
+ }
+
+ cur->penalty = penalty;
+ cur->paused = paused;
+
+ if (!ast_strlen_zero(membername)) {
+ ast_copy_string(cur->membername, membername, sizeof(cur->membername));
+ } else {
+ ast_copy_string(cur->membername, interface, sizeof(cur->membername));
}
return cur;
@@ -1686,11 +1683,30 @@
static int member_cmp_fn(void *obj1, void *obj2, int flags)
{
- struct member *mem1 = obj1, *mem2 = obj2;
- return strcasecmp(mem1->interface, mem2->interface) ? 0 : CMP_MATCH | CMP_STOP;
-}
-
-/*!
+ struct member *mem1 = obj1;
+
+ if (flags & OBJ_POINTER) {
+ struct member *mem2 = obj2;
+ return strcasecmp(mem1->interface, mem2->interface) ? 0 : CMP_MATCH | CMP_STOP;
+ } else {
+ char *arg = obj2;
+ return strcasecmp(mem1->interface, arg) ? 0 : CMP_MATCH | CMP_STOP;
+ }
+}
+
+static int member_cmp_uniqueid_fn(void *obj1, void *arg, int flags)
+{
+ struct member *mem1 = obj1;
+ if (flags & OBJ_POINTER) {
+ struct member *mem2 = arg;
+ return strcasecmp(mem1->rt_uniqueid, mem2->rt_uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+ } else {
+ char *uniqueid = arg;
+ return strcasecmp(mem1->rt_uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+ }
+}
+
+/*!
* \brief Initialize Queue default values.
* \note the queue's lock must be held before executing this function
*/
@@ -1771,6 +1787,14 @@
ast_free(pr_iter);
}
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+ struct member *mem = obj1;
+ mem->calls = 0;
+ mem->lastcall = 0;
+ return 0;
+}
+
static void clear_queue(struct call_queue *q)
{
q->holdtime = 0;
@@ -1780,25 +1804,18 @@
q->talktime = 0;
if (q->members) {
- struct member *mem;
- struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0);
- while ((mem = ao2_iterator_next(&mem_iter))) {
- mem->calls = 0;
- mem->lastcall = 0;
- ao2_ref(mem, -1);
- }
- ao2_iterator_destroy(&mem_iter);
- }
-}
-
-/*!
+ ao2_callback(q->members, OBJ_NODATA, clear_queue_member_fn, NULL);
+ }
+}
+
+/*!
* \brief Change queue penalty by adding rule.
*
- * Check rule for errors with time or fomatting, see if rule is relative to rest
+ * Check rule for errors with time or fomatting, see if rule is relative to rest
* of queue, iterate list of rules to find correct insertion point, insert and return.
* \retval -1 on failure
- * \retval 0 on success
- * \note Call this with the rule_lists locked
+ * \retval 0 on success
+ * \note Call this with the rule_lists locked
*/
static int insert_penaltychange(const char *list_name, const char *content, const int linenum)
{
@@ -2119,14 +2136,13 @@
*
* Search for member in queue, if found update penalty/paused state,
* if no member exists create one flag it as a RT member and add to queue member list.
+ * q is locked here.
*/
static void rt_handle_member_record(struct call_queue *q, char *interface, struct ast_config *member_config)
{
struct member *m;
- struct ao2_iterator mem_iter;
int penalty = 0;
int paused = 0;
- int found = 0;
int ignorebusy = 0;
const char *config_val;
@@ -2152,8 +2168,9 @@
if (paused_str) {
paused = atoi(paused_str);
- if (paused < 0)
+ if (paused < 0) {
paused = 0;
+ }
}
if ((config_val = ast_variable_retrieve(member_config, interface, "ignorebusy"))) {
@@ -2163,43 +2180,36 @@
}
/* Find member by realtime uniqueid and update */
- mem_iter = ao2_iterator_init(q->members, 0);
- while ((m = ao2_iterator_next(&mem_iter))) {
- if (!strcasecmp(m->rt_uniqueid, rt_uniqueid)) {
- m->dead = 0; /* Do not delete this one. */
- ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
- if (paused_str)
- m->paused = paused;
- if (strcasecmp(state_interface, m->state_interface)) {
- ast_copy_string(m->state_interface, state_interface, sizeof(m->state_interface));
- }
- m->penalty = penalty;
- m->ignorebusy = ignorebusy;
- found = 1;
- ao2_ref(m, -1);
- break;
- }
+ if ((m = ao2_callback(q->members, 0, member_cmp_uniqueid_fn, (char *)rt_uniqueid))) {
+ m->dead = 0; /* Do not delete this one. */
+ ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
+ if (paused_str) {
+ m->paused = paused;
+ }
+ if (strcmp(state_interface, m->device->state_interface)) {
+ /* we may be the last ref [outside container] unlink if so */
+ if (ao2_ref(m->device, -1) == 2) {
+ ao2_unlink(devices, m->device);
+ }
+ m->device = create_member_state(state_interface);
+ }
+ m->penalty = penalty;
+ m->ignorebusy = ignorebusy;
ao2_ref(m, -1);
- }
- ao2_iterator_destroy(&mem_iter);
-
- /* Create a new member */
- if (!found) {
- if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) {
- m->dead = 0;
- m->realtime = 1;
- m->ignorebusy = ignorebusy;
- ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
- if (!log_membername_as_agent) {
- ast_queue_log(q->name, "REALTIME", m->interface, "ADDMEMBER", "%s", "");
- } else {
- ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", "");
- }
- ao2_link(q->members, m);
- ao2_ref(m, -1);
- m = NULL;
- q->membercount++;
- }
+ } else if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) {
+ m->dead = 0;
+ m->realtime = 1;
+ m->ignorebusy = ignorebusy;
+ ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
+ if (!log_membername_as_agent) {
+ ast_queue_log(q->name, "REALTIME", m->interface, "ADDMEMBER", "%s", "");
+ } else {
+ ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", "");
+ }
+ ao2_link(q->members, m);
+ ao2_ref(m, -1);
+ m = NULL;
+ q->membercount++;
}
}
@@ -2239,9 +2249,9 @@
{
struct call_queue *q;
- if ((q = ao2_t_alloc(sizeof(*q), destroy_queue, "Allocate queue"))) {
+ if ((q = ao2_alloc(sizeof(*q), destroy_queue))) {
if (ast_string_field_init(q, 64)) {
- queue_t_unref(q, "String field allocation failed");
+ ao2_ref(q, -1);
return NULL;
}
ast_string_field_set(q, name, queuename);
@@ -2255,16 +2265,14 @@
* Check for statically defined queue first, check if deleted RT queue,
* check for new RT queue, if queue vars are not defined init them with defaults.
* reload RT queue vars, set RT queue members dead and reload them, return finished queue.
- * \retval the queue,
+ * \retval the queue,
* \retval NULL if it doesn't exist.
- * \note Should be called with the "queues" container locked.
+ * \note Should be called with the "queues" container locked.
*/
static struct call_queue *find_queue_by_name_rt(const char *queuename, struct ast_variable *queue_vars, struct ast_config *member_config)
{
struct ast_variable *v;
- struct call_queue *q, tmpq = {
- .name = queuename,
- };
+ struct call_queue *q;
struct member *m;
struct ao2_iterator mem_iter;
char *interface = NULL;
@@ -2273,12 +2281,12 @@
char tmpbuf[64]; /* Must be longer than the longest queue param name. */
/* Static queues override realtime. */
- if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Check if static queue exists"))) {
+ if ((q = ao2_find(queues, (char *)queuename, 0))) {
ao2_lock(q);
if (!q->realtime) {
if (q->dead) {
ao2_unlock(q);
- queue_t_unref(q, "Queue is dead; can't return it");
+ ao2_ref(q, -1);
return NULL;
} else {
ast_log(LOG_WARNING, "Static queue '%s' already exists. Not loading from realtime\n", q->name);
@@ -2286,9 +2294,10 @@
return q;
}
}
- } else if (!member_config)
+ } else if (!member_config) {
/* Not found in the list, and it's not realtime ... */
return NULL;
+ }
/* Check if queue is defined in realtime. */
if (!queue_vars) {
@@ -2301,9 +2310,9 @@
q->dead = 1;
/* Delete if unused (else will be deleted when last caller leaves). */
- queues_t_unlink(queues, q, "Unused; removing from container");
+ ao2_unlink(queues, q);
ao2_unlock(q);
- queue_t_unref(q, "Queue is dead; can't return it");
+ ao2_ref(q, -1);
}
return NULL;
}
@@ -2311,9 +2320,9 @@
/* Create a new queue if an in-core entry does not exist yet. */
if (!q) {
struct ast_variable *tmpvar = NULL;
- if (!(q = alloc_queue(queuename)))
+ if (!(q = alloc_queue(queuename))) {
return NULL;
- ao2_lock(q);
+ }
clear_queue(q);
q->realtime = 1;
q->membercount = 0;
@@ -2332,9 +2341,11 @@
}
}
/* We traversed all variables and didn't find a strategy */
- if (!tmpvar)
+ if (!tmpvar) {
q->strategy = QUEUE_STRATEGY_RINGALL;
- queues_t_link(queues, q, "Add queue to container");
+ }
+ ao2_link(queues, q);
+ ao2_lock(q);
}
init_queue(q); /* Ensure defaults for all parameters not set explicitly. */
@@ -2361,8 +2372,9 @@
mem_iter = ao2_iterator_init(q->members, 0);
while ((m = ao2_iterator_next(&mem_iter))) {
q->membercount++;
- if (m->realtime)
+ if (m->realtime) {
m->dead = 1;
+ }
ao2_ref(m, -1);
}
ao2_iterator_destroy(&mem_iter);
@@ -2397,13 +2409,11 @@
{
struct ast_variable *queue_vars;
struct ast_config *member_config = NULL;
- struct call_queue *q = NULL, tmpq = {
- .name = queuename,
- };
+ struct call_queue *q;
int prev_weight = 0;
/* Find the queue in the in-core list first. */
- q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Look for queue in memory first");
+ q = ao2_find(queues, (char *)queuename, 0);
if (!q || q->realtime) {
/*! \note Load from realtime before taking the "queues" container lock, to avoid blocking all
@@ -2426,7 +2436,7 @@
}
if (q) {
prev_weight = q->weight ? 1 : 0;
- queue_t_unref(q, "Need to find realtime queue");
+ ao2_ref(q, -1);
}
q = find_queue_by_name_rt(queuename, queue_vars, member_config);
@@ -2518,9 +2528,9 @@
int pos = 0;
int inserted = 0;
- if (!(q = load_realtime_queue(queuename)))
+ if (!(q = load_realtime_queue(queuename))) {
return res;
-
+ }
ao2_lock(q);
/* This is our one */
@@ -2529,7 +2539,7 @@
if ((status = get_member_status(q, qe->max_penalty, qe->min_penalty, q->joinempty))) {
*reason = QUEUE_JOINEMPTY;
ao2_unlock(q);
- queue_t_unref(q, "Done with realtime queue");
+ ao2_ref(q, -1);
return res;
}
}
@@ -2592,7 +2602,7 @@
ast_debug(1, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos );
}
ao2_unlock(q);
- queue_t_unref(q, "Done with realtime queue");
+ ao2_ref(q, -1);
return res;
}
@@ -2829,9 +2839,10 @@
struct penalty_rule *pr_iter;
int pos = 0;
- if (!(q = qe->parent))
+ if (!(q = qe->parent)) {
return;
- queue_t_ref(q, "Copy queue pointer from queue entry");
+ }
+ ao2_ref(q, 1);
ao2_lock(q);
prev = NULL;
@@ -2873,12 +2884,12 @@
}
}
- if (q->dead) {
+ if (q->dead) {
/* It's dead and nobody is in it, so kill it */
- queues_t_unlink(queues, q, "Queue is now dead; remove it from the container");
+ ao2_unlink(queues, q);
}
/* unref the explicit ref earlier in the function */
- queue_t_unref(q, "Expire copied reference");
+ ao2_ref(q, -1);
}
/*!
@@ -2892,6 +2903,7 @@
*/
static void callattempt_free(struct callattempt *doomed)
{
+ doomed->member->device->reserved--;
if (doomed->member) {
ao2_ref(doomed->member, -1);
}
@@ -2935,7 +2947,7 @@
mem_iter = ao2_iterator_init(q->members, 0);
while ((mem = ao2_iterator_next(&mem_iter))) {
- switch (mem->status) {
+ switch (mem->device->status) {
case AST_DEVICE_INVALID:
case AST_DEVICE_UNAVAILABLE:
break;
@@ -2950,9 +2962,12 @@
/* else fall through */
case AST_DEVICE_NOT_INUSE:
case AST_DEVICE_UNKNOWN:
- if (!mem->paused) {
- avl++;
+ if (mem->paused ||
+ (mem->device->reserved && ((!q->ringinuse) || (!mem->ignorebusy)))) {
+ break;
}
+ /* else fall through */
+ avl++;
break;
}
ao2_ref(mem, -1);
@@ -2986,9 +3001,9 @@
struct ao2_iterator queue_iter;
queue_iter = ao2_iterator_init(queues, 0);
- while ((q = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
+ while ((q = ao2_iterator_next(&queue_iter))) {
if (q == rq) { /* don't check myself, could deadlock */
- queue_t_unref(q, "Done with iterator");
+ ao2_ref(q, -1);
continue;
}
ao2_lock(q);
@@ -3003,7 +3018,7 @@
}
}
ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
+ ao2_ref(q, -1);
if (found) {
break;
}
@@ -3079,7 +3094,7 @@
char tech[256];
char *location;
const char *macrocontext, *macroexten;
- enum ast_device_state newstate;
+ struct mem_state *s = tmp->member->device;
/* on entry here, we know that tmp->chan == NULL */
if (tmp->member->paused) {
@@ -3104,15 +3119,7 @@
}
if (!qe->parent->ringinuse || !tmp->member->ignorebusy) {
- if (check_state_unknown && (tmp->member->status == AST_DEVICE_UNKNOWN)) {
- newstate = ast_device_state(tmp->member->interface);
- if (newstate != tmp->member->status) {
- ast_log(LOG_WARNING, "Found a channel matching iterface %s while status was %s changed to %s\n",
- tmp->member->interface, ast_devstate2str(tmp->member->status), ast_devstate2str(newstate));
- ast_devstate_changed_literal(newstate, tmp->member->interface);
- }
- }
- if ((tmp->member->status != AST_DEVICE_NOT_INUSE) && (tmp->member->status != AST_DEVICE_UNKNOWN)) {
+ if ((s->reserved > 1) || ((s->status != AST_DEVICE_NOT_INUSE) && (s->status != AST_DEVICE_UNKNOWN))) {
ast_debug(1, "%s in use, can't receive call\n", tmp->interface);
if (qe->chan->cdr) {
ast_cdr_busy(qe->chan->cdr);
@@ -3144,10 +3151,9 @@
if (qe->chan->cdr) {
ast_cdr_busy(qe->chan->cdr);
}
- tmp->stillgoing = 0;
+ tmp->stillgoing = 0;
ao2_lock(qe->parent);
- update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
qe->parent->rrpos++;
qe->linpos++;
ao2_unlock(qe->parent);
@@ -3231,7 +3237,6 @@
ast_channel_unlock(qe->chan);
do_hang(tmp);
(*busies)++;
- update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
return 0;
} else if (qe->parent->eventwhencalled) {
char vars[2048];
@@ -3263,7 +3268,6 @@
ast_channel_unlock(tmp->chan);
ast_channel_unlock(qe->chan);
- update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
return 1;
}
@@ -3489,8 +3493,10 @@
time_t idletime = time(&idletime)-mem->lastcall;
if ((mem->lastcall != 0) && (qe->parent->autopausedelay > idletime)) {
ao2_unlock(qe->parent);
+ ao2_ref(mem, -1);
return;
}
+ ao2_ref(mem, -1);
}
ao2_unlock(qe->parent);
}
@@ -4106,11 +4112,11 @@
struct member *mem;
struct call_queue *qtmp;
- struct ao2_iterator queue_iter;
-
+ struct ao2_iterator queue_iter;
+
if (shared_lastcall) {
queue_iter = ao2_iterator_init(queues, 0);
- while ((qtmp = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
+ while ((qtmp = ao2_iterator_next(&queue_iter))) {
ao2_lock(qtmp);
if ((mem = ao2_find(qtmp->members, member, OBJ_POINTER))) {
time(&mem->lastcall);
@@ -4119,7 +4125,7 @@
ao2_ref(mem, -1);
}
ao2_unlock(qtmp);
- queue_t_unref(qtmp, "Done with iterator");
+ ao2_ref(qtmp, -1);
}
ao2_iterator_destroy(&queue_iter);
} else {
@@ -4376,12 +4382,12 @@
if (ao2_ref(qeb, -1) == 1) {
set_queue_variables(q, chan);
/* This unrefs the reference we made in try_calling when we allocated qeb */
- queue_t_unref(q, "Expire bridge_config reference");
+ ao2_ref(q, -1);
}
}
/*! \brief A large function which calls members, updates statistics, and bridges the caller and a member
- *
+ *
* Here is the process of this function
* 1. Process any options passed to the Queue() application. Options here mean the third argument to Queue()
* 2. Iterate trough the members of the queue, creating a callattempt corresponding to each member. During this
@@ -4621,6 +4627,7 @@
tmp->stillgoing = -1;
tmp->member = cur;/* Place the reference for cur into callattempt. */
+ tmp->member->device->reserved++;
tmp->lastcall = cur->lastcall;
tmp->lastqueue = cur->lastqueue;
ast_copy_string(tmp->interface, cur->interface, sizeof(tmp->interface));
@@ -5141,7 +5148,7 @@
* to make sure to increase the refcount of this queue so it cannot be freed until we
* are done with it. We remove this reference in end_bridge_callback.
*/
- queue_t_ref(qe->parent, "For bridge_config reference");
+ ao2_ref(qe->parent, 1);
}
time(&callstart);
@@ -5210,23 +5217,11 @@
static struct member *interface_exists(struct call_queue *q, const char *interface)
{
- struct member *mem;
- struct ao2_iterator mem_iter;
-
- if (!q)
+ if (q) {
+ return ao2_find(q->members, (char *)interface, 0);
+ } else {
return NULL;
-
- mem_iter = ao2_iterator_init(q->members, 0);
- while ((mem = ao2_iterator_next(&mem_iter))) {
- if (!strcasecmp(interface, mem->interface)) {
- ao2_iterator_destroy(&mem_iter);
- return mem;
- }
- ao2_ref(mem, -1);
- }
- ao2_iterator_destroy(&mem_iter);
-
- return NULL;
+ }
}
@@ -5255,7 +5250,7 @@
}
res = snprintf(value + value_len, sizeof(value) - value_len, "%s%s;%d;%d;%s;%s",
- value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->state_interface);
+ value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->device->state_interface);
ao2_ref(cur_member, -1);
@@ -5283,14 +5278,12 @@
*/
static int remove_from_queue(const char *queuename, const char *interface)
{
- struct call_queue *q, tmpq = {
- .name = queuename,
- };
+ struct call_queue *q;
struct member *mem, tmpmem;
int res = RES_NOSUCHQUEUE;
ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface));
- if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Temporary reference for interface removal"))) {
+ if ((q = ao2_find(queues, (char *)queuename, 0))) {
ao2_lock(q);
if ((mem = ao2_find(q->members, &tmpmem, OBJ_POINTER))) {
/* XXX future changes should beware of this assumption!! */
@@ -5300,7 +5293,7 @@
} else if (!mem->dynamic) {
ao2_ref(mem, -1);
ao2_unlock(q);
- queue_t_unref(q, "Interface wasn't dynamic, expiring temporary reference");
+ ao2_ref(q, -1);
return RES_NOT_DYNAMIC;
}
q->membercount--;
@@ -5320,7 +5313,7 @@
res = RES_EXISTS;
}
ao2_unlock(q);
- queue_t_unref(q, "Expiring temporary reference");
+ ao2_ref(q, -1);
}
return res;
@@ -5341,11 +5334,12 @@
/*! \note Ensure the appropriate realtime queue is loaded. Note that this
* short-circuits if the queue is already in memory. */
- if (!(q = load_realtime_queue(queuename)))
+ if (!(q = load_realtime_queue(queuename))) {
return res;
+ }
ao2_lock(q);
- if ((old_member = interface_exists(q, interface)) == NULL) {
+ if (!(old_member = interface_exists(q, interface))) {
if ((new_member = create_queue_member(interface, membername, penalty, paused, state_interface))) {
new_member->dynamic = 1;
ao2_link(q->members, new_member);
@@ -5364,14 +5358,15 @@
q->name, new_member->interface, new_member->membername, state_interface,
"dynamic",
new_member->penalty, new_member->calls, (int) new_member->lastcall,
- new_member->status, new_member->paused);
-
+ new_member->device->status, new_member->paused);
+
ao2_ref(new_member, -1);
new_member = NULL;
- if (dump)
+ if (dump) {
dump_queue_members(q);
-
+ }
+
res = RES_OKAY;
} else {
res = RES_OUTOFMEMORY;
@@ -5381,7 +5376,7 @@
res = RES_EXISTS;
}
ao2_unlock(q);
- queue_t_unref(q, "Expiring temporary reference");
+ ao2_ref(q, -1);
return res;
}
@@ -5392,7 +5387,6 @@
struct call_queue *q;
struct member *mem;
struct ao2_iterator queue_iter;
- int failed;
/* Special event for when all queues are paused - individual events still generated */
/* XXX In all other cases, we use the membername, but since this affects all queues, we cannot */
@@ -5400,62 +5394,52 @@
ast_queue_log("NONE", "NONE", interface, (paused ? "PAUSEALL" : "UNPAUSEALL"), "%s", "");
queue_iter = ao2_iterator_init(queues, 0);
- while ((q = ao2_t_iterator_next(&queue_iter, "Iterate over queues"))) {
+ while ((q = ao2_iterator_next(&queue_iter))) {
+ if (!ast_strlen_zero(queuename) && (strcasecmp(queuename, q->name))) {
+ ao2_ref(q, -1);
+ continue;
+ }
ao2_lock(q);
- if (ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename)) {
- if ((mem = interface_exists(q, interface))) {
- if (mem->paused == paused) {
- ast_debug(1, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
- }
-
- failed = 0;
- if (mem->realtime) {
- failed = update_realtime_member_field(mem, q->name, "paused", paused ? "1" : "0");
- }
-
- if (failed) {
- ast_log(LOG_WARNING, "Failed %spausing realtime queue member %s:%s\n", (paused ? "" : "un"), q->name, interface);
- ao2_ref(mem, -1);
- ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
- continue;
- }
- found++;
- mem->paused = paused;
-
- if (queue_persistent_members)
- dump_queue_members(q);
-
- ast_queue_log(q->name, "NONE", mem->membername, (paused ? "PAUSE" : "UNPAUSE"), "%s", S_OR(reason, ""));
-
- if (!ast_strlen_zero(reason)) {
- manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
- "Queue: %s\r\n"
- "Location: %s\r\n"
- "MemberName: %s\r\n"
- "Paused: %d\r\n"
- "Reason: %s\r\n",
- q->name, mem->interface, mem->membername, paused, reason);
- } else {
- manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
- "Queue: %s\r\n"
- "Location: %s\r\n"
- "MemberName: %s\r\n"
- "Paused: %d\r\n",
- q->name, mem->interface, mem->membername, paused);
- }
- ao2_ref(mem, -1);
- }
- }
-
+ if ((mem = interface_exists(q, interface))) {
+ found = 1;
+ if (mem->paused == paused) {
+ ast_debug(1, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
+ }
+
+ if ((mem->realtime) && (update_realtime_member_field(mem, q->name, "paused", paused ? "1" : "0"))) {
+ ast_log(LOG_WARNING, "Failed %spausing realtime queue member %s:%s\n", (paused ? "" : "un"), q->name, interface);
+ }
+ mem->paused = paused;
+
+ if (queue_persistent_members) {
+ dump_queue_members(q);
+ }
+
+ ast_queue_log(q->name, "NONE", mem->membername, (paused ? "PAUSE" : "UNPAUSE"), "%s", S_OR(reason, ""));
+
+ if (!ast_strlen_zero(reason)) {
+ manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+ "Queue: %s\r\n"
+ "Location: %s\r\n"
+ "MemberName: %s\r\n"
+ "Paused: %d\r\n"
+ "Reason: %s\r\n",
+ q->name, mem->interface, mem->membername, paused, reason);
+ } else {
+ manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+ "Queue: %s\r\n"
+ "Location: %s\r\n"
+ "MemberName: %s\r\n"
+ "Paused: %d\r\n",
+ q->name, mem->interface, mem->membername, paused);
+ }
+ ao2_ref(mem, -1);
+ }
+ ao2_unlock(q);
+ ao2_ref(q, -1);
if (!ast_strlen_zero(queuename) && !strcasecmp(queuename, q->name)) {
- ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
break;
}
-
- ao2_unlock(q);
- queue_t_unref(q, "Done with iterator");
}
ao2_iterator_destroy(&queue_iter);
@@ -5500,44 +5484,43 @@
if (foundinterface) {
return RESULT_SUCCESS;
} else if (!foundqueue) {
- ast_log (LOG_ERROR, "Invalid queuename\n");
+ ast_log (LOG_ERROR, "Invalid queuename\n");
} else {
ast_log (LOG_ERROR, "Invalid interface\n");
- }
+ }
return RESULT_FAILURE;
}
[... 808 lines stripped ...]
More information about the asterisk-commits
mailing list