[asterisk-commits] irroot: branch irroot/asterisk-trunk-quack-queue r343161 - /team/irroot/aster...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Nov 2 16:47:17 CDT 2011
Author: irroot
Date: Wed Nov 2 16:47:13 2011
New Revision: 343161
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=343161
Log:
Use the container lock and per member lock
Modified:
team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c
Modified: team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c?view=diff&rev=343161&r1=343160&r2=343161
==============================================================================
--- team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c (original)
+++ team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c Wed Nov 2 16:47:13 2011
@@ -25,6 +25,18 @@
* \arg Config in \ref Config_qu queues.conf
*
* \par Development notes
+ * \note 2011-11-01: Made application more thread safe
+ * Distrotech PTY (LTD) (www.distrotech.co.za)
+ * Gregory Nietsky (irroot) <gregory at distrotech.co.za>
+ *
+ * Split away from locking queues/queue to having locks per
+ * Queue/Member/Device.
+ * Added device state struct to mange device states shared
+ * for multiple members sharing same device.
+ *
+ * Made all functions work with realtime/dynamic/static members.
+ * Added missing CLI/AMI functions for handling ignorebusy.
+ *
* \note 2004-11-25: Persistent Dynamic Members added by:
* NetNation Communications (www.netnation.com)
* Kevin Lindsay <kevinl at netnation.com>
@@ -102,16 +114,27 @@
/*!
* \par Please read before modifying this file.
- * There are three locks which are regularly used
- * throughout this file, the queue list lock, the lock
- * for each individual queue, and the interface list lock.
+ * There are locks which are regularly used
+ * throughout this file, the lock
+ * for each individual queue, the individual member lock ,
+ * and the device state lock.
+ * there are container locks for the queues list, the member
+ * list on each queue and the devices container.
* Please be extra careful to always lock in the following order
- * 1) queue list lock
- * 2) individual queue lock
- * 3) interface list lock
+ *
+ * 1) queues container lock (ao2 functions that lock the container)
+ * 2) queue lock
+ * 3) queue member lock
+ * 4) member lock
+ * 5) devices container lock
+ * 6) member device lock
* This order has sort of "evolved" over the lifetime of this
* application, but it is now in place this way, so please adhere
* to this order!
+ *
+ * the only elements that do not require a read lock (ie will not change)
+ * the queue name and members container, the member interface and
+ * device state_interface as they are never altered after been linked.
*/
/*** DOCUMENTATION
@@ -1066,6 +1089,7 @@
struct queue_ent *next; /*!< The next queue entry */
};
+/*! \brief keep track of device state changes */
struct mem_state {
char state_interface[80]; /*!< Technology/Location from which to read devicestate changes */
int reserved; /*!< This interface is reserved for pending call */
@@ -1250,6 +1274,7 @@
}
}
+/*! \brief return strategy name from strategy*/
static const char *int2strat(int strategy)
{
int x;
@@ -1262,6 +1287,7 @@
return "<unknown>";
}
+/*! brief return strategy from strategy name*/
static int strat2int(const char *strategy)
{
int x;
@@ -1274,6 +1300,7 @@
return -1;
}
+/*! brief return a autopause setting from name*/
static int autopause2int(const char *autopause)
{
int x;
@@ -1294,7 +1321,7 @@
return QUEUE_AUTOPAUSE_OFF;
}
-/*! \brief Set variables of queue */
+/*! \brief Set channel variables of queue */
static void set_queue_variables(struct call_queue *q, struct ast_channel *chan)
{
char interfacevar[256]="";
@@ -1319,7 +1346,7 @@
}
}
-/*! \brief Insert the 'new' entry after the 'prev' entry of queue 'q' */
+/*! \brief Insert the 'new' callattempt entry after the 'prev' entry of queue 'q' */
static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, struct queue_ent *new, int *pos)
{
struct queue_ent *cur;
@@ -1344,9 +1371,11 @@
new->opos = *pos;
}
+/*! \brief return the device state for a member*/
static int get_device_status(struct mem_state *s)
{
int ret;
+
ao2_lock(s);
if (s->reserved && ((s->status == AST_DEVICE_NOT_INUSE) || (s->status == AST_DEVICE_UNKNOWN))) {
ret = AST_DEVICE_RINGING;
@@ -1354,6 +1383,7 @@
ret = s->status;
}
ao2_unlock(s);
+
return ret;
}
@@ -1363,21 +1393,27 @@
* is available, the function immediately returns 0. If no members are available,
* then -1 is returned.
*/
-static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, enum empty_conditions conditions)
+static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, int join)
{
struct member *member;
struct ao2_iterator mem_iter;
int status;
-
+ enum empty_conditions conditions;
+
+ ao2_lock(q);
+ conditions = (join) ? q->joinempty : q->leavewhenempty;
ao2_unlock(q);
- mem_iter = ao2_iterator_init(q->members, 0);
+ if (!conditions) {
+ return 0;
+ }
+
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while((member = ao2_iterator_next(&mem_iter))) {
- ao2_lock(member);
if ((max_penalty && (member->penalty > max_penalty)) || (min_penalty && (member->penalty < min_penalty))) {
if (conditions & QUEUE_EMPTY_PENALTY) {
ast_debug(4, "%s is unavailable because his penalty is not between %d and %d\n", member->membername, min_penalty, max_penalty);
- ao2_unlock(member);
ao2_ref(member, -1);
continue;
}
@@ -1425,24 +1461,21 @@
break;
} else {
ast_debug(4, "%s is available.\n", member->membername);
- ao2_unlock(member);
ao2_ref(member, -1);
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
- ao2_lock(q);
return 0;
}
break;
}
- ao2_unlock(member);
ao2_ref(member, -1);
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
- ao2_lock(q);
return -1;
}
-/*! \brief send a QueueMemberStatus manager_event
-*/
+/*! \brief send a QueueMemberStatus manager_event when device state changes*/
static int update_status(void *data)
{
struct ao2_iterator qiter;
@@ -1499,6 +1532,9 @@
return 0;
}
+/*! \brief set the device state of a member explicitly
+ * \note a change update manager event will be sent
+ */
static int set_device_status(const char *device, int status)
{
struct mem_state *s;
@@ -1520,27 +1556,6 @@
}
return 0;
-}
-
-/*! \brief callback used when device state changes*/
-static void device_state_cb(const struct ast_event *event, void *unused)
-{
- enum ast_device_state state;
- const char *device;
-
- state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
- device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
-
- if (ast_strlen_zero(device)) {
- ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
- return;
- }
-
- if (set_device_status(device, state)) {
- ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
- } else {
- ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
- }
}
/*! \brief Helper function which converts from extension state to device state values */
@@ -1575,6 +1590,28 @@
return state;
}
+/*! \brief callback used when device state changes*/
+static void device_state_cb(const struct ast_event *event, void *unused)
+{
+ enum ast_device_state state;
+ const char *device;
+
+ state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
+
+ if (ast_strlen_zero(device)) {
+ ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
+ return;
+ }
+
+ if (set_device_status(device, state)) {
+ ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
+ } else {
+ ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
+ }
+}
+
+/*! \brief callback called when a extension hint is notified of change*/
static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
{
char *device;
@@ -1595,21 +1632,7 @@
return 0;
}
-static void remove_queue_member(void *data) {
- struct member *mem = data;
- struct mem_state *s = mem->device;
-
- if (s) {
- /* we have a device lets unlink and unref it*/
- /* remove our ref*/
- if (ao2_ref(s, -1) == 2) {
- /* we were the only consumer unlink*/
- ao2_unlink(devices, s);
- }
- }
-
-}
-
+/* \brief find or create a member device state object*/
static struct mem_state *create_member_state(const char *state_interface) {
struct mem_state *state;
char *exten;
@@ -1647,7 +1670,7 @@
return state;
}
-/*! \brief Set current state of member if needed*/
+/*! \brief Set current state of member querying channel driver or hint state*/
static void set_queue_member_status(struct member *m)
{
int status;
@@ -1668,6 +1691,7 @@
if (s->status != status) {
s->status = status;
ao2_unlock(s);
+ /* we must pass a ref to the task processor*/
if (!ast_taskprocessor_push(devicestate_tps, update_status, s)) {
ao2_ref(s, 1);
}
@@ -1796,12 +1820,9 @@
const struct member *mem2 = arg;
const char *uniqueid = (flags & OBJ_POINTER) ? mem2->rt_uniqueid : arg;
- ao2_lock(mem1);
if (mem1->realtime && !strcasecmp(mem1->rt_uniqueid, uniqueid)) {
- ao2_unlock(mem1);
return CMP_MATCH | CMP_STOP;
}
- ao2_unlock(mem1);
return 0;
}
@@ -1812,13 +1833,12 @@
static int mark_static_member_dead(void *obj, void *arg, int flags)
{
struct member *member = obj;
- ao2_lock(member);
+
if (!(member->dynamic | member->realtime)) {
member->dead = 1;
- ao2_unlock(member);
return CMP_MATCH;
}
- ao2_unlock(member);
+
return 0;
}
@@ -1829,14 +1849,56 @@
{
struct member *member = obj;
- ao2_lock(member);
if (!(member->dynamic | member->realtime) && member->dead) {
- ao2_unlock(member);
return CMP_MATCH;
}
- ao2_unlock(member);
return 0;
}
+
+/*!
+ * \brief ao2 callback to mark realtime members dead
+ */
+static int mark_realtime_member_dead(void *obj, void *arg, int flags)
+{
+ struct member *member = obj;
+
+ if (member->realtime) {
+ member->dead = 1;
+ return CMP_MATCH;
+ }
+ return 0;
+}
+
+/*!
+ * \brief ao2 callback to delete realtime members marked dead
+ */
+static int kill_realtime_dead_members(void *obj, void *arg, void *data, int flags)
+{
+ struct member *m = obj;
+ struct call_queue *q = data;
+
+ if (m->dead && m->realtime) {
+ if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
+ ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
+ } else {
+ ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
+ }
+ return CMP_MATCH;
+ }
+ return 0;
+}
+
+/*! \brief ao2 callback to reset member counters */
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+ struct member *mem = obj1;
+
+ mem->calls = 0;
+ mem->lastcall = 0;
+
+ return 0;
+}
+
/*!
* \brief ao2 callback to calculate hash of a device by state_interface
@@ -1860,6 +1922,48 @@
const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*! \brief Free queue's member list then its string fields */
+static void destroy_queue(void *obj)
+{
+ struct call_queue *q = obj;
+ int i;
+ struct member *cur;
+ struct ao2_iterator mem_iter;
+
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_UNLINK);
+ while ((cur = ao2_iterator_next(&mem_iter))) {
+ ao2_ref(cur, -1);
+ }
+ ao2_iterator_destroy(&mem_iter);
+
+ ast_string_field_free_memory(q);
+ for (i = 0; i < MAX_PERIODIC_ANNOUNCEMENTS; i++) {
+ if (q->sound_periodicannounce[i])
+ free(q->sound_periodicannounce[i]);
+ }
+ ao2_ref(q->members, -1);
+}
+
+/*! \brief create a new call_queue structure */
+static struct call_queue *alloc_queue(const char *queuename, int rt)
+{
+ struct call_queue *q;
+
+ if ((q = ao2_t_alloc(sizeof(*q), destroy_queue, "Allocate queue"))) {
+ if (ast_string_field_init(q, 64)) {
+ ao2_t_ref(q, -1, "String field allocation failed");
+ return NULL;
+ }
+ ast_string_field_set(q, name, queuename);
+ }
+ q->realtime = rt;
+ q->weight = 0;
+ q->found = 0;
+ ao2_link(queues, q);
+
+ return q;
}
/*!
@@ -1936,30 +2040,6 @@
ast_free(pr_iter);
}
-static int clear_queue_member_fn(void *obj1, void *arg, int flags)
-{
- struct member *mem = obj1;
- ao2_lock(mem);
- mem->calls = 0;
- mem->lastcall = 0;
- ao2_unlock(mem);
-
- return 0;
-}
-
-static void clear_queue(struct call_queue *q)
-{
- q->holdtime = 0;
- q->callscompleted = 0;
- q->callsabandoned = 0;
- q->callscompletedinsl = 0;
- q->talktime = 0;
-
- if (q->members) {
- ao2_callback(q->members, OBJ_NODATA, clear_queue_member_fn, NULL);
- }
-}
-
/*!
* \brief Change queue penalty by adding rule.
*
@@ -2039,39 +2119,44 @@
return 0;
}
-static void parse_empty_options(const char *value, enum empty_conditions *empty, int joinempty)
+/*! \brief return value for joinempty or leavewhenemty options*/
+static enum empty_conditions parse_empty_options(const char *value, int joinempty)
{
char *value_copy = ast_strdupa(value);
char *option = NULL;
+ enum empty_conditions empty = 0;
+
while ((option = strsep(&value_copy, ","))) {
if (!strcasecmp(option, "paused")) {
- *empty |= QUEUE_EMPTY_PAUSED;
+ empty |= QUEUE_EMPTY_PAUSED;
} else if (!strcasecmp(option, "penalty")) {
- *empty |= QUEUE_EMPTY_PENALTY;
+ empty |= QUEUE_EMPTY_PENALTY;
} else if (!strcasecmp(option, "inuse")) {
- *empty |= QUEUE_EMPTY_INUSE;
+ empty |= QUEUE_EMPTY_INUSE;
} else if (!strcasecmp(option, "ringing")) {
- *empty |= QUEUE_EMPTY_RINGING;
+ empty |= QUEUE_EMPTY_RINGING;
} else if (!strcasecmp(option, "invalid")) {
- *empty |= QUEUE_EMPTY_INVALID;
+ empty |= QUEUE_EMPTY_INVALID;
} else if (!strcasecmp(option, "wrapup")) {
- *empty |= QUEUE_EMPTY_WRAPUP;
+ empty |= QUEUE_EMPTY_WRAPUP;
} else if (!strcasecmp(option, "unavailable")) {
- *empty |= QUEUE_EMPTY_UNAVAILABLE;
+ empty |= QUEUE_EMPTY_UNAVAILABLE;
} else if (!strcasecmp(option, "unknown")) {
- *empty |= QUEUE_EMPTY_UNKNOWN;
+ empty |= QUEUE_EMPTY_UNKNOWN;
} else if (!strcasecmp(option, "loose")) {
- *empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID);
+ empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID);
} else if (!strcasecmp(option, "strict")) {
- *empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID | QUEUE_EMPTY_PAUSED | QUEUE_EMPTY_UNAVAILABLE);
+ empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID | QUEUE_EMPTY_PAUSED | QUEUE_EMPTY_UNAVAILABLE);
} else if ((ast_false(option) && joinempty) || (ast_true(option) && !joinempty)) {
- *empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID | QUEUE_EMPTY_PAUSED);
+ empty = (QUEUE_EMPTY_PENALTY | QUEUE_EMPTY_INVALID | QUEUE_EMPTY_PAUSED);
} else if ((ast_false(option) && !joinempty) || (ast_true(option) && joinempty)) {
- *empty = 0;
+ empty = 0;
} else {
ast_log(LOG_WARNING, "Unknown option %s for '%s'\n", option, joinempty ? "joinempty" : "leavewhenempty");
}
}
+
+ return empty;
}
/*! \brief Configure a queue parameter.
@@ -2246,9 +2331,9 @@
}
q->strategy = strategy;
} else if (!strcasecmp(param, "joinempty")) {
- parse_empty_options(val, &q->joinempty, 1);
+ q->joinempty = parse_empty_options(val, 1);
} else if (!strcasecmp(param, "leavewhenempty")) {
- parse_empty_options(val, &q->leavewhenempty, 0);
+ q->leavewhenempty = parse_empty_options(val, 0);
} else if (!strcasecmp(param, "eventmemberstatus")) {
q->maskmemberstatus = !ast_true(val);
} else if (!strcasecmp(param, "eventwhencalled")) {
@@ -2283,6 +2368,21 @@
}
}
+/*! \brief callback that is called when a member is released*/
+static void remove_queue_member(void *data) {
+ struct member *mem = data;
+ struct mem_state *s = mem->device;
+
+ if (s) {
+ /* we have a device lets unlink and unref it*/
+ /* remove our ref*/
+ if (ao2_ref(s, -1) == 2) {
+ /* we were the only consumer unlink*/
+ ao2_unlink(devices, s);
+ }
+ }
+}
+
/*!
* \brief Find rt member record to update otherwise create one.
*
@@ -2317,6 +2417,7 @@
m->dynamic = (memtype & MEMBER_DYNAMIC) ? 1 : 0;
link = 1;
} else {
+ ao2_lock(q->members);
ao2_lock(m);
}
@@ -2378,6 +2479,7 @@
if (ast_strlen_zero(st_dev)) {
st_dev = ast_strdupa(interface);
}
+
if (!m->dead && (s = ao2_find(devices, st_dev, 0))) {
if (s && (m->device != s)) {
if (m->device && (ao2_ref(m->device, -1) == 2)) {
@@ -2463,76 +2565,21 @@
ast_queue_log(q->name, source, m->membername, "REMOVEMEMBER", "%s", "");
}
ao2_unlock(m);
+ ao2_unlock(q->members);
ao2_unlink(q->members, m);
}
} else if (!link) {
/* ive been updated */
ao2_unlock(m);
+ ao2_unlock(q->members);
}
ao2_ref(m, -1);
return res;
}
-/*! \brief Iterate through queue's member list and delete them */
-static void free_members(struct call_queue *q, int all)
-{
- /* Free non-dynamic members */
- struct member *cur;
- struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0);
-
- while ((cur = ao2_iterator_next(&mem_iter))) {
- ao2_lock(cur);
- if (all || !cur->dynamic) {
- ao2_unlock(cur);
- ao2_unlink(q->members, cur);
- } else {
- ao2_unlock(cur);
- }
- ao2_ref(cur, -1);
- }
- ao2_iterator_destroy(&mem_iter);
-}
-
-/*! \brief Free queue's member list then its string fields */
-static void destroy_queue(void *obj)
-{
- struct call_queue *q = obj;
- int i;
-
- free_members(q, 1);
- ast_string_field_free_memory(q);
- for (i = 0; i < MAX_PERIODIC_ANNOUNCEMENTS; i++) {
- if (q->sound_periodicannounce[i])
- free(q->sound_periodicannounce[i]);
- }
- ao2_ref(q->members, -1);
-}
-
-static struct call_queue *alloc_queue(const char *queuename, int rt)
-{
- struct call_queue *q;
-
- if ((q = ao2_t_alloc(sizeof(*q), destroy_queue, "Allocate queue"))) {
- if (ast_string_field_init(q, 64)) {
- ao2_t_ref(q, -1, "String field allocation failed");
- return NULL;
- }
- ast_string_field_set(q, name, queuename);
- }
- q->realtime = rt;
- q->weight = 0;
- q->found = 0;
- ao2_link(queues, q);
-
- return q;
-}
-
-
static void rt_load_member_config(struct call_queue *q)
{
struct ast_config *member_config;
- struct member *m;
- struct ao2_iterator mem_iter;
char *interface = NULL;
/* we may not have realtime members */
@@ -2542,16 +2589,7 @@
}
/* Temporarily set realtime members dead so we can detect deleted ones. */
- mem_iter = ao2_iterator_init(q->members, 0);
- while ((m = ao2_iterator_next(&mem_iter))) {
- ao2_lock(m);
- if (m->realtime) {
- m->dead = 1;
- }
- ao2_unlock(m);
- ao2_ref(m, -1);
- }
- ao2_iterator_destroy(&mem_iter);
+ ao2_callback(q->members, OBJ_NODATA | OBJ_MULTIPLE, mark_realtime_member_dead, NULL);
while ((interface = ast_category_browse(member_config, interface))) {
handle_member_record(q, interface, member_config, MEMBER_REALTIME, "REALTIME");
@@ -2559,24 +2597,7 @@
ast_config_destroy(member_config);
/* Delete all realtime members that have been deleted in DB. */
- mem_iter = ao2_iterator_init(q->members, 0);
- while ((m = ao2_iterator_next(&mem_iter))) {
- ao2_lock(m);
- if (m->dead && m->realtime) {
- if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
- ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
- } else {
- ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
- }
- ao2_unlock(m);
- ao2_unlink(q->members, m);
- ao2_ref(m, -1);
- } else {
- ao2_unlock(m);
- ao2_ref(m, -1);
- }
- }
- ao2_iterator_destroy(&mem_iter);
+ ao2_callback_data(q->members, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK, kill_realtime_dead_members, NULL, q);
}
/*!
@@ -2734,74 +2755,79 @@
if (!(q = load_realtime_queue(queuename, QUEUE_RELOAD_PARAMETERS))) {
return res;
}
- ao2_lock(q);
/* This is our one */
- if ((q->joinempty) &&
- (get_member_status(q, qe->max_penalty, qe->min_penalty, q->joinempty))) {
+ if ((get_member_status(q, qe->max_penalty, qe->min_penalty, 1))) {
*reason = QUEUE_JOINEMPTY;
- ao2_unlock(q);
ao2_t_ref(q, -1, "Done with realtime queue");
return res;
}
- if (*reason == QUEUE_UNKNOWN && q->maxlen && (q->count >= q->maxlen)) {
+
+ if ((*reason == QUEUE_UNKNOWN && q->maxlen && (q->count >= q->maxlen)) ||
+ (*reason != QUEUE_UNKNOWN)) {
*reason = QUEUE_FULL;
- } else if (*reason == QUEUE_UNKNOWN) {
- /* There's space for us, put us at the right position inside
- * the queue.
- * Take into account the priority of the calling user */
- inserted = 0;
- prev = NULL;
- cur = q->head;
- while (cur) {
- /* We have higher priority than the current user, enter
- * before him, after all the other users with priority
- * higher or equal to our priority. */
- if ((!inserted) && (qe->prio > cur->prio)) {
- insert_entry(q, prev, qe, &pos);
- inserted = 1;
+ ao2_t_ref(q, -1, "Done with realtime queue");
+
+ return res;
+ }
+
+
+ /* There's space for us, put us at the right position inside
+ * the queue.
+ * Take into account the priority of the calling user */
+ inserted = 0;
+ prev = NULL;
+ ao2_lock(q);
+ cur = q->head;
+ while (cur) {
+ /* We have higher priority than the current user, enter
+ * before him, after all the other users with priority
+ * higher or equal to our priority. */
+ if ((!inserted) && (qe->prio > cur->prio)) {
+ insert_entry(q, prev, qe, &pos);
+ inserted = 1;
+ }
+ /* <= is necessary for the position comparison because it may not be possible to enter
+ * at our desired position since higher-priority callers may have taken the position we want
+ */
+ if (!inserted && (qe->prio >= cur->prio) && position && (position <= pos + 1)) {
+ insert_entry(q, prev, qe, &pos);
+ /*pos is incremented inside insert_entry, so don't need to add 1 here*/
+ if (position < pos) {
+ ast_log(LOG_NOTICE, "Asked to be inserted at position %d but forced into position %d due to higher priority callers\n", position, pos);
}
- /* <= is necessary for the position comparison because it may not be possible to enter
- * at our desired position since higher-priority callers may have taken the position we want
- */
- if (!inserted && (qe->prio >= cur->prio) && position && (position <= pos + 1)) {
- insert_entry(q, prev, qe, &pos);
- /*pos is incremented inside insert_entry, so don't need to add 1 here*/
- if (position < pos) {
- ast_log(LOG_NOTICE, "Asked to be inserted at position %d but forced into position %d due to higher priority callers\n", position, pos);
- }
- inserted = 1;
- }
- cur->pos = ++pos;
- prev = cur;
- cur = cur->next;
- }
- /* No luck, join at the end of the queue */
- if (!inserted)
- insert_entry(q, prev, qe, &pos);
- ast_copy_string(qe->moh, q->moh, sizeof(qe->moh));
- ast_copy_string(qe->announce, q->announce, sizeof(qe->announce));
- ast_copy_string(qe->context, q->context, sizeof(qe->context));
- q->count++;
- res = 0;
- ast_manager_event(qe->chan, EVENT_FLAG_CALL, "Join",
- "Channel: %s\r\n"
- "CallerIDNum: %s\r\n"
- "CallerIDName: %s\r\n"
- "ConnectedLineNum: %s\r\n"
- "ConnectedLineName: %s\r\n"
- "Queue: %s\r\n"
- "Position: %d\r\n"
- "Count: %d\r\n"
- "Uniqueid: %s\r\n",
- qe->chan->name,
- S_COR(qe->chan->caller.id.number.valid, qe->chan->caller.id.number.str, "unknown"),/* XXX somewhere else it is <unknown> */
- S_COR(qe->chan->caller.id.name.valid, qe->chan->caller.id.name.str, "unknown"),
- S_COR(qe->chan->connected.id.number.valid, qe->chan->connected.id.number.str, "unknown"),/* XXX somewhere else it is <unknown> */
- S_COR(qe->chan->connected.id.name.valid, qe->chan->connected.id.name.str, "unknown"),
- q->name, qe->pos, q->count, qe->chan->uniqueid );
- ast_debug(1, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos );
- }
+ inserted = 1;
+ }
+ cur->pos = ++pos;
+ prev = cur;
+ cur = cur->next;
+ }
+ /* No luck, join at the end of the queue */
+ if (!inserted)
+ insert_entry(q, prev, qe, &pos);
+ ast_copy_string(qe->moh, q->moh, sizeof(qe->moh));
+ ast_copy_string(qe->announce, q->announce, sizeof(qe->announce));
+ ast_copy_string(qe->context, q->context, sizeof(qe->context));
+ q->count++;
+ res = 0;
+ ast_manager_event(qe->chan, EVENT_FLAG_CALL, "Join",
+ "Channel: %s\r\n"
+ "CallerIDNum: %s\r\n"
+ "CallerIDName: %s\r\n"
+ "ConnectedLineNum: %s\r\n"
+ "ConnectedLineName: %s\r\n"
+ "Queue: %s\r\n"
+ "Position: %d\r\n"
+ "Count: %d\r\n"
+ "Uniqueid: %s\r\n",
+ qe->chan->name,
+ S_COR(qe->chan->caller.id.number.valid, qe->chan->caller.id.number.str, "unknown"),/* XXX somewhere else it is <unknown> */
+ S_COR(qe->chan->caller.id.name.valid, qe->chan->caller.id.name.str, "unknown"),
+ S_COR(qe->chan->connected.id.number.valid, qe->chan->connected.id.number.str, "unknown"),/* XXX somewhere else it is <unknown> */
+ S_COR(qe->chan->connected.id.name.valid, qe->chan->connected.id.name.str, "unknown"),
+ q->name, qe->pos, q->count, qe->chan->uniqueid );
+ ast_debug(1, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos );
+
ao2_unlock(q);
ao2_t_ref(q, -1, "Done with realtime queue");
@@ -3163,9 +3189,9 @@
}
ao2_unlock(q);
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((mem = ao2_iterator_next(&mem_iter))) {
- ao2_lock(mem);
status = get_device_status(mem->device);
switch (status) {
case AST_DEVICE_INVALID:
@@ -3187,7 +3213,6 @@
}
break;
}
- ao2_unlock(mem);
ao2_ref(mem, -1);
/* If autofill is not enabled or if the queue's strategy is ringall, then
@@ -3204,6 +3229,7 @@
break;
}
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
return avl;
@@ -3758,7 +3784,7 @@
}
}
if (qe->parent->autopause == QUEUE_AUTOPAUSE_ON) {
- if (do_set_member_penalty_paused(qe->parent, call->member, 1, 1, "Auto-Pause")) {
+ if (!set_member_paused(qe->parent->name, call->interface, "Auto-Pause", 1)) {
ast_verb(3, "Auto-Pausing Queue Member %s in queue %s since they failed to answer.\n",
call->interface, qe->parent->name);
} else {
@@ -4318,16 +4344,14 @@
break;
}
- ao2_lock(qe->parent);
- if ((qe->parent->leavewhenempty) &&
- (get_member_status(qe->parent, qe->max_penalty, qe->min_penalty, qe->parent->leavewhenempty))) {
- ao2_unlock(qe->parent);
+ if ((get_member_status(qe->parent, qe->max_penalty, qe->min_penalty, 0))) {
*reason = QUEUE_LEAVEEMPTY;
ast_queue_log(qe->parent->name, qe->chan->uniqueid, "NONE", "EXITEMPTY", "%d|%d|%ld", qe->pos, qe->opos, (long) time(NULL) - qe->start);
leave_queue(qe);
break;
}
+ ao2_lock(qe->parent);
/* Make a position announcement, if enabled */
if (qe->parent->announcefrequency &&
(res = say_position(qe,ringing))) {
@@ -4400,22 +4424,26 @@
queue_iter = ao2_iterator_init(queues, 0);
while ((qtmp = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
if ((mem = ao2_find(qtmp->members, member, OBJ_POINTER))) {
+ ao2_lock(qtmp->members);
ao2_lock(mem);
time(&mem->lastcall);
mem->calls++;
mem->lastwrapup = wrapuptime;
+ ao2_unlock(mem);
+ ao2_lock(qtmp->members);
ao2_ref(mem, -1);
- ao2_unlock(mem);
}
ao2_t_ref(qtmp, -1, "Done with iterator");
}
ao2_iterator_destroy(&queue_iter);
} else {
+ ao2_lock(q->members);
ao2_lock(member);
time(&member->lastcall);
member->calls++;
member->lastwrapup = wrapuptime;
ao2_unlock(member);
+ ao2_unlock(q->members);
}
ao2_lock(q);
q->callscompleted++;
@@ -4692,6 +4720,9 @@
* iteration, we also check the dialed_interfaces datastore to see if we have already attempted calling this
* member. If we have, we do not create a callattempt. This is in place to prevent call forwarding loops. Also
* during each iteration, we call calc_metric to determine which members should be rung when.
+ * \note It is too costly to lock and unlock the queue and members for this and the queue and members container
+ * Is locked for the whole process.
+ *
* 3. Call ring_one to place a call to the appropriate member(s)
* 4. Call wait_for_answer to wait for an answer. If no one answers, return.
* 5. Take care of any holdtime announcements, member delays, or other options which occur after a call has been answered.
@@ -4842,6 +4873,7 @@
qe->cancel_answered_elsewhere = 1;
}
+ ao2_lock(qe->parent);
ast_debug(1, "%s is trying to call a queue member.\n",
qe->chan->name);
ast_copy_string(queuename, qe->parent->name, sizeof(queuename));
@@ -4850,7 +4882,8 @@
if (!ast_strlen_zero(announceoverride))
announce = announceoverride;
- memi = ao2_iterator_init(qe->parent->members, 0);
+ memi = ao2_iterator_init(qe->parent->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(qe->parent->members);
while ((cur = ao2_iterator_next(&memi))) {
struct callattempt *tmp = ast_calloc(1, sizeof(*tmp));
struct ast_dialed_interface *di;
@@ -4858,6 +4891,8 @@
if (!tmp) {
ao2_ref(cur, -1);
ao2_iterator_destroy(&memi);
+ ao2_lock(qe->parent->members);
+ ao2_unlock(qe->parent);
goto out;
}
if (!datastore) {
@@ -4865,6 +4900,8 @@
ao2_ref(cur, -1);
ao2_iterator_destroy(&memi);
callattempt_free(tmp);
+ ao2_lock(qe->parent->members);
+ ao2_unlock(qe->parent);
goto out;
}
datastore->inheritance = DATASTORE_INHERIT_FOREVER;
@@ -4872,6 +4909,8 @@
ao2_ref(cur, -1);
ao2_iterator_destroy(&memi);
callattempt_free(tmp);
+ ao2_lock(qe->parent->members);
+ ao2_unlock(qe->parent);
goto out;
}
datastore->data = dialed_interfaces;
@@ -4908,6 +4947,8 @@
ao2_ref(cur, -1);
ao2_iterator_destroy(&memi);
callattempt_free(tmp);
+ ao2_lock(qe->parent->members);
+ ao2_unlock(qe->parent);
goto out;
}
strcpy(di->interface, cur->interface);
@@ -4928,7 +4969,6 @@
ast_channel_unlock(qe->chan);
tmp->stillgoing = -1;
- ao2_lock(cur);
tmp->member = cur;/* Place the reference for cur into callattempt. */
tmp->lastcall = cur->lastcall;
tmp->lastwrapup = cur->lastwrapup;
@@ -4937,7 +4977,6 @@
/* Special case: If we ring everyone, go ahead and ring them, otherwise
just calculate their metric for the appropriate strategy */
if (!calc_metric(qe->parent, cur, x++, qe, tmp)) {
- ao2_unlock(cur);
/* Put them in the list of outgoing thingies... We're ready now.
XXX If we're forcibly removed, these outgoing calls won't get
hung up XXX */
@@ -4952,9 +4991,9 @@
callattempt_free(tmp);
}
}
+ ao2_lock(qe->parent->members);
ao2_iterator_destroy(&memi);
- ao2_lock(qe->parent);
if (qe->parent->timeoutpriority == TIMEOUT_PRIORITY_APP) {
/* Application arguments have higher timeout priority (behaviour for <=1.6) */
if (qe->expire && (!qe->parent->timeout || (qe->expire - now) <= qe->parent->timeout))
@@ -5612,11 +5651,10 @@
if (!pm_queue)
return;
- mem_iter = ao2_iterator_init(pm_queue->members, 0);
+ mem_iter = ao2_iterator_init(pm_queue->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(pm_queue->members);
while ((cur_member = ao2_iterator_next(&mem_iter))) {
- ao2_lock(cur_member);
if (!cur_member->dynamic || cur_member->dead) {
- ao2_unlock(cur_member);
ao2_ref(cur_member, -1);
continue;
}
@@ -5625,7 +5663,6 @@
value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused,
cur_member->membername, cur_member->device->state_interface, cur_member->ignorebusy);
- ao2_unlock(cur_member);
ao2_ref(cur_member, -1);
if (res != strlen(value + value_len)) {
@@ -5634,14 +5671,17 @@
}
value_len += res;
}
+ ao2_unlock(pm_queue->members);
ao2_iterator_destroy(&mem_iter);
- if (value_len && !cur_member) {
- if (ast_db_put(pm_family, pm_queue->name, value))
+ if (value_len) {
+ if (ast_db_put(pm_family, pm_queue->name, value)) {
ast_log(LOG_WARNING, "failed to create persistent dynamic entry!\n");
- } else
+ }
+ } else {
/* Delete the entry if the queue is empty or there is an error */
ast_db_del(pm_family, pm_queue->name);
+ }
}
/*! \brief Remove member from queue
@@ -5775,9 +5815,11 @@
return RESULT_FAILURE;
}
+ ao2_lock(q->members);
ao2_lock(mem);
found = !do_set_member_penalty_paused(q, mem, 1, paused, reason);
ao2_unlock(mem);
+ ao2_unlock(q->members);
ao2_ref(mem, -1);
ao2_ref(q, -1);
return (!found) ? RESULT_FAILURE : RESULT_SUCCESS;
@@ -5828,9 +5870,11 @@
return RESULT_FAILURE;
}
+ ao2_lock(q->members);
ao2_lock(mem);
do_set_member_penalty_paused(q, mem, 0, penalty, NULL);
ao2_unlock(mem);
+ ao2_unlock(q->members);
ao2_ref(mem, -1);
ao2_ref(q, -1);
return RESULT_SUCCESS;
@@ -6406,17 +6450,13 @@
goto stop;
}
- ao2_lock(qe.parent);
- if ((qe.parent->leavewhenempty) &&
- (get_member_status(qe.parent, qe.max_penalty, qe.min_penalty, qe.parent->leavewhenempty))) {
- ao2_unlock(qe.parent);
+ if ((get_member_status(qe.parent, qe.max_penalty, qe.min_penalty, 0))) {
record_abandoned(&qe);
reason = QUEUE_LEAVEEMPTY;
ast_queue_log(args.queuename, chan->uniqueid, "NONE", "EXITEMPTY", "%d|%d|%ld", qe.pos, qe.opos, (long)(time(NULL) - qe.start));
res = 0;
break;
}
- ao2_unlock(qe.parent);
/* exit after 'timeout' cycle if 'n' option enabled */
if (noption && tries >= ao2_container_count(qe.parent->members)) {
@@ -6731,19 +6771,21 @@
ast_log(LOG_ERROR, "Invalid member %s queue %s\n", args.interface, args.queuename);
return -1;
}
+ ao2_lock(q->members);
+ ao2_lock(m);
if (!strcasecmp(args.option, "ignorebusy")) {
- ao2_lock(m);
m->ignorebusy = (memvalue) ? 1 : 0;
if (m->realtime) {
update_realtime_member_field(m, q->name, args.option, value);
} else if (m->dynamic) {
reload = 1;
}
- ao2_unlock(m);
} else {
ast_log(LOG_ERROR, "Invalid option, only penalty , paused or ignorebusy are valid\n");
ret = -1;
}
+ ao2_unlock(m);
+ ao2_unlock(q->members);
ao2_ref(m, -1);
if (reload && queue_persistent_members) {
@@ -6859,9 +6901,9 @@
return -1;
}
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((m = ao2_iterator_next(&mem_iter))) {
- ao2_lock(m);
/* strcat() is always faster than printf() */
if (count++) {
strncat(buf + buflen, ",", len - buflen - 1);
@@ -6871,14 +6913,13 @@
buflen += strlen(m->interface);
/* Safeguard against overflow (negative length) */
if (buflen >= len - 2) {
- ao2_unlock(m);
ao2_ref(m, -1);
ast_log(LOG_WARNING, "Truncating list\n");
break;
}
- ao2_unlock(m);
ao2_ref(m, -1);
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
ao2_t_ref(q, -1, "Done with QUEUE_MEMBER_LIST()");
@@ -7223,7 +7264,7 @@
handle_member_record(q, interface, mcfg, MEMBER_STATIC, "queues.conf");
}
/* Free remaining members marked as dead */
- ao2_callback(q->members, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK, kill_static_dead_members, q);
+ ao2_callback(q->members, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK, kill_static_dead_members, NULL);
/* load the realtime agents*/
rt_load_member_config(q);
@@ -7312,10 +7353,17 @@
struct call_queue *q;
struct ao2_iterator queue_iter = ao2_iterator_init(queues, 0);
while ((q = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
- ao2_lock(q);
if (ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename))
- clear_queue(q);
- ao2_unlock(q);
+ ao2_lock(q);
+ q->holdtime = 0;
+ q->callscompleted = 0;
+ q->callsabandoned = 0;
+ q->callscompletedinsl = 0;
+ q->talktime = 0;
+ ao2_unlock(q);
+ if (q->members) {
+ ao2_callback(q->members, OBJ_NODATA, clear_queue_member_fn, NULL);
+ }
ao2_t_ref(q, -1, "Done with iterator");
}
ao2_iterator_destroy(&queue_iter);
@@ -7421,9 +7469,9 @@
int status;
do_print(s, fd, " Members: ");
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((mem = ao2_iterator_next(&mem_iter))) {
- ao2_lock(mem);
ast_str_set(&out, 0, " %s", mem->membername);
if (strcasecmp(mem->membername, mem->interface)) {
ast_str_append(&out, 0, " (%s", mem->interface);
@@ -7447,9 +7495,9 @@
ast_str_append(&out, 0, " has taken no calls yet");
}
do_print(s, fd, ast_str_buffer(out));
- ao2_unlock(mem);
ao2_ref(mem, -1);
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
}
ao2_lock(q);
@@ -7618,9 +7666,9 @@
qlongestholdtime = 0;
/* List Queue Members */
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((mem = ao2_iterator_next(&mem_iter))) {
- ao2_lock(mem);
status = get_device_status(mem->device);
if ((status != AST_DEVICE_UNAVAILABLE) && (status != AST_DEVICE_INVALID)) {
++qmemcount;
@@ -7628,9 +7676,9 @@
++qmemavail;
}
}
- ao2_unlock(mem);
ao2_ref(mem, -1);
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
ao2_lock(q);
for (qe = q->head; qe; qe = qe->next) {
@@ -7717,9 +7765,9 @@
q->callsabandoned, q->servicelevel, sl, q->weight, idText);
/* List Queue Members */
ao2_unlock(q);
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((mem = ao2_iterator_next(&mem_iter))) {
- ao2_lock(mem);
if (ast_strlen_zero(memberfilter) || !strcmp(mem->interface, memberfilter) || !strcmp(mem->membername, memberfilter)) {
status = get_device_status(mem->device);
astman_append(s, "Event: QueueMember\r\n"
@@ -7738,9 +7786,9 @@
q->name, mem->membername, mem->interface, mem->device->state_interface, mem->dynamic ? "dynamic" : "static",
mem->penalty, mem->calls, (int)mem->lastcall, status, mem->paused, idText);
}
- ao2_unlock(mem);
ao2_ref(mem, -1);
}
+ ao2_unlock(q->members);
ao2_iterator_destroy(&mem_iter);
/* List Queue Entries */
pos = 1;
@@ -7898,10 +7946,10 @@
}
if (!(mem = interface_exists(q, interface))) {
astman_send_error(s, m, "Invalid 'Interface'");
- ao2_ref(q, -1);
return 0;
}
+ ao2_lock(q->members);
ao2_lock(mem);
mem->ignorebusy = abs(ast_true(ignorebusy_s));
if (mem->realtime) {
@@ -7911,7 +7959,9 @@
}
astman_send_ack(s, m, mem->ignorebusy ? "Interface IgnoreBusy enabled" : "Interface IgnoreBusy disabled");
ao2_unlock(mem);
+ ao2_unlock(q->members);
ao2_ref(mem, -1);
+ ao2_ref(q, -1);
if (reload && queue_persistent_members) {
dump_queue_members(q);
@@ -8190,21 +8240,21 @@
/* here is the case for 3, <member> */
queue_iter = ao2_iterator_init(queues, 0);
while ((q = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
- mem_iter = ao2_iterator_init(q->members, 0);
+ mem_iter = ao2_iterator_init(q->members, AO2_ITERATOR_DONTLOCK);
+ ao2_lock(q->members);
while ((m = ao2_iterator_next(&mem_iter))) {
- ao2_lock(m);
if (!strncasecmp(word, m->membername, wordlen) && ++which > state) {
- ao2_unlock(m);
tmp = ast_strdup(m->interface);
ao2_ref(m, -1);
[... 50 lines stripped ...]
More information about the asterisk-commits
mailing list