[Asterisk-code-review] ConfBridge: Rework announcer channel methodology (asterisk[certified/13.8])

Mark Michelson asteriskteam at digium.com
Thu Aug 11 13:25:11 CDT 2016


Mark Michelson has uploaded a new change for review.

  https://gerrit.asterisk.org/3505

Change subject: ConfBridge: Rework announcer channel methodology
......................................................................

ConfBridge: Rework announcer channel methodology

One feature that confbridge has is the ability to play sounds to all
participants in the conference. Prior to this commit, the algorithm for
this was as follows:

* Grab the playback lock
* Push the conference announcer channel into the bridge
* Play back the sound
* Pull the conference announcer channel from the bridge
* Release the playback lock

The issue here is that the act of adding the playback channel to the
bridge and removing it for each announcement is expensive. Amongst the
expenses:

* The announcer channel is imparted into the bridge, meaning a new
  thread is spun up for each playback.
* When the announcer is added or removed from the bridge, it results
  in the BRIDGEPEER channel variable being set on all channels in the
  bridge. This requires keeping the bridge locked and locking each
  individual channel in order to set it.
* There's also just the general overhead of adding the channel and
  removing it from the bridge. The bridge potentially has to reconfigure
  every single time

With this commit, the paradigm for playing back announcements has
shifted.

* The announcer channel is now added to the bridge when the conference
  is allocated, and it is hung up when the conference is destroyed.
* A taskprocessor is used to queue playbacks onto the announcer channel.
  This keeps the behavior from before where playbacks do not overlap.
* The announcer channel is no longer placed into the bridge as
  departable. Since we are not constantly removing the channel from
  the bridge, it is safe to add the channel using an independent thread
  and simply hang the channel up when it is time for the conference to
  be destroyed.

The use of the taskprocessor for playbacks opens up the interesting
possibility of having asynchronous announcements played. In this commit,
however, the behavior is still exactly the same as it previously was.

ASTERISK-26289
Reported by Mark Michelson

Change-Id: Ic5cd2c4b98a1eaa1715eb7a5b35d62f1a76d78a5
---
M apps/app_confbridge.c
M apps/confbridge/conf_chan_announce.c
M apps/confbridge/include/confbridge.h
3 files changed, 226 insertions(+), 85 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/05/3505/1

diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c
index 55b7b12..f603264 100644
--- a/apps/app_confbridge.c
+++ b/apps/app_confbridge.c
@@ -71,6 +71,7 @@
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/json.h"
 #include "asterisk/format_cache.h"
+#include "asterisk/taskprocessor.h"
 
 /*** DOCUMENTATION
 	<application name="ConfBridge" language="en_US">
@@ -959,6 +960,59 @@
 	ao2_unlock(conference);
 }
 
+struct hangup_data
+{
+	struct confbridge_conference *conference;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int hungup;
+};
+
+/*!
+ * \brief Hang up the announcer channel
+ *
+ * This hangs up the announcer channel in the conference. This
+ * runs in the playback queue taskprocessor since we do not want
+ * to hang up the channel while it's trying to play an announcement.
+ *
+ * This task is performed synchronously, so there is no need to
+ * perform any cleanup on the passed-in data.
+ *
+ * \param data A hangup_data structure
+ * \return 0
+ */
+static int hangup_playback(void *data)
+{
+	struct hangup_data *hangup = data;
+
+	ast_autoservice_stop(hangup->conference->playback_chan);
+
+	ast_hangup(hangup->conference->playback_chan);
+	hangup->conference->playback_chan = NULL;
+
+	ast_mutex_lock(&hangup->lock);
+	hangup->hungup = 1;
+	ast_cond_signal(&hangup->cond);
+	ast_mutex_unlock(&hangup->lock);
+
+	return 0;
+}
+
+static void hangup_data_init(struct hangup_data *hangup, struct confbridge_conference *conference)
+{
+	ast_mutex_init(&hangup->lock);
+	ast_cond_init(&hangup->cond, NULL);
+
+	hangup->conference = conference;
+	hangup->hungup = 0;
+}
+
+static void hangup_data_destroy(struct hangup_data *hangup)
+{
+	ast_mutex_destroy(&hangup->lock);
+	ast_cond_destroy(&hangup->cond);
+}
+
 /*!
  * \brief Destroy a conference bridge
  *
@@ -973,9 +1027,22 @@
 	ast_debug(1, "Destroying conference bridge '%s'\n", conference->name);
 
 	if (conference->playback_chan) {
-		conf_announce_channel_depart(conference->playback_chan);
-		ast_hangup(conference->playback_chan);
-		conference->playback_chan = NULL;
+		if (conference->playback_queue) {
+			struct hangup_data hangup;
+			hangup_data_init(&hangup, conference);
+			ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup);
+
+			ast_mutex_lock(&hangup.lock);
+			while (!hangup.hungup) {
+				ast_cond_wait(&hangup.cond, &hangup.lock);
+			}
+			ast_mutex_unlock(&hangup.lock);
+			hangup_data_destroy(&hangup);
+		} else {
+			/* Playback queue is not yet allocated. Just hang up the channel straight */
+			ast_hangup(conference->playback_chan);
+			conference->playback_chan = NULL;
+		}
 	}
 
 	/* Destroying a conference bridge is simple, all we have to do is destroy the bridging object */
@@ -989,7 +1056,7 @@
 	ast_free(conference->record_filename);
 
 	conf_bridge_profile_destroy(&conference->b_profile);
-	ast_mutex_destroy(&conference->playback_lock);
+	ast_taskprocessor_unreference(conference->playback_queue);
 }
 
 /*! \brief Call the proper join event handler for the user for the conference bridge's current state
@@ -1262,6 +1329,69 @@
 }
 
 /*!
+ * \internal
+ * \brief Allocate playback channel for a conference.
+ * \pre expects conference to be locked before calling this function
+ */
+static int alloc_playback_chan(struct confbridge_conference *conference)
+{
+	struct ast_format_cap *cap;
+
+	cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
+	if (!cap) {
+		return -1;
+	}
+	ast_format_cap_append(cap, ast_format_slin, 0);
+	conference->playback_chan = ast_request("CBAnn", cap, NULL, NULL,
+		conference->name, NULL);
+	ao2_ref(cap, -1);
+	if (!conference->playback_chan) {
+		return -1;
+	}
+
+	/* To make sure playback_chan has the same language of that profile */
+	ast_channel_lock(conference->playback_chan);
+	ast_channel_language_set(conference->playback_chan, conference->b_profile.language);
+	ast_channel_unlock(conference->playback_chan);
+
+	ast_debug(1, "Created announcer channel '%s' to conference bridge '%s'\n",
+		ast_channel_name(conference->playback_chan), conference->name);
+
+	conference->playback_queue = ast_taskprocessor_get(conference->name, TPS_REF_DEFAULT);
+	if (!conference->playback_queue) {
+		ast_hangup(conference->playback_chan);
+		conference->playback_chan = NULL;
+		return -1;
+	}
+	return 0;
+}
+
+/*!
+ * \brief Push the announcer channel into the bridge
+ *
+ * This runs in the playback queue taskprocessor.
+ *
+ * \param data A confbridge_conference
+ * \retval 0 Success
+ * \retval -1 Failed to push the channel to the bridge
+ */
+static int push_announcer(void *data)
+{
+	struct confbridge_conference *conference = data;
+
+	if (conf_announce_channel_push(conference->playback_chan)) {
+		ast_hangup(conference->playback_chan);
+		conference->playback_chan = NULL;
+		ao2_cleanup(conference);
+		return -1;
+	}
+
+	ast_autoservice_start(conference->playback_chan);
+	ao2_cleanup(conference);
+	return 0;
+}
+
+/*!
  * \brief Join a conference bridge
  *
  * \param conference_name The conference name
@@ -1306,9 +1436,6 @@
 			return NULL;
 		}
 
-		/* Setup lock for playback channel */
-		ast_mutex_init(&conference->playback_lock);
-
 		/* Setup for the record channel */
 		conference->record_filename = ast_str_create(RECORD_FILENAME_INITIAL_SPACE);
 		if (!conference->record_filename) {
@@ -1352,6 +1479,22 @@
 
 		/* Set the initial state to EMPTY */
 		conference->state = CONF_STATE_EMPTY;
+
+		if (alloc_playback_chan(conference)) {
+			ao2_unlink(conference_bridges, conference);
+			ao2_ref(conference, -1);
+			ao2_unlock(conference_bridges);
+			ast_log(LOG_ERROR, "Could not allocate announcer channel for conference '%s'\n", conference_name);
+			return NULL;
+		}
+
+		if (ast_taskprocessor_push(conference->playback_queue, push_announcer, ao2_bump(conference))) {
+			ao2_unlink(conference_bridges, conference);
+			ao2_ref(conference, -1);
+			ao2_unlock(conference_bridges);
+			ast_log(LOG_ERROR, "Could not add announcer channel for conference '%s' bridge\n", conference_name);
+			return NULL;
+		}
 
 		if (ast_test_flag(&conference->b_profile, BRIDGE_OPT_RECORD_CONFERENCE)) {
 			ao2_lock(conference);
@@ -1466,67 +1609,102 @@
 	user->conference = NULL;
 }
 
+struct playback_task_data {
+	struct confbridge_conference *conference;
+	const char *filename;
+	int say_number;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int playback_finished;
+};
+
 /*!
- * \internal
- * \brief Allocate playback channel for a conference.
- * \pre expects conference to be locked before calling this function
+ * \brief Play an announcement into a confbridge
+ *
+ * This runs in the playback queue taskprocessor. This ensures that
+ * all playbacks are handled in sequence and do not play over top one
+ * another.
+ *
+ * This task runs synchronously so there is no need for performing any
+ * sort of cleanup on the input parameter.
+ *
+ * \param data A playback_task_data
+ * \return 0
  */
-static int alloc_playback_chan(struct confbridge_conference *conference)
+static int playback_task(void *data)
 {
-	struct ast_format_cap *cap;
+	struct playback_task_data *ptd = data;
 
-	cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
-	if (!cap) {
-		return -1;
-	}
-	ast_format_cap_append(cap, ast_format_slin, 0);
-	conference->playback_chan = ast_request("CBAnn", cap, NULL, NULL,
-		conference->name, NULL);
-	ao2_ref(cap, -1);
-	if (!conference->playback_chan) {
-		return -1;
+	/* Don't try to play if the playback channel has been hung up */
+	if (!ptd->conference->playback_chan) {
+		return 0;
 	}
 
-	/* To make sure playback_chan has the same language of that profile */
-	ast_channel_lock(conference->playback_chan);
-	ast_channel_language_set(conference->playback_chan, conference->b_profile.language);
-	ast_channel_unlock(conference->playback_chan);
+	ast_autoservice_stop(ptd->conference->playback_chan);
 
-	ast_debug(1, "Created announcer channel '%s' to conference bridge '%s'\n",
-		ast_channel_name(conference->playback_chan), conference->name);
+	/* The channel is all under our control, in goes the prompt */
+	if (!ast_strlen_zero(ptd->filename)) {
+		ast_stream_and_wait(ptd->conference->playback_chan, ptd->filename, "");
+	} else if (ptd->say_number >= 0) {
+		ast_say_number(ptd->conference->playback_chan, ptd->say_number, "",
+			ast_channel_language(ptd->conference->playback_chan), NULL);
+	}
+	ast_autoservice_start(ptd->conference->playback_chan);
+
+	ast_mutex_lock(&ptd->lock);
+	ptd->playback_finished = 1;
+	ast_cond_signal(&ptd->cond);
+	ast_mutex_unlock(&ptd->lock);
+
 	return 0;
+}
+
+static void playback_task_data_init(struct playback_task_data *ptd, struct confbridge_conference *conference,
+		const char *filename, int say_number)
+{
+	ast_mutex_init(&ptd->lock);
+	ast_cond_init(&ptd->cond, NULL);
+
+	ptd->filename = filename;
+	ptd->say_number = say_number;
+	ptd->conference = conference;
+	ptd->playback_finished = 0;
+}
+
+static void playback_task_data_destroy(struct playback_task_data *ptd)
+{
+	ast_mutex_destroy(&ptd->lock);
+	ast_cond_destroy(&ptd->cond);
 }
 
 static int play_sound_helper(struct confbridge_conference *conference, const char *filename, int say_number)
 {
+	struct playback_task_data ptd;
+
 	/* Do not waste resources trying to play files that do not exist */
 	if (!ast_strlen_zero(filename) && !sound_file_exists(filename)) {
 		return 0;
 	}
 
-	ast_mutex_lock(&conference->playback_lock);
-	if (!conference->playback_chan && alloc_playback_chan(conference)) {
-		ast_mutex_unlock(&conference->playback_lock);
-		return -1;
-	}
-	if (conf_announce_channel_push(conference->playback_chan)) {
-		ast_mutex_unlock(&conference->playback_lock);
+	playback_task_data_init(&ptd, conference, filename, say_number);
+	if (ast_taskprocessor_push(conference->playback_queue, playback_task, &ptd)) {
+		if (!ast_strlen_zero(filename)) {
+			ast_log(LOG_WARNING, "Unable to play file '%s' to conference\n", filename);
+		} else {
+			ast_log(LOG_WARNING, "Unable to say number '%d' to conference\n", say_number);
+		}
+		playback_task_data_destroy(&ptd);
 		return -1;
 	}
 
-	/* The channel is all under our control, in goes the prompt */
-	if (!ast_strlen_zero(filename)) {
-		ast_stream_and_wait(conference->playback_chan, filename, "");
-	} else if (say_number >= 0) {
-		ast_say_number(conference->playback_chan, say_number, "",
-			ast_channel_language(conference->playback_chan), NULL);
+	/* Wait for the playback to complete */
+	ast_mutex_lock(&ptd.lock);
+	while (!ptd.playback_finished) {
+		ast_cond_wait(&ptd.cond, &ptd.lock);
 	}
+	ast_mutex_unlock(&ptd.lock);
 
-	ast_debug(1, "Departing announcer channel '%s' from conference bridge '%s'\n",
-		ast_channel_name(conference->playback_chan), conference->name);
-	conf_announce_channel_depart(conference->playback_chan);
-
-	ast_mutex_unlock(&conference->playback_lock);
+	playback_task_data_destroy(&ptd);
 
 	return 0;
 }
diff --git a/apps/confbridge/conf_chan_announce.c b/apps/confbridge/conf_chan_announce.c
index 6596a85..b2ae1e5 100644
--- a/apps/confbridge/conf_chan_announce.c
+++ b/apps/confbridge/conf_chan_announce.c
@@ -143,31 +143,6 @@
 	return &announce_tech;
 }
 
-void conf_announce_channel_depart(struct ast_channel *chan)
-{
-	struct announce_pvt *p = ast_channel_tech_pvt(chan);
-
-	if (!p) {
-		return;
-	}
-
-	ao2_ref(p, +1);
-	ao2_lock(p);
-	if (!ast_test_flag(&p->base, AST_UNREAL_CARETAKER_THREAD)) {
-		ao2_unlock(p);
-		ao2_ref(p, -1);
-		return;
-	}
-	ast_clear_flag(&p->base, AST_UNREAL_CARETAKER_THREAD);
-	chan = p->base.chan;
-	ao2_unlock(p);
-	ao2_ref(p, -1);
-	if (chan) {
-		ast_bridge_depart(chan);
-		ast_channel_unref(chan);
-	}
-}
-
 int conf_announce_channel_push(struct ast_channel *ast)
 {
 	struct ast_bridge_features *features;
@@ -186,7 +161,6 @@
 		if (!chan) {
 			return -1;
 		}
-		ast_channel_ref(chan);
 	}
 
 	features = ast_bridge_features_new();
@@ -198,9 +172,8 @@
 
 	/* Impart the output channel into the bridge */
 	if (ast_bridge_impart(p->bridge, chan, NULL, features,
-		AST_BRIDGE_IMPART_CHAN_DEPARTABLE)) {
+		AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
 		ast_bridge_features_destroy(features);
-		ast_channel_unref(chan);
 		return -1;
 	}
 	ao2_lock(p);
diff --git a/apps/confbridge/include/confbridge.h b/apps/confbridge/include/confbridge.h
index 8d2dffb..79e496b 100644
--- a/apps/confbridge/include/confbridge.h
+++ b/apps/confbridge/include/confbridge.h
@@ -224,9 +224,9 @@
 	struct ast_channel *record_chan;                                  /*!< Channel used for recording the conference */
 	struct ast_str *record_filename;                                  /*!< Recording filename. */
 	struct ast_str *orig_rec_file;                                    /*!< Previous b_profile.rec_file. */
-	ast_mutex_t playback_lock;                                        /*!< Lock used for playback channel */
 	AST_LIST_HEAD_NOLOCK(, confbridge_user) active_list;              /*!< List of users participating in the conference bridge */
 	AST_LIST_HEAD_NOLOCK(, confbridge_user) waiting_list;             /*!< List of users waiting to join the conference bridge */
+	struct ast_taskprocessor *playback_queue;
 };
 
 extern struct ao2_container *conference_bridges;
@@ -604,16 +604,6 @@
  * \return ConfBridge announce channel technology.
  */
 struct ast_channel_tech *conf_announce_get_tech(void);
-
-/*!
- * \brief Remove the announcer channel from the conference.
- * \since 12.0.0
- *
- * \param chan Either channel in the announcer channel pair.
- *
- * \return Nothing
- */
-void conf_announce_channel_depart(struct ast_channel *chan);
 
 /*!
  * \brief Push the announcer channel into the conference.

-- 
To view, visit https://gerrit.asterisk.org/3505
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic5cd2c4b98a1eaa1715eb7a5b35d62f1a76d78a5
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: certified/13.8
Gerrit-Owner: Mark Michelson <mmichelson at digium.com>



More information about the asterisk-code-review mailing list