[asterisk-commits] irroot: branch irroot/distrotech-customers-10 r344956 - /team/irroot/distrote...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sat Nov 12 06:43:30 CST 2011


Author: irroot
Date: Sat Nov 12 06:37:01 2011
New Revision: 344956

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=344956
Log:
Quack Queue

Modified:
    team/irroot/distrotech-customers-10/apps/app_queue.c

Modified: team/irroot/distrotech-customers-10/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/distrotech-customers-10/apps/app_queue.c?view=diff&rev=344956&r1=344955&r2=344956
==============================================================================
--- team/irroot/distrotech-customers-10/apps/app_queue.c (original)
+++ team/irroot/distrotech-customers-10/apps/app_queue.c Sat Nov 12 06:37:01 2011
@@ -25,6 +25,18 @@
  * \arg Config in \ref Config_qu queues.conf
  *
  * \par Development notes
+ * \note 2011-11-01: Made application more thread safe
+ *		Distrotech PTY (LTD) (www.distrotech.co.za)
+ *		Gregory Nietsky (irroot) <gregory at distrotech.co.za>
+ *
+ *		Split away from locking queues/queue to having locks per
+ *		Queue/Member/Device.
+ *		Added device state struct to mange device states shared
+ *		for multiple members sharing same device.
+ *
+ *		Made all functions work with realtime/dynamic/static members.
+ *		Added missing CLI/AMI functions for handling callinuse.
+ *
  * \note 2004-11-25: Persistent Dynamic Members added by:
  *             NetNation Communications (www.netnation.com)
  *             Kevin Lindsay <kevinl at netnation.com>
@@ -99,22 +111,31 @@
 #include "asterisk/callerid.h"
 #include "asterisk/cel.h"
 #include "asterisk/data.h"
-
-/* Define, to debug reference counts on queues, without debugging reference counts on queue members */
-/* #define REF_DEBUG_ONLY_QUEUES */
+#include "asterisk/time.h"
 
 /*!
  * \par Please read before modifying this file.
- * There are three locks which are regularly used
- * throughout this file, the queue list lock, the lock
- * for each individual queue, and the interface list lock.
+ * There are locks which are regularly used
+ * throughout this file, the lock
+ * for each individual queue, the individual member lock ,
+ * and the device state lock.
+ * there are container locks for the queues list, the member
+ * list on each queue and the devices container.
  * Please be extra careful to always lock in the following order
- * 1) queue list lock
- * 2) individual queue lock
- * 3) interface list lock
+ *
+ * 1) queues container lock (ao2 functions that lock the container)
+ * 2) queue lock
+ * 3) queue member lock
+ * 4) member lock
+ * 5) devices container lock
+ * 6) member device lock
  * This order has sort of "evolved" over the lifetime of this
  * application, but it is now in place this way, so please adhere
  * to this order!
+ *
+ * the only elements that do not require a read lock (ie will not change)
+ * the queue name and members container, the member interface and
+ * device state_interface as they are never altered after been linked.
  */
 
 /*** DOCUMENTATION
@@ -264,9 +285,10 @@
 			<parameter name="queuename" required="true" />
 			<parameter name="interface" />
 			<parameter name="penalty" />
-			<parameter name="options" />
+			<parameter name="paused" />
 			<parameter name="membername" />
 			<parameter name="stateinterface" />
+			<parameter name="callinuse" />
 		</syntax>
 		<description>
 			<para>Dynamically adds interface to an existing queue. If the interface is
@@ -530,8 +552,8 @@
 					<enum name="paused">
 						<para>Gets or sets queue member paused status.</para>
 					</enum>
-					<enum name="ignorebusy">
-						<para>Gets or sets queue member ignorebusy.</para>
+					<enum name="callinuse">
+						<para>Gets or sets queue member callinuse.</para>
 					</enum>
 				</enumlist>
 			</parameter>
@@ -846,14 +868,14 @@
 		<description>
 		</description>
 	</manager>
-	<manager name="QueueIgnoreBusy" language="en_US">
+	<manager name="QueueCallInuse" language="en_US">
 		<synopsis>
 			Set interface to allow multiple calls
 		</synopsis>
 		<syntax>
 			<xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
 			<parameter name="Interface" required="true" />
-			<parameter name="IgnoreBusy" required="true" />
+			<parameter name="CallInuse" required="true" />
 			<parameter name="Queue" required="true" />
 		</syntax>
 		<description>
@@ -883,6 +905,13 @@
 	QUEUE_RELOAD_MEMBER = (1 << 1),
 	QUEUE_RELOAD_RULES = (1 << 2),
 	QUEUE_RESET_STATS = (1 << 3),
+	QUEUE_RELOAD_REALTIME = (1 << 4),
+};
+
+enum member_type {
+	QUEUE_ADD_MEMBER_STATIC = (1 << 0),
+	QUEUE_ADD_MEMBER_REALTIME = (1 << 1),
+	QUEUE_ADD_MEMBER_DYNAMIC = (1 << 2),
 };
 
 static const struct strategy {
@@ -925,7 +954,7 @@
 #define	RES_OUTOFMEMORY	(-2)		/*!< Out of memory */
 #define	RES_NOSUCHQUEUE	(-3)		/*!< No such queue */
 #define RES_NOT_DYNAMIC (-4)		/*!< Member is not dynamic */
-
+#define RES_ERROR	(-5)		/*!< Member is mis configured */
 static char *app = "Queue";
 
 static char *app_aqm = "AddQueueMember" ;
@@ -1004,49 +1033,32 @@
  *  use it not only for keeping track of what is in use but
  *  also for keeping track of who we're dialing.
  *
- *  There are two "links" defined in this structure, q_next and call_next.
- *  q_next links ALL defined callattempt structures into a linked list. call_next is
- *  a link which allows for a subset of the callattempts to be traversed. This subset
- *  is used in wait_for_answer so that irrelevant callattempts are not traversed. This
- *  also is helpful so that queue logs are always accurate in the case where a call to 
- *  a member times out, especially if using the ringall strategy. 
 */
 
 struct callattempt {
-	struct callattempt *q_next;
-	struct callattempt *call_next;
-	struct ast_channel *chan;
-	char interface[256];
-	int stillgoing;
-	int metric;
-	time_t lastcall;
-	struct call_queue *lastqueue;
-	struct member *member;
-	/*! Saved connected party info from an AST_CONTROL_CONNECTED_LINE. */
-	struct ast_party_connected_line connected;
-	/*! TRUE if an AST_CONTROL_CONNECTED_LINE update was saved to the connected element. */
-	unsigned int pending_connected_update:1;
-	/*! TRUE if caller id is not available for connected line */
-	unsigned int dial_callerid_absent:1;
-	unsigned int reserved:1;
+	struct ast_channel *chan;                  /*! Channel called */
+	int stillgoing;                            /*! This attempt is valid and active */
+	int metric;                                /*! Metric calculated according to strategy */
+	struct member *member;                     /*! Member assosiated with this attempt */
+	struct ast_party_connected_line connected; /*! Saved connected party info from an AST_CONTROL_CONNECTED_LINE. */
+	unsigned int reserved:1;                   /*! Is this attempt been attempted*/
+	unsigned int active:1;                     /*! Is this attempt active in a call*/
+	unsigned int pending_connected_update:1;   /*! TRUE if caller id is not available for connected line*/
+	unsigned int dial_callerid_absent:1;       /*! TRUE if caller id is not available for connected line */
 	struct ast_aoc_decoded *aoc_s_rate_list;
 };
 
-
 struct queue_ent {
 	struct call_queue *parent;             /*!< What queue is our parent */
-	char moh[80];                          /*!< Name of musiconhold to be used */
-	char announce[PATH_MAX];               /*!< Announcement to play for member when call is answered */
-	char context[AST_MAX_CONTEXT];         /*!< Context when user exits queue */
 	char digits[AST_MAX_EXTENSION];        /*!< Digits entered while in queue */
 	int valid_digits;                      /*!< Digits entered correspond to valid extension. Exited */
 	int pos;                               /*!< Where we are in the queue */
 	int prio;                              /*!< Our priority */
 	int last_pos_said;                     /*!< Last position we told the user */
 	int ring_when_ringing;                 /*!< Should we only use ring indication when a channel is ringing? */
-	time_t last_periodic_announce_time;    /*!< The last time we played a periodic announcement */
+	struct timeval last_pannounce_time;    /*!< The last time we played a periodic announcement */
 	int last_periodic_announce_sound;      /*!< The last periodic announcement we made */
-	time_t last_pos;                       /*!< Last time we told the user their position */
+	struct timeval last_pos;               /*!< Last time we told the user their position */
 	int opos;                              /*!< Where we started in the queue */
 	int handled;                           /*!< Whether our call was handled */
 	int pending;                           /*!< Non-zero if we are attempting to call a member */
@@ -1054,35 +1066,40 @@
 	int min_penalty;                       /*!< Limit the members that can take this call to this penalty or higher */
 	int linpos;                            /*!< If using linear strategy, what position are we at? */
 	int linwrapped;                        /*!< Is the linpos wrapped? */
-	time_t start;                          /*!< When we started holding */
-	time_t expire;                         /*!< When this entry should expire (time out of queue) */
+	struct timeval start;                  /*!< When we started holding */
+	struct timeval expire;                 /*!< When this entry should expire (time out of queue) */
 	int cancel_answered_elsewhere;	       /*!< Whether we should force the CAE flag on this call (C) option*/
+	struct ao2_container *attempts;        /*!< Container holding all call attempts*/
 	struct ast_channel *chan;              /*!< Our channel */
-	AST_LIST_HEAD_NOLOCK(,penalty_rule) qe_rules; /*!< Local copy of the queue's penalty rules */
-	struct penalty_rule *pr;               /*!< Pointer to the next penalty rule to implement */
-	struct queue_ent *next;                /*!< The next queue entry */
+	struct rule_list *rules;               /*!< Pointer holding the ref for the queue penalty rules */
+	struct penalty_rule *pr;               /*!< Active penalty rule */
+	AST_LIST_ENTRY(queue_ent) next;        /*!< The next queue entry */
 };
 
+/*! \brief keep track of device state changes */
 struct mem_state {
-	char state_interface[80];            /*!< Technology/Location from which to read devicestate changes */
-	int reserved;                        /*!< This interface is reserved for pending call */
-	int status;                          /*!< Status of queue member */
+	char state_interface[80];              /*!< Technology/Location from which to read devicestate changes */
+	int reserved;                          /*!< This interface is reserved for pending call */
+	int active;                            /*!< This interface is active on a call */
+	int status;                            /*!< Status of queue member */
 };
 
 struct member {
-	char interface[80];                  /*!< Technology/Location to dial to reach this member*/
-	char membername[80];                 /*!< Member name to use in queue logs */
-	int penalty;                         /*!< Are we a last resort? */
-	int calls;                           /*!< Number of calls serviced by this member */
-	time_t lastcall;                     /*!< When last successful call was hungup */
-	struct call_queue *lastqueue;	     /*!< Last queue we received a call */
-	unsigned int realtime:1;             /*!< Is this member realtime? */
-	unsigned int paused:1;               /*!< Are we paused (not accepting calls)? */
-	unsigned int dead:1;                 /*!< Used to detect members deleted in realtime */
-	unsigned int dynamic:1;              /*!< Are we dynamically added? */
-	unsigned int ignorebusy:1;           /*!< Flag to ignore member if the status is not available */
-	char rt_uniqueid[80];                /*!< Unique id of realtime member entry */
-	struct mem_state *device;            /*!< Device information */
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(interface);   /*!< Technology/Location to dial to reach this member*/
+		AST_STRING_FIELD(membername);  /*!< Member name to use in queue logs */
+		AST_STRING_FIELD(rt_uniqueid); /*!< Unique id of realtime member entry */
+	);
+	int penalty;                           /*!< Are we a last resort? */
+	int calls;                             /*!< Number of calls serviced by this member */
+	struct timeval lastcall;               /*!< When last successful call was hungup */
+	int lastwrapup;                        /*!< Last wrapuptime */
+	unsigned int dynamic:1;                /*!< Is this member dynamic? */
+	unsigned int realtime:1;               /*!< Is this member realtime? */
+	unsigned int paused:1;                 /*!< Are we paused (not accepting calls)? */
+	unsigned int dead:1;                   /*!< Used to detect members deleted in realtime */
+	unsigned int callinuse:1;              /*!< Are we dynamically added? */
+	struct mem_state *device;              /*!< Device information */
 };
 
 enum empty_conditions {
@@ -1165,7 +1182,6 @@
 	unsigned int setqueuevar:1;
 	unsigned int setqueueentryvar:1;
 	unsigned int reportholdtime:1;
-	unsigned int wrapped:1;
 	unsigned int timeoutrestart:1;
 	unsigned int announceholdtime:2;
 	unsigned int announceposition:3;
@@ -1185,49 +1201,50 @@
 	int numperiodicannounce;            /*!< The number of periodic announcements configured */
 	int randomperiodicannounce;         /*!< Are periodic announcments randomly chosen */
 	int roundingseconds;                /*!< How many seconds do we round to? */
-	int holdtime;                       /*!< Current avg holdtime, based on an exponential average */
-	int talktime;                       /*!< Current avg talktime, based on the same exponential average */
-	int callscompleted;                 /*!< Number of queue calls completed */
-	int callsabandoned;                 /*!< Number of queue calls abandoned */
 	int servicelevel;                   /*!< seconds setting for servicelevel*/
-	int callscompletedinsl;             /*!< Number of calls answered with servicelevel*/
 	char monfmt[8];                     /*!< Format to use when recording calls */
 	int montype;                        /*!< Monitor type  Monitor vs. MixMonitor */
-	int count;                          /*!< How many entries */
 	int maxlen;                         /*!< Max number of entries */
 	int wrapuptime;                     /*!< Wrapup Time */
 	int penaltymemberslimit;            /*!< Disregard penalty when queue has fewer than this many members */
-
 	int retry;                          /*!< Retry calling everyone after this amount of time */
 	int timeout;                        /*!< How long to wait for an answer */
 	int weight;                         /*!< Respective weight */
 	int autopause;                      /*!< Auto pause queue members if they fail to answer */
 	int autopausedelay;                 /*!< Delay auto pause for autopausedelay seconds since last call */
 	int timeoutpriority;                /*!< Do we allow a fraction of the timeout to occur for a ring? */
-
 	/* Queue strategy things */
-	int rrpos;                          /*!< Round Robin - position */
 	int memberdelay;                    /*!< Seconds to delay connecting member to caller */
 	int autofill;                       /*!< Ignore the head call status and ring an available agent */
-	
-	struct ao2_container *members;             /*!< Head of the list of members */
-	struct queue_ent *head;             /*!< Head of the list of callers */
-	AST_LIST_ENTRY(call_queue) list;    /*!< Next call queue */
-	AST_LIST_HEAD_NOLOCK(, penalty_rule) rules; /*!< The list of penalty rules to invoke */
+	struct timeval reload;              /*!< Time the queue will be reloaded from RT */
+	struct queue_data *data;            /*!< Queue statistics */
+};
+
+struct queue_data {
+	int qhash;                          /*!< Hash for queue */
+	unsigned int wrapped:1;
+	int count;                          /*!< How many entries */
+	int holdtime;                       /*!< Current avg holdtime, based on an exponential average */
+	int talktime;                       /*!< Current avg talktime, based on the same exponential average */
+	int callscompleted;                 /*!< Number of queue calls completed */
+	int callsabandoned;                 /*!< Number of queue calls abandoned */
+	int callscompletedinsl;             /*!< Number of calls answered with servicelevel*/
+	int rrpos;                          /*!< Round Robin - position */
+	AST_LIST_HEAD(, queue_ent)  *head;  /*!< Head of the list of callers */
+	struct ao2_container *members;      /*!< Head of the list of members */
 };
 
 struct rule_list {
 	char name[80];
-	AST_LIST_HEAD_NOLOCK(,penalty_rule) rules;
-	AST_LIST_ENTRY(rule_list) list;
+	struct ao2_container *rules;
 };
-
-static AST_LIST_HEAD_STATIC(rule_lists, rule_list);
 
 static struct ao2_container *queues;
 static struct ao2_container *devices;
-
-static void dump_queue_members(struct call_queue *pm_queue);
+static struct ao2_container *rules;
+static struct ao2_container *qdata;
+
+static int do_set_member_penalty_paused(struct call_queue *q, struct member *mem, int pause, int value, const char *reason);
 static void pm_load_member_config(struct call_queue *q);
 static char *queue_refshow(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static struct member *interface_exists(struct call_queue *q, const char *interface);
@@ -1235,7 +1252,6 @@
 
 static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
 
-static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
 /*! \brief sets the QUEUESTATUS channel variable */
 static void set_queue_result(struct ast_channel *chan, enum queue_result res)
 {
@@ -1249,6 +1265,7 @@
 	}
 }
 
+/*! \brief return strategy name from strategy*/
 static const char *int2strat(int strategy)
 {
 	int x;
@@ -1261,6 +1278,7 @@
 	return "<unknown>";
 }
 
+/*! brief return strategy from strategy name*/
 static int strat2int(const char *strategy)
 {
 	int x;
@@ -1273,6 +1291,7 @@
 	return -1;
 }
 
+/*! brief return a autopause setting from name*/
 static int autopause2int(const char *autopause)
 {
 	int x;
@@ -1293,111 +1312,125 @@
 	return QUEUE_AUTOPAUSE_OFF;
 }
 
+/*!
+ * \brief ao2 callback to calculate hash of a queue by name
+ */
 static int queue_hash_cb(const void *obj, const int flags)
 {
 	const struct call_queue *q = obj;
-
-	return ast_str_case_hash(q->name);
-}
-
+	const char *name = q->name;
+
+	return ast_str_case_hash(name);
+}
+
+
+/*!
+ * \brief ao2 callback to find queue by name
+ * \note this is the default function used by ao2_find
+ */
 static int queue_cmp_cb(void *obj, void *arg, int flags)
 {
-	const struct call_queue *q = obj;
-	const struct call_queue *q2 = arg;
+	const struct call_queue *q = obj, *q2 = arg;
 	const char *name = (flags & OBJ_POINTER) ? q2->name : arg;
 
 	return !strcasecmp(q->name, name) ? CMP_MATCH | CMP_STOP : 0;
 }
 
-static int device_hash_cb(const void *obj, const int flags)
-{
-	const struct mem_state *d = obj;
-
-	return ast_str_case_hash(d->state_interface);
-}
-
-static int device_cmp_cb(void *obj, void *arg, int flags)
-{
-	const struct mem_state *d = obj;
-	const struct mem_state *d2 = arg;
-	const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
-
-	return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
-}
-
-#ifdef REF_DEBUG_ONLY_QUEUES
-#define queue_ref(a)	__ao2_ref_debug(a,1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_unref(a)	__ao2_ref_debug(a,-1,"",__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_ref(a,b)	__ao2_ref_debug(a,1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queue_t_unref(a,b)	__ao2_ref_debug(a,-1,b,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_link(c,q,tag)	__ao2_link_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#define queues_t_unlink(c,q,tag)	__ao2_unlink_debug(c,q,tag,__FILE__,__LINE__,__PRETTY_FUNCTION__)
-#else
-#define queue_t_ref(a,b)	queue_ref(a)
-#define queue_t_unref(a,b)	queue_unref(a)
-#define queues_t_link(c,q,tag)	ao2_t_link(c,q,tag)
-#define queues_t_unlink(c,q,tag)	ao2_t_unlink(c,q,tag)
-static inline struct call_queue *queue_ref(struct call_queue *q)
-{
-        ao2_ref(q, 1);
-        return q;
-}
-
-static inline struct call_queue *queue_unref(struct call_queue *q)
-{
-        ao2_ref(q, -1);
-        return NULL;
-}
-#endif
-
-/*! \brief Set variables of queue */
+/*!
+ * \brief ao2 callback to calculate hash of a queue by name
+ */
+static int qdata_hash_cb(const void *obj, const int flags)
+{
+	const struct queue_data *d = obj;
+	int qhash = d->qhash;
+
+	return qhash;
+}
+
+
+/*!
+ * \brief ao2 callback to find queue by name
+ * \note this is the default function used by ao2_find
+ */
+static int qdata_cmp_cb(void *obj, void *arg, int flags)
+{
+	const struct queue_data *d = obj, *d2 = arg;
+	const char *name = arg;
+	int qhash = (flags & OBJ_POINTER) ? d2->qhash : ast_str_case_hash(name);
+
+	return  (d->qhash == qhash) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*! \brief Set channel variables of queue */
 static void set_queue_variables(struct call_queue *q, struct ast_channel *chan)
 {
 	char interfacevar[256]="";
 	float sl = 0;
-
-	ao2_lock(q);
+	struct queue_data *data = q->data;
 
 	if (q->setqueuevar) {
 		sl = 0;
-		if (q->callscompleted > 0) 
-			sl = 100 * ((float) q->callscompletedinsl / (float) q->callscompleted);
+		ao2_lock(data);
+		if (data->callscompleted > 0) {
+			sl = 100 * ((float) data->callscompletedinsl / (float) data->callscompleted);
+		}
 
 		snprintf(interfacevar, sizeof(interfacevar),
 			"QUEUENAME=%s,QUEUEMAX=%d,QUEUESTRATEGY=%s,QUEUECALLS=%d,QUEUEHOLDTIME=%d,QUEUETALKTIME=%d,QUEUECOMPLETED=%d,QUEUEABANDONED=%d,QUEUESRVLEVEL=%d,QUEUESRVLEVELPERF=%2.1f",
-			q->name, q->maxlen, int2strat(q->strategy), q->count, q->holdtime, q->talktime, q->callscompleted, q->callsabandoned,  q->servicelevel, sl);
-
-		ao2_unlock(q);
-	
-		pbx_builtin_setvar_multiple(chan, interfacevar); 
-	} else {
-		ao2_unlock(q);
-	}
-}
-
-/*! \brief Insert the 'new' entry after the 'prev' entry of queue 'q' */
-static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, struct queue_ent *new, int *pos)
-{
-	struct queue_ent *cur;
-
-	if (!q || !new)
-		return;
-	if (prev) {
-		cur = prev->next;
-		prev->next = new;
-	} else {
-		cur = q->head;
-		q->head = new;
-	}
-	new->next = cur;
-
-	/* every queue_ent must have a reference to it's parent call_queue, this
-	 * reference does not go away until the end of the queue_ent's life, meaning
-	 * that even when the queue_ent leaves the call_queue this ref must remain. */
-	queue_ref(q);
-	new->parent = q;
+			q->name, q->maxlen, int2strat(q->strategy), data->count, data->holdtime, data->talktime,
+			data->callscompleted, data->callsabandoned,  q->servicelevel, sl);
+
+		pbx_builtin_setvar_multiple(chan, interfacevar);
+		ao2_unlock(data);
+	}
+}
+
+/*! \brief Insert the 'new' callattempt entry after the 'prev' entry of queue 'q' */
+static inline void insert_entry(struct queue_ent *new, int *pos)
+{
 	new->pos = ++(*pos);
 	new->opos = *pos;
+	ao2_lock(new->parent->data);
+	new->parent->data->count++;
+	ao2_unlock(new->parent->data);
+}
+
+/*! \brief return the device state for a member*/
+static int get_device_status(struct member *m)
+{
+	int ret;
+	struct mem_state *s = m->device;
+
+	ao2_lock(s);
+
+	ret = s->status;
+	switch (s->status) {
+		case AST_DEVICE_INVALID:
+		case AST_DEVICE_UNAVAILABLE:
+		case AST_DEVICE_BUSY:
+			break;
+		case AST_DEVICE_INUSE:
+		case AST_DEVICE_RINGING:
+		case AST_DEVICE_RINGINUSE:
+		case AST_DEVICE_ONHOLD:
+			/* if im active and may not place calls when INUSE im actually BUSY */
+			if ((s->reserved || s->active) && !m->callinuse) {
+				ret = AST_DEVICE_BUSY;
+				break;
+			}
+			break;
+		case AST_DEVICE_NOT_INUSE:
+		case AST_DEVICE_UNKNOWN:
+			/* it seems that i have this device active but the system does not */
+			if (s->active) {
+				ret = (m->callinuse) ? AST_DEVICE_INUSE : AST_DEVICE_BUSY;
+			} else if (s->reserved) {
+				ret = (m->callinuse) ? AST_DEVICE_RINGING : AST_DEVICE_BUSY;
+			}
+	}
+        ao2_unlock(s);
+
+        return ret;
 }
 
 /*! \brief Check if members are available
@@ -1405,26 +1438,35 @@
  * This function checks to see if members are available to be called. If any member
  * is available, the function immediately returns 0. If no members are available,
  * then -1 is returned.
- *
- * It must be called with ref and lock held for q
- *
  */
-static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, enum empty_conditions conditions)
+static int get_member_status(const struct queue_ent *qe, int join)
 {
 	struct member *member;
 	struct ao2_iterator mem_iter;
-
-	mem_iter = ao2_iterator_init(q->members, 0);
+	struct call_queue *q = qe->parent;
+	int max_penalty = qe->max_penalty;
+	int min_penalty = qe->min_penalty;
+	enum empty_conditions conditions;
+
+	conditions = (join) ? q->joinempty : q->leavewhenempty;
+
+	if (!conditions) {
+		return 0;
+	}
+
+	mem_iter = ao2_iterator_init(q->data->members, 0);
 	while((member = ao2_iterator_next(&mem_iter))) {
+		ao2_lock(member);
 		if ((max_penalty && (member->penalty > max_penalty)) || (min_penalty && (member->penalty < min_penalty))) {
 			if (conditions & QUEUE_EMPTY_PENALTY) {
 				ast_debug(4, "%s is unavailable because his penalty is not between %d and %d\n", member->membername, min_penalty, max_penalty);
+				ao2_unlock(member);
+				ao2_ref(member, -1);
 				continue;
 			}
 		}
 
-		ao2_lock(member->device);
-		switch (member->device->status) {
+		switch (get_device_status(member)) {
 		case AST_DEVICE_INVALID:
 			if (conditions & QUEUE_EMPTY_INVALID) {
 				ast_debug(4, "%s is unavailable because his device state is 'invalid'\n", member->membername);
@@ -1438,6 +1480,7 @@
 			}
 			goto default_case;
 		case AST_DEVICE_INUSE:
+		case AST_DEVICE_BUSY:
 			if (conditions & QUEUE_EMPTY_INUSE) {
 				ast_debug(4, "%s is unavailable because his device state is 'inuse'\n", member->membername);
 				break;
@@ -1460,27 +1503,41 @@
 			if (member->paused && (conditions & QUEUE_EMPTY_PAUSED)) {
 				ast_debug(4, "%s is unavailable because he is paused'\n", member->membername);
 				break;
-			} else if ((conditions & QUEUE_EMPTY_WRAPUP) && member->lastcall && q->wrapuptime && (time(NULL) - q->wrapuptime < member->lastcall)) {
-				ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int) (time(NULL) - member->lastcall), q->wrapuptime);
+			} else if ((conditions & QUEUE_EMPTY_WRAPUP) && !ast_tvzero(member->lastcall) && member->lastwrapup && (ast_tvdiff_sec(ast_tvnow(), member->lastcall) <= member->lastwrapup)) {
+				ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int)ast_tvdiff_sec(ast_tvnow(), member->lastcall), member->lastwrapup);
 				break;
 			} else {
-				ao2_unlock(member->device);
 				ast_debug(4, "%s is available.\n", member->membername);
+				ao2_unlock(member);
 				ao2_ref(member, -1);
 				ao2_iterator_destroy(&mem_iter);
 				return 0;
 			}
 			break;
 		}
-		ao2_unlock(member->device);
+		ao2_unlock(member);
 		ao2_ref(member, -1);
 	}
 	ao2_iterator_destroy(&mem_iter);
 	return -1;
 }
 
-/*! \brief send a QueueMemberStatus manager_event
-*/
+/*! \brief Un ref a device if im the last consumer unlink it*/
+static void unref_device(struct mem_state *s) {
+	if (!s) {
+		return;
+	}
+
+	ao2_lock(devices);
+	/* remove our ref*/
+	if (ao2_ref(s, -1) == 2) {
+		/* we were the only consumer unlink*/
+		ao2_find(devices, s, OBJ_UNLINK | OBJ_POINTER | OBJ_NODATA | OBJ_NOLOCK);
+	}
+	ao2_unlock(devices);
+}
+
+/*! \brief send a QueueMemberStatus manager_event when device state changes*/
 static int update_status(void *data)
 {
 	struct ao2_iterator qiter;
@@ -1491,21 +1548,19 @@
 
 	qiter = ao2_iterator_init(queues, 0);
 	while ((q = ao2_iterator_next(&qiter))) {
-		ao2_lock(q);
 		if (q->maskmemberstatus) {
-			ao2_unlock(q);
 			ao2_ref(q, -1);
 			continue;
 		}
-		miter = ao2_iterator_init(q->members, 0);
+		miter = ao2_iterator_init(q->data->members, 0);
 		while((m = ao2_iterator_next(&miter))) {
-			ao2_lock(m->device);
-			if (strcasecmp(m->device->state_interface, s->state_interface)) {
-				ao2_unlock(m->device);
+			ao2_lock(m);
+			if (m->device != s) {
+				ao2_unlock(m);
 				ao2_ref(m, -1);
 				continue;
 			}
-			ao2_unlock(m->device);
+			ao2_lock(s);
 			manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
 				"Queue: %s\r\n"
 				"Location: %s\r\n"
@@ -1516,18 +1571,47 @@
 				"CallsTaken: %d\r\n"
 				"LastCall: %d\r\n"
 				"Status: %d\r\n"
-				"Paused: %d\r\n",
+				"Paused: %d\r\n"
+				"CallInuse: %d\r\n",
 				q->name, m->interface, m->membername, s->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
-				m->penalty, m->calls, (int)m->lastcall, s->status, m->paused
+				m->penalty, m->calls, (int)m->lastcall.tv_sec, s->status, m->paused, m->callinuse
 			);
+			ao2_unlock(s);
+			ao2_unlock(m);
 			ao2_ref(m, -1);
 		}
 		ao2_iterator_destroy(&miter);
-		ao2_unlock(q);
 		ao2_ref(q, -1);
 	}
 	ao2_iterator_destroy(&qiter);
-	ao2_ref(s, -1);
+
+	unref_device(s);
+
+	return 0;
+}
+
+/*! \brief set the device state of a member explicitly
+ *  \note a change update manager event will be sent
+ */
+static int set_device_status(const char *device, int status)
+{
+	struct mem_state *s;
+
+	if (!(s = ao2_find(devices, (char *)device, 0))) {
+		return -1;
+	}
+
+	ao2_lock(s);
+	if (s->status != status) {
+		s->status = status;
+		ao2_unlock(s);
+		if (ast_taskprocessor_push(devicestate_tps, update_status, s)) {
+			unref_device(s);
+		}
+	} else {
+		ao2_unlock(s);
+		unref_device(s);
+	}
 
 	return 0;
 }
@@ -1537,7 +1621,6 @@
 {
 	enum ast_device_state state;
 	const char *device;
-	struct mem_state *s, *tmp;
 
 	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
 	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -1547,19 +1630,7 @@
 		return;
 	}
 
-	if ((s = ao2_find(devices, (char *)device, 0))) {
-		ao2_lock(s);
-		if (s->status != state) {
-			s->status = state;
-			if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
-				memcpy(tmp, s, sizeof(*tmp));
-				if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
-					ao2_ref(tmp, -1);
-				}
-			}
-		}
-		ao2_unlock(s);
-		ao2_ref(s, -1);
+	if (set_device_status(device, state)) {
 		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
 	} else {
 		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
@@ -1598,9 +1669,9 @@
 	return state;
 }
 
+/*! \brief callback called when a extension hint is notified of change*/
 static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
 {
-	struct mem_state *s, *tmp;
 	char *device;
 	int status = extensionstate2devicestate(state);
 
@@ -1609,19 +1680,7 @@
 		return 0;
 	}
 
-	if ((s = ao2_find(devices, device, 0))) {
-		ao2_lock(s);
-		if (s->status != status) {
-			s->status = status;
-			if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
-				memcpy(tmp, s, sizeof(*tmp));
-				if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
-					ao2_ref(tmp, -1);
-				}
-			}
-		}
-		ao2_unlock(s);
-		ao2_ref(s, -1);
+	if (set_device_status(device, status)) {
 		ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, status, ast_devstate2str(status));
 	} else {
 		ast_debug(3, "Extension '%s@%s' changed state but we don't care because they're not a member of any queue.\n",
@@ -1631,21 +1690,7 @@
 	return 0;
 }
 
-static void remove_queue_member(void *data) {
-	struct member *mem = data;
-	struct mem_state *s = mem->device;
-
-	if (s) {
-		/* we have a device lets unlink and unref it*/
-		/* remove our ref*/
-		if (ao2_ref(s, -1) == 2) {
-			/* we were the only consumer unlink*/
-			ao2_unlink(devices, s);
-		}
-	}
-
-}
-
+/* \brief find or create a member device state object*/
 static struct mem_state *create_member_state(const char *state_interface) {
 	struct mem_state *state;
 	char *exten;
@@ -1666,7 +1711,11 @@
 		if (context) {
 			ast_copy_string(state->state_interface, state_interface, sizeof(state->state_interface));
 		} else {
-			asprintf(&device, "%s at default", state_interface);
+			if (!asprintf(&device, "%s at default", state_interface)) {
+				ast_log(AST_LOG_ERROR, "Failed to use state_interface %s at default\n", state_interface);
+				ao2_ref(state, -1);
+				return NULL;
+			}
 			ast_copy_string(state->state_interface, device, sizeof(state->state_interface));
 		}
 		state->status = extensionstate2devicestate(ast_extension_state(NULL, S_OR(context, "default"), exten));
@@ -1679,14 +1728,15 @@
 	return state;
 }
 
-/*! \brief Set current state of member if needed*/
-static void set_queue_member_status(struct member *m)
+/*! \brief Set current state of member querying channel driver or hint state*/
+static int set_queue_member_status(struct member *m)
 {
 	int status;
-	struct mem_state *s = m->device;
-	struct mem_state *tmp;
-
-	ao2_lock(s);
+	struct mem_state *s;
+
+	ao2_lock(m);
+	s = m->device;
+
 	if (!strncasecmp(s->state_interface, "hint:", 5)) {
 		char *context = ast_strdupa(s->state_interface);
 		char *exten = strsep(&context, "@") + 5;
@@ -1695,80 +1745,35 @@
 		status = ast_device_state(s->state_interface);
 	}
 
+	ao2_lock(s);
 	if (s->status != status) {
 		s->status = status;
-		if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
-			memcpy(tmp, s, sizeof(*tmp));
-			if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
-				ao2_ref(tmp, -1);
-			}
+		/* we must pass a ref to the task processor*/
+		if (!ast_taskprocessor_push(devicestate_tps, update_status, s)) {
+			ao2_ref(s, 1);
 		}
 	}
 	ao2_unlock(s);
-}
-
-/*! \brief allocate space for new queue member and set fields based on parameters passed */
-static struct member *create_queue_member(const char *interface, const char *membername, int penalty,
-		int paused, const char *state_interface, int ignorebusy)
-{
-	struct member *cur;
-	const char* state_int;
-
-	if (!(cur = ao2_alloc(sizeof(*cur), remove_queue_member))) {
-		return NULL;
-	}
-
-	/* setup device state*/
-	state_int = (!ast_strlen_zero(state_interface)) ? state_interface : interface;
-	if (!(cur->device = create_member_state(state_int))) {
-		ao2_ref(cur, -1);
-		return NULL;
-	}
-
-	cur->penalty = penalty;
-	cur->paused = paused;
-	cur->ignorebusy = ignorebusy;
-	ast_copy_string(cur->interface, interface, sizeof(cur->interface));
-	if (!ast_strlen_zero(membername)) {
-		ast_copy_string(cur->membername, membername, sizeof(cur->membername));
-	} else {
-		ast_copy_string(cur->membername, interface, sizeof(cur->membername));
-	}
-	if (!strchr(cur->interface, '/')) {
-		ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
-	}
-
-
-	return cur;
-}
-
-
-static int compress_char(const char c)
-{
-	if (c < 32)
-		return 0;
-	else if (c > 96)
-		return c - 64;
-	else
-		return c - 32;
-}
-
+	ao2_unlock(m);
+
+	return status;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a member by interface
+ */
 static int member_hash_fn(const void *obj, const int flags)
 {
 	const struct member *mem = obj;
 	const char *interface = mem->interface;
-	const char *chname = strchr(interface, '/');
-	int ret = 0, i;
-
-	if (!chname) {
-		chname = interface;
-	}
-	for (i = 0; i < 5 && chname[i]; i++) {
-		ret += compress_char(chname[i]) << (i * 6);
-	}
-	return ret;
-}
-
+
+	return ast_str_case_hash(interface);
+}
+
+/*!
+ * \brief ao2 callback to find member by interface
+ * \note this is the default function used by ao2_find
+ */
 static int member_cmp_fn(void *obj1, void *obj2, int flags)
 {
 	const struct member *mem1 = obj1;
@@ -1778,23 +1783,189 @@
 	return strcasecmp(mem1->interface, arg) ? 0 : CMP_MATCH | CMP_STOP;
 }
 
+/*!
+ * \brief ao2 callback to find a realtime member by uniqueid
+ */
 static int member_cmp_uniqueid_fn(void *obj1, void *arg, int flags)
 {
 	const struct member *mem1 = obj1;
 	const struct member *mem2 = arg;
 	const char *uniqueid = (flags & OBJ_POINTER) ? mem2->rt_uniqueid : arg;
 
-	return strcasecmp(mem1->rt_uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+	if (mem1->realtime && !mem1->dead &&
+	    !strcasecmp(mem1->rt_uniqueid, uniqueid)) {
+		return CMP_MATCH | CMP_STOP;
+	}
+
+	return 0;
+}
+
+/*!
+ * \brief ao2 callback to mark realtime members dead
+ */
+static int mark_realtime_member_dead(void *obj, void *arg, int flags)
+{
+	struct member *member = obj;
+
+	if (member->realtime) {
+		member->dead = 1;
+		return CMP_MATCH;
+	}
+	return 0;
+}
+
+/*!
+ * \brief ao2 callback to delete realtime members marked dead
+ */
+static int kill_realtime_dead_members(void *obj, void *arg, void *data, int flags)
+{
+	struct member *m = obj;
+	const struct call_queue *q = data;
+
+	if (m->realtime && m->dead) {
+		ao2_lock(m);
+		if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
+			ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
+		} else {
+			ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
+		}
+		ao2_unlock(m);
+		return CMP_MATCH;
+	}
+	return 0;
+}
+
+/*! \brief ao2 callback to reset member counters */
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+	struct member *mem = obj1;
+
+	ao2_lock(mem);
+	mem->calls = 0;
+	mem->lastwrapup = 0;
+	mem->lastcall = ast_tv(0, 0);
+	ao2_unlock(mem);
+
+	return 0;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a device by state_interface
+ */
+static int device_hash_cb(const void *obj, const int flags)
+{
+	const struct mem_state *s = obj;
+	const char *state_interface = s->state_interface;
+
+	return ast_str_case_hash(state_interface);
+}
+
+/*!
+ * \brief ao2 callback to find device by state_interface
+ * \note this is the default function used by ao2_find
+ */
+static int device_cmp_cb(void *obj, void *arg, int flags)
+{
+	const struct mem_state *d = obj;
+	const struct mem_state *d2 = arg;
+	const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
+
+	return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a rule by name
+ */
+static int rules_hash_cb(const void *obj, const int flags)
+{
+	const struct rule_list *rl = obj;
+	const char *name = rl->name;
+
+	return ast_str_case_hash(name);
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a penalty rule by time
+ */
+static int penalty_hash_cb(const void *obj, const int flags)
+{
+	const struct penalty_rule *pr = obj;
+	int time = pr->time;
+
+	return time;
+}
+
+/*! \brief find the best penalty rule for duration */
+static int get_best_rule_cb(void *obj, void *arg, void *data, int flags)
+{
+	int *time = arg;
+	struct penalty_rule *cur = obj;
+	struct penalty_rule **din = data, *best = *din;
+
+	if ((cur->time >= *time) && (!best || (best && (cur->time < best->time)))) {
+		if (best) {
+			ao2_ref(best, -1);
+		}
+		ao2_ref(cur, 1);
+		*din = cur;
+		return CMP_MATCH;
+	}
+	return 0;
+}
+
+/*!
+ * \brief ao2 callback to find rule by name
+ * \note this is the default function used by ao2_find
+ */
+static int rules_cmp_cb(void *obj, void *arg, int flags)
+{
+	const struct rule_list *rl = obj;
+	const struct rule_list *rl2 = arg;
+	const char *name = (flags & OBJ_POINTER) ? rl2->name : arg;
+
+	return !strcasecmp(rl->name, name) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+/*!
+ * \brief ao2 callback to calculate hash of a callattempt by member interface
+ */
+static int callattempt_hash_fn(const void *obj, const int flags)
+{
+	const struct callattempt *c = obj;

[... 7000 lines stripped ...]



More information about the asterisk-commits mailing list