[Asterisk-code-review] core_unreal / core_local: Add multistream and re-negotiation. (asterisk[certified/16.8])

George Joseph asteriskteam at digium.com
Mon Jun 15 09:08:56 CDT 2020


George Joseph has submitted this change. ( https://gerrit.asterisk.org/c/asterisk/+/14524 )

Change subject: core_unreal / core_local: Add multistream and re-negotiation.
......................................................................

core_unreal / core_local: Add multistream and re-negotiation.

When requesting a Local channel the requested stream topology
or a converted stream topology will now be placed onto the
resulting channels.

Frames written in on streams will now also preserve the stream
identifier as they are queued on the opposite channel.

Finally when a stream topology change is requested it is
immediately accepted and reflected on both channels. Each
channel also receives a queued frame to indicate that the
topology has changed.

ASTERISK-28938

Change-Id: I4e9d94da5230d4bd046dc755651493fce1d87186
---
M include/asterisk/core_unreal.h
M main/core_local.c
M main/core_unreal.c
3 files changed, 251 insertions(+), 7 deletions(-)

Approvals:
  Kevin Harwell: Looks good to me, but someone else must approve
  George Joseph: Looks good to me, approved; Approved for Submit



diff --git a/include/asterisk/core_unreal.h b/include/asterisk/core_unreal.h
index 35fc87e..a28d39d 100644
--- a/include/asterisk/core_unreal.h
+++ b/include/asterisk/core_unreal.h
@@ -40,6 +40,7 @@
 
 /* Forward declare some struct names */
 struct ast_format_cap;
+struct ast_stream_topology;
 
 /* ------------------------------------------------------------------- */
 
@@ -96,6 +97,7 @@
 	unsigned int flags;                         /*!< Private option flags */
 	/*! Base name of the unreal channels.  exten at context or other name. */
 	char name[AST_MAX_EXTENSION + AST_MAX_CONTEXT + 2];
+	struct ast_stream_topology *reqtopology;    /*!< Requested stream topology */
 };
 
 #define AST_UNREAL_IS_OUTBOUND(a, b) ((a) == (b)->chan ? 1 : 0)
@@ -146,6 +148,9 @@
 /*! Unreal channel framework struct ast_channel_tech.write callback */
 int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f);
 
+/*! Unreal channel framework struct ast_channel_tech.write_stream callback */
+int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f);
+
 /*! Unreal channel framework struct ast_channel_tech.indicate callback */
 int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen);
 
@@ -188,6 +193,20 @@
 struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap);
 
 /*!
+ * \brief Allocate the base unreal struct for a derivative.
+ * \since 16.12.0
+ * \since 17.6.0
+ *
+ * \param size Size of the unreal struct to allocate.
+ * \param destructor Destructor callback.
+ * \param cap Format capabilities to give the unreal private struct.
+ *
+ * \retval pvt on success.
+ * \retval NULL on error.
+ */
+struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology);
+
+/*!
  * \brief Create the semi1 and semi2 unreal channels.
  * \since 12.0.0
  *
diff --git a/main/core_local.c b/main/core_local.c
index 6a4a963..c7f2a03 100644
--- a/main/core_local.c
+++ b/main/core_local.c
@@ -48,6 +48,8 @@
 #include "asterisk/stasis_channels.h"
 #include "asterisk/_private.h"
 #include "asterisk/stasis_channels.h"
+#include "asterisk/stream.h"
+#include "asterisk/translate.h"
 
 /*** DOCUMENTATION
 	<manager name="LocalOptimizeAway" language="en_US">
@@ -136,6 +138,7 @@
 static struct ao2_container *locals;
 
 static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
+static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
 static int local_call(struct ast_channel *ast, const char *dest, int timeout);
 static int local_hangup(struct ast_channel *ast);
 static int local_devicestate(const char *data);
@@ -170,14 +173,15 @@
 	.type = "Local",
 	.description = tdesc,
 	.requester = local_request,
+	.requester_with_stream_topology = local_request_with_stream_topology,
 	.send_digit_begin = ast_unreal_digit_begin,
 	.send_digit_end = ast_unreal_digit_end,
 	.call = local_call,
 	.hangup = local_hangup,
 	.answer = ast_unreal_answer,
-	.read = ast_unreal_read,
+	.read_stream = ast_unreal_read,
 	.write = ast_unreal_write,
-	.write_video = ast_unreal_write,
+	.write_stream = ast_unreal_write_stream,
 	.exception = ast_unreal_read,
 	.indicate = ast_unreal_indicate,
 	.fixup = ast_unreal_fixup,
@@ -858,14 +862,14 @@
 }
 
 /*! \brief Create a call structure */
-static struct local_pvt *local_alloc(const char *data, struct ast_format_cap *cap)
+static struct local_pvt *local_alloc(const char *data, struct ast_stream_topology *topology)
 {
 	struct local_pvt *pvt;
 	char *parse;
 	char *context;
 	char *opts;
 
-	pvt = (struct local_pvt *) ast_unreal_alloc(sizeof(*pvt), local_pvt_destructor, cap);
+	pvt = (struct local_pvt *) ast_unreal_alloc_stream_topology(sizeof(*pvt), local_pvt_destructor, topology);
 	if (!pvt) {
 		return NULL;
 	}
@@ -916,12 +920,95 @@
 /*! \brief Part of PBX interface */
 static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
 {
+	struct ast_stream_topology *topology;
+	struct ast_channel *chan;
+
+	topology = ast_stream_topology_create_from_format_cap(cap);
+	if (!topology) {
+		return NULL;
+	}
+
+	chan = local_request_with_stream_topology(type, topology, assignedids, requestor, data, cause);
+
+	ast_stream_topology_free(topology);
+
+	return chan;
+}
+
+/*! \brief Part of PBX interface */
+static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
+{
+	struct ast_stream_topology *audio_filtered_topology;
+	int i;
 	struct local_pvt *p;
 	struct ast_channel *chan;
 	ast_callid callid;
 
+	/* Create a copy of the requested topology as we don't have ownership over
+	 * the one that is passed in.
+	 */
+	audio_filtered_topology = ast_stream_topology_clone(topology);
+	if (!audio_filtered_topology) {
+		return NULL;
+	}
+
+	/* Some users of Local channels request every known format in the
+	 * universe. The core itself automatically pruned this list down to a single
+	 * "best" format for audio in non-multistream. We replicate the logic here to
+	 * do the same thing.
+	 */
+	for (i = 0; i < ast_stream_topology_get_count(audio_filtered_topology); ++i) {
+		struct ast_stream *stream;
+		int res;
+		struct ast_format *tmp_fmt = NULL;
+		struct ast_format *best_audio_fmt = NULL;
+		struct ast_format_cap *caps;
+
+		stream = ast_stream_topology_get_stream(audio_filtered_topology, i);
+
+		if (ast_stream_get_type(stream) != AST_MEDIA_TYPE_AUDIO) {
+			continue;
+		}
+
+		/* Respect the immutable state of formats on the stream and create a new
+		 * format capabilities to replace the existing one.
+		 */
+		caps = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
+		if (!caps) {
+			ao2_ref(audio_filtered_topology, -1);
+			return NULL;
+		}
+
+		/* The ast_translator_best_choice function treats both caps as const
+		 * but does not declare it in the API.
+		 */
+		res = ast_translator_best_choice((struct ast_format_cap *)ast_stream_get_formats(stream), local_tech.capabilities,
+			&tmp_fmt, &best_audio_fmt);
+		if (res < 0) {
+			struct ast_str *tech_codecs = ast_str_alloca(AST_FORMAT_CAP_NAMES_LEN);
+			struct ast_str *request_codecs = ast_str_alloca(AST_FORMAT_CAP_NAMES_LEN);
+
+			ast_log(LOG_WARNING, "No translator path exists for channel type %s (native %s) to %s\n", type,
+				ast_format_cap_get_names(local_tech.capabilities, &tech_codecs),
+				ast_format_cap_get_names(ast_stream_get_formats(stream), &request_codecs));
+
+			/* If there are no formats then we abort */
+			ao2_ref(caps, -1);
+			ao2_ref(audio_filtered_topology, -1);
+			return NULL;
+		}
+
+		ast_format_cap_append(caps, best_audio_fmt, 0);
+		ast_stream_set_formats(stream, caps);
+
+		ao2_ref(caps, -1);
+		ao2_ref(tmp_fmt, -1);
+		ao2_ref(best_audio_fmt, -1);
+	}
+
 	/* Allocate a new private structure and then Asterisk channels */
-	p = local_alloc(data, cap);
+	p = local_alloc(data, audio_filtered_topology);
+	ao2_ref(audio_filtered_topology, -1);
 	if (!p) {
 		return NULL;
 	}
@@ -936,6 +1023,7 @@
 	return chan;
 }
 
+
 /*! \brief CLI command "local show channels" */
 static char *locals_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
diff --git a/main/core_unreal.c b/main/core_unreal.c
index 763be4f..bff1c3e 100644
--- a/main/core_unreal.c
+++ b/main/core_unreal.c
@@ -40,6 +40,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/bridge.h"
 #include "asterisk/core_unreal.h"
+#include "asterisk/stream.h"
 
 static unsigned int name_sequence = 0;
 
@@ -316,6 +317,11 @@
 
 int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f)
 {
+	return ast_unreal_write_stream(ast, -1, f);
+}
+
+int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f)
+{
 	struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
 	int res = -1;
 
@@ -337,6 +343,9 @@
 		}
 	}
 
+	/* Update the frame to reflect the stream */
+	f->stream_num = stream_num;
+
 	/* Just queue for delivery to the other side */
 	ao2_ref(p, 1);
 	ao2_lock(p);
@@ -530,6 +539,86 @@
 	return res;
 }
 
+/*!
+ * \internal
+ * \brief Handle stream topology change request.
+ * \since 16.12.0
+ * \since 17.6.0
+ *
+ * \param p Unreal private structure.
+ * \param ast Channel indicating the condition.
+ * \param topology The requested topology.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int unreal_colp_stream_topology_request_change(struct ast_unreal_pvt *p, struct ast_channel *ast, const struct ast_stream_topology *topology)
+{
+	struct ast_stream_topology *this_channel_topology;
+	struct ast_stream_topology *the_other_channel_topology;
+	int i;
+	struct ast_stream *stream;
+	struct ast_channel *my_chan;
+	struct ast_channel *my_owner;
+	struct ast_channel *this_channel;
+	struct ast_channel *the_other_channel;
+	int res = 0;
+
+	this_channel_topology = ast_stream_topology_clone(topology);
+	if (!this_channel_topology) {
+		return -1;
+	}
+
+	the_other_channel_topology = ast_stream_topology_clone(topology);
+	if (!the_other_channel_topology) {
+		ast_stream_topology_free(this_channel_topology);
+		return -1;
+	}
+
+	/* We swap the stream state on the other channel because it is as if the channel is
+	 * connected to an external endpoint, so the perspective changes.
+	 */
+	for (i = 0; i < ast_stream_topology_get_count(the_other_channel_topology); ++i) {
+		stream = ast_stream_topology_get_stream(the_other_channel_topology, i);
+
+		if (ast_stream_get_state(stream) == AST_STREAM_STATE_RECVONLY) {
+			ast_stream_set_state(stream, AST_STREAM_STATE_SENDONLY);
+		} else if (ast_stream_get_state(stream) == AST_STREAM_STATE_SENDONLY) {
+			ast_stream_set_state(stream, AST_STREAM_STATE_RECVONLY);
+		}
+	}
+
+	ast_channel_unlock(ast);
+	ast_unreal_lock_all(p, &my_chan, &my_owner);
+	if (AST_UNREAL_IS_OUTBOUND(ast, p)) {
+		this_channel = p->chan;
+		the_other_channel = p->owner;
+	} else {
+		this_channel = p->owner;
+		the_other_channel = p->chan;
+	}
+	if (this_channel) {
+		ast_channel_set_stream_topology(this_channel, this_channel_topology);
+		ast_queue_control(this_channel, AST_CONTROL_STREAM_TOPOLOGY_CHANGED);
+	}
+	if (the_other_channel) {
+		ast_channel_set_stream_topology(the_other_channel, the_other_channel_topology);
+		ast_channel_stream_topology_changed_externally(the_other_channel);
+	}
+	if (my_chan) {
+		ast_channel_unlock(my_chan);
+		ast_channel_unref(my_chan);
+	}
+	if (my_owner) {
+		ast_channel_unlock(my_owner);
+		ast_channel_unref(my_owner);
+	}
+	ao2_unlock(p);
+	ast_channel_lock(ast);
+
+	return res;
+}
+
 int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen)
 {
 	struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
@@ -583,6 +672,11 @@
 		unreal_queue_indicate(p, ast, condition, data, datalen);
 		res = -1;
 		break;
+	case AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE:
+		if (ast_channel_is_multistream(ast)) {
+			res = unreal_colp_stream_topology_request_change(p, ast, data);
+		}
+		break;
 	default:
 		res = unreal_queue_indicate(p, ast, condition, data, datalen);
 		break;
@@ -916,10 +1010,29 @@
 
 	ao2_cleanup(doomed->reqcap);
 	doomed->reqcap = NULL;
+	ast_stream_topology_free(doomed->reqtopology);
+	doomed->reqtopology = NULL;
 }
 
 struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap)
 {
+	struct ast_stream_topology *topology;
+	struct ast_unreal_pvt *unreal;
+
+	topology = ast_stream_topology_create_from_format_cap(cap);
+	if (!topology) {
+		return NULL;
+	}
+
+	unreal = ast_unreal_alloc_stream_topology(size, destructor, topology);
+
+	ast_stream_topology_free(topology);
+
+	return unreal;
+}
+
+struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology)
+{
 	struct ast_unreal_pvt *unreal;
 
 	static const struct ast_jb_conf jb_conf = {
@@ -935,12 +1048,17 @@
 		return NULL;
 	}
 
-	unreal->reqcap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
+	unreal->reqtopology = ast_stream_topology_clone(topology);
+	if (!unreal->reqtopology) {
+		ao2_ref(unreal, -1);
+		return NULL;
+	}
+
+	unreal->reqcap = ast_format_cap_from_stream_topology(topology);
 	if (!unreal->reqcap) {
 		ao2_ref(unreal, -1);
 		return NULL;
 	}
-	ast_format_cap_append_from_cap(unreal->reqcap, cap, AST_MEDIA_TYPE_UNKNOWN);
 
 	memcpy(&unreal->jb_conf, &jb_conf, sizeof(unreal->jb_conf));
 
@@ -958,6 +1076,7 @@
 	struct ast_assigned_ids id1 = {NULL, NULL};
 	struct ast_assigned_ids id2 = {NULL, NULL};
 	int generated_seqno = ast_atomic_fetchadd_int((int *) &name_sequence, +1);
+	struct ast_stream_topology *topology;
 
 	/* set unique ids for the two channels */
 	if (assignedids && !ast_strlen_zero(assignedids->uniqueid)) {
@@ -975,6 +1094,14 @@
 		id2.uniqueid = uniqueid2;
 	}
 
+	/* We need to create a topology to place on the first channel, as we can't
+	 * share a single one between both.
+	 */
+	topology = ast_stream_topology_clone(p->reqtopology);
+	if (!topology) {
+		return NULL;
+	}
+
 	/*
 	 * Allocate two new Asterisk channels
 	 *
@@ -987,6 +1114,7 @@
 		"%s/%s-%08x;1", tech->type, p->name, (unsigned)generated_seqno);
 	if (!owner) {
 		ast_log(LOG_WARNING, "Unable to allocate owner channel structure\n");
+		ast_stream_topology_free(topology);
 		return NULL;
 	}
 
@@ -1000,6 +1128,10 @@
 
 	ast_channel_nativeformats_set(owner, p->reqcap);
 
+	if (ast_channel_is_multistream(owner)) {
+		ast_channel_set_stream_topology(owner, topology);
+	}
+
 	/* Determine our read/write format and set it on each channel */
 	fmt = ast_format_cap_get_format(p->reqcap, 0);
 	if (!fmt) {
@@ -1054,6 +1186,11 @@
 
 	ast_channel_nativeformats_set(chan, p->reqcap);
 
+	if (ast_channel_is_multistream(chan)) {
+		ast_channel_set_stream_topology(chan, p->reqtopology);
+		p->reqtopology = NULL;
+	}
+
 	/* Format was already determined when setting up owner */
 	ast_channel_set_writeformat(chan, fmt);
 	ast_channel_set_rawwriteformat(chan, fmt);

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/14524
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: certified/16.8
Gerrit-Change-Id: I4e9d94da5230d4bd046dc755651493fce1d87186
Gerrit-Change-Number: 14524
Gerrit-PatchSet: 4
Gerrit-Owner: Joshua Colp <jcolp at sangoma.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20200615/37f36901/attachment-0001.html>


More information about the asterisk-code-review mailing list