[Asterisk-code-review] core_unreal / core_local: Add multistream and re-negotiation. (asterisk[16])

Joshua Colp asteriskteam at digium.com
Mon Jun 8 04:20:21 CDT 2020


Joshua Colp has uploaded this change for review. ( https://gerrit.asterisk.org/c/asterisk/+/14505 )


Change subject: core_unreal / core_local: Add multistream and re-negotiation.
......................................................................

core_unreal / core_local: Add multistream and re-negotiation.

When requesting a Local channel the requested stream topology
or a converted stream topology will now be placed onto the
resulting channels.

Frames written in on streams will now also preserve the stream
identifier as they are queued on the opposite channel.

Finally when a stream topology change is requested it is
immediately accepted and reflected on both channels. Each
channel also receives a queued frame to indicate that the
topology has changed.

ASTERISK-28938

Change-Id: I4e9d94da5230d4bd046dc755651493fce1d87186
---
M include/asterisk/core_unreal.h
M main/core_local.c
M main/core_unreal.c
3 files changed, 189 insertions(+), 7 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/05/14505/1

diff --git a/include/asterisk/core_unreal.h b/include/asterisk/core_unreal.h
index 35fc87e..a28d39d 100644
--- a/include/asterisk/core_unreal.h
+++ b/include/asterisk/core_unreal.h
@@ -40,6 +40,7 @@
 
 /* Forward declare some struct names */
 struct ast_format_cap;
+struct ast_stream_topology;
 
 /* ------------------------------------------------------------------- */
 
@@ -96,6 +97,7 @@
 	unsigned int flags;                         /*!< Private option flags */
 	/*! Base name of the unreal channels.  exten at context or other name. */
 	char name[AST_MAX_EXTENSION + AST_MAX_CONTEXT + 2];
+	struct ast_stream_topology *reqtopology;    /*!< Requested stream topology */
 };
 
 #define AST_UNREAL_IS_OUTBOUND(a, b) ((a) == (b)->chan ? 1 : 0)
@@ -146,6 +148,9 @@
 /*! Unreal channel framework struct ast_channel_tech.write callback */
 int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f);
 
+/*! Unreal channel framework struct ast_channel_tech.write_stream callback */
+int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f);
+
 /*! Unreal channel framework struct ast_channel_tech.indicate callback */
 int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen);
 
@@ -188,6 +193,20 @@
 struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap);
 
 /*!
+ * \brief Allocate the base unreal struct for a derivative.
+ * \since 16.12.0
+ * \since 17.6.0
+ *
+ * \param size Size of the unreal struct to allocate.
+ * \param destructor Destructor callback.
+ * \param cap Format capabilities to give the unreal private struct.
+ *
+ * \retval pvt on success.
+ * \retval NULL on error.
+ */
+struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology);
+
+/*!
  * \brief Create the semi1 and semi2 unreal channels.
  * \since 12.0.0
  *
diff --git a/main/core_local.c b/main/core_local.c
index fa69169..5788668 100644
--- a/main/core_local.c
+++ b/main/core_local.c
@@ -48,6 +48,7 @@
 #include "asterisk/stasis_channels.h"
 #include "asterisk/_private.h"
 #include "asterisk/stasis_channels.h"
+#include "asterisk/stream.h"
 
 /*** DOCUMENTATION
 	<manager name="LocalOptimizeAway" language="en_US">
@@ -136,6 +137,7 @@
 static struct ao2_container *locals;
 
 static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
+static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
 static int local_call(struct ast_channel *ast, const char *dest, int timeout);
 static int local_hangup(struct ast_channel *ast);
 static int local_devicestate(const char *data);
@@ -171,14 +173,15 @@
 	.type = "Local",
 	.description = tdesc,
 	.requester = local_request,
+	.requester_with_stream_topology = local_request_with_stream_topology,
 	.send_digit_begin = ast_unreal_digit_begin,
 	.send_digit_end = ast_unreal_digit_end,
 	.call = local_call,
 	.hangup = local_hangup,
 	.answer = ast_unreal_answer,
-	.read = ast_unreal_read,
+	.read_stream = ast_unreal_read,
 	.write = ast_unreal_write,
-	.write_video = ast_unreal_write,
+	.write_stream = ast_unreal_write_stream,
 	.exception = ast_unreal_read,
 	.indicate = ast_unreal_indicate,
 	.fixup = ast_unreal_fixup,
@@ -859,14 +862,14 @@
 }
 
 /*! \brief Create a call structure */
-static struct local_pvt *local_alloc(const char *data, struct ast_format_cap *cap)
+static struct local_pvt *local_alloc(const char *data, struct ast_stream_topology *topology)
 {
 	struct local_pvt *pvt;
 	char *parse;
 	char *context;
 	char *opts;
 
-	pvt = (struct local_pvt *) ast_unreal_alloc(sizeof(*pvt), local_pvt_destructor, cap);
+	pvt = (struct local_pvt *) ast_unreal_alloc_stream_topology(sizeof(*pvt), local_pvt_destructor, topology);
 	if (!pvt) {
 		return NULL;
 	}
@@ -917,12 +920,30 @@
 /*! \brief Part of PBX interface */
 static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
 {
+	struct ast_stream_topology *topology;
+	struct ast_channel *chan;
+
+	topology = ast_stream_topology_create_from_format_cap(cap);
+	if (!topology) {
+		return NULL;
+	}
+
+	chan = local_request_with_stream_topology(type, topology, assignedids, requestor, data, cause);
+
+	ast_stream_topology_free(topology);
+
+	return chan;
+}
+
+/*! \brief Part of PBX interface */
+static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
+{
 	struct local_pvt *p;
 	struct ast_channel *chan;
 	ast_callid callid;
 
 	/* Allocate a new private structure and then Asterisk channels */
-	p = local_alloc(data, cap);
+	p = local_alloc(data, topology);
 	if (!p) {
 		return NULL;
 	}
@@ -937,6 +958,7 @@
 	return chan;
 }
 
+
 /*! \brief CLI command "local show channels" */
 static char *locals_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
diff --git a/main/core_unreal.c b/main/core_unreal.c
index 763be4f..03664d9 100644
--- a/main/core_unreal.c
+++ b/main/core_unreal.c
@@ -40,6 +40,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/bridge.h"
 #include "asterisk/core_unreal.h"
+#include "asterisk/stream.h"
 
 static unsigned int name_sequence = 0;
 
@@ -316,6 +317,11 @@
 
 int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f)
 {
+	return ast_unreal_write_stream(ast, -1, f);
+}
+
+int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f)
+{
 	struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
 	int res = -1;
 
@@ -337,6 +343,13 @@
 		}
 	}
 
+	/* Update the frame to reflect the stream */
+	f->stream_num = stream_num;
+
+	if (stream_num == 3) {
+		ast_log(LOG_NOTICE, "Wrote video in!\n");
+	}
+
 	/* Just queue for delivery to the other side */
 	ao2_ref(p, 1);
 	ao2_lock(p);
@@ -530,6 +543,86 @@
 	return res;
 }
 
+/*!
+ * \internal
+ * \brief Handle stream topology change request.
+ * \since 16.12.0
+ * \since 17.6.0
+ *
+ * \param p Unreal private structure.
+ * \param ast Channel indicating the condition.
+ * \param topology The requested topology.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int unreal_colp_stream_topology_request_change(struct ast_unreal_pvt *p, struct ast_channel *ast, const struct ast_stream_topology *topology)
+{
+	struct ast_stream_topology *this_channel_topology;
+	struct ast_stream_topology *the_other_channel_topology;
+	int i;
+	struct ast_stream *stream;
+	struct ast_channel *my_chan;
+	struct ast_channel *my_owner;
+	struct ast_channel *this_channel;
+	struct ast_channel *the_other_channel;
+	int res = 0;
+
+	this_channel_topology = ast_stream_topology_clone(topology);
+	if (!this_channel_topology) {
+		return -1;
+	}
+
+	the_other_channel_topology = ast_stream_topology_clone(topology);
+	if (!the_other_channel_topology) {
+		ast_stream_topology_free(this_channel_topology);
+		return -1;
+	}
+
+	/* We swap the stream state on the other channel because it is as if the channel is
+	 * connected to an external endpoint, so the perspective changes.
+	 */
+	for (i = 0; i < ast_stream_topology_get_count(the_other_channel_topology); ++i) {
+		stream = ast_stream_topology_get_stream(the_other_channel_topology, i);
+
+		if (ast_stream_get_state(stream) == AST_STREAM_STATE_RECVONLY) {
+			ast_stream_set_state(stream, AST_STREAM_STATE_SENDONLY);
+		} else if (ast_stream_get_state(stream) == AST_STREAM_STATE_SENDONLY) {
+			ast_stream_set_state(stream, AST_STREAM_STATE_RECVONLY);
+		}
+	}
+
+	ast_channel_unlock(ast);
+	ast_unreal_lock_all(p, &my_chan, &my_owner);
+	if (AST_UNREAL_IS_OUTBOUND(ast, p)) {
+		this_channel = p->chan;
+		the_other_channel = p->owner;
+	} else {
+		this_channel = p->owner;
+		the_other_channel = p->chan;
+	}
+	if (this_channel) {
+		ast_channel_set_stream_topology(this_channel, this_channel_topology);
+		ast_queue_control(this_channel, AST_CONTROL_STREAM_TOPOLOGY_CHANGED);
+	}
+	if (the_other_channel) {
+		ast_channel_set_stream_topology(the_other_channel, the_other_channel_topology);
+		ast_channel_stream_topology_changed_externally(the_other_channel);
+	}
+	if (my_chan) {
+		ast_channel_unlock(my_chan);
+		ast_channel_unref(my_chan);
+	}
+	if (my_owner) {
+		ast_channel_unlock(my_owner);
+		ast_channel_unref(my_owner);
+	}
+	ao2_unlock(p);
+	ast_channel_lock(ast);
+
+	return res;
+}
+
 int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen)
 {
 	struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
@@ -583,6 +676,11 @@
 		unreal_queue_indicate(p, ast, condition, data, datalen);
 		res = -1;
 		break;
+	case AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE:
+		if (ast_channel_is_multistream(ast)) {
+			res = unreal_colp_stream_topology_request_change(p, ast, data);
+		}
+		break;
 	default:
 		res = unreal_queue_indicate(p, ast, condition, data, datalen);
 		break;
@@ -916,10 +1014,29 @@
 
 	ao2_cleanup(doomed->reqcap);
 	doomed->reqcap = NULL;
+	ast_stream_topology_free(doomed->reqtopology);
+	doomed->reqtopology = NULL;
 }
 
 struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap)
 {
+	struct ast_stream_topology *topology;
+	struct ast_unreal_pvt *unreal;
+
+	topology = ast_stream_topology_create_from_format_cap(cap);
+	if (!topology) {
+		return NULL;
+	}
+
+	unreal = ast_unreal_alloc_stream_topology(size, destructor, topology);
+
+	ast_stream_topology_free(topology);
+
+	return unreal;
+}
+
+struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology)
+{
 	struct ast_unreal_pvt *unreal;
 
 	static const struct ast_jb_conf jb_conf = {
@@ -935,12 +1052,17 @@
 		return NULL;
 	}
 
-	unreal->reqcap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
+	unreal->reqtopology = ast_stream_topology_clone(topology);
+	if (!unreal->reqtopology) {
+		ao2_ref(unreal, -1);
+		return NULL;
+	}
+
+	unreal->reqcap = ast_format_cap_from_stream_topology(topology);
 	if (!unreal->reqcap) {
 		ao2_ref(unreal, -1);
 		return NULL;
 	}
-	ast_format_cap_append_from_cap(unreal->reqcap, cap, AST_MEDIA_TYPE_UNKNOWN);
 
 	memcpy(&unreal->jb_conf, &jb_conf, sizeof(unreal->jb_conf));
 
@@ -958,6 +1080,7 @@
 	struct ast_assigned_ids id1 = {NULL, NULL};
 	struct ast_assigned_ids id2 = {NULL, NULL};
 	int generated_seqno = ast_atomic_fetchadd_int((int *) &name_sequence, +1);
+	struct ast_stream_topology *topology;
 
 	/* set unique ids for the two channels */
 	if (assignedids && !ast_strlen_zero(assignedids->uniqueid)) {
@@ -975,6 +1098,14 @@
 		id2.uniqueid = uniqueid2;
 	}
 
+	/* We need to create a topology to place on the first channel, as we can't
+	 * share a single one between both.
+	 */
+	topology = ast_stream_topology_clone(p->reqtopology);
+	if (!topology) {
+		return NULL;
+	}
+
 	/*
 	 * Allocate two new Asterisk channels
 	 *
@@ -987,6 +1118,7 @@
 		"%s/%s-%08x;1", tech->type, p->name, (unsigned)generated_seqno);
 	if (!owner) {
 		ast_log(LOG_WARNING, "Unable to allocate owner channel structure\n");
+		ast_stream_topology_free(topology);
 		return NULL;
 	}
 
@@ -1000,6 +1132,10 @@
 
 	ast_channel_nativeformats_set(owner, p->reqcap);
 
+	if (ast_channel_is_multistream(owner)) {
+		ast_channel_set_stream_topology(owner, topology);
+	}
+
 	/* Determine our read/write format and set it on each channel */
 	fmt = ast_format_cap_get_format(p->reqcap, 0);
 	if (!fmt) {
@@ -1054,6 +1190,11 @@
 
 	ast_channel_nativeformats_set(chan, p->reqcap);
 
+	if (ast_channel_is_multistream(chan)) {
+		ast_channel_set_stream_topology(chan, p->reqtopology);
+		p->reqtopology = NULL;
+	}
+
 	/* Format was already determined when setting up owner */
 	ast_channel_set_writeformat(chan, fmt);
 	ast_channel_set_rawwriteformat(chan, fmt);

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/14505
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-Change-Id: I4e9d94da5230d4bd046dc755651493fce1d87186
Gerrit-Change-Number: 14505
Gerrit-PatchSet: 1
Gerrit-Owner: Joshua Colp <jcolp at sangoma.com>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20200608/008e43fb/attachment-0001.html>


More information about the asterisk-code-review mailing list