[Asterisk-cvs] asterisk/apps app_queue.c,1.116,1.117

markster at lists.digium.com markster at lists.digium.com
Thu Jan 20 22:11:30 CST 2005


Update of /usr/cvsroot/asterisk/apps
In directory mongoose.digium.com:/tmp/cvs-serv19927/apps

Modified Files:
	app_queue.c 
Log Message:
Merge mjohnston's pause/unpause (bug #3252)


Index: app_queue.c
===================================================================
RCS file: /usr/cvsroot/asterisk/apps/app_queue.c,v
retrieving revision 1.116
retrieving revision 1.117
diff -u -d -r1.116 -r1.117
--- app_queue.c	18 Jan 2005 20:43:53 -0000	1.116
+++ app_queue.c	21 Jan 2005 04:14:26 -0000	1.117
@@ -148,6 +148,30 @@
 "Example: RemoveQueueMember(techsupport|SIP/3000)\n"
 "";
 
+static char *app_pqm = "PauseQueueMember" ;
+static char *app_pqm_synopsis = "Pauses a queue member" ;
+static char *app_pqm_descrip =
+"   PauseQueueMember([queuename]|interface):\n"
+"Pauses (blocks calls for) a queue member.\n"
+"The given interface will be paused in the given queue.  This prevents\n"
+"any calls from being sent from the queue to the interface until it is\n"
+"unpaused with UnpauseQueueMember or the manager interface.  If no\n"
+"queuename is given, the interface is paused in every queue it is a\n"
+"member of.  If the interface is not in the named queue, or if no queue\n"
+"is given and the interface is not in any queue, it will jump to\n"
+" priority n+101, if it exists.  Returns -1 if the interface is not\n"
+"found and no extension to jump to exists, 0 otherwise.\n"
+"Example: PauseQueueMember(|SIP/3000)\n";
+
+static char *app_upqm = "UnpauseQueueMember" ;
+static char *app_upqm_synopsis = "Unpauses a queue member" ;
+static char *app_upqm_descrip =
+"   UnpauseQueueMember([queuename]|interface):\n"
+"Unpauses (resumes calls to) a queue member.\n"
+"This is the counterpart to PauseQueueMember and operates exactly the\n"
+"same way, except it unpauses instead of pausing the given interface.\n"
+"Example: UnpauseQueueMember(|SIP/3000)\n";
+
 /* Persistent Members astdb family */
 static const char *pm_family = "/Queue/PersistentMembers";
 /* The maximum lengh of each persistent member queue database entry */
@@ -214,6 +238,7 @@
 	int calls;			/* Number of calls serviced by this member */
 	int dynamic;			/* Are we dynamically added? */
 	int status;			/* Status of queue member */
+	int paused;			/* Are we paused (not accepting calls)? */
 	time_t lastcall;		/* When last successful call was hungup */
 	struct member *next;		/* Next member */
 };
@@ -367,9 +392,10 @@
 						"Penalty: %d\r\n"
 						"CallsTaken: %d\r\n"
 						"LastCall: %ld\r\n"
-						"Status: %d\r\n",
+						"Status: %d\r\n"
+						"Paused: %d\r\n",
 					q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
-					cur->penalty, cur->calls, cur->lastcall, cur->status);
+					cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused);
 				}
 			}
 			cur = cur->next;
@@ -693,9 +719,10 @@
 				"Penalty: %d\r\n"
 				"CallsTaken: %d\r\n"
 				"LastCall: %ld\r\n"
-				"Status: %d\r\n",
+				"Status: %d\r\n"
+				"Paused: %d\r\n",
 					q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
-					cur->penalty, cur->calls, cur->lastcall, cur->status);
+					cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused);
 			break;
 		}
 		cur = cur->next;
@@ -789,6 +816,15 @@
 		return 0;
 	}
 	
+	if (tmp->member->paused) {
+		if (option_debug)
+			ast_log(LOG_DEBUG, "%s paused, can't receive call\n", tmp->interface);
+		if (qe->chan->cdr)
+			ast_cdr_busy(qe->chan->cdr);
+		tmp->stillgoing = 0;
+		return 0;
+	}
+
 	strncpy(tech, tmp->interface, sizeof(tech) - 1);
 	if ((location = strchr(tech, '/')))
 		*location++ = '\0';
@@ -1616,7 +1652,7 @@
 }
 
 
-static struct member *create_queue_node(char *interface, int penalty)
+static struct member *create_queue_node(char *interface, int penalty, int paused)
 {
 	struct member *cur;
 	
@@ -1627,6 +1663,7 @@
 	if (cur) {
 		memset(cur, 0, sizeof(struct member));
 		cur->penalty = penalty;
+		cur->paused = paused;
 		strncpy(cur->interface, interface, sizeof(cur->interface) - 1);
 		if (!strchr(cur->interface, '/'))
 			ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
@@ -1638,7 +1675,7 @@
 
 /* Dump all members in a specific queue to the databse
  *
- * <pm_family>/<queuename> = <interface>;<penalty>;...
+ * <pm_family>/<queuename> = <interface>;<penalty>;<paused>;...
  *
  */
 static void dump_queue_members(struct ast_call_queue *pm_queue)
@@ -1655,7 +1692,7 @@
 		while (cur_member) {
 			if (cur_member->dynamic) {
 				value_len = strlen(value);
-				res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;", cur_member->interface, cur_member->penalty);
+				res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;%d;", cur_member->interface, cur_member->penalty, cur_member->paused);
 				if (res != strlen(value + value_len)) {
 					ast_log(LOG_WARNING, "Could not create persistent member string, out of space\n");
 					break;
@@ -1721,7 +1758,7 @@
 	return res;
 }
 
-static int add_to_queue(char *queuename, char *interface, int penalty)
+static int add_to_queue(char *queuename, char *interface, int penalty, int paused)
 {
 	struct ast_call_queue *q;
 	struct member *new_member;
@@ -1732,7 +1769,7 @@
 		ast_mutex_lock(&q->lock);
 		if (!strcmp(q->name, queuename)) {
 			if (interface_exists(q, interface) == NULL) {
-				new_member = create_queue_node(interface, penalty);
+				new_member = create_queue_node(interface, penalty, paused);
 
 				if (new_member != NULL) {
 					new_member->dynamic = 1;
@@ -1745,9 +1782,10 @@
 						"Penalty: %d\r\n"
 						"CallsTaken: %d\r\n"
 						"LastCall: %ld\r\n"
-						"Status: %d\r\n",
+						"Status: %d\r\n"
+						"Paused: %d\r\n",
 					q->name, new_member->interface, new_member->dynamic ? "dynamic" : "static",
-					new_member->penalty, new_member->calls, new_member->lastcall, new_member->status);
+					new_member->penalty, new_member->calls, new_member->lastcall, new_member->status, new_member->paused);
 					
 					if (queue_persistent_members)
 						dump_queue_members(q);
@@ -1768,6 +1806,49 @@
 	return res;
 }
 
+static int set_member_paused(char *queuename, char *interface, int paused)
+{
+	int found = 0;
+	struct ast_call_queue *q;
+	struct member *mem;
+
+	/* Special event for when all queues are paused - individual events still generated */
+
+	if (ast_strlen_zero(queuename))
+		ast_queue_log("NONE", "NONE", interface, (paused ? "PAUSEALL" : "UNPAUSEALL"), "%s", "");
+
+	ast_mutex_lock(&qlock);
+	for (q = queues ; q ; q = q->next) {
+		ast_mutex_lock(&q->lock);
+		if (ast_strlen_zero(queuename) || !strcmp(q->name, queuename)) {
+			if ((mem = interface_exists(q, interface))) {
+				found++;
+				if (mem->paused == paused)
+					ast_log(LOG_DEBUG, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
+				mem->paused = paused;
+
+				if (queue_persistent_members)
+				    dump_queue_members(q);
+
+				ast_queue_log(q->name, "NONE", interface, (paused ? "PAUSE" : "UNPAUSE"), "%s", "");
+
+				manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+					"Queue: %s\r\n"
+					"Location: %s\r\n"
+					"Paused: %d\r\n",
+						q->name, mem->interface, paused);
+			}
+		}
+		ast_mutex_unlock(&q->lock);
+	}
+	ast_mutex_unlock(&qlock);
+
+	if (found)
+		return RESULT_SUCCESS;
+	else
+		return RESULT_FAILURE;
+}
+
 /* Add members saved in the queue members DB file saves
  * created by dump_queue_members(), back into the queues */
 static void reload_queue_members(void)
@@ -1777,6 +1858,8 @@
 	char *pm_interface;
 	char *pm_penalty_tok;
 	int pm_penalty = 0;
+	char *pm_paused_tok;
+	int pm_paused = 0;
 	struct ast_db_entry *pm_db_tree = NULL;
 	int pm_family_len = 0;
 	struct ast_call_queue *cur_queue = NULL;
@@ -1813,12 +1896,13 @@
 			ast_mutex_unlock(&cur_queue->lock);
 
 		if (!ast_db_get(pm_family, pm_queue_name, queue_data, PM_MAX_LEN)) {
-			/* Parse each <interface>;<penalty>; from the value of the
-			 * queuename key and add it to the respective queue */
 			cur_pm_ptr = queue_data;
 			while ((pm_interface = strsep(&cur_pm_ptr, ";"))) {
+				/* On the last iteration, pm_interface is a pointer to an empty string. Don't report a spurious error. */
+				if (pm_interface[0] == 0)
+					break;
 				if (!(pm_penalty_tok = strsep(&cur_pm_ptr, ";"))) {
-					ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s'\n", pm_queue_name);
+					ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (penalty)\n", pm_queue_name);
 					break;
 				}
 				pm_penalty = strtol(pm_penalty_tok, NULL, 10);
@@ -1826,11 +1910,26 @@
 					ast_log(LOG_WARNING, "Error converting penalty: %s: Out of range.\n", pm_penalty_tok);
 					break;
 				}
+
+				/* If ptr[1] is ';', the string is 1 char long and can't be an interface */
+
+				if (cur_pm_ptr[1] == ';') {
+					if (!(pm_paused_tok = strsep(&cur_pm_ptr, ";"))) {
+						ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (paused)\n", pm_queue_name);
+						break;
+					}
+					pm_paused = strtol(pm_paused_tok, NULL, 10);
+					if ((errno == ERANGE) || (pm_paused < 0 || pm_paused > 1)) {
+						ast_log(LOG_WARNING, "Error converting paused: %s: Expected 0 or 1.\n", pm_paused_tok);
+						break;
+					}
+				} else if (option_debug)
+					ast_verbose(VERBOSE_PREFIX_3 "Found old-format queue member %s:%s\n", pm_queue_name, pm_interface);
 	
 				if (option_debug)
-					ast_log(LOG_DEBUG, "Reload Members: Queue: %s  Member: %s  Penalty: %d\n", pm_queue_name, pm_interface, pm_penalty);
+					ast_log(LOG_DEBUG, "Reload Members: Queue: %s  Member: %s  Penalty: %d  Paused: %d\n", pm_queue_name, pm_interface, pm_penalty, pm_paused);
 	
-				if (add_to_queue(pm_queue_name, pm_interface, pm_penalty) == RES_OUTOFMEMORY) {
+				if (add_to_queue(pm_queue_name, pm_interface, pm_penalty, pm_paused) == RES_OUTOFMEMORY) {
 					ast_log(LOG_ERROR, "Out of Memory when loading queue member from astdb\n");
 					break;
 				}
@@ -1848,6 +1947,90 @@
 	}
 }
 
+static int pqm_exec(struct ast_channel *chan, void *data)
+{
+	struct localuser *u;
+	char *queuename, *interface;
+
+	if (!data) {
+		ast_log(LOG_WARNING, "PauseQueueMember requires an argument ([queuename]|interface])\n");
+		return -1;
+	}
+
+	queuename = ast_strdupa((char *)data);
+	if (!queuename) {
+		ast_log(LOG_ERROR, "Out of memory\n");
+		return -1;
+	}
+
+	interface = strchr(queuename, '|');
+	if (!interface) {
+		ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n");
+		return -1;
+	}
+
+	LOCAL_USER_ADD(u);
+
+	*interface = '\0';
+	interface++;
+
+	if (set_member_paused(queuename, interface, 1)) {
+		ast_log(LOG_WARNING, "Attempt to pause interface %s, not found\n", interface);
+		if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) {
+			chan->priority += 100;
+			LOCAL_USER_REMOVE(u);
+			return 0;
+		}
+		return -1;
+	}
+
+	LOCAL_USER_REMOVE(u);
+
+	return 0;
+}
+
+static int upqm_exec(struct ast_channel *chan, void *data)
+{
+	struct localuser *u;
+	char *queuename, *interface;
+
+	if (!data) {
+		ast_log(LOG_WARNING, "UnpauseQueueMember requires an argument ([queuename]|interface])\n");
+		return -1;
+	}
+
+	queuename = ast_strdupa((char *)data);
+	if (!queuename) {
+		ast_log(LOG_ERROR, "Out of memory\n");
+		return -1;
+	}
+
+	interface = strchr(queuename, '|');
+	if (!interface) {
+		ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n");
+		return -1;
+	}
+
+	LOCAL_USER_ADD(u);
+
+	*interface = '\0';
+	interface++;
+
+	if (set_member_paused(queuename, interface, 0)) {
+		ast_log(LOG_WARNING, "Attempt to unpause interface %s, not found\n", interface);
+		if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) {
+			chan->priority += 100;
+			LOCAL_USER_REMOVE(u);
+			return 0;
+		}
+		return -1;
+	}
+
+	LOCAL_USER_REMOVE(u);
+
+	return 0;
+}
+
 static int rqm_exec(struct ast_channel *chan, void *data)
 {
 	int res=-1;
@@ -1962,7 +2145,7 @@
 		}
 	}
 
-	switch (add_to_queue(queuename, interface, penalty)) {
+	switch (add_to_queue(queuename, interface, penalty, 0)) {
 	case RES_OKAY:
 		ast_log(LOG_NOTICE, "Added interface '%s' to queue '%s'\n", interface, queuename);
 		res = 0;
@@ -2502,6 +2685,8 @@
 					max[0] = '\0';
 				if (mem->dynamic)
 					strncat(max, " (dynamic)", sizeof(max) - strlen(max) - 1);
+				if (mem->paused)
+					strncat(max, " (paused)", sizeof(max) - strlen(max) - 1);
 				if (mem->status)
 					snprintf(max + strlen(max), sizeof(max) - strlen(max), " (%s)", status2str(mem->status, tmpbuf, sizeof(tmpbuf)));
 				if (mem->calls) {
@@ -2615,10 +2800,11 @@
 				"CallsTaken: %d\r\n"
 				"LastCall: %ld\r\n"
 				"Status: %d\r\n"
+				"Paused: %d\r\n"
 				"%s"
 				"\r\n",
 					q->name, mem->interface, mem->dynamic ? "dynamic" : "static",
-					mem->penalty, mem->calls, mem->lastcall, mem->status, idText);
+					mem->penalty, mem->calls, mem->lastcall, mem->status, mem->paused, idText);
 
 		/* List Queue Entries */
 
@@ -2646,12 +2832,13 @@
 
 static int manager_add_queue_member(struct mansession *s, struct message *m)
 {
-	char *queuename, *interface, *penalty_s;
-	int penalty = 0;
+	char *queuename, *interface, *penalty_s, *paused_s;
+	int paused, penalty = 0;
 
 	queuename = astman_get_header(m, "Queue");
 	interface = astman_get_header(m, "Interface");
 	penalty_s = astman_get_header(m, "Penalty");
+	paused_s = astman_get_header(m, "Paused");
 
 	if (ast_strlen_zero(queuename)) {
 		astman_send_error(s, m, "'Queue' not specified.");
@@ -2669,7 +2856,12 @@
 		penalty = 0;
 	}
 
-	switch (add_to_queue(queuename, interface, penalty)) {
+	if (ast_strlen_zero(paused_s))
+		paused = 0;
+	else
+		paused = abs(ast_true(paused_s));
+
+	switch (add_to_queue(queuename, interface, penalty, paused)) {
 	case RES_OKAY:
 		astman_send_ack(s, m, "Added interface to queue");
 		break;
@@ -2715,6 +2907,33 @@
 	return 0;
 }
 
+static int manager_pause_queue_member(struct mansession *s, struct message *m)
+{
+	char *queuename, *interface, *paused_s;
+	int paused;
+
+	interface = astman_get_header(m, "Interface");
+	paused_s = astman_get_header(m, "Paused");
+	queuename = astman_get_header(m, "Queue");	/* Optional - if not supplied, pause the given Interface in all queues */
+
+	if (ast_strlen_zero(interface) || ast_strlen_zero(paused_s)) {
+		astman_send_error(s, m, "Need 'Interface' and 'Paused' parameters.");
+		return 0;
+	}
+
+	paused = abs(ast_true(paused_s));
+
+	if (set_member_paused(queuename, interface, paused))
+		astman_send_error(s, m, "Interface not found");
+	else
+		if (paused)
+			astman_send_ack(s, m, "Interface paused successfully");
+		else
+			astman_send_ack(s, m, "Interface unpaused successfully");
+
+	return 0;
+}
+
 static int handle_add_queue_member(int fd, int argc, char *argv[])
 {
 	char *queuename, *interface;
@@ -2744,7 +2963,7 @@
 		penalty = 0;
 	}
 
-	switch (add_to_queue(queuename, interface, penalty)) {
+	switch (add_to_queue(queuename, interface, penalty, 0)) {
 	case RES_OKAY:
 		ast_cli(fd, "Added interface '%s' to queue '%s'\n", interface, queuename);
 		return RESULT_SUCCESS;
@@ -2909,9 +3128,12 @@
 	ast_manager_unregister("QueueStatus");
 	ast_manager_unregister("QueueAdd");
 	ast_manager_unregister("QueueRemove");
+	ast_manager_unregister("QueuePause");
 	ast_devstate_del(statechange_queue, NULL);
 	ast_unregister_application(app_aqm);
 	ast_unregister_application(app_rqm);
+	ast_unregister_application(app_pqm);
+	ast_unregister_application(app_upqm);
 	return ast_unregister_application(app);
 }
 
@@ -2929,8 +3151,11 @@
 		ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" );
 		ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." );
 		ast_manager_register( "QueueRemove", EVENT_FLAG_AGENT, manager_remove_queue_member, "Remove interface from queue." );
+		ast_manager_register( "QueuePause", EVENT_FLAG_AGENT, manager_pause_queue_member, "Makes a queue member temporarily unavailable" );
 		ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip) ;
 		ast_register_application(app_rqm, rqm_exec, app_rqm_synopsis, app_rqm_descrip) ;
+		ast_register_application(app_pqm, pqm_exec, app_pqm_synopsis, app_pqm_descrip) ;
+		ast_register_application(app_upqm, upqm_exec, app_upqm_synopsis, app_upqm_descrip) ;
 	}
 	reload_queues();
 	




More information about the svn-commits mailing list