<p>Joshua Colp <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/5801">View Change</a></p><div style="white-space:pre-wrap">Approvals:
  George Joseph: Looks good to me, but someone else must approve
  Kevin Harwell: Looks good to me, approved
  Joshua Colp: Approved for Submit

</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">bridge: Add a deferred queue.<br><br>This change adds a deferred queue to bridging. If a bridge<br>technology determines that a frame can not be written and<br>should be deferred it can indicate back to bridging to do so.<br>Bridging will then requeue any deferred frames upon a new<br>channel joining the bridge.<br><br>This change has been leveraged for T.38 request negotiate<br>control frames. Without the deferred queue there is a race<br>condition between the bridge receiving the T.38 request<br>negotiate and the second channel joining and being in the<br>bridge. If the channel is not yet in the bridge then the T.38<br>negotiation fails.<br><br>A unit test has also been added that confirms that a T.38<br>request negotiate control frame is deferred when no other<br>channel is in the bridge and that it is requeued when a new<br>channel joins the bridge.<br><br>ASTERISK-26923<br><br>Change-Id: Ie05b08523f399eae579130f4a5f562a344d2e415<br>---<br>M bridges/bridge_native_rtp.c<br>M bridges/bridge_simple.c<br>M include/asterisk/bridge_channel.h<br>M include/asterisk/bridge_channel_internal.h<br>M include/asterisk/bridge_technology.h<br>M main/bridge.c<br>M main/bridge_channel.c<br>A tests/test_bridging.c<br>8 files changed, 397 insertions(+), 10 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/bridges/bridge_native_rtp.c b/bridges/bridge_native_rtp.c<br>index 77e321f..4af93bf 100644<br>--- a/bridges/bridge_native_rtp.c<br>+++ b/bridges/bridge_native_rtp.c<br>@@ -508,7 +508,37 @@<br> <br> static int native_rtp_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br>-        return ast_bridge_queue_everyone_else(bridge, bridge_channel, frame);<br>+        const struct ast_control_t38_parameters *t38_parameters;<br>+     int defer = 0;<br>+<br>+    if (!ast_bridge_queue_everyone_else(bridge, bridge_channel, frame)) {<br>+                /* This frame was successfully queued so no need to defer */<br>+         return 0;<br>+    }<br>+<br>+ /* Depending on the frame defer it so when the next channel joins it receives it */<br>+  switch (frame->frametype) {<br>+       case AST_FRAME_CONTROL:<br>+              switch (frame->subclass.integer) {<br>+                case AST_CONTROL_T38_PARAMETERS:<br>+                     t38_parameters = frame->data.ptr;<br>+                 switch (t38_parameters->request_response) {<br>+                       case AST_T38_REQUEST_NEGOTIATE:<br>+                              defer = -1;<br>+                          break;<br>+                       default:<br>+                             break;<br>+                       }<br>+                    break;<br>+               default:<br>+                     break;<br>+               }<br>+            break;<br>+       default:<br>+             break;<br>+       }<br>+<br>+ return defer;<br> }<br> <br> static struct ast_bridge_technology native_rtp_bridge = {<br>diff --git a/bridges/bridge_simple.c b/bridges/bridge_simple.c<br>index 3bf0403..a49bc39 100644<br>--- a/bridges/bridge_simple.c<br>+++ b/bridges/bridge_simple.c<br>@@ -71,7 +71,37 @@<br> <br> static int simple_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br>-     return ast_bridge_queue_everyone_else(bridge, bridge_channel, frame);<br>+        const struct ast_control_t38_parameters *t38_parameters;<br>+     int defer = 0;<br>+<br>+    if (!ast_bridge_queue_everyone_else(bridge, bridge_channel, frame)) {<br>+                /* This frame was successfully queued so no need to defer */<br>+         return 0;<br>+    }<br>+<br>+ /* Depending on the frame defer it so when the next channel joins it receives it */<br>+  switch (frame->frametype) {<br>+       case AST_FRAME_CONTROL:<br>+              switch (frame->subclass.integer) {<br>+                case AST_CONTROL_T38_PARAMETERS:<br>+                     t38_parameters = frame->data.ptr;<br>+                 switch (t38_parameters->request_response) {<br>+                       case AST_T38_REQUEST_NEGOTIATE:<br>+                              defer = -1;<br>+                          break;<br>+                       default:<br>+                             break;<br>+                       }<br>+                    break;<br>+               default:<br>+                     break;<br>+               }<br>+            break;<br>+       default:<br>+             break;<br>+       }<br>+<br>+ return defer;<br> }<br> <br> static struct ast_bridge_technology simple_bridge = {<br>diff --git a/include/asterisk/bridge_channel.h b/include/asterisk/bridge_channel.h<br>index dd72f32..4d33260 100644<br>--- a/include/asterisk/bridge_channel.h<br>+++ b/include/asterisk/bridge_channel.h<br>@@ -145,6 +145,8 @@<br>        AST_LIST_ENTRY(ast_bridge_channel) entry;<br>     /*! Queue of outgoing frames to the channel. */<br>       AST_LIST_HEAD_NOLOCK(, ast_frame) wr_queue;<br>+  /*! Queue of deferred frames, queued onto channel when other party joins. */<br>+ AST_LIST_HEAD_NOLOCK(, ast_frame) deferred_queue;<br>     /*! Pipe to alert thread when frames are put into the wr_queue. */<br>    int alert_pipe[2];<br>    /*!<br>diff --git a/include/asterisk/bridge_channel_internal.h b/include/asterisk/bridge_channel_internal.h<br>index fb8e781..ba71e9f 100644<br>--- a/include/asterisk/bridge_channel_internal.h<br>+++ b/include/asterisk/bridge_channel_internal.h<br>@@ -98,6 +98,17 @@<br> <br> /*!<br>  * \internal<br>+ * \brief Queue any deferred frames on the channel.<br>+ * \since 13.17.0<br>+ *<br>+ * \param bridge_channel Channel that the deferred frames should be pulled from and queued to.<br>+ *<br>+ * \return Nothing<br>+ */<br>+void bridge_channel_queue_deferred_frames(struct ast_bridge_channel *bridge_channel);<br>+<br>+/*!<br>+ * \internal<br>  * \brief Push the bridge channel into its specified bridge.<br>  * \since 12.0.0<br>  *<br>diff --git a/include/asterisk/bridge_technology.h b/include/asterisk/bridge_technology.h<br>index 09b0fc0..8cebe93 100644<br>--- a/include/asterisk/bridge_technology.h<br>+++ b/include/asterisk/bridge_technology.h<br>@@ -156,6 +156,9 @@<br>          * \retval -1 Frame needs to be deferred.<br>      *<br>     * \note On entry, bridge is already locked.<br>+  *<br>+    * \note Deferred frames will be automatically queued onto the channel when another<br>+   * channel joins the bridge.<br>   */<br>   int (*write)(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame);<br>  /*!<br>diff --git a/main/bridge.c b/main/bridge.c<br>index 4631e5a..8cde62c 100644<br>--- a/main/bridge.c<br>+++ b/main/bridge.c<br>@@ -476,6 +476,7 @@<br>         }<br> <br>  AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {<br>+         bridge_channel_queue_deferred_frames(bridge_channel);<br>                 if (!bridge_channel->just_joined) {<br>                        continue;<br>             }<br>diff --git a/main/bridge_channel.c b/main/bridge_channel.c<br>index 02783b1..e8ab8a8 100644<br>--- a/main/bridge_channel.c<br>+++ b/main/bridge_channel.c<br>@@ -638,18 +638,21 @@<br> static int bridge_channel_write_frame(struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)<br> {<br>         const struct ast_control_t38_parameters *t38_parameters;<br>+     int deferred;<br> <br>      ast_assert(frame->frametype != AST_FRAME_BRIDGE_ACTION_SYNC);<br> <br>   ast_bridge_channel_lock_bridge(bridge_channel);<br>-/*<br>- * XXX need to implement a deferred write queue for when there<br>- * is no peer channel in the bridge (yet or it was kicked).<br>- *<br>- * The tech decides if a frame needs to be pushed back for deferral.<br>- * simple_bridge/native_bridge are likely the only techs that will do this.<br>- */<br>-  bridge_channel->bridge->technology->write(bridge_channel->bridge, bridge_channel, frame);<br>+<br>+     deferred = bridge_channel->bridge->technology->write(bridge_channel->bridge, bridge_channel, frame);<br>+     if (deferred) {<br>+              struct ast_frame *dup;<br>+<br>+            dup = ast_frdup(frame);<br>+              if (dup) {<br>+                   AST_LIST_INSERT_HEAD(&bridge_channel->deferred_queue, dup, frame_list);<br>+               }<br>+    }<br> <br>  /* Remember any owed events to the bridge. */<br>         switch (frame->frametype) {<br>@@ -751,6 +754,18 @@<br>          bridge_channel->owed.t38_terminate = 0;<br>            orig_bridge->technology->write(orig_bridge, NULL, &frame);<br>  }<br>+}<br>+<br>+void bridge_channel_queue_deferred_frames(struct ast_bridge_channel *bridge_channel)<br>+{<br>+  struct ast_frame *frame;<br>+<br>+  ast_channel_lock(bridge_channel->chan);<br>+   while ((frame = AST_LIST_REMOVE_HEAD(&bridge_channel->deferred_queue, frame_list))) {<br>+         ast_queue_frame_head(bridge_channel->chan, frame);<br>+                ast_frfree(frame);<br>+   }<br>+    ast_channel_unlock(bridge_channel->chan);<br> }<br> <br> /*!<br>@@ -2933,6 +2948,11 @@<br>     }<br>     ast_alertpipe_close(bridge_channel->alert_pipe);<br> <br>+       /* Flush any unhandled deferred_queue frames. */<br>+     while ((fr = AST_LIST_REMOVE_HEAD(&bridge_channel->deferred_queue, frame_list))) {<br>+            ast_frfree(fr);<br>+      }<br>+<br>  ast_cond_destroy(&bridge_channel->cond);<br> <br>    ao2_cleanup(bridge_channel->write_format);<br>diff --git a/tests/test_bridging.c b/tests/test_bridging.c<br>new file mode 100644<br>index 0000000..74595c4<br>--- /dev/null<br>+++ b/tests/test_bridging.c<br>@@ -0,0 +1,290 @@<br>+/*<br>+ * Asterisk -- An open source telephony toolkit.<br>+ *<br>+ * Copyright (C) 2017, Digium, Inc.<br>+ *<br>+ * Joshua Colp <jcolp@digium.com><br>+ *<br>+ * See http://www.asterisk.org for more information about<br>+ * the Asterisk project. Please do not directly contact<br>+ * any of the maintainers of this project for assistance;<br>+ * the project provides a web site, mailing lists and IRC<br>+ * channels for your use.<br>+ *<br>+ * This program is free software, distributed under the terms of<br>+ * the GNU General Public License Version 2. See the LICENSE file<br>+ * at the top of the source tree.<br>+ */<br>+<br>+/*!<br>+ * \file<br>+ * \brief Bridging unit tests<br>+ *<br>+ * \author Joshua Colp <jcolp@digium.com><br>+ *<br>+ */<br>+<br>+/*** MODULEINFO<br>+     <depend>TEST_FRAMEWORK</depend><br>+  <support_level>core</support_level><br>+ ***/<br>+<br>+#include "asterisk.h"<br>+<br>+#include "asterisk/module.h"<br>+#include "asterisk/test.h"<br>+#include "asterisk/channel.h"<br>+#include "asterisk/time.h"<br>+#include "asterisk/bridge.h"<br>+#include "asterisk/bridge_basic.h"<br>+#include "asterisk/features.h"<br>+#include "asterisk/format_cache.h"<br>+<br>+#define TEST_CATEGORY "/main/bridging/"<br>+<br>+#define CHANNEL_TECH_NAME "BridgingTestChannel"<br>+<br>+#define TEST_CHANNEL_FORMAT               ast_format_slin<br>+<br>+/*! \brief A private structure for the test channel */<br>+struct test_bridging_chan_pvt {<br>+        /* \brief The expected indication */<br>+ int condition;<br>+       /*! \brief The number of indicated things */<br>+ unsigned int indicated;<br>+};<br>+<br>+/*! \brief Callback function for when a frame is written to a channel */<br>+static int test_bridging_chan_indicate(struct ast_channel *chan, int condition, const void *data, size_t datalen)<br>+{<br>+   struct test_bridging_chan_pvt *test_pvt = ast_channel_tech_pvt(chan);<br>+<br>+     if (condition == test_pvt->condition) {<br>+           test_pvt->indicated++;<br>+    }<br>+<br>+ return 0;<br>+}<br>+<br>+/*! \brief Callback function for when a channel is hung up */<br>+static int test_bridging_chan_hangup(struct ast_channel *chan)<br>+{<br>+        struct test_bridging_chan_pvt *test_pvt = ast_channel_tech_pvt(chan);<br>+<br>+     ast_free(test_pvt);<br>+  ast_channel_tech_pvt_set(chan, NULL);<br>+<br>+     return 0;<br>+}<br>+<br>+/*! \brief A channel technology used for the unit tests */<br>+static struct ast_channel_tech test_bridging_chan_tech = {<br>+   .type = CHANNEL_TECH_NAME,<br>+   .description = "Mock channel technology for bridge tests",<br>+ .indicate = test_bridging_chan_indicate,<br>+     .hangup = test_bridging_chan_hangup,<br>+ .properties = AST_CHAN_TP_INTERNAL,<br>+};<br>+<br>+static void test_nanosleep(int secs, long nanosecs)<br>+{<br>+        struct timespec sleep_time = {secs, nanosecs};<br>+<br>+    while ((nanosleep(&sleep_time, &sleep_time) == -1) && (errno == EINTR)) {<br>+    }<br>+}<br>+<br>+/*! \brief Wait until a channel is bridged */<br>+static void wait_for_bridged(struct ast_channel *channel)<br>+{<br>+     ast_channel_lock(channel);<br>+   while (!ast_channel_is_bridged(channel)) {<br>+           ast_channel_unlock(channel);<br>+         test_nanosleep(0, 1000000);<br>+          ast_channel_lock(channel);<br>+   }<br>+    ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Wait until a channel is not bridged */<br>+static void wait_for_unbridged(struct ast_channel *channel)<br>+{<br>+    ast_channel_lock(channel);<br>+   while (ast_channel_is_bridged(channel)) {<br>+            ast_channel_unlock(channel);<br>+         test_nanosleep(0, 1000000);<br>+          ast_channel_lock(channel);<br>+   }<br>+    ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Wait until a channel has no frames on its read queue */<br>+static void wait_for_empty_queue(struct ast_channel *channel)<br>+{<br>+ ast_channel_lock(channel);<br>+   while (!AST_LIST_EMPTY(ast_channel_readq(channel))) {<br>+                ast_channel_unlock(channel);<br>+         test_nanosleep(0, 1000000);<br>+          ast_channel_lock(channel);<br>+   }<br>+    ast_channel_unlock(channel);<br>+}<br>+<br>+/*! \brief Create a \ref test_bridging_chan_tech for Alice. */<br>+#define START_ALICE(channel, pvt) START_CHANNEL(channel, pvt, "Alice", "100")<br>+<br>+/*! \brief Create a \ref test_bridging_chan_tech for Bob. */<br>+#define START_BOB(channel, pvt) START_CHANNEL(channel, pvt, "Bob", "200")<br>+<br>+#define START_CHANNEL(channel, pvt, name, number) do { \<br>+     channel = ast_channel_alloc(0, AST_STATE_UP, number, name, number, number, \<br>+         "default", NULL, NULL, 0, CHANNEL_TECH_NAME "/" name); \<br>+ pvt = ast_calloc(1, sizeof(*pvt)); \<br>+ ast_channel_tech_pvt_set(channel, pvt); \<br>+    ast_channel_nativeformats_set(channel, test_bridging_chan_tech.capabilities); \<br>+      ast_channel_set_rawwriteformat(channel, TEST_CHANNEL_FORMAT); \<br>+      ast_channel_set_rawreadformat(channel, TEST_CHANNEL_FORMAT); \<br>+       ast_channel_set_writeformat(channel, TEST_CHANNEL_FORMAT); \<br>+ ast_channel_set_readformat(channel, TEST_CHANNEL_FORMAT); \<br>+  ast_channel_unlock(channel); \<br>+       } while (0)<br>+<br>+/*! \brief Hang up a test channel safely */<br>+#define HANGUP_CHANNEL(channel) do { \<br>+        ao2_ref(channel, +1); \<br>+      ast_hangup((channel)); \<br>+     ao2_cleanup(channel); \<br>+      channel = NULL; \<br>+    } while (0)<br>+<br>+static void safe_channel_release(struct ast_channel *chan)<br>+{<br>+      if (!chan) {<br>+         return;<br>+      }<br>+    ast_channel_release(chan);<br>+}<br>+<br>+static void safe_bridge_destroy(struct ast_bridge *bridge)<br>+{<br>+   if (!bridge) {<br>+               return;<br>+      }<br>+    ast_bridge_destroy(bridge, 0);<br>+}<br>+<br>+static void stream_periodic_frames(struct ast_channel *chan, int ms, int interval_ms)<br>+{<br>+    long nanosecs;<br>+<br>+    ast_assert(chan != NULL);<br>+    ast_assert(0 < ms);<br>+       ast_assert(0 < interval_ms);<br>+<br>+   nanosecs = interval_ms * 1000000L;<br>+   while (0 < ms) {<br>+          ast_queue_frame(chan, &ast_null_frame);<br>+<br>+               if (interval_ms < ms) {<br>+                   ms -= interval_ms;<br>+           } else {<br>+                     nanosecs = ms * 1000000L;<br>+                    ms = 0;<br>+              }<br>+            test_nanosleep(0, nanosecs);<br>+ }<br>+}<br>+<br>+AST_TEST_DEFINE(test_bridging_deferred_queue)<br>+{<br>+ RAII_VAR(struct ast_channel *, chan_alice, NULL, safe_channel_release);<br>+      struct test_bridging_chan_pvt *alice_pvt;<br>+            struct ast_control_t38_parameters t38_parameters = {<br>+                 .request_response = AST_T38_REQUEST_NEGOTIATE,<br>+               };<br>+           struct ast_frame frame = {<br>+                   .frametype = AST_FRAME_CONTROL,<br>+                      .subclass.integer = AST_CONTROL_T38_PARAMETERS,<br>+                      .data.ptr = &t38_parameters,<br>+                     .datalen = sizeof(t38_parameters),<br>+           };<br>+   RAII_VAR(struct ast_channel *, chan_bob, NULL, safe_channel_release);<br>+        struct test_bridging_chan_pvt *bob_pvt;<br>+      RAII_VAR(struct ast_bridge *, bridge1, NULL, safe_bridge_destroy);<br>+<br>+        switch (cmd) {<br>+       case TEST_INIT:<br>+              info->name = __func__;<br>+            info->category = TEST_CATEGORY;<br>+           info->summary = "Test that deferred frames from a channel in a bridge get written";<br>+             info->description =<br>+                       "This test creates two channels, queues a deferrable frame on one, places it into\n"<br>+                       "a bridge, confirms the frame was read by the bridge, adds the second channel to the\n"<br>+                    "bridge, and makes sure the deferred frame is written to it.";<br>+             return AST_TEST_NOT_RUN;<br>+     case TEST_EXECUTE:<br>+           break;<br>+       }<br>+<br>+ /* Create the bridges */<br>+     bridge1 = ast_bridge_basic_new();<br>+    ast_test_validate(test, bridge1 != NULL);<br>+<br>+ /* Create channels that will go into the bridge */<br>+   START_ALICE(chan_alice, alice_pvt);<br>+  START_BOB(chan_bob, bob_pvt);<br>+        bob_pvt->condition = AST_CONTROL_T38_PARAMETERS;<br>+<br>+       /* Bridge alice and wait for the frame to be deferred */<br>+     ast_test_validate(test, !ast_bridge_impart(bridge1, chan_alice, NULL, NULL, AST_BRIDGE_IMPART_CHAN_DEPARTABLE));<br>+     wait_for_bridged(chan_alice);<br>+        ast_queue_frame(chan_alice, &frame);<br>+     wait_for_empty_queue(chan_alice);<br>+<br>+ /* Bridge bob for a second so it can receive the deferred T.38 request negotiate frame */<br>+    ast_test_validate(test, !ast_bridge_impart(bridge1, chan_bob, NULL, NULL, AST_BRIDGE_IMPART_CHAN_DEPARTABLE));<br>+       wait_for_bridged(chan_bob);<br>+  stream_periodic_frames(chan_alice, 1000, 20);<br>+        ast_test_validate(test, !ast_bridge_depart(chan_bob));<br>+       wait_for_unbridged(chan_bob);<br>+<br>+     /* Ensure that we received the expected indications while it was in there (request to negotiate, and to terminate) */<br>+        ast_test_validate(test, bob_pvt->indicated == 2);<br>+<br>+      /* Now remove alice since we are done */<br>+     ast_test_validate(test, !ast_bridge_depart(chan_alice));<br>+     wait_for_unbridged(chan_alice);<br>+<br>+   /* Hangup the channels */<br>+    HANGUP_CHANNEL(chan_alice);<br>+  HANGUP_CHANNEL(chan_bob);<br>+<br>+ return AST_TEST_PASS;<br>+}<br>+<br>+static int unload_module(void)<br>+{<br>+    AST_TEST_UNREGISTER(test_bridging_deferred_queue);<br>+<br>+        ast_channel_unregister(&test_bridging_chan_tech);<br>+        ao2_cleanup(test_bridging_chan_tech.capabilities);<br>+   test_bridging_chan_tech.capabilities = NULL;<br>+<br>+      return 0;<br>+}<br>+<br>+static int load_module(void)<br>+{<br>+  test_bridging_chan_tech.capabilities = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);<br>+    if (!test_bridging_chan_tech.capabilities) {<br>+         return AST_MODULE_LOAD_DECLINE;<br>+      }<br>+    ast_format_cap_append(test_bridging_chan_tech.capabilities, TEST_CHANNEL_FORMAT, 0);<br>+ ast_channel_register(&test_bridging_chan_tech);<br>+<br>+       AST_TEST_REGISTER(test_bridging_deferred_queue);<br>+<br>+  return AST_MODULE_LOAD_SUCCESS;<br>+}<br>+<br>+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Bridging Unit Tests");<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/5801">change 5801</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/5801"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: merged </div>
<div style="display:none"> Gerrit-Change-Id: Ie05b08523f399eae579130f4a5f562a344d2e415 </div>
<div style="display:none"> Gerrit-Change-Number: 5801 </div>
<div style="display:none"> Gerrit-PatchSet: 3 </div>
<div style="display:none"> Gerrit-Owner: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Jenkins2 </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Kevin Harwell <kharwell@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Mark Michelson <mmichelson@digium.com> </div>