<p>Joshua Colp <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/5801">View Change</a></p><div style="white-space:pre-wrap">Approvals:
George Joseph: Looks good to me, but someone else must approve
Kevin Harwell: Looks good to me, approved
Joshua Colp: Approved for Submit
</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">bridge: Add a deferred queue.<br><br>This change adds a deferred queue to bridging. If a bridge<br>technology determines that a frame can not be written and<br>should be deferred it can indicate back to bridging to do so.<br>Bridging will then requeue any deferred frames upon a new<br>channel joining the bridge.<br><br>This change has been leveraged for T.38 request negotiate<br>control frames. Without the deferred queue there is a race<br>condition between the bridge receiving the T.38 request<br>negotiate and the second channel joining and being in the<br>bridge. If the channel is not yet in the bridge then the T.38<br>negotiation fails.<br><br>A unit test has also been added that confirms that a T.38<br>request negotiate control frame is deferred when no other<br>channel is in the bridge and that it is requeued when a new<br>channel joins the bridge.<br><br>ASTERISK-26923<br><br>Change-Id: Ie05b08523f399eae579130f4a5f562a344d2e415<br>---<br>M bridges/bridge_native_rtp.c<br>M bridges/bridge_simple.c<br>M include/asterisk/bridge_channel.h<br>M include/asterisk/bridge_channel_internal.h<br>M include/asterisk/bridge_technology.h<br>M main/bridge.c<br>M main/bridge_channel.c<br>A tests/test_bridging.c<br>8 files changed, 397 insertions(+), 10 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/bridges/bridge_native_rtp.c b/bridges/bridge_native_rtp.c<br>index 77e321f..4af93bf 100644<br>--- a/bridges/bridge_native_rtp.c<br>+++ b/bridges/bridge_native_rtp.c<br>@@ -508,7 +508,37 @@<br> <br> static int native_rtp_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br>- return ast_bridge_queue_everyone_else(bridge, bridge_channel, frame);<br>+ const struct ast_control_t38_parameters *t38_parameters;<br>+ int defer = 0;<br>+<br>+ if (!ast_bridge_queue_everyone_else(bridge, bridge_channel, frame)) {<br>+ /* This frame was successfully queued so no need to defer */<br>+ return 0;<br>+ }<br>+<br>+ /* Depending on the frame defer it so when the next channel joins it receives it */<br>+ switch (frame->frametype) {<br>+ case AST_FRAME_CONTROL:<br>+ switch (frame->subclass.integer) {<br>+ case AST_CONTROL_T38_PARAMETERS:<br>+ t38_parameters = frame->data.ptr;<br>+ switch (t38_parameters->request_response) {<br>+ case AST_T38_REQUEST_NEGOTIATE:<br>+ defer = -1;<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+<br>+ return defer;<br> }<br> <br> static struct ast_bridge_technology native_rtp_bridge = {<br>diff --git a/bridges/bridge_simple.c b/bridges/bridge_simple.c<br>index 3bf0403..a49bc39 100644<br>--- a/bridges/bridge_simple.c<br>+++ b/bridges/bridge_simple.c<br>@@ -71,7 +71,37 @@<br> <br> static int simple_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br>- return ast_bridge_queue_everyone_else(bridge, bridge_channel, frame);<br>+ const struct ast_control_t38_parameters *t38_parameters;<br>+ int defer = 0;<br>+<br>+ if (!ast_bridge_queue_everyone_else(bridge, bridge_channel, frame)) {<br>+ /* This frame was successfully queued so no need to defer */<br>+ return 0;<br>+ }<br>+<br>+ /* Depending on the frame defer it so when the next channel joins it receives it */<br>+ switch (frame->frametype) {<br>+ case AST_FRAME_CONTROL:<br>+ switch (frame->subclass.integer) {<br>+ case AST_CONTROL_T38_PARAMETERS:<br>+ t38_parameters = frame->data.ptr;<br>+ switch (t38_parameters->request_response) {<br>+ case AST_T38_REQUEST_NEGOTIATE:<br>+ defer = -1;<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+ break;<br>+ default:<br>+ break;<br>+ }<br>+<br>+ return defer;<br> }<br> <br> static struct ast_bridge_technology simple_bridge = {<br>diff --git a/include/asterisk/bridge_channel.h b/include/asterisk/bridge_channel.h<br>index dd72f32..4d33260 100644<br>--- a/include/asterisk/bridge_channel.h<br>+++ b/include/asterisk/bridge_channel.h<br>@@ -145,6 +145,8 @@<br> AST_LIST_ENTRY(ast_bridge_channel) entry;<br> /*! Queue of outgoing frames to the channel. */<br> AST_LIST_HEAD_NOLOCK(, ast_frame) wr_queue;<br>+ /*! Queue of deferred frames, queued onto channel when other party joins. */<br>+ AST_LIST_HEAD_NOLOCK(, ast_frame) deferred_queue;<br> /*! Pipe to alert thread when frames are put into the wr_queue. */<br> int alert_pipe[2];<br> /*!<br>diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h<br>index fb8e781..ba71e9f 100644<br>--- a/include/asterisk/bridge_channel_internal.h<br>+++ b/include/asterisk/bridge_channel_internal.h<br>@@ -98,6 +98,17 @@<br> <br> /*!<br> * \internal<br>+ * \brief Queue any deferred frames on the channel.<br>+ * \since 13.17.0<br>+ *<br>+ * \param bridge_channel Channel that the deferred frames should be pulled from and queued to.<br>+ *<br>+ * \return Nothing<br>+ */<br>+void bridge_channel_queue_deferred_frames(struct ast_bridge_channel *bridge_channel);<br>+<br>+/*!<br>+ * \internal<br> * \brief Push the bridge channel into its specified bridge.<br> * \since 12.0.0<br> *<br>diff --git a/include/asterisk/bridge_technology.h b/include/asterisk/bridge_technology.h<br>index 09b0fc0..8cebe93 100644<br>--- a/include/asterisk/bridge_technology.h<br>+++ b/include/asterisk/bridge_technology.h<br>@@ -156,6 +156,9 @@<br> * \retval -1 Frame needs to be deferred.<br> *<br> * \note On entry, bridge is already locked.<br>+ *<br>+ * \note Deferred frames will be automatically queued onto the channel when another<br>+ * channel joins the bridge.<br> */<br> int (*write)(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame);<br> /*!<br>diff --git a/main/bridge.c b/main/bridge.c<br>index 4631e5a..8cde62c 100644<br>--- a/main/bridge.c<br>+++ b/main/bridge.c<br>@@ -476,6 +476,7 @@<br> }<br> <br> AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {<br>+ bridge_channel_queue_deferred_frames(bridge_channel);<br> if (!bridge_channel->just_joined) {<br> continue;<br> }<br>diff --git a/main/bridge_channel.c b/main/bridge_channel.c<br>index 02783b1..e8ab8a8 100644<br>--- a/main/bridge_channel.c<br>+++ b/main/bridge_channel.c<br>@@ -638,18 +638,21 @@<br> static int bridge_channel_write_frame(struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br> const struct ast_control_t38_parameters *t38_parameters;<br>+ int deferred;<br> <br> ast_assert(frame->frametype != AST_FRAME_BRIDGE_ACTION_SYNC);<br> <br> ast_bridge_channel_lock_bridge(bridge_channel);<br>-/*<br>- * XXX need to implement a deferred write queue for when there<br>- * is no peer channel in the bridge (yet or it was kicked).<br>- *<br>- * The tech decides if a frame needs to be pushed back for deferral.<br>- * simple_bridge/native_bridge are likely the only techs that will do this.<br>- */<br>- bridge_channel->bridge->technology->write(bridge_channel->bridge, bridge_channel, frame);<br>+<br>+ deferred = bridge_channel->bridge->technology->write(bridge_channel->bridge, bridge_channel, frame);<br>+ if (deferred) {<br>+ struct ast_frame *dup;<br>+<br>+ dup = ast_frdup(frame);<br>+ if (dup) {<br>+ AST_LIST_INSERT_HEAD(&bridge_channel->deferred_queue, dup, frame_list);<br>+ }<br>+ }<br> <br> /* Remember any owed events to the bridge. */<br> switch (frame->frametype) {<br>@@ -751,6 +754,18 @@<br> bridge_channel->owed.t38_terminate = 0;<br> orig_bridge->technology->write(orig_bridge, NULL, &frame);<br> }<br>+}<br>+<br>+void bridge_channel_queue_deferred_frames(struct ast_bridge_channel *bridge_channel)<br>+{<br>+ struct ast_frame *frame;<br>+<br>+ ast_channel_lock(bridge_channel->chan);<br>+ while ((frame = AST_LIST_REMOVE_HEAD(&bridge_channel->deferred_queue, frame_list))) {<br>+ ast_queue_frame_head(bridge_channel->chan, frame);<br>+ ast_frfree(frame);<br>+ }<br>+ ast_channel_unlock(bridge_channel->chan);<br> }<br> <br> /*!<br>@@ -2933,6 +2948,11 @@<br> }<br> ast_alertpipe_close(bridge_channel->alert_pipe);<br> <br>+ /* Flush any unhandled deferred_queue frames. */<br>+ while ((fr = AST_LIST_REMOVE_HEAD(&bridge_channel->deferred_queue, frame_list))) {<br>+ ast_frfree(fr);<br>+ }<br>+<br> ast_cond_destroy(&bridge_channel->cond);<br> <br> ao2_cleanup(bridge_channel->write_format);<br>diff --git a/tests/test_bridging.c b/tests/test_bridging.c<br>new file mode 100644<br>index 0000000..74595c4<br>--- /dev/null<br>+++ b/tests/test_bridging.c<br>@@ -0,0 +1,290 @@<br>+/*<br>+ * Asterisk -- An open source telephony toolkit.<br>+ *<br>+ * Copyright (C) 2017, Digium, Inc.<br>+ *<br>+ * Joshua Colp <jcolp@digium.com><br>+ *<br>+ * See http://www.asterisk.org for more information about<br>+ * the Asterisk project. Please do not directly contact<br>+ * any of the maintainers of this project for assistance;<br>+ * the project provides a web site, mailing lists and IRC<br>+ * channels for your use.<br>+ *<br>+ * This program is free software, distributed under the terms of<br>+ * the GNU General Public License Version 2. See the LICENSE file<br>+ * at the top of the source tree.<br>+ */<br>+<br>+/*!<br>+ * \file<br>+ * \brief Bridging unit tests<br>+ *<br>+ * \author Joshua Colp <jcolp@digium.com><br>+ *<br>+ */<br>+<br>+/*** MODULEINFO<br>+ <depend>TEST_FRAMEWORK</depend><br>+ <support_level>core</support_level><br>+ ***/<br>+<br>+#include "asterisk.h"<br>+<br>+#include "asterisk/module.h"<br>+#include "asterisk/test.h"<br>+#include "asterisk/channel.h"<br>+#include "asterisk/time.h"<br>+#include "asterisk/bridge.h"<br>+#include "asterisk/bridge_basic.h"<br>+#include "asterisk/features.h"<br>+#include "asterisk/format_cache.h"<br>+<br>+#define TEST_CATEGORY "/main/bridging/"<br>+<br>+#define CHANNEL_TECH_NAME "BridgingTestChannel"<br>+<br>+#define TEST_CHANNEL_FORMAT ast_format_slin<br>+<br>+/*! \brief A private structure for the test channel */<br>+struct test_bridging_chan_pvt {<br>+ /* \brief The expected indication */<br>+ int condition;<br>+ /*! \brief The number of indicated things */<br>+ unsigned int indicated;<br>+};<br>+<br>+/*! \brief Callback function for when a frame is written to a channel */<br>+static int test_bridging_chan_indicate(struct ast_channel *chan, int condition, const void *data, size_t datalen)<br>+{<br>+ struct test_bridging_chan_pvt *test_pvt = ast_channel_tech_pvt(chan);<br>+<br>+ if (condition == test_pvt->condition) {<br>+ test_pvt->indicated++;<br>+ }<br>+<br>+ return 0;<br>+}<br>+<br>+/*! \brief Callback function for when a channel is hung up */<br>+static int test_bridging_chan_hangup(struct ast_channel *chan)<br>+{<br>+ struct test_bridging_chan_pvt *test_pvt = ast_channel_tech_pvt(chan);<br>+<br>+ ast_free(test_pvt);<br>+ ast_channel_tech_pvt_set(chan, NULL);<br>+<br>+ return 0;<br>+}<br>+<br>+/*! \brief A channel technology used for the unit tests */<br>+static struct ast_channel_tech test_bridging_chan_tech = {<br>+ .type = CHANNEL_TECH_NAME,<br>+ .description = "Mock channel technology for bridge tests",<br>+ .indicate = test_bridging_chan_indicate,<br>+ .hangup = test_bridging_chan_hangup,<br>+ .properties = AST_CHAN_TP_INTERNAL,<br>+};<br>+<br>+static void test_nanosleep(int secs, long nanosecs)<br>+{<br>+ struct timespec sleep_time = {secs, nanosecs};<br>+<br>+ while ((nanosleep(&sleep_time, &sleep_time) == -1) && (errno == EINTR)) {<br>+ }<br>+}<br>+<br>+/*! \brief Wait until a channel is bridged */<br>+static void wait_for_bridged(struct ast_channel *channel)<br>+{<br>+ ast_channel_lock(channel);<br>+ while (!ast_channel_is_bridged(channel)) {<br>+ ast_channel_unlock(channel);<br>+ test_nanosleep(0, 1000000);<br>+ ast_channel_lock(channel);<br>+ }<br>+ ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Wait until a channel is not bridged */<br>+static void wait_for_unbridged(struct ast_channel *channel)<br>+{<br>+ ast_channel_lock(channel);<br>+ while (ast_channel_is_bridged(channel)) {<br>+ ast_channel_unlock(channel);<br>+ test_nanosleep(0, 1000000);<br>+ ast_channel_lock(channel);<br>+ }<br>+ ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Wait until a channel has no frames on its read queue */<br>+static void wait_for_empty_queue(struct ast_channel *channel)<br>+{<br>+ ast_channel_lock(channel);<br>+ while (!AST_LIST_EMPTY(ast_channel_readq(channel))) {<br>+ ast_channel_unlock(channel);<br>+ test_nanosleep(0, 1000000);<br>+ ast_channel_lock(channel);<br>+ }<br>+ ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Create a \ref test_bridging_chan_tech for Alice. */<br>+#define START_ALICE(channel, pvt) START_CHANNEL(channel, pvt, "Alice", "100")<br>+<br>+/*! \brief Create a \ref test_bridging_chan_tech for Bob. */<br>+#define START_BOB(channel, pvt) START_CHANNEL(channel, pvt, "Bob", "200")<br>+<br>+#define START_CHANNEL(channel, pvt, name, number) do { \<br>+ channel = ast_channel_alloc(0, AST_STATE_UP, number, name, number, number, \<br>+ "default", NULL, NULL, 0, CHANNEL_TECH_NAME "/" name); \<br>+ pvt = ast_calloc(1, sizeof(*pvt)); \<br>+ ast_channel_tech_pvt_set(channel, pvt); \<br>+ ast_channel_nativeformats_set(channel, test_bridging_chan_tech.capabilities); \<br>+ ast_channel_set_rawwriteformat(channel, TEST_CHANNEL_FORMAT); \<br>+ ast_channel_set_rawreadformat(channel, TEST_CHANNEL_FORMAT); \<br>+ ast_channel_set_writeformat(channel, TEST_CHANNEL_FORMAT); \<br>+ ast_channel_set_readformat(channel, TEST_CHANNEL_FORMAT); \<br>+ ast_channel_unlock(channel); \<br>+ } while (0)<br>+<br>+/*! \brief Hang up a test channel safely */<br>+#define HANGUP_CHANNEL(channel) do { \<br>+ ao2_ref(channel, +1); \<br>+ ast_hangup((channel)); \<br>+ ao2_cleanup(channel); \<br>+ channel = NULL; \<br>+ } while (0)<br>+<br>+static void safe_channel_release(struct ast_channel *chan)<br>+{<br>+ if (!chan) {<br>+ return;<br>+ }<br>+ ast_channel_release(chan);<br>+}<br>+<br>+static void safe_bridge_destroy(struct ast_bridge *bridge)<br>+{<br>+ if (!bridge) {<br>+ return;<br>+ }<br>+ ast_bridge_destroy(bridge, 0);<br>+}<br>+<br>+static void stream_periodic_frames(struct ast_channel *chan, int ms, int interval_ms)<br>+{<br>+ long nanosecs;<br>+<br>+ ast_assert(chan != NULL);<br>+ ast_assert(0 < ms);<br>+ ast_assert(0 < interval_ms);<br>+<br>+ nanosecs = interval_ms * 1000000L;<br>+ while (0 < ms) {<br>+ ast_queue_frame(chan, &ast_null_frame);<br>+<br>+ if (interval_ms < ms) {<br>+ ms -= interval_ms;<br>+ } else {<br>+ nanosecs = ms * 1000000L;<br>+ ms = 0;<br>+ }<br>+ test_nanosleep(0, nanosecs);<br>+ }<br>+}<br>+<br>+AST_TEST_DEFINE(test_bridging_deferred_queue)<br>+{<br>+ RAII_VAR(struct ast_channel *, chan_alice, NULL, safe_channel_release);<br>+ struct test_bridging_chan_pvt *alice_pvt;<br>+ struct ast_control_t38_parameters t38_parameters = {<br>+ .request_response = AST_T38_REQUEST_NEGOTIATE,<br>+ };<br>+ struct ast_frame frame = {<br>+ .frametype = AST_FRAME_CONTROL,<br>+ .subclass.integer = AST_CONTROL_T38_PARAMETERS,<br>+ .data.ptr = &t38_parameters,<br>+ .datalen = sizeof(t38_parameters),<br>+ };<br>+ RAII_VAR(struct ast_channel *, chan_bob, NULL, safe_channel_release);<br>+ struct test_bridging_chan_pvt *bob_pvt;<br>+ RAII_VAR(struct ast_bridge *, bridge1, NULL, safe_bridge_destroy);<br>+<br>+ switch (cmd) {<br>+ case TEST_INIT:<br>+ info->name = __func__;<br>+ info->category = TEST_CATEGORY;<br>+ info->summary = "Test that deferred frames from a channel in a bridge get written";<br>+ info->description =<br>+ "This test creates two channels, queues a deferrable frame on one, places it into\n"<br>+ "a bridge, confirms the frame was read by the bridge, adds the second channel to the\n"<br>+ "bridge, and makes sure the deferred frame is written to it.";<br>+ return AST_TEST_NOT_RUN;<br>+ case TEST_EXECUTE:<br>+ break;<br>+ }<br>+<br>+ /* Create the bridges */<br>+ bridge1 = ast_bridge_basic_new();<br>+ ast_test_validate(test, bridge1 != NULL);<br>+<br>+ /* Create channels that will go into the bridge */<br>+ START_ALICE(chan_alice, alice_pvt);<br>+ START_BOB(chan_bob, bob_pvt);<br>+ bob_pvt->condition = AST_CONTROL_T38_PARAMETERS;<br>+<br>+ /* Bridge alice and wait for the frame to be deferred */<br>+ ast_test_validate(test, !ast_bridge_impart(bridge1, chan_alice, NULL, NULL, AST_BRIDGE_IMPART_CHAN_DEPARTABLE));<br>+ wait_for_bridged(chan_alice);<br>+ ast_queue_frame(chan_alice, &frame);<br>+ wait_for_empty_queue(chan_alice);<br>+<br>+ /* Bridge bob for a second so it can receive the deferred T.38 request negotiate frame */<br>+ ast_test_validate(test, !ast_bridge_impart(bridge1, chan_bob, NULL, NULL, AST_BRIDGE_IMPART_CHAN_DEPARTABLE));<br>+ wait_for_bridged(chan_bob);<br>+ stream_periodic_frames(chan_alice, 1000, 20);<br>+ ast_test_validate(test, !ast_bridge_depart(chan_bob));<br>+ wait_for_unbridged(chan_bob);<br>+<br>+ /* Ensure that we received the expected indications while it was in there (request to negotiate, and to terminate) */<br>+ ast_test_validate(test, bob_pvt->indicated == 2);<br>+<br>+ /* Now remove alice since we are done */<br>+ ast_test_validate(test, !ast_bridge_depart(chan_alice));<br>+ wait_for_unbridged(chan_alice);<br>+<br>+ /* Hangup the channels */<br>+ HANGUP_CHANNEL(chan_alice);<br>+ HANGUP_CHANNEL(chan_bob);<br>+<br>+ return AST_TEST_PASS;<br>+}<br>+<br>+static int unload_module(void)<br>+{<br>+ AST_TEST_UNREGISTER(test_bridging_deferred_queue);<br>+<br>+ ast_channel_unregister(&test_bridging_chan_tech);<br>+ ao2_cleanup(test_bridging_chan_tech.capabilities);<br>+ test_bridging_chan_tech.capabilities = NULL;<br>+<br>+ return 0;<br>+}<br>+<br>+static int load_module(void)<br>+{<br>+ test_bridging_chan_tech.capabilities = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);<br>+ if (!test_bridging_chan_tech.capabilities) {<br>+ return AST_MODULE_LOAD_DECLINE;<br>+ }<br>+ ast_format_cap_append(test_bridging_chan_tech.capabilities, TEST_CHANNEL_FORMAT, 0);<br>+ ast_channel_register(&test_bridging_chan_tech);<br>+<br>+ AST_TEST_REGISTER(test_bridging_deferred_queue);<br>+<br>+ return AST_MODULE_LOAD_SUCCESS;<br>+}<br>+<br>+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Bridging Unit Tests");<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/5801">change 5801</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/5801"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: merged </div>
<div style="display:none"> Gerrit-Change-Id: Ie05b08523f399eae579130f4a5f562a344d2e415 </div>
<div style="display:none"> Gerrit-Change-Number: 5801 </div>
<div style="display:none"> Gerrit-PatchSet: 3 </div>
<div style="display:none"> Gerrit-Owner: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Jenkins2 </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Kevin Harwell <kharwell@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Mark Michelson <mmichelson@digium.com> </div>