[asterisk-commits] irroot: branch irroot/distrotech-customers-trunk r344957 - /team/irroot/distr...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Nov 12 07:10:53 CST 2011
Author: irroot
Date: Sat Nov 12 07:10:49 2011
New Revision: 344957
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=344957
Log:
Merge Quack Queue
Modified:
team/irroot/distrotech-customers-trunk/apps/app_queue.c
Modified: team/irroot/distrotech-customers-trunk/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/distrotech-customers-trunk/apps/app_queue.c?view=diff&rev=344957&r1=344956&r2=344957
==============================================================================
--- team/irroot/distrotech-customers-trunk/apps/app_queue.c (original)
+++ team/irroot/distrotech-customers-trunk/apps/app_queue.c Sat Nov 12 07:10:49 2011
@@ -25,6 +25,18 @@
* \arg Config in \ref Config_qu queues.conf
*
* \par Development notes
+ * \note 2011-11-01: Made application more thread safe
+ * Distrotech PTY (LTD) (www.distrotech.co.za)
+ * Gregory Nietsky (irroot) <gregory at distrotech.co.za>
+ *
+ * Split away from locking queues/queue to having locks per
+ * Queue/Member/Device.
+ * Added device state struct to mange device states shared
+ * for multiple members sharing same device.
+ *
+ * Made all functions work with realtime/dynamic/static members.
+ * Added missing CLI/AMI functions for handling callinuse.
+ *
* \note 2004-11-25: Persistent Dynamic Members added by:
* NetNation Communications (www.netnation.com)
* Kevin Lindsay <kevinl at netnation.com>
@@ -99,22 +111,31 @@
#include "asterisk/callerid.h"
#include "asterisk/cel.h"
#include "asterisk/data.h"
-
-/* Define, to debug reference counts on queues, without debugging reference counts on queue members */
-/* #define REF_DEBUG_ONLY_QUEUES */
+#include "asterisk/time.h"
/*!
* \par Please read before modifying this file.
- * There are three locks which are regularly used
- * throughout this file, the queue list lock, the lock
- * for each individual queue, and the interface list lock.
+ * There are locks which are regularly used
+ * throughout this file, the lock
+ * for each individual queue, the individual member lock ,
+ * and the device state lock.
+ * there are container locks for the queues list, the member
+ * list on each queue and the devices container.
* Please be extra careful to always lock in the following order
- * 1) queue list lock
- * 2) individual queue lock
- * 3) interface list lock
+ *
+ * 1) queues container lock (ao2 functions that lock the container)
+ * 2) queue lock
+ * 3) queue member lock
+ * 4) member lock
+ * 5) devices container lock
+ * 6) member device lock
* This order has sort of "evolved" over the lifetime of this
* application, but it is now in place this way, so please adhere
* to this order!
+ *
+ * the only elements that do not require a read lock (ie will not change)
+ * the queue name and members container, the member interface and
+ * device state_interface as they are never altered after been linked.
*/
/*** DOCUMENTATION
@@ -264,9 +285,10 @@
<parameter name="queuename" required="true" />
<parameter name="interface" />
<parameter name="penalty" />
- <parameter name="options" />
+ <parameter name="paused" />
<parameter name="membername" />
<parameter name="stateinterface" />
+ <parameter name="callinuse" />
</syntax>
<description>
<para>Dynamically adds interface to an existing queue. If the interface is
@@ -530,8 +552,8 @@
<enum name="paused">
<para>Gets or sets queue member paused status.</para>
</enum>
- <enum name="ignorebusy">
- <para>Gets or sets queue member ignorebusy.</para>
+ <enum name="callinuse">
+ <para>Gets or sets queue member callinuse.</para>
</enum>
</enumlist>
</parameter>
@@ -846,14 +868,14 @@
<description>
</description>
</manager>
- <manager name="QueueIgnoreBusy" language="en_US">
+ <manager name="QueueCallInuse" language="en_US">
<synopsis>
Set interface to allow multiple calls
</synopsis>
<syntax>
<xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
<parameter name="Interface" required="true" />
- <parameter name="IgnoreBusy" required="true" />
+ <parameter name="CallInuse" required="true" />
<parameter name="Queue" required="true" />
</syntax>
<description>
@@ -883,6 +905,13 @@
QUEUE_RELOAD_MEMBER = (1 << 1),
QUEUE_RELOAD_RULES = (1 << 2),
QUEUE_RESET_STATS = (1 << 3),
+ QUEUE_RELOAD_REALTIME = (1 << 4),
+};
+
+enum member_type {
+ QUEUE_ADD_MEMBER_STATIC = (1 << 0),
+ QUEUE_ADD_MEMBER_REALTIME = (1 << 1),
+ QUEUE_ADD_MEMBER_DYNAMIC = (1 << 2),
};
static const struct strategy {
@@ -925,7 +954,7 @@
#define RES_OUTOFMEMORY (-2) /*!< Out of memory */
#define RES_NOSUCHQUEUE (-3) /*!< No such queue */
#define RES_NOT_DYNAMIC (-4) /*!< Member is not dynamic */
-
+#define RES_ERROR (-5) /*!< Member is mis configured */
static char *app = "Queue";
static char *app_aqm = "AddQueueMember" ;
@@ -1004,49 +1033,32 @@
* use it not only for keeping track of what is in use but
* also for keeping track of who we're dialing.
*
- * There are two "links" defined in this structure, q_next and call_next.
- * q_next links ALL defined callattempt structures into a linked list. call_next is
- * a link which allows for a subset of the callattempts to be traversed. This subset
- * is used in wait_for_answer so that irrelevant callattempts are not traversed. This
- * also is helpful so that queue logs are always accurate in the case where a call to
- * a member times out, especially if using the ringall strategy.
*/
struct callattempt {
- struct callattempt *q_next;
- struct callattempt *call_next;
- struct ast_channel *chan;
- char interface[256];
- int stillgoing;
- int metric;
- time_t lastcall;
- struct call_queue *lastqueue;
- struct member *member;
- /*! Saved connected party info from an AST_CONTROL_CONNECTED_LINE. */
- struct ast_party_connected_line connected;
- /*! TRUE if an AST_CONTROL_CONNECTED_LINE update was saved to the connected element. */
- unsigned int pending_connected_update:1;
- /*! TRUE if caller id is not available for connected line */
- unsigned int dial_callerid_absent:1;
- unsigned int reserved:1;
+ struct ast_channel *chan; /*! Channel called */
+ int stillgoing; /*! This attempt is valid and active */
+ int metric; /*! Metric calculated according to strategy */
+ struct member *member; /*! Member assosiated with this attempt */
+ struct ast_party_connected_line connected; /*! Saved connected party info from an AST_CONTROL_CONNECTED_LINE. */
+ unsigned int reserved:1; /*! Is this attempt been attempted*/
+ unsigned int active:1; /*! Is this attempt active in a call*/
+ unsigned int pending_connected_update:1; /*! TRUE if caller id is not available for connected line*/
+ unsigned int dial_callerid_absent:1; /*! TRUE if caller id is not available for connected line */
struct ast_aoc_decoded *aoc_s_rate_list;
};
-
struct queue_ent {
struct call_queue *parent; /*!< What queue is our parent */
- char moh[80]; /*!< Name of musiconhold to be used */
- char announce[PATH_MAX]; /*!< Announcement to play for member when call is answered */
- char context[AST_MAX_CONTEXT]; /*!< Context when user exits queue */
char digits[AST_MAX_EXTENSION]; /*!< Digits entered while in queue */
int valid_digits; /*!< Digits entered correspond to valid extension. Exited */
int pos; /*!< Where we are in the queue */
int prio; /*!< Our priority */
int last_pos_said; /*!< Last position we told the user */
int ring_when_ringing; /*!< Should we only use ring indication when a channel is ringing? */
- time_t last_periodic_announce_time; /*!< The last time we played a periodic announcement */
+ struct timeval last_pannounce_time; /*!< The last time we played a periodic announcement */
int last_periodic_announce_sound; /*!< The last periodic announcement we made */
- time_t last_pos; /*!< Last time we told the user their position */
+ struct timeval last_pos; /*!< Last time we told the user their position */
int opos; /*!< Where we started in the queue */
int handled; /*!< Whether our call was handled */
int pending; /*!< Non-zero if we are attempting to call a member */
@@ -1054,35 +1066,40 @@
int min_penalty; /*!< Limit the members that can take this call to this penalty or higher */
int linpos; /*!< If using linear strategy, what position are we at? */
int linwrapped; /*!< Is the linpos wrapped? */
- time_t start; /*!< When we started holding */
- time_t expire; /*!< When this entry should expire (time out of queue) */
+ struct timeval start; /*!< When we started holding */
+ struct timeval expire; /*!< When this entry should expire (time out of queue) */
int cancel_answered_elsewhere; /*!< Whether we should force the CAE flag on this call (C) option*/
+ struct ao2_container *attempts; /*!< Container holding all call attempts*/
struct ast_channel *chan; /*!< Our channel */
- AST_LIST_HEAD_NOLOCK(,penalty_rule) qe_rules; /*!< Local copy of the queue's penalty rules */
- struct penalty_rule *pr; /*!< Pointer to the next penalty rule to implement */
- struct queue_ent *next; /*!< The next queue entry */
+ struct rule_list *rules; /*!< Pointer holding the ref for the queue penalty rules */
+ struct penalty_rule *pr; /*!< Active penalty rule */
+ AST_LIST_ENTRY(queue_ent) next; /*!< The next queue entry */
};
+/*! \brief keep track of device state changes */
struct mem_state {
- char state_interface[80]; /*!< Technology/Location from which to read devicestate changes */
- int reserved; /*!< This interface is reserved for pending call */
- int status; /*!< Status of queue member */
+ char state_interface[80]; /*!< Technology/Location from which to read devicestate changes */
+ int reserved; /*!< This interface is reserved for pending call */
+ int active; /*!< This interface is active on a call */
+ int status; /*!< Status of queue member */
};
struct member {
- char interface[80]; /*!< Technology/Location to dial to reach this member*/
- char membername[80]; /*!< Member name to use in queue logs */
- int penalty; /*!< Are we a last resort? */
- int calls; /*!< Number of calls serviced by this member */
- time_t lastcall; /*!< When last successful call was hungup */
- struct call_queue *lastqueue; /*!< Last queue we received a call */
- unsigned int realtime:1; /*!< Is this member realtime? */
- unsigned int paused:1; /*!< Are we paused (not accepting calls)? */
- unsigned int dead:1; /*!< Used to detect members deleted in realtime */
- unsigned int dynamic:1; /*!< Are we dynamically added? */
- unsigned int ignorebusy:1; /*!< Flag to ignore member if the status is not available */
- char rt_uniqueid[80]; /*!< Unique id of realtime member entry */
- struct mem_state *device; /*!< Device information */
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(interface); /*!< Technology/Location to dial to reach this member*/
+ AST_STRING_FIELD(membername); /*!< Member name to use in queue logs */
+ AST_STRING_FIELD(rt_uniqueid); /*!< Unique id of realtime member entry */
+ );
+ int penalty; /*!< Are we a last resort? */
+ int calls; /*!< Number of calls serviced by this member */
+ struct timeval lastcall; /*!< When last successful call was hungup */
+ int lastwrapup; /*!< Last wrapuptime */
+ unsigned int dynamic:1; /*!< Is this member dynamic? */
+ unsigned int realtime:1; /*!< Is this member realtime? */
+ unsigned int paused:1; /*!< Are we paused (not accepting calls)? */
+ unsigned int dead:1; /*!< Used to detect members deleted in realtime */
+ unsigned int callinuse:1; /*!< Are we dynamically added? */
+ struct mem_state *device; /*!< Device information */
};
enum empty_conditions {
@@ -1165,7 +1182,6 @@
unsigned int setqueuevar:1;
unsigned int setqueueentryvar:1;
unsigned int reportholdtime:1;
- unsigned int wrapped:1;
unsigned int timeoutrestart:1;
unsigned int announceholdtime:2;
unsigned int announceposition:3;
@@ -1185,56 +1201,56 @@
int numperiodicannounce; /*!< The number of periodic announcements configured */
int randomperiodicannounce; /*!< Are periodic announcments randomly chosen */
int roundingseconds; /*!< How many seconds do we round to? */
- int holdtime; /*!< Current avg holdtime, based on an exponential average */
- int talktime; /*!< Current avg talktime, based on the same exponential average */
- int callscompleted; /*!< Number of queue calls completed */
- int callsabandoned; /*!< Number of queue calls abandoned */
int servicelevel; /*!< seconds setting for servicelevel*/
- int callscompletedinsl; /*!< Number of calls answered with servicelevel*/
char monfmt[8]; /*!< Format to use when recording calls */
int montype; /*!< Monitor type Monitor vs. MixMonitor */
- int count; /*!< How many entries */
int maxlen; /*!< Max number of entries */
int wrapuptime; /*!< Wrapup Time */
int penaltymemberslimit; /*!< Disregard penalty when queue has fewer than this many members */
-
int retry; /*!< Retry calling everyone after this amount of time */
int timeout; /*!< How long to wait for an answer */
int weight; /*!< Respective weight */
int autopause; /*!< Auto pause queue members if they fail to answer */
int autopausedelay; /*!< Delay auto pause for autopausedelay seconds since last call */
int timeoutpriority; /*!< Do we allow a fraction of the timeout to occur for a ring? */
-
/* Queue strategy things */
- int rrpos; /*!< Round Robin - position */
int memberdelay; /*!< Seconds to delay connecting member to caller */
int autofill; /*!< Ignore the head call status and ring an available agent */
-
- struct ao2_container *members; /*!< Head of the list of members */
- struct queue_ent *head; /*!< Head of the list of callers */
- AST_LIST_ENTRY(call_queue) list; /*!< Next call queue */
- AST_LIST_HEAD_NOLOCK(, penalty_rule) rules; /*!< The list of penalty rules to invoke */
+ struct timeval reload; /*!< Time the queue will be reloaded from RT */
+ struct queue_data *data; /*!< Queue statistics */
+};
+
+struct queue_data {
+ int qhash; /*!< Hash for queue */
+ unsigned int wrapped:1;
+ int count; /*!< How many entries */
+ int holdtime; /*!< Current avg holdtime, based on an exponential average */
+ int talktime; /*!< Current avg talktime, based on the same exponential average */
+ int callscompleted; /*!< Number of queue calls completed */
+ int callsabandoned; /*!< Number of queue calls abandoned */
+ int callscompletedinsl; /*!< Number of calls answered with servicelevel*/
+ int rrpos; /*!< Round Robin - position */
+ AST_LIST_HEAD(, queue_ent) *head; /*!< Head of the list of callers */
+ struct ao2_container *members; /*!< Head of the list of members */
};
struct rule_list {
char name[80];
- AST_LIST_HEAD_NOLOCK(,penalty_rule) rules;
- AST_LIST_ENTRY(rule_list) list;
+ struct ao2_container *rules;
};
-
-static AST_LIST_HEAD_STATIC(rule_lists, rule_list);
static struct ao2_container *queues;
static struct ao2_container *devices;
-
-static void dump_queue_members(struct call_queue *pm_queue);
+static struct ao2_container *rules;
+static struct ao2_container *qdata;
+
+static int do_set_member_penalty_paused(struct call_queue *q, struct member *mem, int pause, int value, const char *reason);
static void pm_load_member_config(struct call_queue *q);
static struct member *interface_exists(struct call_queue *q, const char *interface);
static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused);
static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
-static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
/*! \brief sets the QUEUESTATUS channel variable */
static void set_queue_result(struct ast_channel *chan, enum queue_result res)
{
@@ -1248,6 +1264,7 @@
}
}
+/*! \brief return strategy name from strategy*/
static const char *int2strat(int strategy)
{
int x;
@@ -1260,6 +1277,7 @@
return "<unknown>";
}
+/*! brief return strategy from strategy name*/
static int strat2int(const char *strategy)
{
int x;
@@ -1272,6 +1290,7 @@
return -1;
}
+/*! brief return a autopause setting from name*/
static int autopause2int(const char *autopause)
{
int x;
@@ -1292,6 +1311,9 @@
return QUEUE_AUTOPAUSE_OFF;
}
+/*!
+ * \brief ao2 callback to calculate hash of a queue by name
+ */
static int queue_hash_cb(const void *obj, const int flags)
{
const struct call_queue *q = obj;
@@ -1300,105 +1322,115 @@
return ast_str_case_hash(name);
}
+
+/*!
+ * \brief ao2 callback to find queue by name
+ * \note this is the default function used by ao2_find
+ */
static int queue_cmp_cb(void *obj, void *arg, int flags)
{
- const struct call_queue *q = obj;
- const struct call_queue *q2 = arg;
+ const struct call_queue *q = obj, *q2 = arg;
const char *name = (flags & OBJ_POINTER) ? q2->name : arg;
return !strcasecmp(q->name, name) ? CMP_MATCH | CMP_STOP : 0;
}
-static int device_hash_cb(const void *obj, const int flags)
-{
- const struct mem_state *s = obj;
- const char *state_interface = (flags & OBJ_KEY) ? obj : s->state_interface;
-
- return ast_str_case_hash(state_interface);
-}
-
-static int device_cmp_cb(void *obj, void *arg, int flags)
-{
- const struct mem_state *d = obj;
- const struct mem_state *d2 = arg;
- const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
-
- return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
-}
-
-#ifdef REF_DEBUG_ONLY_QUEUES
-#define queue_ref(a) __ao2_ref_debug(a,1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_unref(a) __ao2_ref_debug(a,-1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_ref(a,b) __ao2_ref_debug(a,1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_unref(a,b) __ao2_ref_debug(a,-1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_link(c,q,tag) __ao2_link_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_unlink(c,q,tag) __ao2_unlink_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#else
-#define queue_t_ref(a,b) queue_ref(a)
-#define queue_t_unref(a,b) queue_unref(a)
-#define queues_t_link(c,q,tag) ao2_t_link(c,q,tag)
-#define queues_t_unlink(c,q,tag) ao2_t_unlink(c,q,tag)
-static inline struct call_queue *queue_ref(struct call_queue *q)
-{
- ao2_ref(q, 1);
- return q;
-}
-
-static inline struct call_queue *queue_unref(struct call_queue *q)
-{
- ao2_ref(q, -1);
- return NULL;
-}
-#endif
-
-/*! \brief Set variables of queue */
+/*!
+ * \brief ao2 callback to calculate hash of a queue by name
+ */
+static int qdata_hash_cb(const void *obj, const int flags)
+{
+ const struct queue_data *d = obj;
+ const char *qname = obj;
+ int qhash = (flags & OBJ_KEY) ? ast_str_case_hash(qname) : d->qhash;
+
+ return qhash;
+}
+
+
+/*!
+ * \brief ao2 callback to find queue by name
+ * \note this is the default function used by ao2_find
+ */
+static int qdata_cmp_cb(void *obj, void *arg, int flags)
+{
+ const struct queue_data *d = obj, *d2 = arg;
+ const char *name = arg;
+ int qhash = (flags & OBJ_POINTER) ? d2->qhash : ast_str_case_hash(name);
+
+ return (d->qhash == qhash) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*! \brief Set channel variables of queue */
static void set_queue_variables(struct call_queue *q, struct ast_channel *chan)
{
char interfacevar[256]="";
float sl = 0;
-
- ao2_lock(q);
+ struct queue_data *data = q->data;
if (q->setqueuevar) {
sl = 0;
- if (q->callscompleted > 0)
- sl = 100 * ((float) q->callscompletedinsl / (float) q->callscompleted);
+ ao2_lock(data);
+ if (data->callscompleted > 0) {
+ sl = 100 * ((float) data->callscompletedinsl / (float) data->callscompleted);
+ }
snprintf(interfacevar, sizeof(interfacevar),
"QUEUENAME=%s,QUEUEMAX=%d,QUEUESTRATEGY=%s,QUEUECALLS=%d,QUEUEHOLDTIME=%d,QUEUETALKTIME=%d,QUEUECOMPLETED=%d,QUEUEABANDONED=%d,QUEUESRVLEVEL=%d,QUEUESRVLEVELPERF=%2.1f",
- q->name, q->maxlen, int2strat(q->strategy), q->count, q->holdtime, q->talktime, q->callscompleted, q->callsabandoned, q->servicelevel, sl);
-
- ao2_unlock(q);
-
- pbx_builtin_setvar_multiple(chan, interfacevar);
- } else {
- ao2_unlock(q);
- }
-}
-
-/*! \brief Insert the 'new' entry after the 'prev' entry of queue 'q' */
-static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, struct queue_ent *new, int *pos)
-{
- struct queue_ent *cur;
-
- if (!q || !new)
- return;
- if (prev) {
- cur = prev->next;
- prev->next = new;
- } else {
- cur = q->head;
- q->head = new;
- }
- new->next = cur;
-
- /* every queue_ent must have a reference to it's parent call_queue, this
- * reference does not go away until the end of the queue_ent's life, meaning
- * that even when the queue_ent leaves the call_queue this ref must remain. */
- queue_ref(q);
- new->parent = q;
+ q->name, q->maxlen, int2strat(q->strategy), data->count, data->holdtime, data->talktime,
+ data->callscompleted, data->callsabandoned, q->servicelevel, sl);
+
+ pbx_builtin_setvar_multiple(chan, interfacevar);
+ ao2_unlock(data);
+ }
+}
+
+/*! \brief Insert the 'new' callattempt entry after the 'prev' entry of queue 'q' */
+static inline void insert_entry(struct queue_ent *new, int *pos)
+{
new->pos = ++(*pos);
new->opos = *pos;
+ ao2_lock(new->parent->data);
+ new->parent->data->count++;
+ ao2_unlock(new->parent->data);
+}
+
+/*! \brief return the device state for a member*/
+static int get_device_status(struct member *m)
+{
+ int ret;
+ struct mem_state *s = m->device;
+
+ ao2_lock(s);
+
+ ret = s->status;
+ switch (s->status) {
+ case AST_DEVICE_INVALID:
+ case AST_DEVICE_UNAVAILABLE:
+ case AST_DEVICE_BUSY:
+ break;
+ case AST_DEVICE_INUSE:
+ case AST_DEVICE_RINGING:
+ case AST_DEVICE_RINGINUSE:
+ case AST_DEVICE_ONHOLD:
+ /* if im active and may not place calls when INUSE im actually BUSY */
+ if ((s->reserved || s->active) && !m->callinuse) {
+ ret = AST_DEVICE_BUSY;
+ break;
+ }
+ break;
+ case AST_DEVICE_NOT_INUSE:
+ case AST_DEVICE_UNKNOWN:
+ /* it seems that i have this device active but the system does not */
+ if (s->active) {
+ ret = (m->callinuse) ? AST_DEVICE_INUSE : AST_DEVICE_BUSY;
+ } else if (s->reserved) {
+ ret = (m->callinuse) ? AST_DEVICE_RINGING : AST_DEVICE_BUSY;
+ }
+ }
+ ao2_unlock(s);
+
+ return ret;
}
/*! \brief Check if members are available
@@ -1406,26 +1438,35 @@
* This function checks to see if members are available to be called. If any member
* is available, the function immediately returns 0. If no members are available,
* then -1 is returned.
- *
- * It must be called with ref and lock held for q
- *
*/
-static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, enum empty_conditions conditions)
+static int get_member_status(const struct queue_ent *qe, int join)
{
struct member *member;
struct ao2_iterator mem_iter;
-
- mem_iter = ao2_iterator_init(q->members, 0);
+ struct call_queue *q = qe->parent;
+ int max_penalty = qe->max_penalty;
+ int min_penalty = qe->min_penalty;
+ enum empty_conditions conditions;
+
+ conditions = (join) ? q->joinempty : q->leavewhenempty;
+
+ if (!conditions) {
+ return 0;
+ }
+
+ mem_iter = ao2_iterator_init(q->data->members, 0);
while((member = ao2_iterator_next(&mem_iter))) {
+ ao2_lock(member);
if ((max_penalty && (member->penalty > max_penalty)) || (min_penalty && (member->penalty < min_penalty))) {
if (conditions & QUEUE_EMPTY_PENALTY) {
ast_debug(4, "%s is unavailable because his penalty is not between %d and %d\n", member->membername, min_penalty, max_penalty);
+ ao2_unlock(member);
+ ao2_ref(member, -1);
continue;
}
}
- ao2_lock(member->device);
- switch (member->device->status) {
+ switch (get_device_status(member)) {
case AST_DEVICE_INVALID:
if (conditions & QUEUE_EMPTY_INVALID) {
ast_debug(4, "%s is unavailable because his device state is 'invalid'\n", member->membername);
@@ -1439,6 +1480,7 @@
}
goto default_case;
case AST_DEVICE_INUSE:
+ case AST_DEVICE_BUSY:
if (conditions & QUEUE_EMPTY_INUSE) {
ast_debug(4, "%s is unavailable because his device state is 'inuse'\n", member->membername);
break;
@@ -1461,27 +1503,41 @@
if (member->paused && (conditions & QUEUE_EMPTY_PAUSED)) {
ast_debug(4, "%s is unavailable because he is paused'\n", member->membername);
break;
- } else if ((conditions & QUEUE_EMPTY_WRAPUP) && member->lastcall && q->wrapuptime && (time(NULL) - q->wrapuptime < member->lastcall)) {
- ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int) (time(NULL) - member->lastcall), q->wrapuptime);
+ } else if ((conditions & QUEUE_EMPTY_WRAPUP) && !ast_tvzero(member->lastcall) && member->lastwrapup && (ast_tvdiff_sec(ast_tvnow(), member->lastcall) <= member->lastwrapup)) {
+ ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int)ast_tvdiff_sec(ast_tvnow(), member->lastcall), member->lastwrapup);
break;
} else {
- ao2_unlock(member->device);
ast_debug(4, "%s is available.\n", member->membername);
+ ao2_unlock(member);
ao2_ref(member, -1);
ao2_iterator_destroy(&mem_iter);
return 0;
}
break;
}
- ao2_unlock(member->device);
+ ao2_unlock(member);
ao2_ref(member, -1);
}
ao2_iterator_destroy(&mem_iter);
return -1;
}
-/*! \brief send a QueueMemberStatus manager_event
-*/
+/*! \brief Un ref a device if im the last consumer unlink it*/
+static void unref_device(struct mem_state *s) {
+ if (!s) {
+ return;
+ }
+
+ ao2_lock(devices);
+ /* remove our ref*/
+ if (ao2_ref(s, -1) == 2) {
+ /* we were the only consumer unlink*/
+ ao2_find(devices, s, OBJ_UNLINK | OBJ_POINTER | OBJ_NODATA | OBJ_NOLOCK);
+ }
+ ao2_unlock(devices);
+}
+
+/*! \brief send a QueueMemberStatus manager_event when device state changes*/
static int update_status(void *data)
{
struct ao2_iterator qiter;
@@ -1492,21 +1548,19 @@
qiter = ao2_iterator_init(queues, 0);
while ((q = ao2_iterator_next(&qiter))) {
- ao2_lock(q);
if (q->maskmemberstatus) {
- ao2_unlock(q);
ao2_ref(q, -1);
continue;
}
- miter = ao2_iterator_init(q->members, 0);
+ miter = ao2_iterator_init(q->data->members, 0);
while((m = ao2_iterator_next(&miter))) {
- ao2_lock(m->device);
- if (strcasecmp(m->device->state_interface, s->state_interface)) {
- ao2_unlock(m->device);
+ ao2_lock(m);
+ if (m->device != s) {
+ ao2_unlock(m);
ao2_ref(m, -1);
continue;
}
- ao2_unlock(m->device);
+ ao2_lock(s);
manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
"Queue: %s\r\n"
"Location: %s\r\n"
@@ -1517,18 +1571,47 @@
"CallsTaken: %d\r\n"
"LastCall: %d\r\n"
"Status: %d\r\n"
- "Paused: %d\r\n",
+ "Paused: %d\r\n"
+ "CallInuse: %d\r\n",
q->name, m->interface, m->membername, s->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
- m->penalty, m->calls, (int)m->lastcall, s->status, m->paused
+ m->penalty, m->calls, (int)m->lastcall.tv_sec, s->status, m->paused, m->callinuse
);
+ ao2_unlock(s);
+ ao2_unlock(m);
ao2_ref(m, -1);
}
ao2_iterator_destroy(&miter);
- ao2_unlock(q);
ao2_ref(q, -1);
}
ao2_iterator_destroy(&qiter);
- ao2_ref(s, -1);
+
+ unref_device(s);
+
+ return 0;
+}
+
+/*! \brief set the device state of a member explicitly
+ * \note a change update manager event will be sent
+ */
+static int set_device_status(const char *device, int status)
+{
+ struct mem_state *s;
+
+ if (!(s = ao2_find(devices, device, OBJ_KEY))) {
+ return -1;
+ }
+
+ ao2_lock(s);
+ if (s->status != status) {
+ s->status = status;
+ ao2_unlock(s);
+ if (ast_taskprocessor_push(devicestate_tps, update_status, s)) {
+ unref_device(s);
+ }
+ } else {
+ ao2_unlock(s);
+ unref_device(s);
+ }
return 0;
}
@@ -1538,7 +1621,6 @@
{
enum ast_device_state state;
const char *device;
- struct mem_state *s, *tmp;
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -1548,19 +1630,7 @@
return;
}
- if ((s = ao2_find(devices, (char *)device, 0))) {
- ao2_lock(s);
- if (s->status != state) {
- s->status = state;
- if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
- memcpy(tmp, s, sizeof(*tmp));
- if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
- ao2_ref(tmp, -1);
- }
- }
- }
- ao2_unlock(s);
- ao2_ref(s, -1);
+ if (set_device_status(device, state)) {
ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
} else {
ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
@@ -1599,9 +1669,9 @@
return state;
}
+/*! \brief callback called when a extension hint is notified of change*/
static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
{
- struct mem_state *s, *tmp;
char *device;
int status = extensionstate2devicestate(state);
@@ -1610,19 +1680,7 @@
return 0;
}
- if ((s = ao2_find(devices, device, 0))) {
- ao2_lock(s);
- if (s->status != status) {
- s->status = status;
- if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
- memcpy(tmp, s, sizeof(*tmp));
- if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
- ao2_ref(tmp, -1);
- }
- }
- }
- ao2_unlock(s);
- ao2_ref(s, -1);
+ if (set_device_status(device, status)) {
ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, status, ast_devstate2str(status));
} else {
ast_debug(3, "Extension '%s@%s' changed state but we don't care because they're not a member of any queue.\n",
@@ -1632,21 +1690,7 @@
return 0;
}
-static void remove_queue_member(void *data) {
- struct member *mem = data;
- struct mem_state *s = mem->device;
-
- if (s) {
- /* we have a device lets unlink and unref it*/
- /* remove our ref*/
- if (ao2_ref(s, -1) == 2) {
- /* we were the only consumer unlink*/
- ao2_unlink(devices, s);
- }
- }
-
-}
-
+/* \brief find or create a member device state object*/
static struct mem_state *create_member_state(const char *state_interface) {
struct mem_state *state;
char *exten;
@@ -1654,7 +1698,7 @@
char *device;
/* ref will be held for each shared member and one ref for container */
- if ((state = ao2_find(devices, (char *)state_interface, 0))) {
+ if ((state = ao2_find(devices, state_interface, OBJ_KEY))) {
return state;
} else if (!(state = ao2_alloc(sizeof(*state), NULL))) {
return NULL;
@@ -1667,7 +1711,11 @@
if (context) {
ast_copy_string(state->state_interface, state_interface, sizeof(state->state_interface));
} else {
- asprintf(&device, "%s at default", state_interface);
+ if (!asprintf(&device, "%s at default", state_interface)) {
+ ast_log(AST_LOG_ERROR, "Failed to use state_interface %s at default\n", state_interface);
+ ao2_ref(state, -1);
+ return NULL;
+ }
ast_copy_string(state->state_interface, device, sizeof(state->state_interface));
}
state->status = extensionstate2devicestate(ast_extension_state(NULL, S_OR(context, "default"), exten));
@@ -1680,14 +1728,15 @@
return state;
}
-/*! \brief Set current state of member if needed*/
-static void set_queue_member_status(struct member *m)
+/*! \brief Set current state of member querying channel driver or hint state*/
+static int set_queue_member_status(struct member *m)
{
int status;
- struct mem_state *s = m->device;
- struct mem_state *tmp;
-
- ao2_lock(s);
+ struct mem_state *s;
+
+ ao2_lock(m);
+ s = m->device;
+
if (!strncasecmp(s->state_interface, "hint:", 5)) {
char *context = ast_strdupa(s->state_interface);
char *exten = strsep(&context, "@") + 5;
@@ -1696,80 +1745,35 @@
status = ast_device_state(s->state_interface);
}
+ ao2_lock(s);
if (s->status != status) {
s->status = status;
- if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
- memcpy(tmp, s, sizeof(*tmp));
- if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
- ao2_ref(tmp, -1);
- }
+ /* we must pass a ref to the task processor*/
+ if (!ast_taskprocessor_push(devicestate_tps, update_status, s)) {
+ ao2_ref(s, 1);
}
}
ao2_unlock(s);
-}
-
-/*! \brief allocate space for new queue member and set fields based on parameters passed */
-static struct member *create_queue_member(const char *interface, const char *membername, int penalty,
- int paused, const char *state_interface, int ignorebusy)
-{
- struct member *cur;
- const char* state_int;
-
- if (!(cur = ao2_alloc(sizeof(*cur), remove_queue_member))) {
- return NULL;
- }
-
- /* setup device state*/
- state_int = (!ast_strlen_zero(state_interface)) ? state_interface : interface;
- if (!(cur->device = create_member_state(state_int))) {
- ao2_ref(cur, -1);
- return NULL;
- }
-
- cur->penalty = penalty;
- cur->paused = paused;
- cur->ignorebusy = ignorebusy;
- ast_copy_string(cur->interface, interface, sizeof(cur->interface));
- if (!ast_strlen_zero(membername)) {
- ast_copy_string(cur->membername, membername, sizeof(cur->membername));
- } else {
- ast_copy_string(cur->membername, interface, sizeof(cur->membername));
- }
- if (!strchr(cur->interface, '/')) {
- ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
- }
-
-
- return cur;
-}
-
-
-static int compress_char(const char c)
-{
- if (c < 32)
- return 0;
- else if (c > 96)
- return c - 64;
- else
- return c - 32;
-}
-
+ ao2_unlock(m);
+
+ return status;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a member by interface
+ */
static int member_hash_fn(const void *obj, const int flags)
{
const struct member *mem = obj;
const char *interface = (flags & OBJ_KEY) ? obj : mem->interface;
- const char *chname = strchr(interface, '/');
- int ret = 0, i;
-
- if (!chname) {
- chname = interface;
- }
- for (i = 0; i < 5 && chname[i]; i++) {
- ret += compress_char(chname[i]) << (i * 6);
- }
- return ret;
-}
-
+
+ return ast_str_case_hash(interface);
+}
+
+/*!
+ * \brief ao2 callback to find member by interface
+ * \note this is the default function used by ao2_find
+ */
static int member_cmp_fn(void *obj1, void *obj2, int flags)
{
const struct member *mem1 = obj1;
@@ -1779,23 +1783,190 @@
return strcasecmp(mem1->interface, arg) ? 0 : CMP_MATCH | CMP_STOP;
}
+/*!
+ * \brief ao2 callback to find a realtime member by uniqueid
+ */
static int member_cmp_uniqueid_fn(void *obj1, void *arg, int flags)
{
const struct member *mem1 = obj1;
const struct member *mem2 = arg;
const char *uniqueid = (flags & OBJ_POINTER) ? mem2->rt_uniqueid : arg;
- return strcasecmp(mem1->rt_uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+ if (mem1->realtime && !mem1->dead &&
+ !strcasecmp(mem1->rt_uniqueid, uniqueid)) {
+ return CMP_MATCH | CMP_STOP;
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief ao2 callback to mark realtime members dead
+ */
+static int mark_realtime_member_dead(void *obj, void *arg, int flags)
+{
+ struct member *member = obj;
+
+ if (member->realtime) {
+ member->dead = 1;
+ return CMP_MATCH;
+ }
+ return 0;
+}
+
+/*!
+ * \brief ao2 callback to delete realtime members marked dead
+ */
+static int kill_realtime_dead_members(void *obj, void *arg, void *data, int flags)
+{
+ struct member *m = obj;
+ const struct call_queue *q = data;
+
+ if (m->realtime && m->dead) {
+ ao2_lock(m);
+ if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
+ ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
+ } else {
+ ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
+ }
+ ao2_unlock(m);
+ return CMP_MATCH;
+ }
+ return 0;
+}
+
+/*! \brief ao2 callback to reset member counters */
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+ struct member *mem = obj1;
+
+ ao2_lock(mem);
+ mem->calls = 0;
+ mem->lastwrapup = 0;
+ mem->lastcall = ast_tv(0, 0);
+ ao2_unlock(mem);
+
+ return 0;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a device by state_interface
+ */
+static int device_hash_cb(const void *obj, const int flags)
+{
+ const struct mem_state *s = obj;
+ const char *state_interface = (flags & OBJ_KEY) ? obj : s->state_interface;
+
+ return ast_str_case_hash(state_interface);
+}
+
+/*!
+ * \brief ao2 callback to find device by state_interface
+ * \note this is the default function used by ao2_find
+ */
+static int device_cmp_cb(void *obj, void *arg, int flags)
+{
+ const struct mem_state *d = obj;
+ const struct mem_state *d2 = arg;
+ const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
+
+ return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a rule by name
+ */
+static int rules_hash_cb(const void *obj, const int flags)
+{
+ const struct rule_list *rl = obj;
+ const char *name = (flags & OBJ_KEY) ? obj : rl->name;
+
+ return ast_str_case_hash(name);
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a penalty rule by time
+ */
+static int penalty_hash_cb(const void *obj, const int flags)
+{
+ const struct penalty_rule *pr = obj;
+ const int *tin = obj;
+ int time = (flags & OBJ_KEY) ? *tin : pr->time;
+
+ return time;
+}
+
+/*! \brief find the best penalty rule for duration */
+static int get_best_rule_cb(void *obj, void *arg, void *data, int flags)
+{
+ int *time = arg;
+ struct penalty_rule *cur = obj;
+ struct penalty_rule **din = data, *best = *din;
+
+ if ((cur->time >= *time) && (!best || (best && (cur->time < best->time)))) {
+ if (best) {
+ ao2_ref(best, -1);
+ }
+ ao2_ref(cur, 1);
+ *din = cur;
+ return CMP_MATCH;
+ }
+ return 0;
+}
+
+/*!
[... 6919 lines stripped ...]
More information about the asterisk-commits
mailing list