[svn-commits] rizzo: branch rizzo/astobj2 r45805 - /team/rizzo/astobj2/main/manager.c

svn-commits at lists.digium.com svn-commits at lists.digium.com
Fri Oct 20 15:01:49 MST 2006


Author: rizzo
Date: Fri Oct 20 17:01:48 2006
New Revision: 45805

URL: http://svn.digium.com/view/asterisk?rev=45805&view=rev
Log:
new version of manager.c, with O(1) event insertion and
hopefully more reliable locking.


Modified:
    team/rizzo/astobj2/main/manager.c

Modified: team/rizzo/astobj2/main/manager.c
URL: http://svn.digium.com/view/asterisk/team/rizzo/astobj2/main/manager.c?rev=45805&r1=45804&r2=45805&view=diff
==============================================================================
--- team/rizzo/astobj2/main/manager.c (original)
+++ team/rizzo/astobj2/main/manager.c Fri Oct 20 17:01:48 2006
@@ -76,19 +76,41 @@
 #include "asterisk/threadstorage.h"
 #include "asterisk/linkedlists.h"
 
+/*!
+ * Linked list of events.
+ * Global events are appended to the list by append_event().
+ * The usecount is the number of stored pointers to the element,
+ * excluding the list pointers. So an element that is only in
+ * the list has a usecount of 0, not 1.
+ *
+ * Clients have a pointer to the last event processed, and for each
+ * of these clients we track the usecount of the elements.
+ * If we have a pointer to an entry in the list, it is safe to navigate
+ * it forward because elements will not be deleted, but only appended.
+ * The worst that can happen is seeing the pointer still NULL.
+ *
+ * When the usecount of an element drops to 0, and the element is the
+ * first in the list, we can remove it. Removal is done within the
+ * main thread, which is woken up for the purpose.
+ *
+ * For simplicity of implementation, we make sure the list is never empty.
+ */
 struct eventqent {
 	int usecount;		/*!< # of clients who still need the event */
 	int category;
-	struct eventqent *next;
+	unsigned int seq;	/*!< sequence number */
+	AST_LIST_ENTRY(eventqent) eq_next;
 	char eventdata[1];	/*!< really variable size, allocated by append_event() */
 };
-struct eventqent *master_eventq = NULL; /*!< Protected by the sessions list lock */
+
+static AST_LIST_HEAD_STATIC(all_events, eventqent);
 
 static int enabled = 0;
 static int portno = DEFAULT_MANAGER_PORT;
 static int asock = -1;	/* the accept socket */
 static int displayconnects = 1;
 static int timestampevents = 0;
+static int numberevents = 1;
 static int httptimeout = 60;
 
 static pthread_t accept_thread_ptr;	/*!< the accept thread */
@@ -131,7 +153,7 @@
 	AST_LIST_ENTRY(mansession) list;
 };
 
-#define NEW_EVENT(m)	(m->last_ev->next)
+#define NEW_EVENT(m)	(AST_LIST_NEXT(m->last_ev, eq_next))
 
 static AST_LIST_HEAD_STATIC(sessions, mansession);
 
@@ -160,6 +182,85 @@
 AST_MUTEX_DEFINE_STATIC(actionlock);
 
 /*! \brief
+ * Event list management functions.
+ * We assume that the event list always has at least one element,
+ * and the delete code will not remove the last entry even if the
+ * 
+ */
+#if 0
+static time_t __deb(time_t start, const char *msg)
+{
+	time_t now = time(NULL);
+	ast_verbose("%4d th %p %s\n", (int)(now % 3600), pthread_self(), msg);
+	if (start != 0 && now - start > 5)
+		ast_verbose("+++ WOW, %s took %d seconds\n", msg, (int)(now - start));
+	return now;
+}
+
+static void LOCK_EVENTS(void)
+{
+	time_t start = __deb(0, "about to lock events");
+	AST_LIST_LOCK(&all_events);
+	__deb(start, "done lock events");
+}
+
+static void UNLOCK_EVENTS(void)
+{
+	__deb(0, "about to unlock events");
+	AST_LIST_UNLOCK(&all_events);
+}
+
+static void LOCK_SESS(void)
+{
+	time_t start = __deb(0, "about to lock sessions");
+	AST_LIST_LOCK(&sessions);
+	__deb(start, "done lock sessions");
+}
+
+static void UNLOCK_SESS(void)
+{
+	__deb(0, "about to unlock sessions");
+	AST_LIST_UNLOCK(&sessions);
+}
+#endif
+
+/*!
+ * Grab a reference to the last event, update usecount as needed.
+ * Can handle a NULL pointer.
+ */
+static struct eventqent *grab_last(void)
+{
+	struct eventqent *ret;
+
+	AST_LIST_LOCK(&all_events);
+	ret = AST_LIST_LAST(&all_events);
+	/* the list is never empty now, but may become so when
+	 * we optimize it in the future, so be prepared.
+	 */
+	if (ret)
+		ast_atomic_fetchadd_int(&ret->usecount, 1);
+	AST_LIST_UNLOCK(&all_events);
+	return ret;
+}
+
+/*!
+ * Purge unused events. Remove elements from the head
+ * as long as their usecount is 0 and there is a next element.
+ */
+static void purge_unused(void)
+{
+	struct eventqent *ev;
+
+	AST_LIST_LOCK(&all_events);
+	while ( (ev = AST_LIST_FIRST(&all_events)) &&
+	    ev->usecount == 0 && AST_LIST_NEXT(ev, eq_next)) {
+		AST_LIST_REMOVE_HEAD(&all_events, eq_next);
+		free(ev);
+	}
+	AST_LIST_UNLOCK(&all_events);
+}
+
+/*!
  * helper functions to convert back and forth between
  * string and numeric representation of set of flags
  */
@@ -434,13 +535,13 @@
 {
 	struct eventqent *s;
 
-	AST_LIST_LOCK(&sessions);
-	for (s = master_eventq; s; s = s->next) {
+	AST_LIST_LOCK(&all_events);
+	AST_LIST_TRAVERSE(&all_events, s, eq_next) {
 		ast_cli(fd, "Usecount: %d\n",s->usecount);
 		ast_cli(fd, "Category: %d\n", s->category);
 		ast_cli(fd, "Event:\n%s", s->eventdata);
 	}
-	AST_LIST_UNLOCK(&sessions);
+	AST_LIST_UNLOCK(&all_events);
 
 	return RESULT_SUCCESS;
 }
@@ -506,12 +607,20 @@
  */
 static struct eventqent *unref_event(struct eventqent *e)
 {
-	struct eventqent *ret = e->next;
+	struct eventqent *ret = AST_LIST_NEXT(e, eq_next);
 	if (ast_atomic_dec_and_test(&e->usecount) && ret)
 		pthread_kill(accept_thread_ptr, SIGURG);
 	return ret;
 }
 
+static void ref_event(struct eventqent *e)
+{
+	ast_atomic_fetchadd_int(&e->usecount, 1);
+}
+
+/*
+ * destroy a session, leaving the usecount
+ */
 static void free_session(struct mansession *s)
 {
 	struct eventqent *eqe = s->last_ev;
@@ -521,14 +630,12 @@
 		free(s->outputstr);
 	ast_mutex_destroy(&s->__lock);
 	free(s);
-	while ( eqe )
-		eqe = unref_event(eqe);
+	unref_event(eqe);
 }
 
 static void destroy_session(struct mansession *s)
 {
 	AST_LIST_LOCK(&sessions);
-	ast_verbose("destroy session %lx\n", s->managerid);
 	AST_LIST_REMOVE(&sessions, s, list);
 	AST_LIST_UNLOCK(&sessions);
 
@@ -978,7 +1085,7 @@
 		ast_log(LOG_DEBUG, "Starting waiting for an event!\n");
 	for (x=0; ((x < timeout) || (timeout < 0)); x++) {
 		ast_mutex_lock(&s->__lock);
-		if (s->last_ev && s->last_ev->next)
+		if (NEW_EVENT(s))
 			needexit = 1;
 		if (s->waiting_thread != pthread_self())
 			needexit = 1;
@@ -1000,7 +1107,9 @@
 	if (s->waiting_thread == pthread_self()) {
 		struct eventqent *eqe;
 		astman_send_response(s, m, "Success", "Waiting for Event...");
+		/* Only show events if we're the most recent waiter */
 		while ( (eqe = NEW_EVENT(s)) ) {
+			ref_event(eqe);
 			if (((s->readperm & eqe->category) == eqe->category) &&
 			    ((s->send_events & eqe->category) == eqe->category)) {
 				astman_append(s, "%s", eqe->eventdata);
@@ -1707,9 +1816,8 @@
 	if (s->fd > -1) {
 		struct eventqent *eqe;
 
-		if (!s->last_ev)
-			s->last_ev = master_eventq;
 		while ( (eqe = NEW_EVENT(s)) ) {
+			ref_event(eqe);
 			if ((s->authenticated && (s->readperm & eqe->category) == eqe->category) &&
 			    ((s->send_events & eqe->category) == eqe->category)) {
 				if (!ret && ast_carefulwrite(s->fd, eqe->eventdata,
@@ -1885,10 +1993,8 @@
 				memset(&m, 0, sizeof(m));
 			} else if (m.hdrcount < AST_MAX_MANHEADERS - 1)
 				m.hdrcount++;
-		} else if (s->last_ev->next) {
-			if (process_events(s))
-				break;
-		}
+		} else if (process_events(s))
+			break;
 	}
 	/* session is over, explain why and terminate */
 	if (s->authenticated) {
@@ -1940,19 +2046,13 @@
 					ast_verbose(VERBOSE_PREFIX_2 "HTTP Manager '%s' timed out from %s\n",
 						s->username, ast_inet_ntoa(s->sin.sin_addr));
 				}
-				free_session(s);
+				free_session(s);	/* XXX outside ? */
 				break;
 			}
 		}
 		AST_LIST_TRAVERSE_SAFE_END
-		/* Purge master event queue of old, unused events, but make sure we
-		   always keep at least one in the queue */
-		while (master_eventq->next && !master_eventq->usecount) {
-			struct eventqent *eqe = master_eventq;
-			master_eventq = master_eventq->next;
-			free(eqe);
-		}
 		AST_LIST_UNLOCK(&sessions);
+		purge_unused();
 
 		sinlen = sizeof(sin);
 		pfds[0].fd = asock;
@@ -1998,12 +2098,9 @@
 		ast_atomic_fetchadd_int(&num_sessions, 1);
 		AST_LIST_LOCK(&sessions);
 		AST_LIST_INSERT_HEAD(&sessions, s, list);
+		AST_LIST_UNLOCK(&sessions);
 		/* Hook to the tail of the event queue */
-		s->last_ev = master_eventq;
-		while(s->last_ev->next)
-			s->last_ev = s->last_ev->next;
-		AST_LIST_UNLOCK(&sessions);
-		ast_atomic_fetchadd_int(&s->last_ev->usecount, 1);
+		s->last_ev = grab_last();
 		if (ast_pthread_create_background(&s->ms_t, &attr, session_do, s))
 			destroy_session(s);
 	}
@@ -2017,27 +2114,22 @@
  */
 static int append_event(const char *str, int category)
 {
-	struct eventqent *prev = NULL;
 	struct eventqent *tmp = ast_malloc(sizeof(*tmp) + strlen(str));
+	static int seq;	/* sequence number */
 
 	if (!tmp)
 		return -1;
 
 	/* need to init all fields, because ast_malloc() does not */
-	tmp->next = NULL;
-	tmp->usecount = num_sessions;
+	tmp->usecount = 0;
 	tmp->category = category;
+	tmp->seq = ast_atomic_fetchadd_int(&seq, 1);
+	AST_LIST_NEXT(tmp, eq_next) = NULL;
 	strcpy(tmp->eventdata, str);
 
-	if (master_eventq) {
-		prev = master_eventq;
-		while (prev->next)
-			prev = prev->next;
-		prev->next = tmp;
-	} else {
-		master_eventq = tmp;
-	}
-
+	AST_LIST_LOCK(&all_events);
+	AST_LIST_INSERT_TAIL(&all_events, tmp, eq_next);
+	AST_LIST_UNLOCK(&all_events);
 
 	return 0;
 }
@@ -2068,6 +2160,12 @@
 				"Timestamp: %ld.%06lu\r\n",
 				 now.tv_sec, (unsigned long) now.tv_usec);
 	}
+	if (numberevents) {
+		static int seq;
+		ast_dynamic_str_thread_append(&buf, 0, &manager_event_buf,
+				"SequenceNumber: %d\r\n",
+				 ast_atomic_fetchadd_int(&seq, 1));
+	}
 
 	va_start(ap, fmt);
 	ast_dynamic_str_thread_append_va(&buf, 0, &manager_event_buf, fmt, ap);
@@ -2075,10 +2173,10 @@
 
 	ast_dynamic_str_thread_append(&buf, 0, &manager_event_buf, "\r\n");
 
+	append_event(buf->str, category);
+
+	/* Wake up any sleeping sessions */
 	AST_LIST_LOCK(&sessions);
-	append_event(buf->str, category);
-
-	/* Wake up any sleeping sessions */
 	AST_LIST_TRAVERSE(&sessions, s, list) {
 		ast_mutex_lock(&s->__lock);
 		if (s->waiting_thread != AST_PTHREADT_NULL)
@@ -2442,14 +2540,10 @@
 		ast_mutex_lock(&s->__lock);
 		s->inuse = 1;
 		s->managerid = rand() | 1;	/* make sure it is non-zero */
+		s->last_ev = grab_last();
 		AST_LIST_LOCK(&sessions);
 		AST_LIST_INSERT_HEAD(&sessions, s, list);
-		/* Hook into the last spot in the event queue */
-		s->last_ev = master_eventq;
-		while (s->last_ev->next)
-			s->last_ev = s->last_ev->next;
 		AST_LIST_UNLOCK(&sessions);
-		ast_atomic_fetchadd_int(&s->last_ev->usecount, 1);
 		ast_atomic_fetchadd_int(&num_sessions, 1);
 	}
 
@@ -2690,6 +2784,9 @@
 
 	if ((val = ast_variable_retrieve(cfg, "general", "timestampevents")))
 		timestampevents = ast_true(val);
+
+	if ((val = ast_variable_retrieve(cfg, "general", "numberevents")))
+		numberevents = ast_true(val);
 
 	if ((val = ast_variable_retrieve(cfg, "general", "httptimeout")))
 		newhttptimeout = atoi(val);



More information about the svn-commits mailing list