[asterisk-commits] file: branch file/bridging r180364 - /team/file/bridging/bridges/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Mar 5 09:50:36 CST 2009


Author: file
Date: Thu Mar  5 09:50:33 2009
New Revision: 180364

URL: http://svn.digium.com/svn-view/asterisk?view=rev&rev=180364
Log:
Use a pipe as a nudge mechanism for the multiplexed thread. This will actually come into greater play with
phase 2 work where I plan to have the multiplexed thread live for awhile after the last channel leaves so
that we do not incur the cost of starting up a new thread. (thanks Vadim!)

Modified:
    team/file/bridging/bridges/bridge_multiplexed.c

Modified: team/file/bridging/bridges/bridge_multiplexed.c
URL: http://svn.digium.com/svn-view/asterisk/team/file/bridging/bridges/bridge_multiplexed.c?view=diff&rev=180364&r1=180363&r2=180364
==============================================================================
--- team/file/bridging/bridges/bridge_multiplexed.c (original)
+++ team/file/bridging/bridges/bridge_multiplexed.c Thu Mar  5 09:50:33 2009
@@ -34,7 +34,7 @@
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <signal.h>
+#include <fcntl.h>
 
 #include "asterisk/module.h"
 #include "asterisk/channel.h"
@@ -53,6 +53,8 @@
 struct multiplexed_thread {
 	/*! Thread itself */
 	pthread_t thread;
+	/*! Pipe used to wake up the multiplexed thread */
+	int pipe[2];
 	/*! Channels in this thread */
 	struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
 	/*! Number of channels in this thread */
@@ -73,6 +75,21 @@
 	return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! \brief Destroy callback for a multiplexed thread structure */
+static void destroy_multiplexed_thread(void *obj)
+{
+	struct multiplexed_thread *multiplexed_thread = obj;
+
+	if (multiplexed_thread->pipe[0] > -1) {
+		close(multiplexed_thread->pipe[0]);
+	}
+	if (multiplexed_thread->pipe[1] > -1) {
+		close(multiplexed_thread->pipe[1]);
+	}
+
+	return;
+}
+
 /*! \brief Create function which finds/reserves/references a multiplexed thread structure */
 static int multiplexed_bridge_create(struct ast_bridge *bridge)
 {
@@ -82,14 +99,43 @@
 
 	/* Try to find an existing thread to handle our additional channels */
 	if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
+		int flags;
+
 		/* If we failed we will have to create a new one from scratch */
-		if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), NULL))) {
+		if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
 			ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
 			ao2_unlock(multiplexed_threads);
 			return -1;
 		}
+
+		multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
+		/* Setup a pipe so we can poke the thread itself when needed */
+		if (pipe(multiplexed_thread->pipe)) {
+			ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
+			ao2_ref(multiplexed_thread, -1);
+			ao2_unlock(multiplexed_threads);
+			return -1;
+		}
+
+		/* Setup each pipe for non-blocking operation */
+		flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
+		if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+			ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
+			ao2_ref(multiplexed_thread, -1);
+			ao2_unlock(multiplexed_threads);
+			return -1;
+		}
+		flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
+		if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
+			ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
+			ao2_ref(multiplexed_thread, -1);
+			ao2_unlock(multiplexed_threads);
+			return -1;
+		}
+
 		/* Set up default parameters */
 		multiplexed_thread->thread = AST_PTHREADT_NULL;
+
 		/* Finally link us into the container so others may find us */
 		ao2_link(multiplexed_threads, multiplexed_thread);
 		ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
@@ -110,12 +156,17 @@
 /*! \brief Internal function which nudges the thread */
 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
 {
+	int nudge = 0;
+
 	if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
 		return;
 	}
 
+	if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
+		ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
+	}
+
 	while (multiplexed_thread->waiting) {
-		pthread_kill(multiplexed_thread->thread, SIGURG);
 		usleep(1);
 	}
 
@@ -149,6 +200,7 @@
 static void *multiplexed_thread_function(void *data)
 {
 	struct multiplexed_thread *multiplexed_thread = data;
+	int fds = multiplexed_thread->pipe[0];
 
 	ao2_lock(multiplexed_thread);
 
@@ -156,7 +208,7 @@
 
 	while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
 		struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
-		int to = -1;
+		int to = -1, outfd = -1;
 
 		/* Move channels around so not just the first one gets priority */
 		memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
@@ -164,10 +216,19 @@
 
 		ao2_unlock(multiplexed_thread);
 		multiplexed_thread->waiting = 1;
-		winner = ast_waitfor_n(multiplexed_thread->chans, multiplexed_thread->service_count, &to);
+		winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
 		multiplexed_thread->waiting = 0;
 		ao2_lock(multiplexed_thread);
 
+		if (outfd > -1) {
+			int nudge;
+
+			if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
+				if (errno != EINTR && errno != EAGAIN) {
+					ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
+				}
+			}
+		}
 		if (winner && winner->bridge) {
 			ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
 		}




More information about the asterisk-commits mailing list