[asterisk-commits] file: branch file/bridging r180364 - /team/file/bridging/bridges/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Mar 5 09:50:36 CST 2009
Author: file
Date: Thu Mar 5 09:50:33 2009
New Revision: 180364
URL: http://svn.digium.com/svn-view/asterisk?view=rev&rev=180364
Log:
Use a pipe as a nudge mechanism for the multiplexed thread. This will actually come into greater play with
phase 2 work where I plan to have the multiplexed thread live for awhile after the last channel leaves so
that we do not incur the cost of starting up a new thread. (thanks Vadim!)
Modified:
team/file/bridging/bridges/bridge_multiplexed.c
Modified: team/file/bridging/bridges/bridge_multiplexed.c
URL: http://svn.digium.com/svn-view/asterisk/team/file/bridging/bridges/bridge_multiplexed.c?view=diff&rev=180364&r1=180363&r2=180364
==============================================================================
--- team/file/bridging/bridges/bridge_multiplexed.c (original)
+++ team/file/bridging/bridges/bridge_multiplexed.c Thu Mar 5 09:50:33 2009
@@ -34,7 +34,7 @@
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
-#include <signal.h>
+#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/channel.h"
@@ -53,6 +53,8 @@
struct multiplexed_thread {
/*! Thread itself */
pthread_t thread;
+ /*! Pipe used to wake up the multiplexed thread */
+ int pipe[2];
/*! Channels in this thread */
struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
/*! Number of channels in this thread */
@@ -73,6 +75,21 @@
return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
}
+/*! \brief Destroy callback for a multiplexed thread structure */
+static void destroy_multiplexed_thread(void *obj)
+{
+ struct multiplexed_thread *multiplexed_thread = obj;
+
+ if (multiplexed_thread->pipe[0] > -1) {
+ close(multiplexed_thread->pipe[0]);
+ }
+ if (multiplexed_thread->pipe[1] > -1) {
+ close(multiplexed_thread->pipe[1]);
+ }
+
+ return;
+}
+
/*! \brief Create function which finds/reserves/references a multiplexed thread structure */
static int multiplexed_bridge_create(struct ast_bridge *bridge)
{
@@ -82,14 +99,43 @@
/* Try to find an existing thread to handle our additional channels */
if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
+ int flags;
+
/* If we failed we will have to create a new one from scratch */
- if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), NULL))) {
+ if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
ao2_unlock(multiplexed_threads);
return -1;
}
+
+ multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
+ /* Setup a pipe so we can poke the thread itself when needed */
+ if (pipe(multiplexed_thread->pipe)) {
+ ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
+ ao2_ref(multiplexed_thread, -1);
+ ao2_unlock(multiplexed_threads);
+ return -1;
+ }
+
+ /* Setup each pipe for non-blocking operation */
+ flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
+ if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
+ ao2_ref(multiplexed_thread, -1);
+ ao2_unlock(multiplexed_threads);
+ return -1;
+ }
+ flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
+ if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
+ ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
+ ao2_ref(multiplexed_thread, -1);
+ ao2_unlock(multiplexed_threads);
+ return -1;
+ }
+
/* Set up default parameters */
multiplexed_thread->thread = AST_PTHREADT_NULL;
+
/* Finally link us into the container so others may find us */
ao2_link(multiplexed_threads, multiplexed_thread);
ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
@@ -110,12 +156,17 @@
/*! \brief Internal function which nudges the thread */
static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
{
+ int nudge = 0;
+
if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
return;
}
+ if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
+ ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
+ }
+
while (multiplexed_thread->waiting) {
- pthread_kill(multiplexed_thread->thread, SIGURG);
usleep(1);
}
@@ -149,6 +200,7 @@
static void *multiplexed_thread_function(void *data)
{
struct multiplexed_thread *multiplexed_thread = data;
+ int fds = multiplexed_thread->pipe[0];
ao2_lock(multiplexed_thread);
@@ -156,7 +208,7 @@
while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
- int to = -1;
+ int to = -1, outfd = -1;
/* Move channels around so not just the first one gets priority */
memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
@@ -164,10 +216,19 @@
ao2_unlock(multiplexed_thread);
multiplexed_thread->waiting = 1;
- winner = ast_waitfor_n(multiplexed_thread->chans, multiplexed_thread->service_count, &to);
+ winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
multiplexed_thread->waiting = 0;
ao2_lock(multiplexed_thread);
+ if (outfd > -1) {
+ int nudge;
+
+ if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
+ if (errno != EINTR && errno != EAGAIN) {
+ ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
+ }
+ }
+ }
if (winner && winner->bridge) {
ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
}
More information about the asterisk-commits
mailing list