[asterisk-commits] moy: branch moy/webrtc-12 r400845 - /team/moy/webrtc-12/res/res_http_websocket.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Oct 10 23:17:32 CDT 2013


Author: moy
Date: Thu Oct 10 23:17:30 2013
New Revision: 400845

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=400845
Log:
Several fixes for the WebSockets implementation in res/res_http_websocket.c

* Flush the websocket session FILE* as fwrite() may not actually guarantee sending
  the data to the network. If we do not flush, it seems that buffering on the SSL
  socket for outbound messages causes issues

* Refactored ast_websocket_read to take into account that SSL file descriptors
  may be ready to read via fread() but poll() will not actually say so because
  the data was already read from the network buffers and is now in the libc buffers

This should fix an issue that I have experienced and other users may have reported [1][2], where
secure websockets wouldn't work, messages seem to not make it into Asterisk

[1] http://lists.digium.com/pipermail/asterisk-users/2013-August/280175.html
[2] https://issues.asterisk.org/jira/browse/ASTERISK-21930

Modified:
    team/moy/webrtc-12/res/res_http_websocket.c

Modified: team/moy/webrtc-12/res/res_http_websocket.c
URL: http://svnview.digium.com/svn/asterisk/team/moy/webrtc-12/res/res_http_websocket.c?view=diff&rev=400845&r1=400844&r2=400845
==============================================================================
--- team/moy/webrtc-12/res/res_http_websocket.c (original)
+++ team/moy/webrtc-12/res/res_http_websocket.c Thu Oct 10 23:17:30 2013
@@ -57,6 +57,16 @@
 
 /*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */
 #define MAXIMUM_RECONSTRUCTION_CEILING 16384
+
+/*! \brief Maximum size of a websocket frame header
+ * 1 byte flags and opcode
+ * 1 byte mask flag + payload len
+ * 8 bytes max extended length
+ * 4 bytes optional masking key
+ * ... payload follows ...
+ * */
+#define MAX_WS_HDR_SZ 14
+#define MIN_WS_HDR_SZ 2
 
 /*! \brief Structure definition for session */
 struct ast_websocket {
@@ -278,6 +288,7 @@
 	if (fwrite(payload, 1, actual_length, session->f) != actual_length) {
 		return -1;
 	}
+	fflush(session->f);
 
 	return 0;
 }
@@ -334,112 +345,114 @@
 	return 0;
 }
 
+/* MAINTENANCE WARNING on ast_websocket_read()!
+ *
+ * We have to keep in mind during this function that the fact that session->fd seems ready (via poll) does not necessarily mean we have application data ready, because in the case
+ * of an SSL socket, there is some encryption data overhead that needs to be read from the TCP socket, so poll() may say there are bytes to be read, but whether is just 1 byte or N bytes
+ * we do not know that, and we do not know how many of those bytes (if any) are for application data (for us) and not just for the SSL protocol consumption
+ *
+ * There used to be a couple of nasty bugs here that were fixed in last refactoring but I want to document them so the constraints are clear and we do not re-introduce them:
+ *
+ * - This function would incorrectly assume that fread() would necessarily return more than 1 byte of data, just because a websocket frame is always >= 2 bytes, but the thing
+ *   is we're dealing with a TCP bitstream here, we could read just one byte and that's normal. The problem before was that if just one byte was read, the function bailed out
+ *   and returned an error, effectively dropping the first byte of a websocket frame header!
+ *
+ * - Another subtle bug was that it would just read up to MAX_WS_HDR_SZ (14 bytes) via fread() then assume that executing poll() would tell you if there is more to read, but since
+ * we're dealing with a buffered stream (session->f is a FILE*), poll would say there is nothing else to read (in the real tcp socket session->fd) and we would get stuck here
+ * without processing the rest of the data in session->f internal buffers until another packet came on the network to unblock us!
+ *
+ * Note during the header parsing stage we try to read in small chunks just what we need, this is buffered data anyways, no expensive syscall required most of the time ...
+ */
+#define WS_SAFE_READ(session, buf, len) \
+	do { \
+		int sanity; \
+		size_t rlen; \
+		int xlen = len; \
+		char *rbuf = buf; \
+		for (sanity = 10; sanity; sanity--) { \
+			clearerr(session->f); \
+			rlen = fread(rbuf, 1, xlen, session->f); \
+			if (0 == rlen && ferror(session->f)) { \
+				ast_log(LOG_ERROR, "Error reading from web socket!\n"); \
+				(*opcode) = AST_WEBSOCKET_OPCODE_CLOSE; \
+				session->closing = 1; \
+				return 0; \
+			} \
+			xlen = (xlen - rlen); \
+			rbuf = rbuf + rlen;\
+			if (0 == xlen) { \
+				break; \
+			} \
+			if (ast_wait_for_input(session->fd, 1000) < 0) { \
+				ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno)); \
+				(*opcode) = AST_WEBSOCKET_OPCODE_CLOSE; \
+				session->closing = 1; \
+				return 0; \
+			} \
+		} \
+		if (!sanity) { \
+			ast_log(LOG_DEBUG, "Websocket seems unresponsive, disconnecting ...\n"); \
+			(*opcode) = AST_WEBSOCKET_OPCODE_CLOSE; \
+			session->closing = 1; \
+			return 0; \
+		} \
+	} while (0);
 int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
 {
 	char buf[MAXIMUM_FRAME_SIZE] = "";
-	size_t frame_size, expected = 2;
+	int fin = 0;
+	int mask_present = 0;
+	char *mask = NULL, *new_payload = NULL;
+	size_t options_len = 0, frame_size = 0;
 
 	*payload = NULL;
 	*payload_len = 0;
 	*fragmented = 0;
 
-	/* We try to read in 14 bytes, which is the largest possible WebSocket header */
-	if ((frame_size = fread(&buf, 1, 14, session->f)) < 1) {
-		return -1;
-	}
-
-	/* The minimum size for a WebSocket frame is 2 bytes */
-	if (frame_size < expected) {
-		return -1;
-	}
-
+	WS_SAFE_READ(session, &buf[0], MIN_WS_HDR_SZ);
+	frame_size += MIN_WS_HDR_SZ;
+
+	/* ok, now we have the first 2 bytes, so we know some flags, opcode and payload length (or whether payload length extension will be required) */
 	*opcode = buf[0] & 0xf;
-
+	*payload_len = buf[1] & 0x7f;
 	if (*opcode == AST_WEBSOCKET_OPCODE_TEXT || *opcode == AST_WEBSOCKET_OPCODE_BINARY || *opcode == AST_WEBSOCKET_OPCODE_CONTINUATION ||
 	    *opcode == AST_WEBSOCKET_OPCODE_PING || *opcode == AST_WEBSOCKET_OPCODE_PONG) {
-		int fin = (buf[0] >> 7) & 1;
-		int mask_present = (buf[1] >> 7) & 1;
-		char *mask = NULL, *new_payload;
-		size_t remaining;
-
-		if (mask_present) {
-			/* The mask should take up 4 bytes */
-			expected += 4;
-
-			if (frame_size < expected) {
-				/* Per the RFC 1009 means we received a message that was too large for us to process */
-				ast_websocket_close(session, 1009);
-				return 0;
-			}
-		}
-
-		/* Assume no extended length and no masking at the beginning */
-		*payload_len = buf[1] & 0x7f;
-		*payload = &buf[2];
-
-		/* Determine if extended length is being used */
+		fin = (buf[0] >> 7) & 1;
+		mask_present = (buf[1] >> 7) & 1;
+
+		/* Based on the mask flag and payload length, determine how much more we need to read before start parsing the rest of the header */
+		options_len += mask_present ? 4 : 0;
+		options_len += (*payload_len == 126) ? 2 : (*payload_len == 127) ? 8 : 0;
+		if (options_len) {
+			/* read the rest of the header options */
+			WS_SAFE_READ(session, &buf[frame_size], options_len);
+			frame_size += options_len;
+		}
+
 		if (*payload_len == 126) {
-			/* Use the next 2 bytes to get a uint16_t */
-			expected += 2;
-			*payload += 2;
-
-			if (frame_size < expected) {
-				ast_websocket_close(session, 1009);
-				return 0;
-			}
-
+			/* Grab the 2-byte payload length  */
 			*payload_len = ntohs(get_unaligned_uint16(&buf[2]));
+			mask = &buf[4];
 		} else if (*payload_len == 127) {
-			/* Use the next 8 bytes to get a uint64_t */
-			expected += 8;
-			*payload += 8;
-
-			if (frame_size < expected) {
-				ast_websocket_close(session, 1009);
-				return 0;
-			}
-
+			/* Grab the 8-byte payload length  */
 			*payload_len = ntohl(get_unaligned_uint64(&buf[2]));
-		}
-
-		/* If masking is present the payload currently points to the mask, so move it over 4 bytes to the actual payload */
-		if (mask_present) {
-			mask = *payload;
-			*payload += 4;
-		}
-
-		/* Determine how much payload we need to read in as we may have already read some in */
-		remaining = *payload_len - (frame_size - expected);
-
-		/* If how much payload they want us to read in exceeds what we are capable of close the session, things
-		 * will fail no matter what most likely */
-		if (remaining > (MAXIMUM_FRAME_SIZE - frame_size)) {
+			mask = &buf[10];
+		} else {
+			/* Just set the mask after the small 2-byte header */
+			mask = &buf[2];
+		}
+
+		/* Now read the rest of the payload */
+		*payload = &buf[frame_size]; /* payload will start here, at the end of the options, if any */
+		frame_size = frame_size + (*payload_len); /* final frame size is header + optional headers + payload data */
+		if (frame_size > MAXIMUM_FRAME_SIZE) {
+			ast_log(LOG_WARNING, "Cannot fit huge websocket frame of %zd bytes\n", frame_size);
+			/* The frame won't fit :-( */
 			ast_websocket_close(session, 1009);
-			return 0;
-		}
-
-		new_payload = *payload + (frame_size - expected);
-
-		/* Read in the remaining payload */
-		while (remaining > 0) {
-			size_t payload_read;
-
-			/* Wait for data to come in */
-			if (ast_wait_for_input(session->fd, -1) <= 0) {
-				*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
-				*payload = NULL;
-				session->closing = 1;
-				return 0;
-			}
-
-			/* If some sort of failure occurs notify the caller */
-			if ((payload_read = fread(new_payload, 1, remaining, session->f)) < 1) {
-				return -1;
-			}
-
-			remaining -= payload_read;
-			new_payload += payload_read;
-		}
+			return -1;
+		}
+
+		WS_SAFE_READ(session, (*payload), (*payload_len));
 
 		/* If a mask is present unmask the payload */
 		if (mask_present) {
@@ -449,7 +462,9 @@
 			}
 		}
 
-		if (!(new_payload = ast_realloc(session->payload, session->payload_len + *payload_len))) {
+		if (!(new_payload = ast_realloc(session->payload, (session->payload_len + *payload_len)))) {
+			ast_log(LOG_WARNING, "Failed allocation: %p, %zd, %lu\n",
+				session->payload, session->payload_len, *payload_len);
 			*payload_len = 0;
 			ast_websocket_close(session, 1009);
 			return 0;
@@ -461,7 +476,7 @@
 		}
 
 		session->payload = new_payload;
-		memcpy(session->payload + session->payload_len, *payload, *payload_len);
+		memcpy((session->payload + session->payload_len), (*payload), (*payload_len));
 		session->payload_len += *payload_len;
 
 		if (!fin && session->reconstruct && (session->payload_len < session->reconstruct)) {
@@ -487,15 +502,13 @@
 			session->payload_len = 0;
 		}
 	} else if (*opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
-		char *new_payload;
-
-		*payload_len = buf[1] & 0x7f;
-
 		/* Make the payload available so the user can look at the reason code if they so desire */
 		if ((*payload_len) && (new_payload = ast_realloc(session->payload, *payload_len))) {
+			WS_SAFE_READ(session, &buf[frame_size], (*payload_len));
 			session->payload = new_payload;
-			memcpy(session->payload, &buf[2], *payload_len);
+			memcpy(session->payload, &buf[frame_size], *payload_len);
 			*payload = session->payload;
+			frame_size += (*payload_len);
 		}
 
 		if (!session->closing) {
@@ -506,6 +519,7 @@
 		session->f = NULL;
 		ast_verb(2, "WebSocket connection from '%s' closed\n", ast_sockaddr_stringify(&session->address));
 	} else {
+		ast_log(LOG_WARNING, "WebSocket unknown opcode %d\n", *opcode);
 		/* We received an opcode that we don't understand, the RFC states that 1003 is for a type of data that can't be accepted... opcodes
 		 * fit that, I think. */
 		ast_websocket_close(session, 1003);
@@ -664,8 +678,8 @@
 		}
 
 		fprintf(ser->f, "\r\n");
+		fflush(ser->f);
 	} else {
-
 		/* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */
 		ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - unsupported version '%d' chosen\n",
 			ast_sockaddr_stringify(&ser->remote_address), version ? version : 75);
@@ -720,6 +734,8 @@
 {
 	int flags, res;
 
+	ast_log(LOG_DEBUG, "Entering WebSocket echo loop\n");
+
 	if ((flags = fcntl(ast_websocket_fd(session), F_GETFL)) == -1) {
 		goto end;
 	}
@@ -738,6 +754,7 @@
 
 		if (ast_websocket_read(session, &payload, &payload_len, &opcode, &fragmented)) {
 			/* We err on the side of caution and terminate the session if any error occurs */
+			ast_log(LOG_WARNING, "Read failure during WebSocket echo loop\n");
 			break;
 		}
 
@@ -745,10 +762,13 @@
 			ast_websocket_write(session, opcode, payload, payload_len);
 		} else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
 			break;
+		} else {
+			ast_log(LOG_DEBUG, "Ignored WebSocket opcode %d\n", opcode);
 		}
 	}
 
 end:
+	ast_log(LOG_DEBUG, "Exitting WebSocket echo loop\n");
 	ast_websocket_unref(session);
 }
 




More information about the asterisk-commits mailing list