[svn-commits] irroot: branch irroot/asterisk-trunk-quack-queue r342883 - /team/irroot/aster...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Tue Nov 1 15:20:15 CDT 2011


Author: irroot
Date: Tue Nov  1 15:20:09 2011
New Revision: 342883

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=342883
Log:
Add changes for RB1538

Modified:
    team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c

Modified: team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c?view=diff&rev=342883&r1=342882&r2=342883
==============================================================================
--- team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c (original)
+++ team/irroot/asterisk-trunk-quack-queue/apps/app_queue.c Tue Nov  1 15:20:09 2011
@@ -264,9 +264,10 @@
 			<parameter name="queuename" required="true" />
 			<parameter name="interface" />
 			<parameter name="penalty" />
-			<parameter name="options" />
+			<parameter name="paused" />
 			<parameter name="membername" />
 			<parameter name="stateinterface" />
+			<parameter name="ignorebusy" />
 		</syntax>
 		<description>
 			<para>Dynamically adds interface to an existing queue. If the interface is
@@ -846,6 +847,19 @@
 		<description>
 		</description>
 	</manager>
+	<manager name="QueueIgnoreBusy" language="en_US">
+		<synopsis>
+			Set interface to allow multiple calls
+		</synopsis>
+		<syntax>
+			<xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
+			<parameter name="Interface" required="true" />
+			<parameter name="IgnoreBusy" required="true" />
+			<parameter name="Queue" required="true" />
+		</syntax>
+		<description>
+		</description>
+	</manager>
  ***/
 
 enum {
@@ -912,6 +926,11 @@
 #define	RES_OUTOFMEMORY	(-2)		/*!< Out of memory */
 #define	RES_NOSUCHQUEUE	(-3)		/*!< No such queue */
 #define RES_NOT_DYNAMIC (-4)		/*!< Member is not dynamic */
+#define RES_ERROR	(-5)		/*!< Member is mis configured */
+
+#define MEMBER_STATIC		(1 << 0)
+#define MEMBER_REALTIME		(1 << 1)
+#define MEMBER_DYNAMIC		(1 << 2)
 
 static char *app = "Queue";
 
@@ -956,9 +975,6 @@
 
 /*! \brief queues.conf [general] option */
 static int log_membername_as_agent = 0;
-
-/*! \brief queues.conf [general] option */
-static int check_state_unknown = 0;
 
 enum queue_result {
 	QUEUE_UNKNOWN = 0,
@@ -1010,7 +1026,7 @@
 	int stillgoing;
 	int metric;
 	time_t lastcall;
-	struct call_queue *lastqueue;
+	int lastwrapup;
 	struct member *member;
 	/*! Saved connected party info from an AST_CONTROL_CONNECTED_LINE. */
 	struct ast_party_connected_line connected;
@@ -1018,6 +1034,7 @@
 	unsigned int pending_connected_update:1;
 	/*! TRUE if caller id is not available for connected line */
 	unsigned int dial_callerid_absent:1;
+	unsigned int reserved:1;
 	struct ast_aoc_decoded *aoc_s_rate_list;
 };
 
@@ -1052,24 +1069,26 @@
 	struct queue_ent *next;                /*!< The next queue entry */
 };
 
+struct mem_state {
+	char state_interface[80];            /*!< Technology/Location from which to read devicestate changes */
+	int reserved;                        /*!< This interface is reserved for pending call */
+	int status;                          /*!< Status of queue member */
+};
+
 struct member {
 	char interface[80];                  /*!< Technology/Location to dial to reach this member*/
-	char state_exten[AST_MAX_EXTENSION]; /*!< Extension to get state from (if using hint) */
-	char state_context[AST_MAX_CONTEXT]; /*!< Context to use when getting state (if using hint) */
-	char state_interface[80];            /*!< Technology/Location from which to read devicestate changes */
 	char membername[80];                 /*!< Member name to use in queue logs */
 	int penalty;                         /*!< Are we a last resort? */
 	int calls;                           /*!< Number of calls serviced by this member */
-	int dynamic;                         /*!< Are we dynamically added? */
-	int realtime;                        /*!< Is this member realtime? */
-	int status;                          /*!< Status of queue member */
-	int paused;                          /*!< Are we paused (not accepting calls)? */
 	time_t lastcall;                     /*!< When last successful call was hungup */
-	struct call_queue *lastqueue;	     /*!< Last queue we received a call */
+	int lastwrapup;                      /*!< Last wrapuptime */
+	unsigned int realtime:1;             /*!< Is this member realtime? */
+	unsigned int paused:1;               /*!< Are we paused (not accepting calls)? */
 	unsigned int dead:1;                 /*!< Used to detect members deleted in realtime */
-	unsigned int delme:1;                /*!< Flag to delete entry on reload */
+	unsigned int dynamic:1;              /*!< Are we dynamically added? */
+	unsigned int ignorebusy:1;           /*!< Flag to ignore member if the status is not available */
 	char rt_uniqueid[80];                /*!< Unique id of realtime member entry */
-	unsigned int ignorebusy:1;           /*!< Flag to ignore member if the status is not available */
+	struct mem_state *device;            /*!< Device information */
 };
 
 enum empty_conditions {
@@ -1212,14 +1231,15 @@
 static AST_LIST_HEAD_STATIC(rule_lists, rule_list);
 
 static struct ao2_container *queues;
-
-static void update_realtime_members(struct call_queue *q);
+static struct ao2_container *devices;
+
+static int do_set_member_penalty_paused(struct call_queue *q, struct member *mem, int pause, int value, const char *reason);
+static void pm_load_member_config(struct call_queue *q);
 static struct member *interface_exists(struct call_queue *q, const char *interface);
 static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused);
 
 static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
 
-static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
 /*! \brief sets the QUEUESTATUS channel variable */
 static void set_queue_result(struct ast_channel *chan, enum queue_result res)
 {
@@ -1280,14 +1300,35 @@
 static int queue_hash_cb(const void *obj, const int flags)
 {
 	const struct call_queue *q = obj;
-
-	return ast_str_case_hash(q->name);
+	const char *name = (flags & OBJ_KEY) ? obj : q->name;
+
+	return ast_str_case_hash(name);
 }
 
 static int queue_cmp_cb(void *obj, void *arg, int flags)
 {
-	struct call_queue *q = obj, *q2 = arg;
-	return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
+	struct call_queue *q = obj;
+	const struct call_queue *q2 = arg;
+	const char *name = (flags & OBJ_POINTER) ? q2->name : arg;
+
+	return !strcasecmp(q->name, name) ? CMP_MATCH | CMP_STOP : 0;
+}
+
+static int device_hash_cb(const void *obj, const int flags)
+{
+	const struct mem_state *s = obj;
+	const char *state_interface = (flags & OBJ_KEY) ? obj : s->state_interface;
+
+	return ast_str_case_hash(state_interface);
+}
+
+static int device_cmp_cb(void *obj, void *arg, int flags)
+{
+	struct mem_state *d = obj;
+	const struct mem_state *d2 = arg;
+	const char *iface = (flags & OBJ_POINTER) ? d2->state_interface : arg;
+
+	return !strcasecmp(d->state_interface, iface) ? CMP_MATCH | CMP_STOP : 0;
 }
 
 #ifdef REF_DEBUG_ONLY_QUEUES
@@ -1304,14 +1345,14 @@
 #define queues_t_unlink(c,q,tag)	ao2_t_unlink(c,q,tag)
 static inline struct call_queue *queue_ref(struct call_queue *q)
 {
-	ao2_ref(q, 1);
-	return q;
+        ao2_ref(q, 1);
+        return q;
 }
 
 static inline struct call_queue *queue_unref(struct call_queue *q)
 {
-	ao2_ref(q, -1);
-	return NULL;
+        ao2_ref(q, -1);
+        return NULL;
 }
 #endif
 
@@ -1365,6 +1406,19 @@
 	new->opos = *pos;
 }
 
+static int get_device_status(struct mem_state *s)
+{
+	int ret;
+	ao2_lock(s);
+	if (s->reserved && ((s->status == AST_DEVICE_NOT_INUSE) || (s->status == AST_DEVICE_UNKNOWN))) {
+		ret = AST_DEVICE_RINGING;
+	} else {
+		ret = s->status;
+	}
+        ao2_unlock(s);
+        return ret;
+}
+
 /*! \brief Check if members are available
  *
  * This function checks to see if members are available to be called. If any member
@@ -1375,18 +1429,24 @@
 {
 	struct member *member;
 	struct ao2_iterator mem_iter;
-
-	ao2_lock(q);
+	int status;
+
+	ao2_unlock(q);
+
 	mem_iter = ao2_iterator_init(q->members, 0);
-	for (; (member = ao2_iterator_next(&mem_iter)); ao2_ref(member, -1)) {
+	while((member = ao2_iterator_next(&mem_iter))) {
+		ao2_lock(member);
 		if ((max_penalty && (member->penalty > max_penalty)) || (min_penalty && (member->penalty < min_penalty))) {
 			if (conditions & QUEUE_EMPTY_PENALTY) {
 				ast_debug(4, "%s is unavailable because his penalty is not between %d and %d\n", member->membername, min_penalty, max_penalty);
+				ao2_unlock(member);
+				ao2_ref(member, -1);
 				continue;
 			}
 		}
 
-		switch (member->status) {
+		status = get_device_status(member->device);
+		switch (status) {
 		case AST_DEVICE_INVALID:
 			if (conditions & QUEUE_EMPTY_INVALID) {
 				ast_debug(4, "%s is unavailable because his device state is 'invalid'\n", member->membername);
@@ -1422,112 +1482,109 @@
 			if (member->paused && (conditions & QUEUE_EMPTY_PAUSED)) {
 				ast_debug(4, "%s is unavailable because he is paused'\n", member->membername);
 				break;
-			} else if ((conditions & QUEUE_EMPTY_WRAPUP) && member->lastcall && q->wrapuptime && (time(NULL) - q->wrapuptime < member->lastcall)) {
-				ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int) (time(NULL) - member->lastcall), q->wrapuptime);
+			} else if ((conditions & QUEUE_EMPTY_WRAPUP) && member->lastcall && member->lastwrapup && (time(NULL) - member->lastwrapup < member->lastcall)) {
+				ast_debug(4, "%s is unavailable because it has only been %d seconds since his last call (wrapup time is %d)\n", member->membername, (int) (time(NULL) - member->lastcall), member->lastwrapup);
 				break;
 			} else {
-				ao2_unlock(q);
+				ast_debug(4, "%s is available.\n", member->membername);
+				ao2_unlock(member);
 				ao2_ref(member, -1);
 				ao2_iterator_destroy(&mem_iter);
-				ast_debug(4, "%s is available.\n", member->membername);
+				ao2_lock(q);
 				return 0;
 			}
 			break;
 		}
+		ao2_unlock(member);
+		ao2_ref(member, -1);
 	}
 	ao2_iterator_destroy(&mem_iter);
-
-	ao2_unlock(q);
+	ao2_lock(q);
 	return -1;
 }
 
-struct statechange {
-	AST_LIST_ENTRY(statechange) entry;
-	int state;
-	char dev[0];
-};
-
-/*! \brief set a member's status based on device state of that member's state_interface.
- *
- * Lock interface list find sc, iterate through each queues queue_member list for member to
- * update state inside queues
+/*! \brief send a QueueMemberStatus manager_event
 */
-static int update_status(struct call_queue *q, struct member *m, const int status)
-{
-	m->status = status;
-
-	if (q->maskmemberstatus)
-		return 0;
-
-	manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
-		"Queue: %s\r\n"
-		"Location: %s\r\n"
-		"MemberName: %s\r\n"
-		"StateInterface: %s\r\n"
-		"Membership: %s\r\n"
-		"Penalty: %d\r\n"
-		"CallsTaken: %d\r\n"
-		"LastCall: %d\r\n"
-		"Status: %d\r\n"
-		"Paused: %d\r\n",
-		q->name, m->interface, m->membername, m->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
-		m->penalty, m->calls, (int)m->lastcall, m->status, m->paused
-	);
+static int update_status(void *data)
+{
+	struct ao2_iterator qiter;
+	struct ao2_iterator miter;
+	struct call_queue *q;
+	struct member *m;
+	struct mem_state *s = data;
+
+	qiter = ao2_iterator_init(queues, 0);
+	while ((q = ao2_iterator_next(&qiter))) {
+		ao2_lock(q);
+		if (q->maskmemberstatus) {
+			ao2_unlock(q);
+			ao2_ref(q, -1);
+			continue;
+		}
+		ao2_unlock(q);
+		miter = ao2_iterator_init(q->members, 0);
+		while((m = ao2_iterator_next(&miter))) {
+			ao2_lock(m);
+			if (strcasecmp(m->device->state_interface, s->state_interface)) {
+				ao2_unlock(m);
+				ao2_ref(m, -1);
+				continue;
+			}
+			manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
+				"Queue: %s\r\n"
+				"Location: %s\r\n"
+				"MemberName: %s\r\n"
+				"StateInterface: %s\r\n"
+				"Membership: %s\r\n"
+				"Penalty: %d\r\n"
+				"CallsTaken: %d\r\n"
+				"LastCall: %d\r\n"
+				"Status: %d\r\n"
+				"Paused: %d\r\n",
+				q->name, m->interface, m->membername, s->state_interface, m->dynamic ? "dynamic" : m->realtime ? "realtime" : "static",
+				m->penalty, m->calls, (int)m->lastcall, s->status, m->paused
+			);
+			ao2_unlock(m);
+			ao2_ref(m, -1);
+		}
+		ao2_iterator_destroy(&miter);
+		ao2_ref(q, -1);
+	}
+	ao2_iterator_destroy(&qiter);
+	ao2_ref(s, -1);
 
 	return 0;
 }
 
-/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
-	struct statechange *sc = datap;
-	struct ao2_iterator miter, qiter;
-	struct member *m;
-	struct call_queue *q;
-	char interface[80], *slash_pos;
-	int found = 0;
-
-	qiter = ao2_iterator_init(queues, 0);
-	while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
-		ao2_lock(q);
-
-		miter = ao2_iterator_init(q->members, 0);
-		for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
-			ast_copy_string(interface, m->state_interface, sizeof(interface));
-
-			if ((slash_pos = strchr(interface, '/')))
-				if (!strncasecmp(interface, "Local/", 6) && (slash_pos = strchr(slash_pos + 1, '/')))
-					*slash_pos = '\0';
-
-			if (!strcasecmp(interface, sc->dev)) {
-				found = 1;
-				update_status(q, m, sc->state);
-				ao2_ref(m, -1);
-				break;
-			}
-		}
-		ao2_iterator_destroy(&miter);
-
-		ao2_unlock(q);
-		queue_t_unref(q, "Done with iterator");
-	}
-	ao2_iterator_destroy(&qiter);
-
-	if (found)
-		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-	else
-		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-
-	ast_free(sc);
+static int set_device_status(const char *device, int status)
+{
+	struct mem_state *s, *tmp;
+
+	if (!(s = ao2_find(devices, device, OBJ_KEY))) {
+		return -1;
+	}
+
+	ao2_lock(s);
+	if (s->status != status) {
+		s->status = status;
+		if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
+			memcpy(tmp, s, sizeof(*tmp));
+			if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
+				ao2_ref(tmp, -1);
+			}
+		}
+	}
+	ao2_unlock(s);
+	ao2_ref(s, -1);
+
 	return 0;
 }
 
+/*! \brief callback used when device state changes*/
 static void device_state_cb(const struct ast_event *event, void *unused)
 {
 	enum ast_device_state state;
 	const char *device;
-	struct statechange *sc;
-	size_t datapsize;
 
 	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
 	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -1536,15 +1593,11 @@
 		ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
 		return;
 	}
-	datapsize = sizeof(*sc) + strlen(device) + 1;
-	if (!(sc = ast_calloc(1, datapsize))) {
-		ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
-		return;
-	}
-	sc->state = state;
-	strcpy(sc->dev, device);
-	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
-		ast_free(sc);
+
+	if (set_device_status(device, state)) {
+		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", device, state, ast_devstate2str(state));
+	} else {
+		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", device, state, ast_devstate2str(state));
 	}
 }
 
@@ -1582,82 +1635,106 @@
 
 static int extension_state_cb(const char *context, const char *exten, enum ast_extension_states state, void *data)
 {
-	struct ao2_iterator miter, qiter;
-	struct member *m;
-	struct call_queue *q;
-	int found = 0, device_state = extensionstate2devicestate(state);
-
-	qiter = ao2_iterator_init(queues, 0);
-	while ((q = ao2_t_iterator_next(&qiter, "Iterate through queues"))) {
-		ao2_lock(q);
-
-		miter = ao2_iterator_init(q->members, 0);
-		for (; (m = ao2_iterator_next(&miter)); ao2_ref(m, -1)) {
-			if (!strcmp(m->state_context, context) && !strcmp(m->state_exten, exten)) {
-				update_status(q, m, device_state);
-				ao2_ref(m, -1);
-				found = 1;
-				break;
-			}
-		}
-		ao2_iterator_destroy(&miter);
-
-		ao2_unlock(q);
-		queue_t_unref(q, "Done with iterator");
-	}
-	ao2_iterator_destroy(&qiter);
-
-        if (found) {
-		ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, device_state, ast_devstate2str(device_state));
+	char *device;
+	int status = extensionstate2devicestate(state);
+
+	if (!asprintf(&device, "hint:%s@%s", exten, context)) {
+		ast_log(LOG_WARNING, "asprintf() failed: %s\n", strerror(errno));
+		return 0;
+	}
+
+	if (set_device_status(device, status)) {
+		ast_debug(1, "Extension '%s@%s' changed to state '%d' (%s)\n", exten, context, status, ast_devstate2str(status));
 	} else {
-		ast_debug(3, "Extension '%s@%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
-			  exten, context, device_state, ast_devstate2str(device_state));
+		ast_debug(3, "Extension '%s@%s' changed state but we don't care because they're not a member of any queue.\n",
+			  exten, context);
 	}
 
 	return 0;
 }
 
-/*! \brief Return the current state of a member */
-static int get_queue_member_status(struct member *cur)
-{
-	return ast_strlen_zero(cur->state_exten) ? ast_device_state(cur->state_interface) : extensionstate2devicestate(ast_extension_state(NULL, cur->state_context, cur->state_exten));
-}
-
-/*! \brief allocate space for new queue member and set fields based on parameters passed */
-static struct member *create_queue_member(const char *interface, const char *membername, int penalty, int paused, const char *state_interface)
-{
-	struct member *cur;
-
-	if ((cur = ao2_alloc(sizeof(*cur), NULL))) {
-		cur->penalty = penalty;
-		cur->paused = paused;
-		ast_copy_string(cur->interface, interface, sizeof(cur->interface));
-		if (!ast_strlen_zero(state_interface)) {
-			ast_copy_string(cur->state_interface, state_interface, sizeof(cur->state_interface));
+static void remove_queue_member(void *data) {
+	struct member *mem = data;
+	struct mem_state *s = mem->device;
+
+	if (s) {
+		/* we have a device lets unlink and unref it*/
+		/* remove our ref*/
+		if (ao2_ref(s, -1) == 2) {
+			/* we were the only consumer unlink*/
+			ao2_unlink(devices, s);
+		}
+	}
+
+}
+
+static struct mem_state *create_member_state(const char *state_interface) {
+	struct mem_state *state;
+	char *exten;
+	char *context;
+	char *device;
+
+	/* ref will be held for each shared member and one ref for container */
+	if ((state = ao2_find(devices, state_interface, OBJ_KEY))) {
+		return state;
+	} else  if (!(state = ao2_alloc(sizeof(*state), NULL))) {
+		return NULL;
+	}
+
+	state->reserved = 0;
+	if (!strncasecmp(state_interface, "hint:", 5)) {
+		context = ast_strdupa(state_interface);
+		exten = strsep(&context, "@") + 5;
+		if (context) {
+			ast_copy_string(state->state_interface, state_interface, sizeof(state->state_interface));
 		} else {
-			ast_copy_string(cur->state_interface, interface, sizeof(cur->state_interface));
-		}
-		if (!ast_strlen_zero(membername)) {
-			ast_copy_string(cur->membername, membername, sizeof(cur->membername));
-		} else {
-			ast_copy_string(cur->membername, interface, sizeof(cur->membername));
-		}
-		if (!strchr(cur->interface, '/')) {
-			ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
-		}
-		if (!strncmp(cur->state_interface, "hint:", 5)) {
-			char *tmp = ast_strdupa(cur->state_interface), *context = tmp;
-			char *exten = strsep(&context, "@") + 5;
-
-			ast_copy_string(cur->state_exten, exten, sizeof(cur->state_exten));
-			ast_copy_string(cur->state_context, S_OR(context, "default"), sizeof(cur->state_context));
-		}
-		cur->status = get_queue_member_status(cur);
-	}
-
-	return cur;
-}
-
+			if (!asprintf(&device, "%s at default", state_interface)) {
+				ast_log(AST_LOG_ERROR, "Failed to use state_interface %s at default\n", state_interface);
+				ao2_ref(state, -1);
+				return NULL;
+			}
+			ast_copy_string(state->state_interface, device, sizeof(state->state_interface));
+		}
+		state->status = extensionstate2devicestate(ast_extension_state(NULL, S_OR(context, "default"), exten));
+	} else {
+		ast_copy_string(state->state_interface, state_interface, sizeof(state->state_interface));
+		state->status = ast_device_state(state->state_interface);
+	}
+	ao2_link(devices, state);
+
+	return state;
+}
+
+/*! \brief Set current state of member if needed*/
+static void set_queue_member_status(struct member *m)
+{
+	int status;
+	struct mem_state *s;
+	struct mem_state *tmp;
+
+	if (!strncasecmp(s->state_interface, "hint:", 5)) {
+		char *context = ast_strdupa(s->state_interface);
+		char *exten = strsep(&context, "@") + 5;
+		status = extensionstate2devicestate(ast_extension_state(NULL, S_OR(context, "default"), exten));
+	} else {
+		status = ast_device_state(s->state_interface);
+	}
+
+	ao2_lock(m);
+	s = m->device;
+	ao2_lock(s);
+	if (s->status != status) {
+		s->status = status;
+		if ((tmp = ao2_alloc(sizeof(*tmp), NULL))) {
+			memcpy(tmp, s, sizeof(*tmp));
+			if (ast_taskprocessor_push(devicestate_tps, update_status, tmp)) {
+				ao2_ref(tmp, -1);
+			}
+		}
+	}
+	ao2_unlock(s);
+	ao2_unlock(m);
+}
 
 static int compress_char(const char c)
 {
@@ -1688,10 +1765,26 @@
 static int member_cmp_fn(void *obj1, void *obj2, int flags)
 {
 	struct member *mem1 = obj1;
-	struct member *mem2 = obj2;
-	const char *interface = (flags & OBJ_KEY) ? obj2 : mem2->interface;
-
-	return strcasecmp(mem1->interface, interface) ? 0 : CMP_MATCH | CMP_STOP;
+	const struct member *mem2 = obj2;
+	const char *arg = (flags & OBJ_POINTER) ? mem2->interface : obj2;
+
+	return strcasecmp(mem1->interface, arg) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+static int member_cmp_uniqueid_fn(void *obj1, void *arg, int flags)
+{
+	struct member *mem1 = obj1;
+	const struct member *mem2 = arg;
+	const char *uniqueid = (flags & OBJ_POINTER) ? mem2->rt_uniqueid : arg;
+
+	ao2_lock(mem1);
+	if (mem1->realtime && !strcasecmp(mem1->rt_uniqueid, uniqueid)) {
+		ao2_unlock(mem1);
+		return CMP_MATCH | CMP_STOP;
+	}
+	ao2_unlock(mem1);
+
+	return 0;
 }
 
 /*!
@@ -1737,13 +1830,6 @@
 	q->autopause = QUEUE_AUTOPAUSE_OFF;
 	q->timeoutpriority = TIMEOUT_PRIORITY_APP;
 	q->autopausedelay = 0;
-	if (!q->members) {
-		if (q->strategy == QUEUE_STRATEGY_LINEAR || q->strategy == QUEUE_STRATEGY_RRORDERED)
-			/* linear strategy depends on order, so we have to place all members in a single bucket */
-			q->members = ao2_container_alloc(1, member_hash_fn, member_cmp_fn);
-		else
-			q->members = ao2_container_alloc(37, member_hash_fn, member_cmp_fn);
-	}
 	q->found = 1;
 
 	ast_string_field_set(q, sound_next, "queue-youarenext");
@@ -1775,6 +1861,17 @@
 		ast_free(pr_iter);
 }
 
+static int clear_queue_member_fn(void *obj1, void *arg, int flags)
+{
+	struct member *mem = obj1;
+	ao2_lock(mem);
+	mem->calls = 0;
+	mem->lastcall = 0;
+	ao2_unlock(mem);
+
+	return 0;
+}
+
 static void clear_queue(struct call_queue *q)
 {
 	q->holdtime = 0;
@@ -1784,14 +1881,7 @@
 	q->talktime = 0;
 
 	if (q->members) {
-		struct member *mem;
-		struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0);
-		while ((mem = ao2_iterator_next(&mem_iter))) {
-			mem->calls = 0;
-			mem->lastcall = 0;
-			ao2_ref(mem, -1);
-		}
-		ao2_iterator_destroy(&mem_iter);
+		ao2_callback(q->members, OBJ_NODATA, clear_queue_member_fn, NULL);
 	}
 }
 
@@ -2123,89 +2213,189 @@
  *
  * Search for member in queue, if found update penalty/paused state,
  * if no member exists create one flag it as a RT member and add to queue member list.
+ *
+ * \retval RES_NOT_DYNAMIC when they aren't a RT member
+ * \retval RES_OKAY added member from queue
+ * \retval RES_ERROR member was not ok
+ * \retval RES_EXISTS queue exists but no members
+ * \retval RES_OUTOFMEMORY queue exists but not enough memory to create member
 */
-static void rt_handle_member_record(struct call_queue *q, char *interface, struct ast_config *member_config)
-{
-	struct member *m;
-	struct ao2_iterator mem_iter;
-	int penalty = 0;
-	int paused  = 0;
-	int found = 0;
-	int ignorebusy = 0;
-
-	const char *config_val;
-	const char *rt_uniqueid = ast_variable_retrieve(member_config, interface, "uniqueid");
-	const char *membername = S_OR(ast_variable_retrieve(member_config, interface, "membername"), interface);
-	const char *state_interface = S_OR(ast_variable_retrieve(member_config, interface, "state_interface"), interface);
-	const char *penalty_str = ast_variable_retrieve(member_config, interface, "penalty");
-	const char *paused_str = ast_variable_retrieve(member_config, interface, "paused");
-
-	if (ast_strlen_zero(rt_uniqueid)) {
-		ast_log(LOG_WARNING, "Realtime field uniqueid is empty for member %s\n", S_OR(membername, "NULL"));
-		return;
-	}
-
-	if (penalty_str) {
-		penalty = atoi(penalty_str);
-		if ((penalty < 0) && negative_penalty_invalid) {
-			return;
-		} else if (penalty < 0) {
-			penalty = 0;
-		}
-	}
-
-	if (paused_str) {
-		paused = atoi(paused_str);
-		if (paused < 0) {
-			paused = 0;
-		}
-	}
-
-	if ((config_val = ast_variable_retrieve(member_config, interface, "ignorebusy"))) {
-		ignorebusy = ast_true(config_val);
+static int handle_member_record(struct call_queue *q, const char *interface, struct ast_config *member_config,
+	int memtype, const char* source)
+{
+	struct member *m, *rt_m;
+	struct mem_state *s;
+	struct ast_variable *v;
+	int link = 0, res = RES_OKAY;
+	char *rt_uniqueid = NULL, *st_dev = NULL;
+
+	if (!(m = ao2_find(q->members, interface, OBJ_KEY))) {
+		if (!(m = ao2_alloc(sizeof(*m), remove_queue_member))) {
+			ast_log(LOG_WARNING, "Unable to alocate member\n");
+			return RES_OUTOFMEMORY;
+		}
+		m->device = NULL;
+		m->penalty = 0;
+		m->paused = 0;
+		m->ignorebusy = 1;
+		m->realtime = (memtype & MEMBER_REALTIME) ? 1 : 0;
+		m->dynamic = (memtype & MEMBER_DYNAMIC) ? 1 : 0;
+		link = 1;
 	} else {
-		ignorebusy = 1;
-	}
-
-	/* Find member by realtime uniqueid and update */
-	mem_iter = ao2_iterator_init(q->members, 0);
-	while ((m = ao2_iterator_next(&mem_iter))) {
-		if (!strcasecmp(m->rt_uniqueid, rt_uniqueid)) {
-			m->dead = 0;	/* Do not delete this one. */
+		ao2_lock(m);
+	}
+
+	if (!link && (memtype & MEMBER_DYNAMIC)) {
+		/* dynamic members are the lowest priority and cannot overwrite settings from DB*/
+		if (m->dynamic) {
+			res = RES_EXISTS;
+			m->dead = 0;
+		} else {
+			res = RES_NOT_DYNAMIC;
+		}
+		ao2_unlock(m);
+                ao2_ref(m, -1);
+		return res;
+	} else if (!link && (m->realtime || m->dynamic) && (memtype & MEMBER_STATIC)) {
+		/*static members take precedence over all others*/
+		m->dynamic = 0;
+		m->realtime = 0;
+		if (!ast_strlen_zero(m->rt_uniqueid)) {
+			m->rt_uniqueid[0] = '\0';
+		}
+	} else if (!link && (memtype & MEMBER_REALTIME)) {
+		/* realtime takes precedence over dynamic but not static*/
+		if (m->dynamic) {
+			m->dynamic = 0;
+			m->realtime = 1;
+		} else if (!m->realtime) {
+			ao2_unlock(m);
+			ao2_ref(m, -1);
+			return RES_EXISTS;
+		}
+	}
+
+	m->dead = 0;
+
+	ast_copy_string(m->interface, interface, sizeof(m->interface));
+
+	for (v = ast_variable_browse(member_config, interface); v; v = v->next) {
+		if (!ast_strlen_zero(v->value) && !strcasecmp(v->name, "uniqueid")) {
+			rt_uniqueid = ast_strdupa(v->value);
+		} else if (!strcasecmp(v->name, "membername")) {
+			ast_copy_string(m->membername, v->value, sizeof(m->membername));
+		} else if (!strcasecmp(v->name, "state_interface")) {
+			st_dev = ast_strdupa(ast_strlen_zero(v->value) ? interface : v->value);
+		} else if (!strcasecmp(v->name, "penalty")) {
+			if ((sscanf(v->value, "%30d", &m->penalty) != 1) || (!negative_penalty_invalid && m->penalty < 0)) {
+				m->penalty = 0;
+			} else if (m->penalty < 0) {
+				/* negative_penalty_invalid is set and i have a invalid penalty ignoring this member */
+				m->dead = 1;
+			}
+		} else if (!strcasecmp(v->name, "paused")) {
+			m->paused = abs(ast_true(v->value));
+		} else if (!strcasecmp(v->name, "ignorebusy")) {
+			m->ignorebusy = abs(ast_true(v->value));
+		}
+	}
+
+	if (ast_strlen_zero(st_dev)) {
+		st_dev = ast_strdupa(interface);
+	}
+	if (!m->dead && (s = ao2_find(devices, st_dev, 0))) {
+		if (s && (m->device != s)) {
+			if (m->device && (ao2_ref(m->device, -1) == 2)) {
+				ao2_unlink(devices, m->device);
+			}
+			m->device = s;
+		} else if (s) {
+			ao2_ref(s, -1);
+		} else if (m->device) {
+			if (ao2_ref(m->device, -1) == 2) {
+				ao2_unlink(devices, m->device);
+			}
+			m->device = NULL;
+		}
+	}
+
+	if (!m->dead && !m->device && (!(m->device = create_member_state(st_dev)))) {
+		m->dead = 1;
+	}
+
+	if (ast_strlen_zero(m->membername)) {
+		ast_copy_string(m->membername, interface, sizeof(m->membername));
+	}
+
+	/*check the uniqueness of the RT uniqueid */
+	if (m->realtime && !m->dead) {
+		if (ast_strlen_zero(rt_uniqueid)) {
+			ast_log(LOG_WARNING, "Realtime field uniqueid is empty for member %s\n", S_OR(m->membername, interface));
+			m->dead = 1;
+		} else if (link && (rt_m = ao2_callback(q->members, 0, member_cmp_uniqueid_fn, rt_uniqueid))) {
+			/*make sure there no duplicates this should never happen am i changing interface perhaps ??*/
+			ao2_lock(rt_m);
+			if (!rt_m->dead) {
+				m->dead = 1;
+			}
+			ast_log(AST_LOG_WARNING, "Duplicate uniqueid found while adding %s (%s) found %s (%s) on queue %s : Not adding\n",
+					m->interface, m->membername, rt_m->interface, rt_m->membername, q->name);
+			ao2_unlock(rt_m);
+			ao2_ref(rt_m, -1);
+		} else {
 			ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
-			if (paused_str) {
-				m->paused = paused;
-			}
-			if (strcasecmp(state_interface, m->state_interface)) {
-				ast_copy_string(m->state_interface, state_interface, sizeof(m->state_interface));
-			}
-			m->penalty = penalty;
-			m->ignorebusy = ignorebusy;
-			found = 1;
-			ao2_ref(m, -1);
-			break;
-		}
-		ao2_ref(m, -1);
-	}
-	ao2_iterator_destroy(&mem_iter);
-
-	/* Create a new member */
-	if (!found) {
-		if ((m = create_queue_member(interface, membername, penalty, paused, state_interface))) {
-			m->dead = 0;
-			m->realtime = 1;
-			m->ignorebusy = ignorebusy;
-			ast_copy_string(m->rt_uniqueid, rt_uniqueid, sizeof(m->rt_uniqueid));
-			if (!log_membername_as_agent) {
-				ast_queue_log(q->name, "REALTIME", m->interface, "ADDMEMBER", "%s", "");
+		}
+	}
+
+	if (!m->dead && link) {
+		/* i have just been born */
+		if ((ast_strlen_zero(m->membername) || !log_membername_as_agent)) {
+			ast_queue_log(q->name, source, m->interface, "ADDMEMBER", "%s", "");
+		} else {
+			ast_queue_log(q->name, source, m->membername, "ADDMEMBER", "%s", "");
+		}
+		if (m->dynamic) {
+			int status = get_device_status(m->device);
+			manager_event(EVENT_FLAG_AGENT, "QueueMemberAdded",
+				"Queue: %s\r\n"
+				"Location: %s\r\n"
+				"MemberName: %s\r\n"
+				"StateInterface: %s\r\n"
+				"Membership: %s\r\n"
+				"Penalty: %d\r\n"
+				"CallsTaken: %d\r\n"
+				"LastCall: %d\r\n"
+				"Status: %d\r\n"
+				"Paused: %d\r\n"
+				"IgnoreBusy: %d\r\n",
+				q->name, m->interface, m->membername, m->device->state_interface,
+				"dynamic",m->penalty, m->calls, (int)m->lastcall,
+				status, m->paused, m->ignorebusy);
+		}
+		ao2_link(q->members, m);
+	} else if (m->dead) {
+		/* ive failed penalty/uniqueid/devstate failure */
+		if (!m->device) {
+			res = RES_OUTOFMEMORY;
+		} else {
+			res = RES_ERROR;
+		}
+		if (!link) {
+			/* thee was a config error remove this member from container now*/
+			if ((ast_strlen_zero(m->membername) || !log_membername_as_agent)) {
+				ast_queue_log(q->name, source, m->interface, "REMOVEMEMBER", "%s", "");
 			} else {
-				ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", "");
-			}
-			ao2_link(q->members, m);
-			ao2_ref(m, -1);
-			m = NULL;
-		}
-	}
+				ast_queue_log(q->name, source, m->membername, "REMOVEMEMBER", "%s", "");
+			}
+			ao2_unlock(m);
+			ao2_unlink(q->members, m);
+		}
+	} else if (!link) {
+		/* ive been updated */
+		ao2_unlock(m);
+	}
+	ao2_ref(m, -1);
+	return res;
 }
 
 /*! \brief Iterate through queue's member list and delete them */
@@ -2216,8 +2406,12 @@
 	struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0);
 
 	while ((cur = ao2_iterator_next(&mem_iter))) {
+		ao2_lock(cur);
 		if (all || !cur->dynamic) {
+			ao2_unlock(cur);
 			ao2_unlink(q->members, cur);
+		} else {
+			ao2_unlock(cur);
 		}
 		ao2_ref(cur, -1);
 	}
@@ -2239,7 +2433,7 @@
 	ao2_ref(q->members, -1);
 }
 
-static struct call_queue *alloc_queue(const char *queuename)
+static struct call_queue *alloc_queue(const char *queuename, int rt)
 {
 	struct call_queue *q;
 
@@ -2250,7 +2444,64 @@
 		}
 		ast_string_field_set(q, name, queuename);
 	}
+	q->realtime = rt;
+	q->weight = 0;
+	q->found = 0;
+	ao2_link(queues, q);
+
 	return q;
+}
+
+
+static void rt_load_member_config(struct call_queue *q)
+{
+	struct ast_config *member_config;
+	struct member *m;
+	struct ao2_iterator mem_iter;
+	char *interface = NULL;
+
+	/* we may not have realtime members */
+	if (!(member_config = ast_load_realtime_multientry("queue_members", "interface LIKE", "%", "queue_name", q->name, SENTINEL))) {
+		ast_debug(3, "Queue %s has no realtime members defined. No need for update\n", q->name);
+		return;
+	}
+
+	/* Temporarily set realtime members dead so we can detect deleted ones. */
+	mem_iter = ao2_iterator_init(q->members, 0);
+	while ((m = ao2_iterator_next(&mem_iter))) {
+		ao2_lock(m);
+		if (m->realtime) {
+			m->dead = 1;
+		}
+		ao2_unlock(m);
+		ao2_ref(m, -1);
+	}
+	ao2_iterator_destroy(&mem_iter);
+
+	while ((interface = ast_category_browse(member_config, interface))) {
+		handle_member_record(q, interface, member_config, MEMBER_REALTIME, "REALTIME");
+	}
+	ast_config_destroy(member_config);
+
+	/* Delete all realtime members that have been deleted in DB. */
+	mem_iter = ao2_iterator_init(q->members, 0);
+	while ((m = ao2_iterator_next(&mem_iter))) {
+		ao2_lock(m);
+		if (m->dead && m->realtime) {
+			if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
+				ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
+			} else {
+				ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
+			}
+			ao2_unlock(m);
+			ao2_unlink(q->members, m);
+			ao2_ref(m, -1);
+		} else {
+			ao2_unlock(m);
+			ao2_ref(m, -1);
+		}
+	}
+	ao2_iterator_destroy(&mem_iter);
 }
 
 /*!
@@ -2261,86 +2512,55 @@
  * reload RT queue vars, set RT queue members dead and reload them, return finished queue.
  * \retval the queue,
  * \retval NULL if it doesn't exist.
- * \note Should be called with the "queues" container locked.
 */
-static struct call_queue *find_queue_by_name_rt(const char *queuename, struct ast_variable *queue_vars, struct ast_config *member_config)
+static struct call_queue *load_realtime_queue(const char *queuename, int rmask)
 {
 	struct ast_variable *v;
-	struct call_queue *q, tmpq = {
-		.name = queuename,
-	};
-	struct member *m;
-	struct ao2_iterator mem_iter;
-	char *interface = NULL;
 	const char *tmp_name;
 	char *tmp;
 	char tmpbuf[64];	/* Must be longer than the longest queue param name. */
-
-	/* Static queues override realtime. */
-	if ((q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Check if static queue exists"))) {
-		ao2_lock(q);
-		if (!q->realtime) {
-			if (q->dead) {
-				ao2_unlock(q);
-				queue_t_unref(q, "Queue is dead; can't return it");
-				return NULL;
-			} else {
-				ast_log(LOG_WARNING, "Static queue '%s' already exists. Not loading from realtime\n", q->name);
-				ao2_unlock(q);
-				return q;
-			}
-		}
-	} else if (!member_config) {
-		/* Not found in the list, and it's not realtime ... */
-		return NULL;
-	}
+	int prev_weight = 0;
+	struct ast_variable *queue_vars;
+	struct call_queue *q;
+	int found;
+
+	if ((q = ao2_t_find(queues, queuename, OBJ_KEY, "Look for queue in memory first")) &&
+	    (!q->realtime || !(rmask & QUEUE_RELOAD_PARAMETERS))) {
+		if (rmask & QUEUE_RELOAD_MEMBER) {
+			rt_load_member_config(q);
+		}
+		return q;
+	}
+
+	/*! \note This will be two separate database transactions, so we might
+	   see queue parameters as they were before another process
+	   changed the queue and member list as it was after the change.
+	   Thus we might see an empty member list when a queue is
+	   deleted. In practise, this is unlikely to cause a problem. */
+
 	/* Check if queue is defined in realtime. */
-	if (!queue_vars) {
+	if (!(queue_vars = ast_load_realtime("queues", "name", queuename, SENTINEL))) {
 		/* Delete queue from in-core list if it has been deleted in realtime. */
 		if (q) {
 			/*! \note Hmm, can't seem to distinguish a DB failure from a not
 			   found condition... So we might delete an in-core queue
 			   in case of DB failure. */
 			ast_debug(1, "Queue %s not found in realtime.\n", queuename);
-
-			q->dead = 1;
-			/* Delete if unused (else will be deleted when last caller leaves). */
 			queues_t_unlink(queues, q, "Unused; removing from container");
-			ao2_unlock(q);
 			queue_t_unref(q, "Queue is dead; can't return it");
 		}
 		return NULL;
 	}
 
 	/* Create a new queue if an in-core entry does not exist yet. */
-	if (!q) {
-		struct ast_variable *tmpvar = NULL;
-		if (!(q = alloc_queue(queuename))) {
-			return NULL;
-		}
-		ao2_lock(q);
-		clear_queue(q);
-		q->realtime = 1;
-		/*Before we initialize the queue, we need to set the strategy, so that linear strategy
-		 * will allocate the members properly
-		 */
-		for (tmpvar = queue_vars; tmpvar; tmpvar = tmpvar->next) {
-			if (!strcasecmp(tmpvar->name, "strategy")) {
-				q->strategy = strat2int(tmpvar->value);
-				if (q->strategy < 0) {
-					ast_log(LOG_WARNING, "'%s' isn't a valid strategy for queue '%s', using ringall instead\n",
-					tmpvar->value, q->name);
-					q->strategy = QUEUE_STRATEGY_RINGALL;
-				}
-				break;
-			}
-		}
-		/* We traversed all variables and didn't find a strategy */
-		if (!tmpvar) {
-			q->strategy = QUEUE_STRATEGY_RINGALL;
-		}
-		queues_t_link(queues, q, "Add queue to container");
-	}
+	if (!q && (!(q = alloc_queue(queuename, 1)))) {
+		ast_variables_destroy(queue_vars);
+		return NULL;
+	}
+
+	ao2_lock(q);
+	found = q->found;
+	prev_weight = q->weight ? 1 : 0;
 	init_queue(q);		/* Ensure defaults for all parameters not set explicitly. */
 
 	memset(tmpbuf, 0, sizeof(tmpbuf));
@@ -2360,156 +2580,72 @@
 		 * should set the realtime column to NULL, not blank. */
 		queue_set_param(q, tmp_name, v->value, -1, 0);
 	}
-
-	/* Temporarily set realtime members dead so we can detect deleted ones. */
-	mem_iter = ao2_iterator_init(q->members, 0);
-	while ((m = ao2_iterator_next(&mem_iter))) {
-		if (m->realtime) {
-			m->dead = 1;
-		}
-		ao2_ref(m, -1);
-	}
-	ao2_iterator_destroy(&mem_iter);
-
-	while ((interface = ast_category_browse(member_config, interface))) {
-		rt_handle_member_record(q, interface, member_config);
-	}
-
-	/* Delete all realtime members that have been deleted in DB. */
-	mem_iter = ao2_iterator_init(q->members, 0);
-	while ((m = ao2_iterator_next(&mem_iter))) {
-		if (m->dead) {
-			if (ast_strlen_zero(m->membername) || !log_membername_as_agent) {
-				ast_queue_log(q->name, "REALTIME", m->interface, "REMOVEMEMBER", "%s", "");
-			} else {
-				ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
-			}
-			ao2_unlink(q->members, m);
-		}
-		ao2_ref(m, -1);
-	}
-	ao2_iterator_destroy(&mem_iter);
+	ast_variables_destroy(queue_vars);
+
+	/* its important that this is never altered in the life of the queue*/
+	if (!q->members && (q->strategy == QUEUE_STRATEGY_LINEAR || q->strategy == QUEUE_STRATEGY_RRORDERED)) {
+		/* linear strategy depends on order, so we have to place all members in a single bucket */

[... 4244 lines stripped ...]



More information about the svn-commits mailing list