[Asterisk-code-review] res rtp asterisk: Add support for sending NACK requests. (asterisk[15])

Benjamin Keith Ford asteriskteam at digium.com
Tue Jun 19 16:55:36 CDT 2018


Benjamin Keith Ford has uploaded this change for review. ( https://gerrit.asterisk.org/9224


Change subject: res_rtp_asterisk: Add support for sending NACK requests.
......................................................................

res_rtp_asterisk: Add support for sending NACK requests.

Support has been added for receiving a NACK request and handling it.
Now, Asterisk can detect when a NACK request should be sent and knows
how to construct one based on the packets we've received from the remote
end. A buffer has been added that will store out of order frames until
we receive the packet we are expecting. Then, these frames are queued to
the core like normal. Asterisk knows which packets to request in the
NACK request using a vector which stores the sequence numbers of the
packets we are currently missing.

If a missing packet is received, cycle through the buffer until we reach
another packet we have not received yet. If the buffer reaches a certain
size, send a NACK request. If the buffer reaches its max size, queue all
frames to the core and wipe the buffer and vector.

According to RFC3711, the NACK request must be sent out in a compound
packet. All compound packets must start with a sender or receiver
report, so some work was done to refactor the current sender / receiver
code to allow it to be used without having to also include sdes
information and automatically send the report.

Also added additional functionality to ast_data_buffer, along with some
testing.

For more information, refer to the wiki page:
https://wiki.asterisk.org/wiki/display/AST/WebRTC+User+Experience+Improvements

ASTERISK-27810 #close

Change-Id: Idab644b08a1593659c92cda64132ccc203fe991d
---
M include/asterisk/data_buffer.h
M main/data_buffer.c
M res/res_rtp_asterisk.c
M tests/test_data_buffer.c
4 files changed, 450 insertions(+), 72 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/24/9224/1

diff --git a/include/asterisk/data_buffer.h b/include/asterisk/data_buffer.h
index dacbaa5..999b1b1 100644
--- a/include/asterisk/data_buffer.h
+++ b/include/asterisk/data_buffer.h
@@ -111,6 +111,35 @@
 void *ast_data_buffer_get(const struct ast_data_buffer *buffer, size_t pos);
 
 /*!
+ * \brief Retrieve a data payload from the data buffer
+ *
+ * \param buffer The data buffer
+ * \param pos The position of the data payload
+ *
+ * \retval non-NULL success
+ * \retval NULL failure
+ *
+ * \note This DOES remove the data payload from the data buffer. It does not free it, though.
+ *
+ * \since 15.5.0
+ */
+void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos);
+
+/*!
+ * \brief Retrieve the first payload from the data buffer
+ *
+ * \param buffer The data buffer
+ *
+ * \retval non-NULL success
+ * \retval NULL failure
+ *
+ * \note This DOES remove the data payload from the data buffer.
+ *
+ * \since 15.5.0
+ */
+void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer);
+
+/*!
  * \brief Free a data buffer (and all held data payloads)
  *
  * \param buffer The data buffer
diff --git a/main/data_buffer.c b/main/data_buffer.c
index ccbffd2..196ccf4 100644
--- a/main/data_buffer.c
+++ b/main/data_buffer.c
@@ -281,6 +281,46 @@
 	return NULL;
 }
 
+void *ast_data_buffer_remove(struct ast_data_buffer *buffer, size_t pos)
+{
+	struct data_buffer_payload_entry *buffer_payload;
+
+	ast_assert(buffer != NULL);
+
+	AST_LIST_TRAVERSE_SAFE_BEGIN(&buffer->payloads, buffer_payload, list) {
+		if (buffer_payload->pos == pos) {
+			void *payload = buffer_payload->payload;
+			AST_LIST_REMOVE_CURRENT(list);
+			buffer_payload->payload = NULL;
+			buffer->count--;
+			ast_free(buffer_payload);
+			return payload;
+		}
+	}
+	AST_LIST_TRAVERSE_SAFE_END;
+
+	return NULL;
+}
+
+void *ast_data_buffer_remove_head(struct ast_data_buffer *buffer)
+{
+	struct data_buffer_payload_entry *buffer_payload;
+
+	ast_assert(buffer != NULL);
+
+	if (buffer->count > 0) {
+		void *payload;
+		buffer_payload = AST_LIST_REMOVE_HEAD(&buffer->payloads, list);
+		payload = buffer_payload->payload;
+		buffer_payload->payload = NULL;
+		buffer->count--;
+		ast_free(buffer_payload);
+		return payload;
+	}
+
+	return NULL;
+}
+
 void ast_data_buffer_free(struct ast_data_buffer *buffer)
 {
 	struct data_buffer_payload_entry *buffer_payload;
diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c
index 4ac20d5..7a1da50 100644
--- a/res/res_rtp_asterisk.c
+++ b/res/res_rtp_asterisk.c
@@ -93,7 +93,8 @@
 
 #define TURN_STATE_WAIT_TIME 2000
 
-#define DEFAULT_RTP_BUFFER_SIZE	250
+#define DEFAULT_RTP_SEND_BUFFER_SIZE	250
+#define DEFAULT_RTP_RECV_BUFFER_SIZE	20
 
 /*! Full INTRA-frame Request / Fast Update Request (From RFC2032) */
 #define RTCP_PT_FUR     192
@@ -312,6 +313,8 @@
 	unsigned int lastotexttimestamp;
 	unsigned int lasteventseqn;
 	int lastrxseqno;                /*!< Last received sequence number, from the network */
+	int expectedrxseqno;		/*!< Next expected sequence number, from the network */
+	AST_VECTOR(, int) missing_seqno; /*!< A vector of sequence numbers we never received */
 	int expectedseqno;		/*!< Next expected sequence number, from the core */
 	unsigned short seedrxseqno;     /*!< What sequence number did they start with?*/
 	unsigned int seedrxts;          /*!< What RTP timestamp did they start with? */
@@ -377,6 +380,7 @@
 	struct rtp_red *red;
 
 	struct ast_data_buffer *send_buffer;		/*!< Buffer for storing sent packets for retransmission */
+	struct ast_data_buffer *recv_buffer;		/*!< Buffer for storing frames from received packets for retransmission */
 
 #ifdef HAVE_PJPROJECT
 	ast_cond_t cond;            /*!< ICE/TURN condition for signaling */
@@ -2749,6 +2753,30 @@
 }
 #endif
 
+/*! \brief Helper function to compare an elem in a vector by value */
+static int compare_by_value(int elem, int value)
+{
+	if (elem < value) {
+		return -1;
+	}
+
+	if (value < elem) {
+		return 1;
+	}
+
+	return 0;
+}
+
+/*! \brief Helper function to find an elem in a vector by value */
+static int find_by_value(int elem, int value)
+{
+	if (elem == value) {
+		return 1;
+	}
+
+	return 0;
+}
+
 static int rtcp_mux(struct ast_rtp *rtp, const unsigned char *packet)
 {
 	uint8_t version;
@@ -3607,6 +3635,7 @@
 	rtp->ssrc = ast_random();
 	ast_uuid_generate_str(rtp->cname, sizeof(rtp->cname));
 	rtp->seqno = ast_random() & 0x7fff;
+	rtp->expectedrxseqno = -1;
 	rtp->expectedseqno = -1;
 	rtp->sched = sched;
 	ast_sockaddr_copy(&rtp->bind_address, addr);
@@ -3691,10 +3720,16 @@
 		ast_data_buffer_free(rtp->send_buffer);
 	}
 
+	/* Destroy the recv buffer if it was being used */
+	if (rtp->recv_buffer) {
+		ast_data_buffer_free(rtp->recv_buffer);
+	}
+
 	ao2_cleanup(rtp->lasttxformat);
 	ao2_cleanup(rtp->lastrxformat);
 	ao2_cleanup(rtp->f.subclass.format);
 	AST_VECTOR_FREE(&rtp->ssrc_mapping);
+	AST_VECTOR_FREE(&rtp->missing_seqno);
 
 	/* Finally destroy ourselves */
 	ast_free(rtp);
@@ -4056,39 +4091,25 @@
 	rtp->rtcp->rxlost_count++;
 }
 
-/*!
- * \brief Send RTCP SR or RR report
- *
- * \pre instance is locked
- */
-static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr)
+static int ast_rtcp_generate_report(struct ast_rtp_instance *instance, unsigned char *rtcpheader,
+		struct ast_rtp_rtcp_report *rtcp_report, int *sr)
 {
 	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-	RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref);
-	int res;
 	int len = 0;
-	uint16_t sdes_packet_len_bytes, sdes_packet_len_rounded;
 	struct timeval now;
 	unsigned int now_lsw;
 	unsigned int now_msw;
-	unsigned char *rtcpheader;
 	unsigned int lost_packets;
 	int fraction_lost;
 	struct timeval dlsr = { 0, };
-	unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */
-	int rate = rtp_get_rate(rtp->f.subclass.format);
-	int ice;
-	struct ast_sockaddr remote_address = { { 0, } };
+	int rate;
 	struct ast_rtp_rtcp_report_block *report_block = NULL;
-	RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report,
-			ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0),
-			ao2_cleanup);
 
 	if (!rtp || !rtp->rtcp) {
 		return 0;
 	}
 
-	if (ast_sockaddr_isnull(&rtp->rtcp->them)) {  /* This'll stop rtcp for this rtp session */
+	if (ast_sockaddr_isnull(&rtp->rtcp->them)) { /* This'll stop rtcp for this rtp session */
 		/* RTCP was stopped. */
 		return 0;
 	}
@@ -4096,6 +4117,9 @@
 	if (!rtcp_report) {
 		return 1;
 	}
+
+	*sr = rtp->txcount > rtp->rtcp->lastsrtxcount ? 1 : 0;
+	rate = rtp_get_rate(rtp->f.subclass.format);
 
 	/* Compute statistics */
 	calculate_lost_packet_statistics(rtp, &lost_packets, &fraction_lost);
@@ -4131,11 +4155,10 @@
 		}
 	}
 	timeval2ntp(rtcp_report->sender_information.ntp_timestamp, &now_msw, &now_lsw);
-	rtcpheader = bdata;
 	put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc)); /* Our SSRC */
 	len += 8;
 	if (sr) {
-		put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970*/
+		put_unaligned_uint32(rtcpheader + len, htonl(now_msw)); /* now, MSW. gettimeofday() + SEC_BETWEEN_1900_AND_1970 */
 		put_unaligned_uint32(rtcpheader + len + 4, htonl(now_lsw)); /* now, LSW */
 		put_unaligned_uint32(rtcpheader + len + 8, htonl(rtcp_report->sender_information.rtp_timestamp));
 		put_unaligned_uint32(rtcpheader + len + 12, htonl(rtcp_report->sender_information.packet_count));
@@ -4153,53 +4176,34 @@
 	}
 
 	put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (rtcp_report->reception_report_count << 24)
-					| ((sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1)));
+				| ((sr ? RTCP_PT_SR : RTCP_PT_RR) << 16) | ((len/4)-1)));
 
-	sdes_packet_len_bytes =
-		4 + /* RTCP Header */
-		4 + /* SSRC */
-		1 + /* Type (CNAME) */
-		1 + /* Text Length */
-		AST_UUID_STR_LEN /* Text and NULL terminator */
-		;
+	return len;
+}
 
-	/* Round to 32 bit boundary */
-	sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3;
+static int ast_rtcp_calculate_sr_rr_statistics(struct ast_rtp_instance *instance,
+		struct ast_rtp_rtcp_report *rtcp_report, struct ast_sockaddr remote_address, int ice, int sr)
+{
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	int rate;
+	struct ast_rtp_rtcp_report_block *report_block = NULL;
+	RAII_VAR(struct ast_json *, message_blob, NULL, ast_json_unref);
 
-	put_unaligned_uint32(rtcpheader + len, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1)));
-	put_unaligned_uint32(rtcpheader + len + 4, htonl(rtcp_report->ssrc));
-	rtcpheader[len + 8] = 0x01; /* CNAME */
-	rtcpheader[len + 9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */
-	memcpy(rtcpheader + len + 10, rtp->cname, AST_UUID_STR_LEN);
-	len += 10 + AST_UUID_STR_LEN;
-
-	/* Padding - Note that we don't set the padded bit on the packet. From
-	 * RFC 3550 Section 6.5:
-	 *
-	 *    No length octet follows the null item type octet, but additional null
-	 *    octets MUST be included if needed to pad until the next 32-bit
-	 *    boundary.  Note that this padding is separate from that indicated by
-	 *    the P bit in the RTCP header.
-	 *
-	 * These bytes will already be zeroed out during array initialization.
-	 */
-	len += (sdes_packet_len_rounded - sdes_packet_len_bytes);
-
-	if (rtp->bundled) {
-		ast_rtp_instance_get_remote_address(instance, &remote_address);
-	} else {
-		ast_sockaddr_copy(&remote_address, &rtp->rtcp->them);
-	}
-	res = rtcp_sendto(instance, (unsigned int *)rtcpheader, len, 0, &remote_address, &ice);
-	if (res < 0) {
-		ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n",
-			sr ? "SR" : "RR",
-			ast_sockaddr_stringify(&rtp->rtcp->them),
-			strerror(errno));
+	if (!rtp || !rtp->rtcp) {
 		return 0;
 	}
 
-	/* Update RTCP SR/RR statistics */
+	if (ast_sockaddr_isnull(&rtp->rtcp->them)) {
+		return 0;
+	}
+
+	if (!rtcp_report) {
+		return -1;
+	}
+
+	report_block = rtcp_report->report_block[0];
+	rate = rtp_get_rate(rtp->f.subclass.format);
+
 	if (sr) {
 		rtp->rtcp->txlsr = rtcp_report->sender_information.ntp_timestamp;
 		rtp->rtcp->sr_count++;
@@ -4236,9 +4240,121 @@
 			"to", ast_sockaddr_stringify(&remote_address),
 			"from", rtp->rtcp->local_addr_str);
 	ast_rtp_publish_rtcp_message(instance, ast_rtp_rtcp_sent_type(),
-			rtcp_report,
-			message_blob);
-	return res;
+			rtcp_report, message_blob);
+
+	return 1;
+}
+
+static int ast_rtcp_generate_sdes(struct ast_rtp_instance *instance, unsigned char *rtcpheader,
+		struct ast_rtp_rtcp_report *rtcp_report)
+{
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	int len = 0;
+	uint16_t sdes_packet_len_bytes;
+	uint16_t sdes_packet_len_rounded;
+
+	if (!rtp || !rtp->rtcp) {
+		return 0;
+	}
+
+	if (ast_sockaddr_isnull(&rtp->rtcp->them)) {
+		return 0;
+	}
+
+	if (!rtcp_report) {
+		return -1;
+	}
+
+	sdes_packet_len_bytes =
+		4 + /* RTCP Header */
+		4 + /* SSRC */
+		1 + /* Type (CNAME) */
+		1 + /* Text Length */
+		AST_UUID_STR_LEN /* Text and NULL terminator */
+		;
+
+	/* Round to 32 bit boundary */
+	sdes_packet_len_rounded = (sdes_packet_len_bytes + 3) & ~0x3;
+
+	put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (1 << 24) | (RTCP_PT_SDES << 16) | ((sdes_packet_len_rounded / 4) - 1)));
+	put_unaligned_uint32(rtcpheader + 4, htonl(rtcp_report->ssrc));
+	rtcpheader[8] = 0x01; /* CNAME */
+	rtcpheader[9] = AST_UUID_STR_LEN - 1; /* Number of bytes of text */
+	memcpy(rtcpheader + 10, rtp->cname, AST_UUID_STR_LEN);
+	len += 10 + AST_UUID_STR_LEN;
+
+	/* Padding - Note that we don't set the padded bit on the packet. From
+	 * RFC 3550 Section 6.5:
+	 *
+	 *   No length octet follows the null item type octet, but additional null
+	 *   octets MUST be included if needd to pad until the next 32-bit
+	 *   boundary. Note that this padding is separate from that indicated by
+	 *   the P bit in the RTCP header.
+	 *
+	 * These bytes will already be zeroed out during array initialization.
+	 */
+	len += (sdes_packet_len_rounded - sdes_packet_len_bytes);
+
+	return len;
+}
+
+static int ast_rtcp_generate_nack(struct ast_rtp_instance *instance, unsigned char *rtcpheader)
+{
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	int packet_len;
+	int blp_index;
+	int current_seqno;
+	int seqno;
+	unsigned int fci;
+
+	if (!rtp || !rtp->rtcp) {
+		return 0;
+	}
+
+	if (ast_sockaddr_isnull(&rtp->rtcp->them)) {
+		return 0;
+	}
+
+	current_seqno = rtp->expectedrxseqno;
+	seqno = rtp->lastrxseqno;
+	packet_len = 12; /* The header length is 12 (version line, packet source SSRC, media source SSRC) */
+
+	/* Get the missing sequence numbers for the FCI section of the NACK request */
+	for (blp_index = 0, fci = 0; current_seqno < seqno; current_seqno++, blp_index++) {
+		int *missing_seqno;
+
+		missing_seqno = AST_VECTOR_GET_CMP(&rtp->missing_seqno, current_seqno,
+				find_by_value);
+
+		if (!missing_seqno) {
+			continue;
+		}
+
+		/* We hit the max blp size, reset */
+		if (blp_index >= 17) {
+			put_unaligned_uint32(rtcpheader + packet_len, htonl(fci));
+			fci = 0;
+			blp_index = 0;
+			packet_len += 4;
+		}
+
+		if (blp_index == 0) {
+			fci |= (current_seqno << 16);
+		} else {
+			fci |= (1 << (blp_index - 1));
+		}
+	}
+
+	put_unaligned_uint32(rtcpheader + packet_len, htonl(fci));
+	packet_len += 4;
+
+	/* Length MUST be 2+n, where n is the number of NACKs. Same as length in words minus 1 */
+	put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (AST_RTP_RTCP_FMT_NACK << 24)
+				| (AST_RTP_RTCP_RTPFB << 16) | ((packet_len / 4) - 1)));
+	put_unaligned_uint32(rtcpheader + 4, htonl(rtp->ssrc));
+	put_unaligned_uint32(rtcpheader + 8, htonl(rtp->themssrc));
+
+	return packet_len;
 }
 
 /*!
@@ -4254,6 +4370,15 @@
 	struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data;
 	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
 	int res;
+	int sr;
+	int packet_len = 0;
+	int ice;
+	struct ast_sockaddr remote_address = { { 0, } };
+	unsigned char *rtcpheader;
+	unsigned char bdata[AST_UUID_STR_LEN + 128] = ""; /* More than enough */
+	RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report,
+			ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0),
+			ao2_cleanup);
 
 	if (!rtp || !rtp->rtcp || rtp->rtcp->schedid == -1) {
 		ao2_ref(instance, -1);
@@ -4261,13 +4386,44 @@
 	}
 
 	ao2_lock(instance);
-	if (rtp->txcount > rtp->rtcp->lastsrtxcount) {
-		/* Send an SR */
-		res = ast_rtcp_write_report(instance, 1);
-	} else {
-		/* Send an RR */
-		res = ast_rtcp_write_report(instance, 0);
+	rtcpheader = bdata;
+
+	res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr);
+
+	if (res == 0 || res == 1) {
+		ast_debug(1, "Failed to add %s report to RTCP packet!\n", sr ? "SR" : "RR");
+		goto cleanup;
 	}
+
+	packet_len += res;
+
+	res = ast_rtcp_generate_sdes(instance, rtcpheader + packet_len, rtcp_report);
+
+	if (res == 0 || res == 1) {
+		ast_debug(1, "Failed to add SDES to RTCP packet!\n");
+		goto cleanup;
+	}
+
+	packet_len += res;
+
+	if (rtp->bundled) {
+		ast_rtp_instance_get_remote_address(instance, &remote_address);
+	} else {
+		ast_sockaddr_copy(&remote_address, &rtp->rtcp->them);
+	}
+
+	res = rtcp_sendto(instance, (unsigned int *)rtcpheader, packet_len, 0, &remote_address, &ice);
+	if (res < 0) {
+		ast_log(LOG_ERROR, "RTCP %s transmission error to %s, rtcp halted %s\n",
+				sr ? "SR" : "RR",
+				ast_sockaddr_stringify(&rtp->rtcp->them),
+				strerror(errno));
+		res = 0;
+	} else {
+		ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr);
+	}
+
+cleanup:
 	ao2_unlock(instance);
 
 	if (!res) {
@@ -6600,6 +6756,124 @@
 		rtp->f.delivery.tv_usec = 0;
 		/* Pass the RTP marker bit as bit */
 		rtp->f.subclass.frame_ending = mark ? 1 : 0;
+
+		/* If retransmissions are enabled, we need to do some extra work */
+		if (rtp->recv_buffer) {
+			AST_VECTOR_REMOVE_CMP_ORDERED(&rtp->missing_seqno, seqno, find_by_value,
+					AST_VECTOR_ELEM_CLEANUP_NOOP);
+			if (rtp->expectedrxseqno == -1 || abs(seqno - rtp->expectedrxseqno) > 100) {
+				/* Define our starting point, or recover from a large gap of lost packets */
+				rtp->expectedrxseqno = seqno + 1;
+			} else if (rtp->expectedrxseqno == seqno) {
+				/* We got what we expected - add frames from buffer until we hit another missing packet */
+				AST_LIST_INSERT_TAIL(&frames, &rtp->f, frame_list);
+				rtp->expectedrxseqno++;
+				while (ast_data_buffer_count(rtp->recv_buffer)) {
+					struct ast_frame *f;
+
+					f = (struct ast_frame *)ast_data_buffer_remove(rtp->recv_buffer, rtp->expectedrxseqno);
+					if (f) {
+						AST_LIST_INSERT_TAIL(&frames, f, frame_list);
+					} else {
+						break;
+					}
+
+					rtp->expectedrxseqno++;
+				}
+				if (!(AST_LIST_EMPTY(&frames))) {
+					return AST_LIST_FIRST(&frames);
+				}
+			} else {
+				/* We got an out of order packet */
+				struct ast_frame *f = NULL;
+				int difference;
+
+				f = ast_frdup(&rtp->f);
+				if (f) {
+					ast_data_buffer_put(rtp->recv_buffer, seqno, f);
+				}
+
+				difference = seqno - (prev_seqno + 1);
+				if (difference > 0) {
+					/* We missed some packets, so let's add them to the vector */
+
+					while (difference > 0) {
+						/* We don't want missing sequence number duplicates */
+						if (AST_VECTOR_GET_CMP(&rtp->missing_seqno, seqno - difference,
+									find_by_value)) {
+							difference--;
+							continue;
+						}
+
+						AST_VECTOR_ADD_SORTED(&rtp->missing_seqno, seqno - difference,
+								compare_by_value);
+						difference--;
+					}
+				}
+
+				/* Time to send a NACK request */
+				if (ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer) / 2) {
+					int packet_len = 0;
+					int res = 0;
+					int ice;
+					int sr;
+					size_t data_size = AST_UUID_STR_LEN + 128 + (seqno - rtp->expectedrxseqno) / 17;
+					RAII_VAR(unsigned char *, rtcpheader, NULL, ast_free_ptr);
+					RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report,
+							ast_rtp_rtcp_report_alloc(rtp->themssrc_valid ? 1 : 0),
+							ao2_cleanup);
+
+					rtcpheader = ast_malloc(sizeof(*rtcpheader) + data_size);
+					if (!rtcpheader) {
+						ast_debug(1, "Failed to allocate memory for NACK\n");
+						return &ast_null_frame;
+					}
+
+					memset(rtcpheader, 0, data_size);
+
+					res = ast_rtcp_generate_report(instance, rtcpheader, rtcp_report, &sr);
+
+					if (res == 0 || res == 1) {
+						ast_debug(1, "Failed to add %s report to NACK, stopping here\n", sr ? "SR" : "RR");
+						return &ast_null_frame;
+					}
+
+					packet_len += res;
+
+					res = ast_rtcp_generate_nack(instance, rtcpheader + packet_len);
+
+					if (res == 0) {
+						ast_debug(1, "Failed to construct NACK, stopping here\n");
+						return &ast_null_frame;
+					}
+
+					packet_len += res;
+
+					res = rtcp_sendto(instance, rtcpheader, packet_len, 0, &remote_address, &ice);
+					if (res < 0) {
+						ast_debug(1, "Failed to send NACK request out\n");
+					} else {
+						/* Update RTCP SR/RR statistics */
+						ast_rtcp_calculate_sr_rr_statistics(instance, rtcp_report, remote_address, ice, sr);
+					}
+				} else if (ast_data_buffer_count(rtp->recv_buffer) == ast_data_buffer_max(rtp->recv_buffer)) {
+					/* Things are a little out of hand. Salvage what we can and start fresh */
+					while (ast_data_buffer_count(rtp->recv_buffer)) {
+						f = (struct ast_frame *)ast_data_buffer_remove_head(rtp->recv_buffer);
+						if (f) {
+							AST_LIST_INSERT_TAIL(&frames, f, frame_list);
+						}
+					}
+					AST_VECTOR_RESET(&rtp->missing_seqno, AST_VECTOR_ELEM_CLEANUP_NOOP);
+					rtp->expectedrxseqno = seqno + 1;
+					if (!(AST_LIST_EMPTY(&frames))) {
+						return AST_LIST_FIRST(&frames);
+					}
+				}
+
+				return &ast_null_frame;
+			}
+		}
 	} else if (ast_format_get_type(rtp->f.subclass.format) == AST_MEDIA_TYPE_TEXT) {
 		/* TEXT -- samples is # of samples vs. 1000 */
 		if (!rtp->lastitexttimestamp)
@@ -6766,7 +7040,10 @@
 	} else if (property == AST_RTP_PROPERTY_ASYMMETRIC_CODEC) {
 		rtp->asymmetric_codec = value;
 	} else if (property == AST_RTP_PROPERTY_RETRANS_SEND) {
-		rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_BUFFER_SIZE);
+		rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_SEND_BUFFER_SIZE);
+	} else if (property == AST_RTP_PROPERTY_RETRANS_RECV) {
+		rtp->recv_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_RECV_BUFFER_SIZE);
+		AST_VECTOR_INIT(&rtp->missing_seqno, 0);
 	}
 }
 
diff --git a/tests/test_data_buffer.c b/tests/test_data_buffer.c
index 11fdc7b..a27050b 100644
--- a/tests/test_data_buffer.c
+++ b/tests/test_data_buffer.c
@@ -217,6 +217,7 @@
 AST_TEST_DEFINE(buffer_nominal)
 {
 	RAII_VAR(struct ast_data_buffer *, buffer, NULL, ast_data_buffer_free_wrapper);
+	RAII_VAR(struct mock_payload *, removed_payload, NULL, ast_free_ptr);
 	struct mock_payload *payload;
 	struct mock_payload *fetched_payload;
 	int ret;
@@ -247,6 +248,9 @@
 				"Failed to allocate memory for payload %d", i);
 
 		ret = ast_data_buffer_put(buffer, i, payload);
+		if (ret != 0) {
+			ast_free(payload);
+		}
 
 		ast_test_validate(test, ret == 0,
 				"Failed to add payload %d to buffer", i);
@@ -268,7 +272,11 @@
 		ast_test_validate(test, payload != NULL,
 				"Failed to allocate memory for payload %d", i + BUFFER_MAX_NOMINAL);
 
+		payload->id = i;
 		ret = ast_data_buffer_put(buffer, i + BUFFER_MAX_NOMINAL, payload);
+		if (ret != 0) {
+			ast_free(payload);
+		}
 
 		ast_test_validate(test, ret == 0,
 				"Failed to add payload %d to buffer", i + BUFFER_MAX_NOMINAL);
@@ -289,6 +297,30 @@
 				"Failed to get payload at position %d during second loop", i + BUFFER_MAX_NOMINAL);
 	}
 
+	removed_payload = (struct mock_payload *)ast_data_buffer_remove_head(buffer);
+
+	ast_test_validate(test, removed_payload != NULL,
+			"Failed to get the payload at the HEAD of the buffer");
+
+	ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 1,
+			"Removing payload from HEAD of buffer did not decrease buffer size");
+
+	ast_test_validate(test, removed_payload->id == 1,
+			"Removing payload from HEAD of buffer did not return expected payload");
+
+	ast_free(removed_payload);
+
+	removed_payload = (struct mock_payload *)ast_data_buffer_remove(buffer, BUFFER_MAX_NOMINAL * 2);
+
+	ast_test_validate(test, removed_payload != NULL,
+			"Failed to get payload at position %d from buffer", BUFFER_MAX_NOMINAL * 2);
+
+	ast_test_validate(test, ast_data_buffer_count(buffer) == BUFFER_MAX_NOMINAL - 2,
+			"Removing payload from buffer did not decrease buffer size");
+
+	ast_test_validate(test, removed_payload->id == BUFFER_MAX_NOMINAL,
+			"Removing payload from buffer did not return expected payload");
+
 	return AST_TEST_PASS;
 }
 

-- 
To view, visit https://gerrit.asterisk.org/9224
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 15
Gerrit-MessageType: newchange
Gerrit-Change-Id: Idab644b08a1593659c92cda64132ccc203fe991d
Gerrit-Change-Number: 9224
Gerrit-PatchSet: 1
Gerrit-Owner: Benjamin Keith Ford <bford at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180619/76a7a4b0/attachment-0001.html>


More information about the asterisk-code-review mailing list