[asterisk-commits] trunk r16957 - in /trunk: include/asterisk/manager.h manager.c

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Sun Apr 2 12:59:57 MST 2006


Author: markster
Date: Sun Apr  2 14:59:55 2006
New Revision: 16957

URL: http://svn.digium.com/view/asterisk?rev=16957&view=rev
Log:
Unify manager behind a single event queue

Modified:
    trunk/include/asterisk/manager.h
    trunk/manager.c

Modified: trunk/include/asterisk/manager.h
URL: http://svn.digium.com/view/asterisk/trunk/include/asterisk/manager.h?rev=16957&r1=16956&r2=16957&view=diff
==============================================================================
--- trunk/include/asterisk/manager.h (original)
+++ trunk/include/asterisk/manager.h Sun Apr  2 14:59:55 2006
@@ -58,11 +58,6 @@
 /* Export manager structures */
 #define AST_MAX_MANHEADERS 80
 #define AST_MAX_MANHEADER_LEN 256
-
-struct eventqent {
-	struct eventqent *next;
-	char eventdata[1];
-};
 
 struct mansession;
 

Modified: trunk/manager.c
URL: http://svn.digium.com/view/asterisk/trunk/manager.c?rev=16957&r1=16956&r2=16957&view=diff
==============================================================================
--- trunk/manager.c (original)
+++ trunk/manager.c Sun Apr  2 14:59:55 2006
@@ -83,6 +83,14 @@
 	struct ast_variable *vars;
 };
 
+struct eventqent {
+	int usecount;
+	int category;
+	ast_mutex_t lock;
+	struct eventqent *next;
+	char eventdata[1];
+};
+
 static int enabled = 0;
 static int portno = DEFAULT_MANAGER_PORT;
 static int asock = -1;
@@ -93,6 +101,8 @@
 static pthread_t t;
 AST_MUTEX_DEFINE_STATIC(sessionlock);
 static int block_sockets = 0;
+static int num_sessions = 0;
+struct eventqent *master_eventq = NULL;
 
 static struct permalias {
 	int num;
@@ -472,6 +482,23 @@
 	return RESULT_SUCCESS;
 }
 
+/*! \brief  handle_showmanconn: CLI command show manager connected */
+/* Should change to "manager show connected" */
+static int handle_showmaneventq(int fd, int argc, char *argv[])
+{
+	struct eventqent *s;
+	ast_mutex_lock(&sessionlock);
+	s = master_eventq;
+	while (s) {
+		ast_cli(fd, "Usecount: %d\n",s->usecount);
+		ast_cli(fd, "Category: %d\n", s->category);
+		ast_cli(fd, "Event:\n%s", s->eventdata);
+		s = s->next;
+	}
+	ast_mutex_unlock(&sessionlock);
+	return RESULT_SUCCESS;
+}
+
 static char showmancmd_help[] = 
 "Usage: show manager command <actionname>\n"
 "	Shows the detailed description for a specific Asterisk manager interface command.\n";
@@ -485,6 +512,11 @@
 "	Prints a listing of the users that are currently connected to the\n"
 "Asterisk manager interface.\n";
 
+static char showmaneventq_help[] = 
+"Usage: show manager eventq\n"
+"	Prints a listing of all events pending in the Asterisk manger\n"
+"event queue.\n";
+
 static struct ast_cli_entry show_mancmd_cli =
 	{ { "show", "manager", "command", NULL },
 	handle_showmancmd, "Show a manager interface command", showmancmd_help, complete_show_mancmd };
@@ -496,6 +528,24 @@
 static struct ast_cli_entry show_manconn_cli =
 	{ { "show", "manager", "connected", NULL },
 	handle_showmanconn, "Show connected manager interface users", showmanconn_help };
+
+static struct ast_cli_entry show_maneventq_cli =
+	{ { "show", "manager", "eventq", NULL },
+	handle_showmaneventq, "Show manager interface queued events", showmaneventq_help };
+
+static void unuse_eventqent(struct eventqent *e)
+{
+	/* XXX Need to atomically decrement the users.  Change this to atomic_dec
+	       one day when we have such a beast XXX */
+	int val;
+	ast_mutex_lock(&e->lock);
+	e->usecount--;
+	val = e->usecount && e->next;
+	ast_mutex_unlock(&e->lock);
+	/* Wake up sleeping beauty */
+	if (val)
+		pthread_kill(t, SIGURG);
+}
 
 static void free_session(struct mansession *s)
 {
@@ -508,7 +558,7 @@
 	while(s->eventq) {
 		eqe = s->eventq;
 		s->eventq = s->eventq->next;
-		free(eqe);
+		unuse_eventqent(eqe);
 	}
 	free(s);
 }
@@ -530,6 +580,7 @@
 		else
 			sessions = cur->next;
 		free_session(s);
+		num_sessions--;
 	} else
 		ast_log(LOG_WARNING, "Trying to delete nonexistent session %p?\n", s);
 	ast_mutex_unlock(&sessionlock);
@@ -876,7 +927,7 @@
 		ast_log(LOG_DEBUG, "Starting waiting for an event!\n");
 	for (x=0;((x<timeout) || (timeout < 0)); x++) {
 		ast_mutex_lock(&s->__lock);
-		if (s->eventq)
+		if (s->eventq && s->eventq->next)
 			needexit = 1;
 		if (s->waiting_thread != pthread_self())
 			needexit = 1;
@@ -898,11 +949,14 @@
 	if (s->waiting_thread == pthread_self()) {
 		astman_send_response(s, m, "Success", "Waiting for Event...");
 		/* Only show events if we're the most recent waiter */
-		while(s->eventq) {
-			astman_append(s, "%s", s->eventq->eventdata);
-			eqe = s->eventq;
-			s->eventq = s->eventq->next;
-			free(eqe);
+		while(s->eventq->next) {
+			eqe = s->eventq->next;
+			if (((s->readperm & eqe->category) == eqe->category) &&
+			    ((s->send_events & eqe->category) == eqe->category)) {
+				astman_append(s, "%s", eqe->eventdata);
+			}
+			unuse_eventqent(s->eventq);
+			s->eventq = eqe;
 		}
 		astman_append(s,
 			"Event: WaitEventComplete\r\n"
@@ -1566,6 +1620,30 @@
 	return 0;
 }
 
+static int process_events(struct mansession *s)
+{
+	struct eventqent *eqe;
+	int ret = 0;
+	ast_mutex_lock(&s->__lock);
+	if (s->fd > -1) {
+		s->busy--;
+		if (!s->eventq)
+			s->eventq = master_eventq;
+		while(s->eventq->next) {
+			eqe = s->eventq->next;
+			if ((s->authenticated && (s->readperm & eqe->category) == eqe->category) &&
+			    ((s->send_events & eqe->category) == eqe->category)) {
+				if (!ret && ast_carefulwrite(s->fd, eqe->eventdata, strlen(eqe->eventdata), s->writetimeout) < 0)
+					ret = -1;
+			}
+			unuse_eventqent(s->eventq);
+			s->eventq = eqe;
+		}
+	}
+	ast_mutex_unlock(&s->__lock);
+	return ret;
+}
+
 static int process_message(struct mansession *s, struct message *m)
 {
 	char action[80] = "";
@@ -1573,6 +1651,7 @@
 	char *id = astman_get_header(m,"ActionID");
 	char idText[256] = "";
 	char iabuf[INET_ADDRSTRLEN];
+	int ret = 0;
 
 	ast_copy_string(action, astman_get_header(m, "Action"), sizeof(action));
 	ast_log( LOG_DEBUG, "Manager received command '%s'\n", action );
@@ -1581,9 +1660,9 @@
 		astman_send_error(s, m, "Missing action in request");
 		return 0;
 	}
-        if (!ast_strlen_zero(id)) {
-                snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id);
-        }
+	if (!ast_strlen_zero(id)) {
+		snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id);
+	}
 	if (!s->authenticated) {
 		if (!strcasecmp(action, "Challenge")) {
 			char *authtype;
@@ -1623,8 +1702,6 @@
 		} else
 			astman_send_error(s, m, "Authentication Required");
 	} else {
-		int ret=0;
-		struct eventqent *eqe;
 		ast_mutex_lock(&s->__lock);
 		s->busy++;
 		ast_mutex_unlock(&s->__lock);
@@ -1642,23 +1719,10 @@
 		}
 		if (!tmp)
 			astman_send_error(s, m, "Invalid/unknown command");
-		ast_mutex_lock(&s->__lock);
-		if (s->fd > -1) {
-			s->busy--;
-			while(s->eventq) {
-				if (ast_carefulwrite(s->fd, s->eventq->eventdata, strlen(s->eventq->eventdata), s->writetimeout) < 0) {
-					ret = -1;
-					break;
-				}
-				eqe = s->eventq;
-				s->eventq = s->eventq->next;
-				free(eqe);
-			}
-		}
-		ast_mutex_unlock(&s->__lock);
+	}
+	if (ret)
 		return ret;
-	}
-	return 0;
+	return process_events(s);
 }
 
 static int get_input(struct mansession *s, char *output)
@@ -1687,12 +1751,20 @@
 	fds[0].fd = s->fd;
 	fds[0].events = POLLIN;
 	do {
+		ast_mutex_lock(&s->__lock);
+		s->waiting_thread = pthread_self();
+		ast_mutex_unlock(&s->__lock);
+
 		res = poll(fds, 1, -1);
+
+		ast_mutex_lock(&s->__lock);
+		s->waiting_thread = AST_PTHREADT_NULL;
+		ast_mutex_unlock(&s->__lock);
 		if (res < 0) {
 			if (errno == EINTR) {
 				if (s->dead)
 					return -1;
-				continue;
+				return 0;
 			}
 			ast_log(LOG_WARNING, "Select returned error: %s\n", strerror(errno));
 	 		return -1;
@@ -1734,8 +1806,12 @@
 				memset(&m, 0, sizeof(m));
 			} else if (m.hdrcount < AST_MAX_MANHEADERS - 1)
 				m.hdrcount++;
-		} else if (res < 0)
+		} else if (res < 0) {
 			break;
+		} else if (s->eventq->next) {
+			if (process_events(s))
+				break;
+		}
 	}
 	if (s->authenticated) {
 		if (option_verbose > 1) {
@@ -1759,6 +1835,7 @@
 	int as;
 	struct sockaddr_in sin;
 	socklen_t sinlen;
+	struct eventqent *eqe;
 	struct mansession *s, *prev=NULL, *next;
 	struct protoent *p;
 	int arg = 1;
@@ -1779,6 +1856,7 @@
 		while(s) {
 			next = s->next;
 			if (s->sessiontimeout && (now > s->sessiontimeout) && !s->inuse) {
+				num_sessions--;
 				if (prev)
 					prev->next = next;
 				else
@@ -1792,6 +1870,14 @@
 				prev = s;
 			s = next;
 		}
+		/* Purge master event queue of old, unused events, but make sure we
+		   always keep at least one in the queue */
+		eqe = master_eventq;
+		while (master_eventq->next && !master_eventq->usecount) {
+			eqe = master_eventq;
+			master_eventq = master_eventq->next;
+			free(eqe);
+		}
 		ast_mutex_unlock(&sessionlock);
 
 		sinlen = sizeof(sin);
@@ -1831,8 +1917,17 @@
 		s->fd = as;
 		s->send_events = -1;
 		ast_mutex_lock(&sessionlock);
+		num_sessions++;
 		s->next = sessions;
 		sessions = s;
+		/* Find the last place in the master event queue and hook ourselves
+		   in there */
+		s->eventq = master_eventq;
+		while(s->eventq->next)
+			s->eventq = s->eventq->next;
+		ast_mutex_lock(&s->eventq->lock);
+		s->eventq->usecount++;
+		ast_mutex_unlock(&s->eventq->lock);
 		ast_mutex_unlock(&sessionlock);
 		if (ast_pthread_create(&s->t, &attr, session_do, s))
 			destroy_session(s);
@@ -1841,21 +1936,24 @@
 	return NULL;
 }
 
-static int append_event(struct mansession *s, const char *str)
+static int append_event(const char *str, int category)
 {
 	struct eventqent *tmp, *prev=NULL;
 	tmp = malloc(sizeof(struct eventqent) + strlen(str));
 	if (tmp) {
+		ast_mutex_init(&tmp->lock);
 		tmp->next = NULL;
+		tmp->category = category;
 		strcpy(tmp->eventdata, str);
-		if (s->eventq) {
-			prev = s->eventq;
+		if (master_eventq) {
+			prev = master_eventq;
 			while(prev->next) 
 				prev = prev->next;
 			prev->next = tmp;
 		} else {
-			s->eventq = tmp;
-		}
+			master_eventq = tmp;
+		}
+		tmp->usecount = num_sessions;
 		return 0;
 	}
 	return -1;
@@ -1870,45 +1968,33 @@
 	char *tmp_next = tmp;
 	size_t tmp_left = sizeof(tmp) - 2;
 	va_list ap;
-
+	struct timeval now;
+
+	/* Abort if there aren't any manager sessions */
+	if (!num_sessions)
+		return 0;
+
+	ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n",
+			 event, authority_to_str(category, auth, sizeof(auth)));
+	if (timestampevents) {
+		now = ast_tvnow();
+		ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n",
+				 now.tv_sec, (unsigned long) now.tv_usec);
+	}
+	va_start(ap, fmt);
+	ast_build_string_va(&tmp_next, &tmp_left, fmt, ap);
+	va_end(ap);
+	*tmp_next++ = '\r';
+	*tmp_next++ = '\n';
+	*tmp_next = '\0';
+	
 	ast_mutex_lock(&sessionlock);
+	/* Append even to master list and wake up any sleeping sessions */
+	append_event(tmp, category);
 	for (s = sessions; s; s = s->next) {
-		if ((s->readperm & category) != category)
-			continue;
-
-		if ((s->send_events & category) != category)
-			continue;
-
-		if (ast_strlen_zero(tmp)) {
-			struct timeval now;
-
-			ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n",
-					 event, authority_to_str(category, auth, sizeof(auth)));
-			if (timestampevents) {
-				now = ast_tvnow();
-				ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n",
-						 now.tv_sec, (unsigned long) now.tv_usec);
-			}
-			va_start(ap, fmt);
-			ast_build_string_va(&tmp_next, &tmp_left, fmt, ap);
-			va_end(ap);
-			*tmp_next++ = '\r';
-			*tmp_next++ = '\n';
-			*tmp_next = '\0';
-		}
-
 		ast_mutex_lock(&s->__lock);
-		if (s->busy) {
-			append_event(s, tmp);
-			if (s->waiting_thread != AST_PTHREADT_NULL)
-				pthread_kill(s->waiting_thread, SIGURG);
-		} else if (!s->dead && !s->sessiontimeout) {
-			if (ast_carefulwrite(s->fd, tmp, tmp_next - tmp, s->writetimeout) < 0) {
-				ast_log(LOG_WARNING, "Disconnecting slow (or gone) manager session!\n");
-				s->dead = 1;
-				pthread_kill(s->t, SIGURG);
-			}
-		}
+		if (s->waiting_thread != AST_PTHREADT_NULL)
+			pthread_kill(s->waiting_thread, SIGURG);
 		ast_mutex_unlock(&s->__lock);
 	}
 	ast_mutex_unlock(&sessionlock);
@@ -2084,12 +2170,23 @@
 		s->managerid = rand() | (unsigned long)s;
 		s->next = sessions;
 		sessions = s;
+		num_sessions++;
+		/* Hook into the last spot in the event queue */
+		s->eventq = master_eventq;
+		while(s->eventq->next)
+			s->eventq = s->eventq->next;
+		ast_mutex_lock(&s->eventq->lock);
+		s->eventq->usecount++;
+		ast_mutex_unlock(&s->eventq->lock);
 		ast_mutex_unlock(&sessionlock);
 	}
 
-	/* Reset HTTP timeout */
+	/* Reset HTTP timeout.  If we're not yet authenticated, keep it extremely short */
 	time(&s->sessiontimeout);
-	s->sessiontimeout += httptimeout;
+	if (!s->authenticated && (httptimeout > 5))
+		s->sessiontimeout += 5;
+	else
+		s->sessiontimeout += httptimeout;
 	ast_mutex_unlock(&s->__lock);
 	
 	memset(&m, 0, sizeof(m));
@@ -2248,8 +2345,11 @@
 		ast_cli_register(&show_mancmd_cli);
 		ast_cli_register(&show_mancmds_cli);
 		ast_cli_register(&show_manconn_cli);
+		ast_cli_register(&show_maneventq_cli);
 		ast_extension_state_add(NULL, NULL, manager_state_cb, NULL);
 		registered = 1;
+		/* Append placeholder event so master_eventq never runs dry */
+		append_event("Event: Placeholder\r\n\r\n", 0);
 	}
 	portno = DEFAULT_MANAGER_PORT;
 	displayconnects = 1;



More information about the asterisk-commits mailing list