[asterisk-commits] irroot: branch irroot/distrotech-customers-10 r342057 - in /team/irroot/distr...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Oct 24 07:35:58 CDT 2011


Author: irroot
Date: Mon Oct 24 07:35:54 2011
New Revision: 342057

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=342057
Log:
Add locking for device states

Modified:
    team/irroot/distrotech-customers-10/   (props changed)
    team/irroot/distrotech-customers-10/apps/app_queue.c
    team/irroot/distrotech-customers-10/configs/queues.conf.sample

Propchange: team/irroot/distrotech-customers-10/
------------------------------------------------------------------------------
    svn:mergeinfo = /team/irroot/distrotech-customers-trunk:342056

Modified: team/irroot/distrotech-customers-10/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/distrotech-customers-10/apps/app_queue.c?view=diff&rev=342057&r1=342056&r2=342057
==============================================================================
--- team/irroot/distrotech-customers-10/apps/app_queue.c (original)
+++ team/irroot/distrotech-customers-10/apps/app_queue.c Mon Oct 24 07:35:54 2011
@@ -896,9 +896,6 @@
 	{ QUEUE_AUTOPAUSE_ALL,"all" },
 };
 
-
-static struct ast_taskprocessor *devicestate_tps;
-
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
 #define RECHECK			1		/*!< Recheck every second to see we we're at the top yet */
@@ -956,9 +953,6 @@
 
 /*! \brief queues.conf [general] option */
 static int log_membername_as_agent = 0;
-
-/*! \brief queues.conf [general] option */
-static int check_state_unknown = 0;
 
 enum queue_result {
 	QUEUE_UNKNOWN = 0,
@@ -1052,24 +1046,27 @@
 	struct queue_ent *next;                /*!< The next queue entry */
 };
 
+struct mem_state {
+	char state_interface[80];            /*!< Technology/Location from which to read devicestate changes */
+	int reserved;                        /*!< This interface is reserved for pending call */
+	int status;                          /*!< Status of queue member */
+};
+
 struct member {
 	char interface[80];                  /*!< Technology/Location to dial to reach this member*/
-	char state_exten[AST_MAX_EXTENSION]; /*!< Extension to get state from (if using hint) */
-	char state_context[AST_MAX_CONTEXT]; /*!< Context to use when getting state (if using hint) */
-	char state_interface[80];            /*!< Technology/Location from which to read devicestate changes */
 	char membername[80];                 /*!< Member name to use in queue logs */
 	int penalty;                         /*!< Are we a last resort? */
 	int calls;                           /*!< Number of calls serviced by this member */
 	int dynamic;                         /*!< Are we dynamically added? */
-	int realtime;                        /*!< Is this member realtime? */
-	int status;                          /*!< Status of queue member */
-	int paused;                          /*!< Are we paused (not accepting calls)? */
 	time_t lastcall;                     /*!< When last successful call was hungup */
 	struct call_queue *lastqueue;	     /*!< Last queue we received a call */
+	unsigned int realtime:1;             /*!< Is this member realtime? */
+	unsigned int paused:1;               /*!< Are we paused (not accepting calls)? */
 	unsigned int dead:1;                 /*!< Used to detect members deleted in realtime */
 	unsigned int delme:1;                /*!< Flag to delete entry on reload */
+	unsigned int ignorebusy:1;           /*!< Flag to ignore member if the status is not available */
 	char rt_uniqueid[80];                /*!< Unique id of realtime member entry */
-	unsigned int ignorebusy:1;           /*!< Flag to ignore member if the status is not available */
+	struct mem_state *device;            /*!< Device information */
 };
 
 enum empty_conditions {
@@ -1218,6 +1215,7 @@
 static AST_LIST_HEAD_STATIC(rule_lists, rule_list);
 
 static struct ao2_container *queues;
+static struct ao2_container *devices;
 
 static void update_realtime_members(struct call_queue *q);
 static struct member *interface_exists(struct call_queue *q, const char *interface);
@@ -1292,8 +1290,34 @@
 
 static int queue_cmp_cb(void *obj, void *arg, int flags)
 {
-	struct call_queue *q = obj, *q2 = arg;
-	return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
+	struct call_queue *q = obj;
+
+	if (flags & OBJ_POINTER) {
+		struct call_queue *q2 = arg;
+		return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
+	} else {
+		char *name = arg;
+		return !strcasecmp(q->name, name) ? CMP_MATCH | CMP_STOP : 0;
+	}
+}
+
+static int device_hash_cb(const void *obj, const int flags)
+{
+	const struct mem_state *d = obj;
+
+	return ast_str_case_hash(d->state_interface);
+}
+
+static int device_cmp_cb(void *obj, void *arg, int flags)
+{
+	struct mem_state *d = obj;
+	if (flags & OBJ_POINTER) {
+		struct mem_state *d2 = arg;
+		return !strcasecmp(d->state_interface, d2->state_interface) ? CMP_MATCH | CMP_STOP : 0;
+	} else {
+		char *iface = arg;
+		return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
+	}
 }
 
 #ifdef REF_DEBUG_ONLY_QUEUES
@@ -1310,14 +1334,14 @@
 #define queues_t_unlink(c,q,tag)	ao2_t_unlink(c,q,tag)
 static inline struct call_queue *queue_ref(struct call_queue *q)
 {
-	ao2_ref(q, 1);
-	return q;
+        ao2_ref(q, 1);
+        return q;
 }
 
 static inline struct call_queue *queue_unref(struct call_queue *q)
 {
-	ao2_ref(q, -1);
-	return NULL;
+        ao2_ref(q, -1);
+        return NULL;
 }
 #endif
 
@@ -1376,15 +1400,17 @@
  * This function checks to see if members are available to be called. If any member
  * is available, the function immediately returns 0. If no members are available,
  * then -1 is returned.
+ *
+ * It must be called with ref and lock held for q
+ *
  */
 static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, enum empty_conditions conditions)
 {
 	struct member *member;
 	struct ao2_iterator mem_iter;
 
-	ao2_lock(q);
 	mem_iter = ao2_iterator_init(q->members, 0);
-	for (; (member = ao2_iterator_next(&mem_iter)); ao2_ref(member, -1)) {
+	while((member = ao2_iterator_next(&mem_iter))) {
 		if ((max_penalty && (member->penalty > max_penalty)) || (min_penalty && (member->penalty < min_penalty))) {
 			if (conditions & QUEUE_EMPTY_PENALTY) {
 				ast_debug(4, "%s is unavailable because his penalty is not between %d and %d\n", member->membername, min_penalty, max_penalty);
@@ -1392,7 +1418,8 @@
 			}
 		}
 
-		switch (member->status) {
+		ao2_lock(member->device);
+		switch (member->device->status) {
 		case AST_DEVICE_INVALID:
 			if (conditions & QUEUE_EMPTY_INVALID) {
 				ast_debug(4, "%s is unavailable because his device state is 'invalid'\n", member->membername);
@@ -1432,108 +1459,76 @@
 				ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int) (time(NULL) - member->lastcall), q->wrapuptime);
 				break;
 			} else {
-				ao2_unlock(q);
+				ao2_unlock(member->device);
+				ast_debug(4, "%s is available.\n", member->membername);
 				ao2_ref(member, -1);
 				ao2_iterator_destroy(&mem_iter);
-				ast_debug(4, "%s is available.\n", member->membername);
 				return 0;
 			}
 			break;
 		}
+		ao2_unlock(member->device);
+		ao2_ref(member, -1);
 	}
 	ao2_iterator_destroy(&mem_iter);
-
-	ao2_unlock(q);
 	return -1;
 }
 
-struct statechange {
-	AST_LIST_ENTRY(statechange) entry;
-	int state;
-	char dev[0];
-};
-
-/*! \brief set a member's status based on device state of that member's state_interface.
- *  
- * Lock interface list find sc, iterate through each queues queue_member list for member to
- * update state inside queues
+/*! \brief send a QueueMemberStatus manager_event
 */
-static int update_status(struct call_queue *q, struct member *m, const int status)
-{
-	m->status = status;
-
-	if (q->maskmemberstatus)
-		return 0;
-
-	manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
-		"Queue: %s\r\n"
-		"Location: %s\r\n"
-		"MemberName: %s\r\n"
-		"StateInterface: %s\r\n"
-		"Membership: %s\r\n"
-		"Penalty: %d\r\n"
-		"CallsTaken: %d\r\n"
-		"LastCall: %d\r\n"
-		"Status: %d\r\n"
-		"Paused: %d\r\n",
-		q->name, m->interface, m->membername, m->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
-		m->penalty, m->calls, (int)m->lastcall, m->status, m->paused
-	);
-
-	return 0;
-}
-
-/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
-	struct statechange *sc = datap;
-	struct ao2_iterator miter, qiter;
+static void update_status(struct mem_state *s)
+{
+	struct ao2_iterator qiter;
+	struct ao2_iterator miter;
+	struct call_queue *q;
 	struct member *m;
-	struct call_queue *q;
-	char interface[80], *slash_pos;
-	int found = 0;
 
 	qiter = ao2_iterator_init(queues, 0);
-	while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
+	while ((q = ao2_iterator_next(&qiter))) {
 		ao2_lock(q);
-
+		if (q->maskmemberstatus) {
+			ao2_unlock(q);
+			ao2_ref(q, -1);
+			continue;
+		}
 		miter = ao2_iterator_init(q->members, 0);
-		for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
-			ast_copy_string(interface, m->state_interface, sizeof(interface));
-
-			if ((slash_pos = strchr(interface, '/')))
-				if (!strncasecmp(interface, "Local/", 6) && (slash_pos = strchr(slash_pos + 1, '/')))
-					*slash_pos = '\0';
-
-			if (!strcasecmp(interface, sc->dev)) {
-				found = 1;
-				update_status(q, m, sc->state);
+		while((m = ao2_iterator_next(&miter))) {
+			ao2_lock(m->device);
+			if (strcmp(m->device->state_interface, s->state_interface)) {
+				ao2_unlock(m->device);
 				ao2_ref(m, -1);
-				break;
-			}
+				continue;
+			}
+			manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
+				"Queue: %s\r\n"
+				"Location: %s\r\n"
+				"MemberName: %s\r\n"
+				"StateInterface: %s\r\n"
+				"Membership: %s\r\n"
+				"Penalty: %d\r\n"
+				"CallsTaken: %d\r\n"
+				"LastCall: %d\r\n"
+				"Status: %d\r\n"
+				"Paused: %d\r\n",
+				q->name, m->interface, m->membername, s->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
+				m->penalty, m->calls, (int)m->lastcall, s->status, m->paused
+			);
+			ao2_unlock(m->device);
+			ao2_ref(m, -1);
 		}
 		ao2_iterator_destroy(&miter);
-
 		ao2_unlock(q);
 		queue_t_unref(q, "Done with iterator");
 	}
 	ao2_iterator_destroy(&qiter);
-
-	if (found)
-		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-	else
-		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-
-	ast_free(sc);
-	return 0;
-}
-
+}
+
+/*! \brief callback used when device state changes*/
 static void device_state_cb(const struct ast_event *event, void *unused)
 {
 	enum ast_device_state state;
 	const char *device;
-	struct statechange *sc;
-	size_t datapsize;
+	struct mem_state *s;
 
 	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
 	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -1542,15 +1537,16 @@
 		ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
 		return;
 	}
-	datapsize = sizeof(*sc) + strlen(device) + 1;
-	if (!(sc = ast_calloc(1, datapsize))) {
-		ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
-		return;
-	}
-	sc->state = state;
-	strcpy(sc->dev, device);
-	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
-		ast_free(sc);
+
+	if ((s = ao2_find(devices, (char *)device, 0))) {
+		ao2_lock(s);
+		s->status = state;
+		ao2_unlock(s);
+		update_status(s);
+		ao2_ref(s, -1);
+		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
+	} else {
+		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
 	}
 }
 
@@ -1588,75 +1584,110 @@
 
 static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
 {
-	struct ao2_iterator miter, qiter;
-	struct member *m;
-	struct call_queue *q;
-	int found = 0, device_state = extensionstate2devicestate(state);
-
-	qiter = ao2_iterator_init(queues, 0);
-	while ((q = ao2_t_iterator_next(&qiter, "Iterate through queues"))) {
-		ao2_lock(q);
-
-		miter = ao2_iterator_init(q->members, 0);
-		for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
-			if (!strcmp(m->state_context, context) && !strcmp(m->state_exten, exten)) {
-				update_status(q, m, device_state);
-				ao2_ref(m, -1);
-				found = 1;
-				break;
-			}
-		}
-		ao2_iterator_destroy(&miter);
-
-		ao2_unlock(q);
-		queue_t_unref(q, "Done with iterator");
-	}
-	ao2_iterator_destroy(&qiter);
-
-        if (found) {
-		ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, device_state, ast_devstate2str(device_state));
+	struct mem_state *s;
+	char *device;
+	int status = extensionstate2devicestate(state);
+
+	if (!asprintf(&device, "hint:%s@%s", exten, context)) {
+		ast_log(LOG_WARNING, "asprintf() failed: %s\n", strerror(errno));
+		return 0;
+	}
+
+	if ((s = ao2_find(devices, device, 0))) {
+		ao2_lock(s);
+		s->status = status;
+		ao2_unlock(s);
+		update_status(s);
+		ao2_ref(s, -1);
+		ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, status, ast_devstate2str(status));
 	} else {
-		ast_debug(3, "Extension '%s@%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
-			  exten, context, device_state, ast_devstate2str(device_state));
+		ast_debug(3, "Extension '%s@%s' changed state but we don't care because they're not a member of any queue.\n",
+			  exten, context);
 	}
 
 	return 0;
 }
 
-/*! \brief Return the current state of a member */
-static int get_queue_member_status(struct member *cur)
-{
-	return ast_strlen_zero(cur->state_exten) ? ast_device_state(cur->state_interface) : extensionstate2devicestate(ast_extension_state(NULL, cur->state_context, cur->state_exten));
+static void remove_queue_member(void *data) {
+	struct member *mem = data;
+	struct mem_state *s = mem->device;
+
+	if (s) {
+		/* we have a device lets unlink and unref it*/
+		/* remove our ref*/
+		if (ao2_ref(s, -1) == 2) {
+			/* we were the only consumer unlink*/
+			ao2_unlink(devices, s);
+		}
+	}
+
+}
+
+static struct mem_state *create_member_state(const char *state_interface) {
+	struct mem_state *state;
+	char *dev_int;
+	char *device;
+	char *exten;
+	char *context;
+
+	dev_int = ast_strdupa(state_interface);
+	/* ref will be held for each shared member and one ref for container */
+	if ((state = ao2_find(devices, dev_int, 0))) {
+		return state;
+	} else  if (!(state = ao2_alloc(sizeof(*state), NULL))) {
+		return NULL;
+	}
+
+	state->reserved = 0;
+	if (!strncmp(dev_int, "hint:", 5)) {
+		context = ast_strdupa(dev_int);
+		exten = strsep(&context, "@") + 5;
+
+		if (context) {
+			ast_copy_string(state->state_interface, dev_int, sizeof(state->state_interface));
+		} else {
+			asprintf(&device, "%s at default", dev_int);
+			ast_copy_string(state->state_interface, device, sizeof(state->state_interface));
+		}
+		state->status = extensionstate2devicestate(ast_extension_state(NULL, context, exten));
+	} else {
+		ast_copy_string(state->state_interface, dev_int, sizeof(state->state_interface));
+		state->status = ast_device_state(state->state_interface);
+	}
+	ao2_link(devices, state);
+
+	return state;
 }
 
 /*! \brief allocate space for new queue member and set fields based on parameters passed */
 static struct member *create_queue_member(const char *interface, const char *membername, int penalty, int paused, const char *state_interface)
 {
 	struct member *cur;
-	
-	if ((cur = ao2_alloc(sizeof(*cur), NULL))) {
-		cur->penalty = penalty;
-		cur->paused = paused;
-		ast_copy_string(cur->interface, interface, sizeof(cur->interface));
-		if (!ast_strlen_zero(state_interface))
-			ast_copy_string(cur->state_interface, state_interface, sizeof(cur->state_interface));
-		else
-			ast_copy_string(cur->state_interface, interface, sizeof(cur->state_interface));
-		if (!ast_strlen_zero(membername))
-			ast_copy_string(cur->membername, membername, sizeof(cur->membername));
-		else
-			ast_copy_string(cur->membername, interface, sizeof(cur->membername));
-		if (!strchr(cur->interface, '/'))
-			ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
-		if (!strncmp(cur->state_interface, "hint:", 5)) {
-			char *tmp = ast_strdupa(cur->state_interface), *context = tmp;
-			char *exten = strsep(&context, "@") + 5;
-
-			ast_copy_string(cur->state_exten, exten, sizeof(cur->state_exten));
-			ast_copy_string(cur->state_context, S_OR(context, "default"), sizeof(cur->state_context));
-		}
-		cur->status = get_queue_member_status(cur);
-	}
+	const char* state_int;
+
+	if (!(cur = ao2_alloc(sizeof(*cur), remove_queue_member))) {
+		return NULL;
+	}
+
+	/* setup device state*/
+	state_int = (!ast_strlen_zero(state_interface)) ? state_interface : interface;
+	if (!(cur->device = create_member_state(state_int))) {
+		ao2_ref(cur, -1);
+		return NULL;
+	}
+
+	cur->penalty = penalty;
+	cur->paused = paused;
+	ast_copy_string(cur->interface, interface, sizeof(cur->interface));
+	if (!ast_strlen_zero(membername)) {
+		ast_copy_string(cur->membername, membername, sizeof(cur->membername));
+	} else {
+		ast_copy_string(cur->membername, interface, sizeof(cur->membername));
+	}
+	if (!strchr(cur->interface, '/')) {
+		ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
+	}
+
 
 	return cur;
 }
@@ -1686,11 +1717,30 @@
 
 static int member_cmp_fn(void *obj1, void *obj2, int flags)
 {
-	struct member *mem1 = obj1, *mem2 = obj2;
-	return strcasecmp(mem1->interface, mem2->interface) ? 0 : CMP_MATCH | CMP_STOP;
-}
-
-/*! 
+	struct member *mem1 = obj1;
+
+	if (flags & OBJ_POINTER) {
+		struct member *mem2 = obj2;
+		return strcasecmp(mem1->interface, mem2->interface) ? 0 : CMP_MATCH | CMP_STOP;
+	} else {
+		char *arg = obj2;
+		return strcasecmp(mem1->interface, arg) ? 0 : CMP_MATCH | CMP_STOP;
+	}
+}
+
+static int member_cmp_uniqueid_fn(void *obj1, void *arg, int flags)
+{
+	struct member *mem1 = obj1;
+	if (flags & OBJ_POINTER) {
+		struct member *mem2 = arg;
+		return strcasecmp(mem1->rt_uniqueid, mem2->rt_uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+	} else {
+		char *uniqueid = arg;
+		return strcasecmp(mem1->rt_uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+	}
+}
+
+/*!
  * \brief Initialize Queue default values.
  * \note the queue's lock  must be held before executing this function
 */
@@ -1771,6 +1821,14 @@
 		ast_free(pr_iter);
 }
 
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+	struct member *mem = obj1;
+	mem->calls = 0;
+	mem->lastcall = 0;
+	return 0;
+}
+
 static void clear_queue(struct call_queue *q)
 {
 	q->holdtime = 0;
@@ -1780,25 +1838,18 @@
 	q->talktime = 0;
 
 	if (q->members) {
-		struct member *mem;
-		struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0);
-		while ((mem = ao2_iterator_next(&mem_iter))) {
-			mem->calls = 0;
-			mem->lastcall = 0;
-			ao2_ref(mem, -1);
-		}
-		ao2_iterator_destroy(&mem_iter);
-	}
-}
-
-/*! 
+		ao2_callback(q->members, OBJ_NODATA, clear_queue_member_fn, NULL);
+	}
+}
+
+/*!
  * \brief Change queue penalty by adding rule.
  *
- * Check rule for errors with time or fomatting, see if rule is relative to rest 
+ * Check rule for errors with time or fomatting, see if rule is relative to rest
  * of queue, iterate list of rules to find correct insertion point, insert and return.
  * \retval -1 on failure
- * \retval 0 on success 
- * \note Call this with the rule_lists locked 
+ * \retval 0 on success
+ * \note Call this with the rule_lists locked
 */
 static int insert_penaltychange(const char *list_name, const char *content, const int linenum)
 {
@@ -2119,14 +2170,13 @@
  *
  * Search for member in queue, if found update penalty/paused state,
  * if no member exists create one flag it as a RT member and add to queue member list.
+ * q is locked here.
 */
 static void rt_handle_member_record(struct call_queue *q, char *interface, struct ast_config *member_config)
 {
 	struct member *m;
-	struct ao2_iterator mem_iter;
 	int penalty = 0;
 	int paused  = 0;
-	int found = 0;
 	int ignorebusy = 0;
 
 	const char *config_val;
@@ -2152,8 +2202,9 @@
 
 	if (paused_str) {
 		paused = atoi(paused_str);
-		if (paused < 0)
+		if (paused < 0) {
 			paused = 0;
+		}
 	}
 
 	if ((config_val = ast_variable_retrieve(member_config, interface, "ignorebusy"))) {
@@ -2163,43 +2214,36 @@
 	}
 
 	/* Find member by realtime uniqueid and update */
-	mem_iter = ao2_iterator_init(q->members, 0);
-	while ((m = ao2_iterator_next(&mem_iter))) {
-		if (!strcasecmp(m->rt_uniqueid, rt_uniqueid)) {
-			m->dead = 0;	/* Do not delete this one. */
-			ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
-			if (paused_str)
-				m->paused = paused;
-			if (strcasecmp(state_interface, m->state_interface)) {
-				ast_copy_string(m->state_interface, state_interface, sizeof(m->state_interface));
-			}
-			m->penalty = penalty;
-			m->ignorebusy = ignorebusy;
-			found = 1;
-			ao2_ref(m, -1);
-			break;
-		}
+	if ((m = ao2_callback(q->members, 0, member_cmp_uniqueid_fn, (char *)rt_uniqueid))) {
+		m->dead = 0;	/* Do not delete this one. */
+		ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
+		if (paused_str) {
+			m->paused = paused;
+		}
+		if (strcmp(state_interface, m->device->state_interface)) {
+			/* we may be the last ref [outside container] unlink if so */
+			if (ao2_ref(m->device, -1) == 2) {
+				ao2_unlink(devices, m->device);
+			}
+			m->device = create_member_state(state_interface);
+		}
+		m->penalty = penalty;
+		m->ignorebusy = ignorebusy;
 		ao2_ref(m, -1);
-	}
-	ao2_iterator_destroy(&mem_iter);
-
-	/* Create a new member */
-	if (!found) {
-		if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) {
-			m->dead = 0;
-			m->realtime = 1;
-			m->ignorebusy = ignorebusy;
-			ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
-			if (!log_membername_as_agent) {
-				ast_queue_log(q->name, "REALTIME", m->interface, "ADDMEMBER", "%s", "");
-			} else {
-				ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", "");
-			}
-			ao2_link(q->members, m);
-			ao2_ref(m, -1);
-			m = NULL;
-			q->membercount++;
-		}
+	} else if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) {
+		m->dead = 0;
+		m->realtime = 1;
+		m->ignorebusy = ignorebusy;
+		ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
+		if (!log_membername_as_agent) {
+			ast_queue_log(q->name, "REALTIME", m->interface, "ADDMEMBER", "%s", "");
+		} else {
+			ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", "");
+		}
+		ao2_link(q->members, m);
+		ao2_ref(m, -1);
+		m = NULL;
+		q->membercount++;
 	}
 }
 
@@ -2255,16 +2299,14 @@
  * Check for statically defined queue first, check if deleted RT queue,
  * check for new RT queue, if queue vars are not defined init them with defaults.
  * reload RT queue vars, set RT queue members dead and reload them, return finished queue.
- * \retval the queue, 
+ * \retval the queue,
  * \retval NULL if it doesn't exist.
- * \note Should be called with the "queues" container locked. 
+ * \note Should be called with the "queues" container locked.
 */
 static struct call_queue *find_queue_by_name_rt(const char *queuename, struct ast_variable *queue_vars, struct ast_config *member_config)
 {
 	struct ast_variable *v;
-	struct call_queue *q, tmpq = {
-		.name = queuename,	
-	};
+	struct call_queue *q;
 	struct member *m;
 	struct ao2_iterator mem_iter;
 	char *interface = NULL;
@@ -2273,7 +2315,7 @@
 	char tmpbuf[64];	/* Must be longer than the longest queue param name. */
 
 	/* Static queues override realtime. */
-	if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Check if static queue exists"))) {
+	if ((q = ao2_t_find(queues, (char *)queuename, 0, "Check if static queue exists"))) {
 		ao2_lock(q);
 		if (!q->realtime) {
 			if (q->dead) {
@@ -2286,10 +2328,10 @@
 				return q;
 			}
 		}
-	} else if (!member_config)
+	} else if (!member_config) {
 		/* Not found in the list, and it's not realtime ... */
 		return NULL;
-
+	}
 	/* Check if queue is defined in realtime. */
 	if (!queue_vars) {
 		/* Delete queue from in-core list if it has been deleted in realtime. */
@@ -2311,8 +2353,9 @@
 	/* Create a new queue if an in-core entry does not exist yet. */
 	if (!q) {
 		struct ast_variable *tmpvar = NULL;
-		if (!(q = alloc_queue(queuename)))
+		if (!(q = alloc_queue(queuename))) {
 			return NULL;
+		}
 		ao2_lock(q);
 		clear_queue(q);
 		q->realtime = 1;
@@ -2332,8 +2375,9 @@
 			}
 		}
 		/* We traversed all variables and didn't find a strategy */
-		if (!tmpvar)
+		if (!tmpvar) {
 			q->strategy = QUEUE_STRATEGY_RINGALL;
+		}
 		queues_t_link(queues, q, "Add queue to container");
 	}
 	init_queue(q);		/* Ensure defaults for all parameters not set explicitly. */
@@ -2356,13 +2400,14 @@
 		queue_set_param(q, tmp_name, v->value, -1, 0);
 	}
 
-	/* Temporarily set realtime members dead so we can detect deleted ones. 
+	/* Temporarily set realtime members dead so we can detect deleted ones.
 	 * Also set the membercount correctly for realtime*/
 	mem_iter = ao2_iterator_init(q->members, 0);
 	while ((m = ao2_iterator_next(&mem_iter))) {
 		q->membercount++;
-		if (m->realtime)
+		if (m->realtime) {
 			m->dead = 1;
+		}
 		ao2_ref(m, -1);
 	}
 	ao2_iterator_destroy(&mem_iter);
@@ -2397,13 +2442,11 @@
 {
 	struct ast_variable *queue_vars;
 	struct ast_config *member_config = NULL;
-	struct call_queue *q = NULL, tmpq = {
-		.name = queuename,	
-	};
+	struct call_queue *q;
 	int prev_weight = 0;
 
 	/* Find the queue in the in-core list first. */
-	q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Look for queue in memory first");
+	q = ao2_t_find(queues, (char *)queuename, 0, "Look for queue in memory first");
 
 	if (!q || q->realtime) {
 		/*! \note Load from realtime before taking the "queues" container lock, to avoid blocking all
@@ -2518,9 +2561,9 @@
 	int pos = 0;
 	int inserted = 0;
 
-	if (!(q = load_realtime_queue(queuename)))
+	if (!(q = load_realtime_queue(queuename))) {
 		return res;
-
+	}
 	ao2_lock(q);
 
 	/* This is our one */
@@ -2533,9 +2576,9 @@
 			return res;
 		}
 	}
-	if (*reason == QUEUE_UNKNOWN && q->maxlen && (q->count >= q->maxlen))
+	if (*reason == QUEUE_UNKNOWN && q->maxlen && (q->count >= q->maxlen)) {
 		*reason = QUEUE_FULL;
-	else if (*reason == QUEUE_UNKNOWN) {
+	} else if (*reason == QUEUE_UNKNOWN) {
 		/* There's space for us, put us at the right position inside
 		 * the queue.
 		 * Take into account the priority of the calling user */
@@ -2829,8 +2872,9 @@
 	struct penalty_rule *pr_iter;
 	int pos = 0;
 
-	if (!(q = qe->parent))
+	if (!(q = qe->parent)) {
 		return;
+	}
 	queue_t_ref(q, "Copy queue pointer from queue entry");
 	ao2_lock(q);
 
@@ -2873,7 +2917,7 @@
 		}
 	}
 
-	if (q->dead) {	
+	if (q->dead) {
 		/* It's dead and nobody is in it, so kill it */
 		queues_t_unlink(queues, q, "Queue is now dead; remove it from the container");
 	}
@@ -2892,6 +2936,9 @@
  */
 static void callattempt_free(struct callattempt *doomed)
 {
+	ao2_lock(doomed->member->device);
+	doomed->member->device->reserved--;
+	ao2_unlock(doomed->member->device);
 	if (doomed->member) {
 		ao2_ref(doomed->member, -1);
 	}
@@ -2935,7 +2982,8 @@
 
 	mem_iter = ao2_iterator_init(q->members, 0);
 	while ((mem = ao2_iterator_next(&mem_iter))) {
-		switch (mem->status) {
+		ao2_lock(mem->device);
+		switch (mem->device->status) {
 			case AST_DEVICE_INVALID:
 			case AST_DEVICE_UNAVAILABLE:
 				break;
@@ -2950,11 +2998,15 @@
 				/* else fall through */
 			case AST_DEVICE_NOT_INUSE:
 			case AST_DEVICE_UNKNOWN:
-				if (!mem->paused) {
-					avl++;
+				if (mem->paused ||
+				    (mem->device->reserved && ((!q->ringinuse) || (!mem->ignorebusy)))) {
+					break;
 				}
+				/* else fall through */
+				avl++;
 				break;
 		}
+		ao2_unlock(mem->device);
 		ao2_ref(mem, -1);
 
 		/* If autofill is not enabled or if the queue's strategy is ringall, then
@@ -3079,7 +3131,7 @@
 	char tech[256];
 	char *location;
 	const char *macrocontext, *macroexten;
-	enum ast_device_state newstate;
+	struct mem_state *s = tmp->member->device;
 
 	/* on entry here, we know that tmp->chan == NULL */
 	if (tmp->member->paused) {
@@ -3104,22 +3156,17 @@
 	}
 
 	if (!qe->parent->ringinuse || !tmp->member->ignorebusy) {
-		if (check_state_unknown && (tmp->member->status == AST_DEVICE_UNKNOWN)) {
-			newstate = ast_device_state(tmp->member->interface);
-			if (newstate != tmp->member->status) {
-				ast_log(LOG_WARNING, "Found a channel matching iterface %s while status was %s changed to %s\n",
-					tmp->member->interface, ast_devstate2str(tmp->member->status), ast_devstate2str(newstate));
-				ast_devstate_changed_literal(newstate, tmp->member->interface);
-			}
-		}
-		if ((tmp->member->status != AST_DEVICE_NOT_INUSE) && (tmp->member->status != AST_DEVICE_UNKNOWN)) {
+		ao2_lock(s);
+		if ((s->reserved > 1) || ((s->status != AST_DEVICE_NOT_INUSE) && (s->status != AST_DEVICE_UNKNOWN))) {
 			ast_debug(1, "%s in use, can't receive call\n", tmp->interface);
 			if (qe->chan->cdr) {
 				ast_cdr_busy(qe->chan->cdr);
 			}
 			tmp->stillgoing = 0;
+			ao2_unlock(s);
 			return 0;
 		}
+		ao2_unlock(s);
 	}
 
 	if (use_weight && compare_weight(qe->parent,tmp->member)) {
@@ -3144,10 +3191,9 @@
 		if (qe->chan->cdr) {
 			ast_cdr_busy(qe->chan->cdr);
 		}
-		tmp->stillgoing = 0;	
+		tmp->stillgoing = 0;
 
 		ao2_lock(qe->parent);
-		update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
 		qe->parent->rrpos++;
 		qe->linpos++;
 		ao2_unlock(qe->parent);
@@ -3231,7 +3277,6 @@
 		ast_channel_unlock(qe->chan);
 		do_hang(tmp);
 		(*busies)++;
-		update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
 		return 0;
 	} else if (qe->parent->eventwhencalled) {
 		char vars[2048];
@@ -3263,7 +3308,6 @@
 	ast_channel_unlock(tmp->chan);
 	ast_channel_unlock(qe->chan);
 
-	update_status(qe->parent, tmp->member, get_queue_member_status(tmp->member));
 	return 1;
 }
 
@@ -4045,11 +4089,15 @@
 		if (qe->parent->leavewhenempty) {
 			int status = 0;
 
+			ao2_lock(qe->parent);
 			if ((status = get_member_status(qe->parent, qe->max_penalty, qe->min_penalty, qe->parent->leavewhenempty))) {
 				*reason = QUEUE_LEAVEEMPTY;
 				ast_queue_log(qe->parent->name, qe->chan->uniqueid, "NONE", "EXITEMPTY", "%d|%d|%ld", qe->pos, qe->opos, (long) time(NULL) - qe->start);
+				ao2_unlock(qe->parent);
 				leave_queue(qe);
 				break;
+			} else {
+				ao2_unlock(qe->parent);
 			}
 		}
 
@@ -4108,8 +4156,8 @@
 
 	struct member *mem;
 	struct call_queue *qtmp;
-	struct ao2_iterator queue_iter;	
-	
+	struct ao2_iterator queue_iter;
+
 	if (shared_lastcall) {
 		queue_iter = ao2_iterator_init(queues, 0);
 		while ((qtmp = ao2_t_iterator_next(&queue_iter, "Iterate through queues"))) {
@@ -4383,7 +4431,7 @@
 }
 
 /*! \brief A large function which calls members, updates statistics, and bridges the caller and a member
- * 
+ *
  * Here is the process of this function
  * 1. Process any options passed to the Queue() application. Options here mean the third argument to Queue()
  * 2. Iterate trough the members of the queue, creating a callattempt corresponding to each member. During this
@@ -4623,6 +4671,9 @@
 
 		tmp->stillgoing = -1;
 		tmp->member = cur;/* Place the reference for cur into callattempt. */
+		ao2_lock(tmp->member->device);
+		tmp->member->device->reserved++;
+		ao2_unlock(tmp->member->device);
 		tmp->lastcall = cur->lastcall;
 		tmp->lastqueue = cur->lastqueue;
 		ast_copy_string(tmp->interface, cur->interface, sizeof(tmp->interface));
@@ -5212,23 +5263,7 @@
 
 static struct member *interface_exists(struct call_queue *q, const char *interface)
 {
-	struct member *mem;
-	struct ao2_iterator mem_iter;
-
-	if (!q)
-		return NULL;
-
-	mem_iter = ao2_iterator_init(q->members, 0);
-	while ((mem = ao2_iterator_next(&mem_iter))) {
-		if (!strcasecmp(interface, mem->interface)) {
-			ao2_iterator_destroy(&mem_iter);
-			return mem;
-		}
-		ao2_ref(mem, -1);
-	}
-	ao2_iterator_destroy(&mem_iter);
-
-	return NULL;
+	return (q) ? ao2_find(q->members, (char *)interface, 0) : NULL;
 }
 
 
@@ -5257,7 +5292,7 @@
 		}
 
 		res = snprintf(value + value_len, sizeof(value) - value_len, "%s%s;%d;%d;%s;%s",
-			value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->state_interface);
+			value_len ? "|" : "", cur_member->interface, cur_member->penalty, cur_member->paused, cur_member->membername, cur_member->device->state_interface);
 
 		ao2_ref(cur_member, -1);
 
@@ -5285,14 +5320,12 @@
 */
 static int remove_from_queue(const char *queuename, const char *interface)
 {
-	struct call_queue *q, tmpq = {
-		.name = queuename,	
-	};
+	struct call_queue *q;
 	struct member *mem, tmpmem;
 	int res = RES_NOSUCHQUEUE;
 
 	ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface));
-	if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Temporary reference for interface removal"))) {
+	if ((q = ao2_t_find(queues, (char *)queuename, 0, "Temporary reference for interface removal"))) {
 		ao2_lock(q);
 		if ((mem = ao2_find(q->members, &tmpmem, OBJ_POINTER))) {
 			/* XXX future changes should beware of this assumption!! */
@@ -5343,8 +5376,9 @@
 
 	/*! \note Ensure the appropriate realtime queue is loaded.  Note that this
 	 * short-circuits if the queue is already in memory. */
-	if (!(q = load_realtime_queue(queuename)))
+	if (!(q = load_realtime_queue(queuename))) {
 		return res;
+	}
 
 	ao2_lock(q);
 	if ((old_member = interface_exists(q, interface)) == NULL) {
@@ -5352,6 +5386,7 @@
 			new_member->dynamic = 1;
 			ao2_link(q->members, new_member);
 			q->membercount++;
+			ao2_lock(new_member->device);
 			manager_event(EVENT_FLAG_AGENT, "QueueMemberAdded",
 				"Queue: %s\r\n"
 				"Location: %s\r\n"
@@ -5366,14 +5401,16 @@
 				q->name, new_member->interface, new_member->membername, state_interface,
 				"dynamic",
 				new_member->penalty, new_member->calls, (int) new_member->lastcall,
-				new_member->status, new_member->paused);
-			
+				new_member->device->status, new_member->paused);
+
+			ao2_unlock(new_member->device);
 			ao2_ref(new_member, -1);
 			new_member = NULL;
 
-			if (dump)
+			if (dump) {
 				dump_queue_members(q);
-			
+			}
+
 			res = RES_OKAY;
 		} else {
 			res = RES_OUTOFMEMORY;
@@ -5394,7 +5431,6 @@
 	struct call_queue *q;
 	struct member *mem;
 	struct ao2_iterator queue_iter;
-	int failed;
 
 	/* Special event for when all queues are paused - individual events still generated */
 	/* XXX In all other cases, we use the membername, but since this affects all queues, we cannot */
@@ -5404,58 +5440,52 @@
 	queue_iter = ao2_iterator_init(queues, 0);
 	while ((q = ao2_t_iterator_next(&queue_iter, "Iterate over queues"))) {
 		ao2_lock(q);
-		if (ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename)) {
-			if ((mem = interface_exists(q, interface))) {
-				if (mem->paused == paused) {
-					ast_debug(1, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
-				}
-
-				failed = 0;
-				if (mem->realtime) {
-					failed = update_realtime_member_field(mem, q->name, "paused", paused ? "1" : "0");
-				}
-			
-				if (failed) {
-					ast_log(LOG_WARNING, "Failed %spausing realtime queue member %s:%s\n", (paused ? "" : "un"), q->name, interface);
-					ao2_ref(mem, -1);
-					ao2_unlock(q);
-					queue_t_unref(q, "Done with iterator");
-					continue;
-				}	
-				found++;
-				mem->paused = paused;
-
-				if (queue_persistent_members)
-					dump_queue_members(q);
-
-				ast_queue_log(q->name, "NONE", mem->membername, (paused ? "PAUSE" : "UNPAUSE"), "%s", S_OR(reason, ""));
-				
-				if (!ast_strlen_zero(reason)) {
-					manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
-						"Queue: %s\r\n"
-						"Location: %s\r\n"
-						"MemberName: %s\r\n"
-						"Paused: %d\r\n"
-						"Reason: %s\r\n",
-							q->name, mem->interface, mem->membername, paused, reason);
-				} else {
-					manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
-						"Queue: %s\r\n"
-						"Location: %s\r\n"
-						"MemberName: %s\r\n"
-						"Paused: %d\r\n",
-							q->name, mem->interface, mem->membername, paused);
-				}
+		if ((ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename)) &&
+		    (mem = interface_exists(q, interface))) {
+			if (mem->paused == paused) {
+				ast_debug(1, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
+			}
+
+			if ((mem->realtime) && (update_realtime_member_field(mem, q->name, "paused", paused ? "1" : "0"))) {
+				ast_log(LOG_WARNING, "Failed %spausing realtime queue member %s:%s\n", (paused ? "" : "un"), q->name, interface);
 				ao2_ref(mem, -1);
-			}
-		}
-		
+				ao2_unlock(q);
+				queue_t_unref(q, "Done with iterator");
+				continue;
+			}
+			found++;
+			mem->paused = paused;
+
+			if (queue_persistent_members)
+				dump_queue_members(q);
+
+			ast_queue_log(q->name, "NONE", mem->membername, (paused ? "PAUSE" : "UNPAUSE"), "%s", S_OR(reason, ""));
+
+			if (!ast_strlen_zero(reason)) {
+				manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+					"Queue: %s\r\n"
+					"Location: %s\r\n"
+					"MemberName: %s\r\n"
+					"Paused: %d\r\n"
+					"Reason: %s\r\n",
+						q->name, mem->interface, mem->membername, paused, reason);
+			} else {
+				manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+					"Queue: %s\r\n"
+					"Location: %s\r\n"
+					"MemberName: %s\r\n"
+					"Paused: %d\r\n",
+						q->name, mem->interface, mem->membername, paused);
+			}
+			ao2_ref(mem, -1);
+		}
+
 		if (!ast_strlen_zero(queuename) && !strcasecmp(queuename, q->name)) {
 			ao2_unlock(q);
 			queue_t_unref(q, "Done with iterator");
 			break;
 		}
-		
+
 		ao2_unlock(q);
 		queue_t_unref(q, "Done with iterator");
 	}
@@ -5505,23 +5535,21 @@
 		ast_log (LOG_ERROR, "Invalid queuename\n"); 
 	} else {
 		ast_log (LOG_ERROR, "Invalid interface\n");
-	}	
+	}
 
 	return RESULT_FAILURE;
 }
 
-/* \brief Gets members penalty. 
- * \return Return the members penalty or RESULT_FAILURE on error. 
+/* \brief Gets members penalty.
+ * \return Return the members penalty or RESULT_FAILURE on error.
 */
 static int get_member_penalty(char *queuename, char *interface)
 {
 	int foundqueue = 0, penalty;
-	struct call_queue *q, tmpq = {
-		.name = queuename,	
-	};
+	struct call_queue *q;
 	struct member *mem;
-	
-	if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Search for queue"))) {
+
+	if ((q = ao2_t_find(queues, queuename, 0, "Search for queue"))) {
 		foundqueue = 1;
 		ao2_lock(q);
 		if ((mem = interface_exists(q, interface))) {
@@ -5536,10 +5564,11 @@
 	}
 
 	/* some useful debuging */
-	if (foundqueue) 
+	if (foundqueue) {
 		ast_log (LOG_ERROR, "Invalid queuename\n");
-	else 
+	} else {
 		ast_log (LOG_ERROR, "Invalid interface\n");
+	}
 
 	return RESULT_FAILURE;
 }
@@ -5568,15 +5597,11 @@
 
 		queue_name = entry->key + strlen(pm_family) + 2;
 
-		{
-			struct call_queue tmpq = {
-				.name = queue_name,
-			};
-			cur_queue = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Reload queue members");
-		}	
-
-		if (!cur_queue)
+		cur_queue = ao2_t_find(queues, (char *)queue_name, 0, "Reload queue members");
+
+		if (!cur_queue) {
 			cur_queue = load_realtime_queue(queue_name);
+		}
 
 		if (!cur_queue) {
 			/* If the queue no longer exists, remove it from the
@@ -5584,7 +5609,7 @@
 			ast_log(LOG_WARNING, "Error loading persistent queue: '%s': it does not exist\n", queue_name);
 			ast_db_del(pm_family, queue_name);
 			continue;
-		} 
+		}
 

[... 470 lines stripped ...]



More information about the asterisk-commits mailing list