[Asterisk-code-review] Implement internal abstraction for iostreams (asterisk[master])

Joshua Colp asteriskteam at digium.com
Thu Nov 17 11:07:07 CST 2016


Joshua Colp has submitted this change and it was merged. ( https://gerrit.asterisk.org/2932 )

Change subject: Implement internal abstraction for iostreams
......................................................................


Implement internal abstraction for iostreams

fopencookie/funclose is a non-standard API and should not be used
in portable software. Additionally, the way FILE's fd is used in
non-blocking mode is undefined behaviour and cannot be relied on.

This introduces internal abstraction for io streams, that allows
implementing the desired virtualization of read/write operations
with necessary timeout handling.

ASTERISK-24515 #close
ASTERISK-24517 #close

Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
---
M apps/app_externalivr.c
M channels/chan_sip.c
M configure.ac
A include/asterisk/iostream.h
M include/asterisk/tcptls.h
M main/http.c
A main/iostream.c
M main/manager.c
M main/tcptls.c
M main/utils.c
M res/res_http_post.c
M res/res_http_websocket.c
M res/res_phoneprov.c
13 files changed, 1,002 insertions(+), 1,103 deletions(-)

Approvals:
  George Joseph: Looks good to me, approved
  Anonymous Coward #1000019: Verified
  Joshua Colp: Looks good to me, but someone else must approve



diff --git a/apps/app_externalivr.c b/apps/app_externalivr.c
index 273f963..c2224b4 100644
--- a/apps/app_externalivr.c
+++ b/apps/app_externalivr.c
@@ -150,10 +150,12 @@
 };
 
 static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, 
-	int *eivr_events_fd, int *eivr_commands_fd, int *eivr_errors_fd, 
+	struct ast_iostream *eivr_events,
+	struct ast_iostream *eivr_commands,
+	struct ast_iostream *eivr_errors,
 	const struct ast_str *args, const struct ast_flags flags);
 
-static void send_eivr_event(FILE *handle, const char event, const char *data,
+static void send_eivr_event(struct ast_iostream *stream, const char event, const char *data,
 	const struct ast_channel *chan)
 {
 	struct ast_str *tmp = ast_str_create(12);
@@ -162,9 +164,11 @@
 	if (data) {
 		ast_str_append(&tmp, 0, ",%s", data);
 	}
+	ast_str_append(&tmp, 0, "\n");
+	ast_iostream_write(stream, ast_str_buffer(tmp), strlen(ast_str_buffer(tmp)));
+	ast_str_truncate(tmp, -1);
 
-	fprintf(handle, "%s\n", ast_str_buffer(tmp));
-	ast_debug(1, "sent '%s'\n", ast_str_buffer(tmp));
+	ast_debug(1, "sent '%s'", ast_str_buffer(tmp));
 	ast_free(tmp);
 }
 
@@ -393,6 +397,8 @@
 	int child_stdin[2] = { -1, -1 };
 	int child_stdout[2] = { -1, -1 };
 	int child_stderr[2] = { -1, -1 };
+	struct ast_iostream *stream_stdin = NULL, *stream_stdout = NULL,
+		*stream_stderr = NULL;
 	int res = -1;
 	int pid;
 
@@ -524,7 +530,7 @@
 			goto exit;
 		}
 
-		res = eivr_comm(chan, u, &ser->fd, &ser->fd, NULL, comma_delim_args, flags);
+		res = eivr_comm(chan, u, ser->stream, ser->stream, NULL, comma_delim_args, flags);
 
 	} else {
 		if (pipe(child_stdin)) {
@@ -566,13 +572,27 @@
 			child_stdout[1] = -1;
 			close(child_stderr[1]);
 			child_stderr[1] = -1;
-			res = eivr_comm(chan, u, &child_stdin[1], &child_stdout[0], &child_stderr[0], comma_delim_args, flags);
+
+			stream_stdin  = ast_iostream_from_fd(&child_stdin[1]);
+			stream_stdout = ast_iostream_from_fd(&child_stdout[0]);
+			stream_stderr = ast_iostream_from_fd(&child_stderr[0]);
+
+			res = eivr_comm(chan, u, stream_stdin, stream_stdout, stream_stderr, comma_delim_args, flags);
 		}
 	}
 
 	exit:
 	if (u->gen_active) {
 		ast_deactivate_generator(chan);
+	}
+	if (stream_stdin) {
+		ast_iostream_close(stream_stdin);
+	}
+	if (stream_stdout) {
+		ast_iostream_close(stream_stdout);
+	}
+	if (stream_stderr) {
+		ast_iostream_close(stream_stderr);
 	}
 	if (child_stdin[0] > -1) {
 		close(child_stdin[0]);
@@ -602,46 +622,25 @@
 }
 
 static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, 
- 				int *eivr_events_fd, int *eivr_commands_fd, int *eivr_errors_fd, 
- 				const struct ast_str *args, const struct ast_flags flags)
+				struct ast_iostream *eivr_events,
+				struct ast_iostream *eivr_commands,
+				struct ast_iostream *eivr_errors,
+				const struct ast_str *args, const struct ast_flags flags)
 {
+	char input[1024];
 	struct playlist_entry *entry;
 	struct ast_frame *f;
 	int ms;
  	int exception;
  	int ready_fd;
-	int waitfds[2] = { *eivr_commands_fd, (eivr_errors_fd) ? *eivr_errors_fd : -1 };
+	int waitfds[2];
+	int r;
  	struct ast_channel *rchan;
  	int res = -1;
-	int test_available_fd = -1;
 	int hangup_info_sent = 0;
-  
- 	FILE *eivr_commands = NULL;
- 	FILE *eivr_errors = NULL;
- 	FILE *eivr_events = NULL;
 
-	if (!(eivr_events = fdopen(*eivr_events_fd, "w"))) {
-		ast_chan_log(LOG_ERROR, chan, "Could not open stream to send events\n");
-		goto exit;
-	}
-	if (!(eivr_commands = fdopen(*eivr_commands_fd, "r"))) {
-		ast_chan_log(LOG_ERROR, chan, "Could not open stream to receive commands\n");
-		goto exit;
-	}
-	if (eivr_errors_fd) {  /* if opening a socket connection, error stream will not be used */
- 		if (!(eivr_errors = fdopen(*eivr_errors_fd, "r"))) {
- 			ast_chan_log(LOG_ERROR, chan, "Could not open stream to receive errors\n");
- 			goto exit;
- 		}
-	}
-
-	test_available_fd = open("/dev/null", O_RDONLY);
- 
- 	setvbuf(eivr_events, NULL, _IONBF, 0);
- 	setvbuf(eivr_commands, NULL, _IONBF, 0);
- 	if (eivr_errors) {
-		setvbuf(eivr_errors, NULL, _IONBF, 0);
-	}
+	waitfds[0] = ast_iostream_get_fd(eivr_commands);
+	waitfds[1] = eivr_errors ? ast_iostream_get_fd(eivr_errors) : -1;
 
  	while (1) {
  		if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE)) {
@@ -665,7 +664,7 @@
  		errno = 0;
  		exception = 0;
  
- 		rchan = ast_waitfor_nandfds(&chan, 1, waitfds, (eivr_errors_fd) ? 2 : 1, &exception, &ready_fd, &ms);
+		rchan = ast_waitfor_nandfds(&chan, 1, waitfds, (eivr_errors) ? 2 : 1, &exception, &ready_fd, &ms);
  
  		if (ast_channel_state(chan) == AST_STATE_UP && !AST_LIST_EMPTY(&u->finishlist)) {
  			AST_LIST_LOCK(&u->finishlist);
@@ -713,15 +712,18 @@
  				break;
  			}
  			ast_frfree(f);
- 		} else if (ready_fd == *eivr_commands_fd) {
- 			char input[1024];
- 
- 			if (exception || (dup2(*eivr_commands_fd, test_available_fd) == -1) || feof(eivr_commands)) {
+ 		} else if (ready_fd == waitfds[0]) {
+ 			if (exception) {
  				ast_chan_log(LOG_ERROR, chan, "Child process went away\n");
   				break;
   			}
   
-			if (!fgets(input, sizeof(input), eivr_commands)) {
+			r = ast_iostream_gets(eivr_commands, input, sizeof(input));
+			if (r <= 0) {
+				if (r == 0) {
+					ast_chan_log(LOG_ERROR, chan, "Child process went away\n");
+					break;
+				}
 				continue;
 			}
 
@@ -867,16 +869,19 @@
  				else
  					ast_chan_log(LOG_WARNING, chan, "Unknown option requested: %s\n", &input[2]);
  			}
- 		} else if (eivr_errors_fd && (ready_fd == *eivr_errors_fd)) {
- 			char input[1024];
-  
- 			if (exception || feof(eivr_errors)) {
+ 		} else if (ready_fd == waitfds[1]) {
+ 			if (exception) {
  				ast_chan_log(LOG_ERROR, chan, "Child process went away\n");
  				break;
  			}
- 			if (fgets(input, sizeof(input), eivr_errors)) {
+ 
+ 			r = ast_iostream_gets(eivr_errors, input, sizeof(input));
+ 			if (r > 0) {
  				ast_chan_log(LOG_NOTICE, chan, "stderr: %s\n", ast_strip(input));
- 			}
+ 			} else if (r == 0) {
+ 				ast_chan_log(LOG_ERROR, chan, "Child process went away\n");
+ 				break;
+			}
  		} else if ((ready_fd < 0) && ms) { 
  			if (errno == 0 || errno == EINTR)
  				continue;
@@ -886,23 +891,7 @@
  		}
  	}
  
-	exit:
-	if (test_available_fd > -1) {
-		close(test_available_fd);
-	}
-	if (eivr_events) {
- 		fclose(eivr_events);
-		*eivr_events_fd = -1;
-	}
-	if (eivr_commands) {
-		fclose(eivr_commands);
-		*eivr_commands_fd = -1;
-	}
-	if (eivr_errors) {
-		fclose(eivr_errors);
-		*eivr_errors_fd = -1;
-	}
-  	return res;
+	return res;
 }
 
 static int unload_module(void)
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 32b2a36..43d49af 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -2541,7 +2541,7 @@
 	}
 	ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object");
 	th->tcptls_session = tcptls_session;
-	th->type = transport ? transport : (tcptls_session->ssl ? AST_TRANSPORT_TLS: AST_TRANSPORT_TCP);
+	th->type = transport ? transport : (ast_iostream_get_ssl(tcptls_session->stream) ? AST_TRANSPORT_TLS: AST_TRANSPORT_TCP);
 	ao2_t_link(threadt, th, "Adding new tcptls helper thread");
 	ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains");
 	return th;
@@ -2564,8 +2564,7 @@
 
 	ao2_lock(tcptls_session);
 
-	if ((tcptls_session->fd == -1) ||
-		!(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) ||
+	if (!(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) ||
 		!(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) ||
 		!(packet->data = ast_str_create(len))) {
 		goto tcptls_write_setup_error;
@@ -2878,7 +2877,7 @@
 			} else {
 				timeout = -1;
 			}
-			res = ast_wait_for_input(tcptls_session->fd, timeout);
+			res = ast_wait_for_input(ast_iostream_get_fd(tcptls_session->stream), timeout);
 			if (res < 0) {
 				ast_debug(2, "SIP TCP/TLS server :: ast_wait_for_input returned %d\n", res);
 				return -1;
@@ -2887,7 +2886,7 @@
 				return -1;
 			}
 
-			res = ast_tcptls_server_read(tcptls_session, readbuf, sizeof(readbuf) - 1);
+			res = ast_iostream_read(tcptls_session->stream, readbuf, sizeof(readbuf) - 1);
 			if (res < 0) {
 				if (errno == EAGAIN || errno == EINTR) {
 					continue;
@@ -2948,18 +2947,8 @@
 			goto cleanup;
 		}
 
-		if ((flags = fcntl(tcptls_session->fd, F_GETFL)) == -1) {
-			ast_log(LOG_ERROR, "error setting socket to non blocking mode, fcntl() failed: %s\n", strerror(errno));
-			goto cleanup;
-		}
-
-		flags |= O_NONBLOCK;
-		if (fcntl(tcptls_session->fd, F_SETFL, flags) == -1) {
-			ast_log(LOG_ERROR, "error setting socket to non blocking mode, fcntl() failed: %s\n", strerror(errno));
-			goto cleanup;
-		}
-
-		if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? AST_TRANSPORT_TLS : AST_TRANSPORT_TCP))) {
+		ast_iostream_nonblock(tcptls_session->stream);
+		if (!(me = sip_threadinfo_create(tcptls_session, ast_iostream_get_ssl(tcptls_session->stream) ? AST_TRANSPORT_TLS : AST_TRANSPORT_TCP))) {
 			goto cleanup;
 		}
 		ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread");
@@ -2976,16 +2965,16 @@
 	}
 
 	flags = 1;
-	if (setsockopt(tcptls_session->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
+	if (setsockopt(ast_iostream_get_fd(tcptls_session->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
 		ast_log(LOG_ERROR, "error enabling TCP keep-alives on sip socket: %s\n", strerror(errno));
 		goto cleanup;
 	}
 
 	me->threadid = pthread_self();
-	ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "TLS" : "TCP");
+	ast_debug(2, "Starting thread for %s server\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS" : "TCP");
 
 	/* set up pollfd to watch for reads on both the socket and the alert_pipe */
-	fds[0].fd = tcptls_session->fd;
+	fds[0].fd = ast_iostream_get_fd(tcptls_session->stream);
 	fds[1].fd = me->alert_pipe[0];
 	fds[0].events = fds[1].events = POLLIN | POLLPRI;
 
@@ -3005,9 +2994,9 @@
 	 * We cannot let the stream exclusively wait for data to arrive.
 	 * We have to wake up the task to send outgoing messages.
 	 */
-	ast_tcptls_stream_set_exclusive_input(tcptls_session->stream_cookie, 0);
+	ast_iostream_set_exclusive_input(tcptls_session->stream, 0);
 
-	ast_tcptls_stream_set_timeout_sequence(tcptls_session->stream_cookie, ast_tvnow(),
+	ast_iostream_set_timeout_sequence(tcptls_session->stream, ast_tvnow(),
 		tcptls_session->client ? -1 : (authtimeout * 1000));
 
 	for (;;) {
@@ -3015,7 +3004,7 @@
 
 		if (!tcptls_session->client && req.authenticated && !authenticated) {
 			authenticated = 1;
-			ast_tcptls_stream_set_timeout_disable(tcptls_session->stream_cookie);
+			ast_iostream_set_timeout_disable(tcptls_session->stream);
 			ast_atomic_fetchadd_int(&unauth_sessions, -1);
 		}
 
@@ -3026,7 +3015,7 @@
 			}
 
 			if (timeout == 0) {
-				ast_debug(2, "SIP %s server timed out\n", tcptls_session->ssl ? "TLS": "TCP");
+				ast_debug(2, "SIP %s server timed out\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP");
 				goto cleanup;
 			}
 		} else {
@@ -3036,11 +3025,11 @@
 		if (ast_str_strlen(tcptls_session->overflow_buf) == 0) {
 			res = ast_poll(fds, 2, timeout); /* polls for both socket and alert_pipe */
 			if (res < 0) {
-				ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "TLS": "TCP", res);
+				ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP", res);
 				goto cleanup;
 			} else if (res == 0) {
 				/* timeout */
-				ast_debug(2, "SIP %s server timed out\n", tcptls_session->ssl ? "TLS": "TCP");
+				ast_debug(2, "SIP %s server timed out\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP");
 				goto cleanup;
 			}
 		}
@@ -3065,14 +3054,14 @@
 
 			memset(buf, 0, sizeof(buf));
 
-			if (tcptls_session->ssl) {
+			if (ast_iostream_get_ssl(tcptls_session->stream)) {
 				set_socket_transport(&req.socket, AST_TRANSPORT_TLS);
 				req.socket.port = htons(ourport_tls);
 			} else {
 				set_socket_transport(&req.socket, AST_TRANSPORT_TCP);
 				req.socket.port = htons(ourport_tcp);
 			}
-			req.socket.fd = tcptls_session->fd;
+			req.socket.fd = ast_iostream_get_fd(tcptls_session->stream);
 
 			res = sip_tcptls_read(&req, tcptls_session, authenticated, start);
 			if (res < 0) {
@@ -3106,7 +3095,7 @@
 				ao2_unlock(me);
 
 				if (packet) {
-					if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) {
+					if (ast_iostream_write(tcptls_session->stream, ast_str_buffer(packet->data), packet->len) == -1) {
 						ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n");
 					}
 					ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed");
@@ -3118,7 +3107,7 @@
 		}
 	}
 
-	ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "TLS" : "TCP");
+	ast_debug(2, "Shutting down thread for %s server\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS" : "TCP");
 
 cleanup:
 	if (tcptls_session && !tcptls_session->client && !authenticated) {
@@ -29095,9 +29084,8 @@
 		return s->fd;
 	}
 	if ((s->type & (AST_TRANSPORT_TCP | AST_TRANSPORT_TLS)) &&
-			(s->tcptls_session) &&
-			(s->tcptls_session->fd != -1)) {
-		return s->tcptls_session->fd;
+			s->tcptls_session) {
+		return ast_iostream_get_fd(s->tcptls_session->stream);
 	}
 	if ((s->type & (AST_TRANSPORT_WS | AST_TRANSPORT_WSS))) {
 		return s->ws_session ? ast_websocket_fd(s->ws_session) : -1;
@@ -29127,7 +29115,7 @@
 	/* 1.  check for existing threads */
 	ast_sockaddr_copy(&sa_tmp, sip_real_dst(p));
 	if ((tcptls_session = sip_tcp_locate(&sa_tmp))) {
-		s->fd = tcptls_session->fd;
+		s->fd = ast_iostream_get_fd(tcptls_session->stream);
 		if (s->tcptls_session) {
 			ao2_ref(s->tcptls_session, -1);
 			s->tcptls_session = NULL;
@@ -29174,7 +29162,7 @@
 		goto create_tcptls_session_fail;
 	}
 
-	s->fd = s->tcptls_session->fd;
+	s->fd = ast_iostream_get_fd(s->tcptls_session->stream);
 
 	/* client connections need to have the sip_threadinfo object created before
 	 * the thread is detached.  This ensures the alert_pipe is up before it will
@@ -29976,8 +29964,7 @@
 	if ((peer->socket.fd != -1) && (peer->socket.type == AST_TRANSPORT_UDP)) {
 		res = ast_sendto(peer->socket.fd, keepalive, sizeof(keepalive), 0, &peer->addr);
 	} else if ((peer->socket.type & (AST_TRANSPORT_TCP | AST_TRANSPORT_TLS)) &&
-		   (peer->socket.tcptls_session) &&
-		   (peer->socket.tcptls_session->fd != -1)) {
+		   peer->socket.tcptls_session) {
 		res = sip_tcptls_write(peer->socket.tcptls_session, keepalive, sizeof(keepalive));
 	} else if (peer->socket.type == AST_TRANSPORT_UDP) {
 		res = ast_sendto(sipsock, keepalive, sizeof(keepalive), 0, &peer->addr);
diff --git a/configure.ac b/configure.ac
index 0f2148b..0574c08 100644
--- a/configure.ac
+++ b/configure.ac
@@ -815,10 +815,6 @@
 	esac], [AST_ASTERISKSSL=yes])
 AC_SUBST(AST_ASTERISKSSL)
 
-# https support (in main/http.c) uses funopen on BSD systems,
-# fopencookie on linux
-AC_CHECK_FUNCS([funopen fopencookie])
-
 AC_CHECK_FUNCS([inet_aton])
 
 # check if we have IP_PKTINFO constant defined
diff --git a/include/asterisk/iostream.h b/include/asterisk/iostream.h
new file mode 100644
index 0000000..c641ffb
--- /dev/null
+++ b/include/asterisk/iostream.h
@@ -0,0 +1,118 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 1999 - 2015, Digium, Inc.
+ *
+ * Timo Teräs <timo.teras at iki.fi>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_IOSTREAM_H
+#define _ASTERISK_IOSTREAM_H
+
+/*!
+ * \file iostream.h
+ *
+ * \brief Generic abstraction for input/output streams.
+ */
+
+#if defined(HAVE_OPENSSL)
+#define DO_SSL  /* comment in/out if you want to support ssl */
+#endif
+
+#ifdef DO_SSL
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <openssl/x509v3.h>
+#else
+/* declare dummy types so we can define a pointer to them */
+typedef struct {} SSL;
+typedef struct {} SSL_CTX;
+#endif /* DO_SSL */
+
+struct ast_iostream;
+
+/*!
+ * \brief Disable the iostream timeout timer.
+ *
+ * \param stream iostream control data.
+ *
+ * \return Nothing
+ */
+void ast_iostream_set_timeout_disable(struct ast_iostream *stream);
+
+/*!
+ * \brief Set the iostream inactivity timeout timer.
+ *
+ * \param stream iostream control data.
+ * \param timeout Number of milliseconds to wait for data transfer with the peer.
+ *
+ * \details This is basically how much time we are willing to spend
+ * in an I/O call before we declare the peer unresponsive.
+ *
+ * \note Setting timeout to -1 disables the timeout.
+ * \note Setting this timeout replaces the I/O sequence timeout timer.
+ *
+ * \return Nothing
+ */
+void ast_iostream_set_timeout_inactivity(struct ast_iostream *stream, int timeout);
+
+void ast_iostream_set_timeout_idle_inactivity(struct ast_iostream *stream, int timeout, int timeout_reset);
+
+/*!
+ * \brief Set the iostream I/O sequence timeout timer.
+ *
+ * \param stream iostream control data.
+ * \param start Time the I/O sequence timer starts.
+ * \param timeout Number of milliseconds from the start time before timeout.
+ *
+ * \details This is how much time are we willing to allow the peer
+ * to complete an operation that can take several I/O calls.  The
+ * main use is as an authentication timer with us.
+ *
+ * \note Setting timeout to -1 disables the timeout.
+ * \note Setting this timeout replaces the inactivity timeout timer.
+ *
+ * \return Nothing
+ */
+void ast_iostream_set_timeout_sequence(struct ast_iostream *stream, struct timeval start, int timeout);
+
+/*!
+ * \brief Set the iostream if it can exclusively depend upon the set timeouts.
+ *
+ * \param stream iostream control data.
+ * \param exclusive_input TRUE if stream can exclusively wait for fd input.
+ * Otherwise, the stream will not wait for fd input.  It will wait while
+ * trying to send data.
+ *
+ * \note The stream timeouts still need to be set.
+ *
+ * \return Nothing
+ */
+void ast_iostream_set_exclusive_input(struct ast_iostream *stream, int exclusive_input);
+
+int ast_iostream_get_fd(struct ast_iostream *stream);
+void ast_iostream_nonblock(struct ast_iostream *stream);
+
+SSL* ast_iostream_get_ssl(struct ast_iostream *stream);
+
+ssize_t ast_iostream_read(struct ast_iostream *stream, void *buf, size_t count);
+ssize_t ast_iostream_gets(struct ast_iostream *stream, char *buf, size_t count);
+ssize_t ast_iostream_discard(struct ast_iostream *stream, size_t count);
+ssize_t ast_iostream_write(struct ast_iostream *stream, const void *buf, size_t count);
+ssize_t ast_iostream_printf(struct ast_iostream *stream, const void *fmt, ...);
+
+struct ast_iostream* ast_iostream_from_fd(int *fd);
+int ast_iostream_start_tls(struct ast_iostream **stream, SSL_CTX *ctx, int client);
+int ast_iostream_close(struct ast_iostream *stream);
+
+#endif /* _ASTERISK_IOSTREAM_H */
diff --git a/include/asterisk/tcptls.h b/include/asterisk/tcptls.h
index 3c5f450..883cb92 100644
--- a/include/asterisk/tcptls.h
+++ b/include/asterisk/tcptls.h
@@ -57,20 +57,7 @@
 
 #include "asterisk/netsock2.h"
 #include "asterisk/utils.h"
-
-#if defined(HAVE_OPENSSL) && (defined(HAVE_FUNOPEN) || defined(HAVE_FOPENCOOKIE))
-#define DO_SSL  /* comment in/out if you want to support ssl */
-#endif
-
-#ifdef DO_SSL
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#include <openssl/x509v3.h>
-#else
-/* declare dummy types so we can define a pointer to them */
-typedef struct {} SSL;
-typedef struct {} SSL_CTX;
-#endif /* DO_SSL */
+#include "asterisk/iostream.h"
 
 /*! SSL support */
 #define AST_CERTFILE "asterisk.pem"
@@ -153,72 +140,10 @@
 	const char *name;
 };
 
-struct ast_tcptls_stream;
-
-/*!
- * \brief Disable the TCP/TLS stream timeout timer.
- *
- * \param stream TCP/TLS stream control data.
- *
- * \return Nothing
- */
-void ast_tcptls_stream_set_timeout_disable(struct ast_tcptls_stream *stream);
-
-/*!
- * \brief Set the TCP/TLS stream inactivity timeout timer.
- *
- * \param stream TCP/TLS stream control data.
- * \param timeout Number of milliseconds to wait for data transfer with the peer.
- *
- * \details This is basically how much time we are willing to spend
- * in an I/O call before we declare the peer unresponsive.
- *
- * \note Setting timeout to -1 disables the timeout.
- * \note Setting this timeout replaces the I/O sequence timeout timer.
- *
- * \return Nothing
- */
-void ast_tcptls_stream_set_timeout_inactivity(struct ast_tcptls_stream *stream, int timeout);
-
-/*!
- * \brief Set the TCP/TLS stream I/O sequence timeout timer.
- *
- * \param stream TCP/TLS stream control data.
- * \param start Time the I/O sequence timer starts.
- * \param timeout Number of milliseconds from the start time before timeout.
- *
- * \details This is how much time are we willing to allow the peer
- * to complete an operation that can take several I/O calls.  The
- * main use is as an authentication timer with us.
- *
- * \note Setting timeout to -1 disables the timeout.
- * \note Setting this timeout replaces the inactivity timeout timer.
- *
- * \return Nothing
- */
-void ast_tcptls_stream_set_timeout_sequence(struct ast_tcptls_stream *stream, struct timeval start, int timeout);
-
-/*!
- * \brief Set the TCP/TLS stream I/O if it can exclusively depend upon the set timeouts.
- *
- * \param stream TCP/TLS stream control data.
- * \param exclusive_input TRUE if stream can exclusively wait for fd input.
- * Otherwise, the stream will not wait for fd input.  It will wait while
- * trying to send data.
- *
- * \note The stream timeouts still need to be set.
- *
- * \return Nothing
- */
-void ast_tcptls_stream_set_exclusive_input(struct ast_tcptls_stream *stream, int exclusive_input);
-
 /*! \brief
  * describes a server instance
  */
 struct ast_tcptls_session_instance {
-	FILE *f;    /*!< fopen/funopen result */
-	int fd;     /*!< the socket returned by accept() */
-	SSL *ssl;   /*!< ssl state */
 	int client;
 	struct ast_sockaddr remote_address;
 	struct ast_tcptls_session_args *parent;
@@ -228,19 +153,11 @@
 	 * extra data.
 	 */
 	struct ast_str *overflow_buf;
-	/*! ao2 FILE stream cookie object associated with f. */
-	struct ast_tcptls_stream *stream_cookie;
+	/*! ao2 stream object associated with this session. */
+	struct ast_iostream *stream;
 	/*! ao2 object private data of parent->worker_fn */
 	void *private_data;
 };
-
-#if defined(HAVE_FUNOPEN)
-#define HOOK_T int
-#define LEN_T int
-#else
-#define HOOK_T ssize_t
-#define LEN_T size_t
-#endif
 
 /*! 
   * \brief attempts to connect and start tcptls session, on error the tcptls_session's
@@ -296,8 +213,5 @@
  * \brief Used to parse conf files containing tls/ssl options.
  */
 int ast_tls_read_conf(struct ast_tls_config *tls_cfg, struct ast_tcptls_session_args *tls_desc, const char *varname, const char *value);
-
-HOOK_T ast_tcptls_server_read(struct ast_tcptls_session_instance *ser, void *buf, size_t count);
-HOOK_T ast_tcptls_server_write(struct ast_tcptls_session_instance *ser, const void *buf, size_t count);
 
 #endif /* _ASTERISK_TCPTLS_H */
diff --git a/main/http.c b/main/http.c
index 77feb39..9aff4d1 100644
--- a/main/http.c
+++ b/main/http.c
@@ -449,11 +449,13 @@
 	struct timeval now = ast_tvnow();
 	struct ast_tm tm;
 	char timebuf[80];
+	char buf[256];
+	int len;
 	int content_length = 0;
 	int close_connection;
 	struct ast_str *server_header_field = ast_str_create(MAX_SERVER_NAME_LENGTH);
 
-	if (!ser || !ser->f || !server_header_field) {
+	if (!ser || !server_header_field) {
 		/* The connection is not open. */
 		ast_free(http_header);
 		ast_free(out);
@@ -503,7 +505,7 @@
 	}
 
 	/* send http header */
-	fprintf(ser->f,
+	ast_iostream_printf(ser->stream,
 		"HTTP/1.1 %d %s\r\n"
 		"%s"
 		"Date: %s\r\n"
@@ -524,18 +526,16 @@
 	/* send content */
 	if (method != AST_HTTP_HEAD || status_code >= 400) {
 		if (out && ast_str_strlen(out)) {
-			if (fwrite(ast_str_buffer(out), ast_str_strlen(out), 1, ser->f) != 1) {
+			len = ast_str_strlen(out);
+			if (ast_iostream_write(ser->stream, ast_str_buffer(out), len) != len) {
 				ast_log(LOG_ERROR, "fwrite() failed: %s\n", strerror(errno));
 				close_connection = 1;
 			}
 		}
 
 		if (fd) {
-			char buf[256];
-			int len;
-
 			while ((len = read(fd, buf, sizeof(buf))) > 0) {
-				if (fwrite(buf, len, 1, ser->f) != 1) {
+				if (ast_iostream_write(ser->stream, buf, len) != len) {
 					ast_log(LOG_WARNING, "fwrite() failed: %s\n", strerror(errno));
 					close_connection = 1;
 					break;
@@ -567,7 +567,7 @@
 		ast_free(http_header_data);
 		ast_free(server_address);
 		ast_free(out);
-		if (ser && ser->f) {
+		if (ser) {
 			ast_debug(1, "HTTP closing session. OOM.\n");
 			ast_tcptls_close_session_file(ser);
 		}
@@ -921,9 +921,9 @@
 {
 	int res;
 
-	/* Stay in fread until get all the expected data or timeout. */
-	res = fread(buf, length, 1, ser->f);
-	if (res < 1) {
+	/* Stream is in exclusive mode so we get it all if possible. */
+	res = ast_iostream_read(ser->stream, buf, length);
+	if (res < length) {
 		ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d)\n",
 			what_getting, length);
 		return -1;
@@ -945,23 +945,12 @@
  */
 static int http_body_discard_contents(struct ast_tcptls_session_instance *ser, int length, const char *what_getting)
 {
-	int res;
-	char buf[MAX_HTTP_LINE_LENGTH];/* Discard buffer */
+	ssize_t res;
 
-	/* Stay in fread until get all the expected data or timeout. */
-	while (sizeof(buf) < length) {
-		res = fread(buf, sizeof(buf), 1, ser->f);
-		if (res < 1) {
-			ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %zu of remaining %d)\n",
-				what_getting, sizeof(buf), length);
-			return -1;
-		}
-		length -= sizeof(buf);
-	}
-	res = fread(buf, length, 1, ser->f);
-	if (res < 1) {
-		ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d of remaining %d)\n",
-			what_getting, length, length);
+	res = ast_iostream_discard(ser->stream, length);
+	if (res < length) {
+		ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d but got %zd)\n",
+			what_getting, length, res);
 		return -1;
 	}
 	return 0;
@@ -1037,7 +1026,7 @@
 	char header_line[MAX_HTTP_LINE_LENGTH];
 
 	/* get the line of hexadecimal giving chunk-size w/ optional chunk-extension */
-	if (!fgets(header_line, sizeof(header_line), ser->f)) {
+	if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) {
 		ast_log(LOG_WARNING, "Short HTTP read of chunked header\n");
 		return -1;
 	}
@@ -1065,8 +1054,8 @@
 	char chunk_sync[2];
 
 	/* Stay in fread until get the expected CRLF or timeout. */
-	res = fread(chunk_sync, sizeof(chunk_sync), 1, ser->f);
-	if (res < 1) {
+	res = ast_iostream_read(ser->stream, chunk_sync, sizeof(chunk_sync));
+	if (res < sizeof(chunk_sync)) {
 		ast_log(LOG_WARNING, "Short HTTP chunk sync read (Wanted %zu)\n",
 			sizeof(chunk_sync));
 		return -1;
@@ -1095,7 +1084,7 @@
 	char header_line[MAX_HTTP_LINE_LENGTH];
 
 	for (;;) {
-		if (!fgets(header_line, sizeof(header_line), ser->f)) {
+		if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) {
 			ast_log(LOG_WARNING, "Short HTTP read of chunked trailer header\n");
 			return -1;
 		}
@@ -1758,7 +1747,7 @@
 		char *name;
 		char *value;
 
-		if (!fgets(header_line, sizeof(header_line), ser->f)) {
+		if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) {
 			ast_http_error(ser, 400, "Bad Request", "Timeout");
 			return -1;
 		}
@@ -1832,7 +1821,7 @@
 	int res;
 	char request_line[MAX_HTTP_LINE_LENGTH];
 
-	if (!fgets(request_line, sizeof(request_line), ser->f)) {
+	if (ast_iostream_gets(ser->stream, request_line, sizeof(request_line)) <= 0) {
 		return -1;
 	}
 
@@ -1913,11 +1902,10 @@
 static void *httpd_helper_thread(void *data)
 {
 	struct ast_tcptls_session_instance *ser = data;
-	struct protoent *p;
-	int flags;
 	int timeout;
+	int arg = 1;
 
-	if (!ser || !ser->f) {
+	if (!ser) {
 		ao2_cleanup(ser);
 		return NULL;
 	}
@@ -1934,23 +1922,11 @@
 	 * This is necessary to prevent delays (caused by buffering) as we
 	 * write to the socket in bits and pieces.
 	 */
-	p = getprotobyname("tcp");
-	if (p) {
-		int arg = 1;
-
-		if (setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *) &arg, sizeof(arg) ) < 0) {
-			ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection: %s\n", strerror(errno));
-			ast_log(LOG_WARNING, "Some HTTP requests may be slow to respond.\n");
-		}
-	} else {
-		ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection, getprotobyname(\"tcp\") failed\n");
+	if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *) &arg, sizeof(arg) ) < 0) {
+		ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection: %s\n", strerror(errno));
 		ast_log(LOG_WARNING, "Some HTTP requests may be slow to respond.\n");
 	}
-
-	/* make sure socket is non-blocking */
-	flags = fcntl(ser->fd, F_GETFL);
-	flags |= O_NONBLOCK;
-	fcntl(ser->fd, F_SETFL, flags);
+	ast_iostream_nonblock(ser->stream);
 
 	/* Setup HTTP worker private data to keep track of request body reading. */
 	ao2_cleanup(ser->private_data);
@@ -1973,23 +1949,17 @@
 	}
 
 	/* We can let the stream wait for data to arrive. */
-	ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 1);
+	ast_iostream_set_exclusive_input(ser->stream, 1);
 
 	for (;;) {
-		int ch;
-
 		/* Wait for next potential HTTP request message. */
-		ast_tcptls_stream_set_timeout_inactivity(ser->stream_cookie, timeout);
-		ch = fgetc(ser->f);
-		if (ch == EOF || ungetc(ch, ser->f) == EOF) {
-			/* Between request idle timeout */
-			ast_debug(1, "HTTP idle timeout or peer closed connection.\n");
+		ast_iostream_set_timeout_idle_inactivity(ser->stream, timeout, session_inactivity);
+		if (httpd_process_request(ser)) {
+			/* Break the connection or the connection closed */
 			break;
 		}
-
-		ast_tcptls_stream_set_timeout_inactivity(ser->stream_cookie, session_inactivity);
-		if (httpd_process_request(ser) || !ser->f || feof(ser->f)) {
-			/* Break the connection or the connection closed */
+		if (!ser->stream) {
+			/* Web-socket or similar that took the connection */
 			break;
 		}
 
@@ -2003,10 +1973,9 @@
 done:
 	ast_atomic_fetchadd_int(&session_count, -1);
 
-	if (ser->f) {
-		ast_debug(1, "HTTP closing session.  Top level\n");
-		ast_tcptls_close_session_file(ser);
-	}
+	ast_debug(1, "HTTP closing session.  Top level\n");
+	ast_tcptls_close_session_file(ser);
+
 	ao2_ref(ser, -1);
 	return NULL;
 }
diff --git a/main/iostream.c b/main/iostream.c
new file mode 100644
index 0000000..46abc18
--- /dev/null
+++ b/main/iostream.c
@@ -0,0 +1,553 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 1999 - 2015, Digium, Inc.
+ *
+ * Timo Teräs <timo.teras at iki.fi>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#include <fcntl.h>
+#include <stdarg.h>
+
+#include "asterisk.h"
+#include "asterisk/utils.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/iostream.h"
+
+struct ast_iostream {
+	SSL *ssl;
+	struct timeval start;
+	int fd;
+	int timeout;
+	int timeout_reset;
+	int exclusive_input;
+	int rbuflen;
+	char *rbufhead;
+	char rbuf[2048];
+};
+
+int ast_iostream_get_fd(struct ast_iostream *stream)
+{
+	return stream->fd;
+}
+
+void ast_iostream_nonblock(struct ast_iostream *stream)
+{
+	fcntl(stream->fd, F_SETFL, fcntl(stream->fd, F_GETFL) | O_NONBLOCK);
+}
+
+SSL *ast_iostream_get_ssl(struct ast_iostream *stream)
+{
+	return stream->ssl;
+}
+
+void ast_iostream_set_timeout_disable(struct ast_iostream *stream)
+{
+	ast_assert(stream != NULL);
+
+	stream->timeout = -1;
+	stream->timeout_reset = -1;
+}
+
+void ast_iostream_set_timeout_inactivity(struct ast_iostream *stream, int timeout)
+{
+	ast_assert(stream != NULL);
+
+	stream->start.tv_sec = 0;
+	stream->timeout = timeout;
+	stream->timeout_reset = timeout;
+}
+
+void ast_iostream_set_timeout_idle_inactivity(struct ast_iostream *stream, int timeout, int timeout_reset)
+{
+	ast_assert(stream != NULL);
+
+	stream->start.tv_sec = 0;
+	stream->timeout = timeout;
+	stream->timeout_reset = timeout_reset;
+}
+
+void ast_iostream_set_timeout_sequence(struct ast_iostream *stream, struct timeval start, int timeout)
+{
+	ast_assert(stream != NULL);
+
+	stream->start = start;
+	stream->timeout = timeout;
+	stream->timeout_reset = timeout;
+}
+
+void ast_iostream_set_exclusive_input(struct ast_iostream *stream, int exclusive_input)
+{
+	ast_assert(stream != NULL);
+
+	stream->exclusive_input = exclusive_input;
+}
+
+static ssize_t iostream_read(struct ast_iostream *stream, void *buf, size_t size)
+{
+	struct timeval start;
+	int ms;
+	int res;
+
+	if (stream->start.tv_sec) {
+		start = stream->start;
+	} else {
+		start = ast_tvnow();
+	}
+
+#if defined(DO_SSL)
+	if (stream->ssl) {
+		for (;;) {
+			res = SSL_read(stream->ssl, buf, size);
+			if (0 < res) {
+				/* We read some payload data. */
+				stream->timeout = stream->timeout_reset;
+				return res;
+			}
+			switch (SSL_get_error(stream->ssl, res)) {
+			case SSL_ERROR_ZERO_RETURN:
+				/* Report EOF for a shutdown */
+				ast_debug(1, "TLS clean shutdown alert reading data\n");
+				return 0;
+			case SSL_ERROR_WANT_READ:
+				if (!stream->exclusive_input) {
+					/* We cannot wait for data now. */
+					errno = EAGAIN;
+					return -1;
+				}
+				while ((ms = ast_remaining_ms(start, stream->timeout))) {
+					res = ast_wait_for_input(stream->fd, ms);
+					if (0 < res) {
+						/* Socket is ready to be read. */
+						break;
+					}
+					if (res < 0) {
+						if (errno == EINTR || errno == EAGAIN) {
+							/* Try again. */
+							continue;
+						}
+						ast_debug(1, "TLS socket error waiting for read data: %s\n",
+							strerror(errno));
+						return -1;
+					}
+				}
+				break;
+			case SSL_ERROR_WANT_WRITE:
+				while ((ms = ast_remaining_ms(start, stream->timeout))) {
+					res = ast_wait_for_output(stream->fd, ms);
+					if (0 < res) {
+						/* Socket is ready to be written. */
+						break;
+					}
+					if (res < 0) {
+						if (errno == EINTR || errno == EAGAIN) {
+							/* Try again. */
+							continue;
+						}
+						ast_debug(1, "TLS socket error waiting for write space: %s\n",
+							strerror(errno));
+						return -1;
+					}
+				}
+				break;
+			default:
+				/* Report EOF for an undecoded SSL or transport error. */
+				ast_debug(1, "TLS transport or SSL error reading data\n");
+				return 0;
+			}
+			if (!ms) {
+				/* Report EOF for a timeout */
+				ast_debug(1, "TLS timeout reading data\n");
+				return 0;
+			}
+		}
+	}
+#endif	/* defined(DO_SSL) */
+
+	for (;;) {
+		res = read(stream->fd, buf, size);
+		if (0 <= res) {
+			/* Got data or we cannot wait for it. */
+			stream->timeout = stream->timeout_reset;
+			return res;
+		}
+		if (!stream->exclusive_input) {
+			return res;
+		}
+		if (errno != EINTR && errno != EAGAIN) {
+			/* Not a retryable error. */
+			ast_debug(1, "TCP socket error reading data: %s\n",
+				strerror(errno));
+			return -1;
+		}
+		ms = ast_remaining_ms(start, stream->timeout);
+		if (!ms) {
+			/* Report EOF for a timeout */
+			ast_debug(1, "TCP timeout reading data\n");
+			return 0;
+		}
+		ast_wait_for_input(stream->fd, ms);
+	}
+}
+
+ssize_t ast_iostream_read(struct ast_iostream *stream, void *buf, size_t size)
+{
+	if (!size) {
+		/* You asked for no data you got no data. */
+		return 0;
+	}
+
+	if (!stream || stream->fd == -1) {
+		errno = EBADF;
+		return -1;
+	}
+
+	/* Get any remains from the read buffer */
+	if (stream->rbuflen) {
+		size_t r = size;
+		if (r > stream->rbuflen) {
+			r = stream->rbuflen;
+		}
+		memcpy(buf, stream->rbufhead, r);
+		stream->rbuflen -= r;
+		stream->rbufhead += r;
+		return r;
+	}
+
+	return iostream_read(stream, buf, size);
+}
+
+ssize_t ast_iostream_gets(struct ast_iostream *stream, char *buf, size_t count)
+{
+	ssize_t r;
+	char *newline;
+
+	do {
+		/* Search for newline */
+		newline = memchr(stream->rbufhead, '\n', stream->rbuflen);
+		if (newline) {
+			r = newline - stream->rbufhead + 1;
+			if (r > count-1) {
+				r = count-1;
+			}
+			break;
+		}
+
+		/* Enough data? */
+		if (stream->rbuflen >= count - 1) {
+			r = count - 1;
+			break;
+		}
+
+		/* Try to fill in line buffer */
+		if (stream->rbuflen && stream->rbuf != stream->rbufhead) {
+			memmove(&stream->rbuf, stream->rbufhead, stream->rbuflen);
+		}
+		stream->rbufhead = stream->rbuf;
+
+		r = iostream_read(stream, stream->rbufhead + stream->rbuflen, sizeof(stream->rbuf) - stream->rbuflen);
+		if (r <= 0) {
+			return r;
+		}
+		stream->rbuflen += r;
+	} while (1);
+
+	/* Return r bytes with termination byte */
+	memcpy(buf, stream->rbufhead, r);
+	buf[r] = 0;
+	stream->rbuflen -= r;
+	stream->rbufhead += r;
+
+	return r;
+}
+
+ssize_t ast_iostream_discard(struct ast_iostream *stream, size_t size)
+{
+	char buf[1024];
+	size_t remaining = size;
+	ssize_t ret;
+
+	while (remaining) {
+		ret = ast_iostream_read(stream, buf, remaining > sizeof(buf) ? sizeof(buf) : remaining);
+		if (ret < 0) {
+			return ret;
+		}
+		remaining -= ret;
+	}
+
+	return size;
+}
+
+ssize_t ast_iostream_write(struct ast_iostream *stream, const void *buf, size_t size)
+{
+	struct timeval start;
+	int ms;
+	int res;
+	int written;
+	int remaining;
+
+	if (!size) {
+		/* You asked to write no data you wrote no data. */
+		return 0;
+	}
+
+	if (!stream || stream->fd == -1) {
+		errno = EBADF;
+		return -1;
+	}
+
+	if (stream->start.tv_sec) {
+		start = stream->start;
+	} else {
+		start = ast_tvnow();
+	}
+
+#if defined(DO_SSL)
+	if (stream->ssl) {
+		written = 0;
+		remaining = size;
+		for (;;) {
+			res = SSL_write(stream->ssl, buf + written, remaining);
+			if (res == remaining) {
+				/* Everything was written. */
+				return size;
+			}
+			if (0 < res) {
+				/* Successfully wrote part of the buffer.  Try to write the rest. */
+				written += res;
+				remaining -= res;
+				continue;
+			}
+			switch (SSL_get_error(stream->ssl, res)) {
+			case SSL_ERROR_ZERO_RETURN:
+				ast_debug(1, "TLS clean shutdown alert writing data\n");
+				if (written) {
+					/* Report partial write. */
+					return written;
+				}
+				errno = EBADF;
+				return -1;
+			case SSL_ERROR_WANT_READ:
+				ms = ast_remaining_ms(start, stream->timeout);
+				if (!ms) {
+					/* Report partial write. */
+					ast_debug(1, "TLS timeout writing data (want read)\n");
+					return written;
+				}
+				ast_wait_for_input(stream->fd, ms);
+				break;
+			case SSL_ERROR_WANT_WRITE:
+				ms = ast_remaining_ms(start, stream->timeout);
+				if (!ms) {
+					/* Report partial write. */
+					ast_debug(1, "TLS timeout writing data (want write)\n");
+					return written;
+				}
+				ast_wait_for_output(stream->fd, ms);
+				break;
+			default:
+				/* Undecoded SSL or transport error. */
+				ast_debug(1, "TLS transport or SSL error writing data\n");
+				if (written) {
+					/* Report partial write. */
+					return written;
+				}
+				errno = EBADF;
+				return -1;
+			}
+		}
+	}
+#endif	/* defined(DO_SSL) */
+
+	written = 0;
+	remaining = size;
+	for (;;) {
+		res = write(stream->fd, buf + written, remaining);
+		if (res == remaining) {
+			/* Yay everything was written. */
+			return size;
+		}
+		if (0 < res) {
+			/* Successfully wrote part of the buffer.  Try to write the rest. */
+			written += res;
+			remaining -= res;
+			continue;
+		}
+		if (errno != EINTR && errno != EAGAIN) {
+			/* Not a retryable error. */
+			ast_debug(1, "TCP socket error writing: %s\n", strerror(errno));
+			if (written) {
+				return written;
+			}
+			return -1;
+		}
+		ms = ast_remaining_ms(start, stream->timeout);
+		if (!ms) {
+			/* Report partial write. */
+			ast_debug(1, "TCP timeout writing data\n");
+			return written;
+		}
+		ast_wait_for_output(stream->fd, ms);
+	}
+}
+
+ssize_t ast_iostream_printf(struct ast_iostream *stream, const void *fmt, ...)
+{
+	char sbuf[256], *buf = sbuf;
+	int len, len2, ret = -1;
+	va_list va;
+
+	va_start(va, fmt);
+	len = vsnprintf(buf, sizeof(sbuf), fmt, va);
+	va_end(va);
+
+	if (len > sizeof(sbuf)) {
+		buf = ast_malloc(len);
+		if (!buf) {
+			return -1;
+		}
+		va_start(va, fmt);
+		len2 = vsnprintf(buf, len, fmt, va);
+		va_end(va);
+		if (len2 > len) {
+			goto error;
+		}
+	}
+
+	if (ast_iostream_write(stream, buf, len) == len)
+		ret = len;
+
+error:
+	if (buf != sbuf) {
+		ast_free(buf);
+	}
+
+	return ret;
+}
+
+int ast_iostream_close(struct ast_iostream *stream)
+{
+	if (!stream) {
+		errno = EBADF;
+		return -1;
+	}
+
+	if (stream->fd != -1) {
+#if defined(DO_SSL)
+		if (stream->ssl) {
+			int res;
+
+			/*
+			 * According to the TLS standard, it is acceptable for an
+			 * application to only send its shutdown alert and then
+			 * close the underlying connection without waiting for
+			 * the peer's response (this way resources can be saved,
+			 * as the process can already terminate or serve another
+			 * connection).
+			 */
+			res = SSL_shutdown(stream->ssl);
+			if (res < 0) {
+				ast_log(LOG_ERROR, "SSL_shutdown() failed: %d\n",
+					SSL_get_error(stream->ssl, res));
+			}
+
+			if (!stream->ssl->server) {
+				/* For client threads, ensure that the error stack is cleared */
+				ERR_remove_state(0);
+			}
+
+			SSL_free(stream->ssl);
+			stream->ssl = NULL;
+		}
+#endif	/* defined(DO_SSL) */
+
+		/*
+		 * Issuing shutdown() is necessary here to avoid a race
+		 * condition where the last data written may not appear
+		 * in the TCP stream.  See ASTERISK-23548
+		 */
+		shutdown(stream->fd, SHUT_RDWR);
+		if (close(stream->fd)) {
+			ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
+		}
+		stream->fd = -1;
+	}
+	ao2_t_ref(stream, -1, "Closed ast_iostream");
+
+	return 0;
+}
+
+static void iostream_dtor(void *cookie)
+{
+#ifdef AST_DEVMODE
+	/* Since the ast_assert below is the only one using stream,
+	 * and ast_assert is only available with AST_DEVMODE, we
+	 * put this in a conditional to avoid compiler warnings. */
+	struct ast_iostream *stream = cookie;
+#endif
+
+	ast_assert(stream->fd == -1);
+}
+
+struct ast_iostream *ast_iostream_from_fd(int *fd)
+{
+	struct ast_iostream *stream;
+
+	stream = ao2_alloc_options(sizeof(*stream), iostream_dtor,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (stream) {
+		stream->timeout = -1;
+		stream->timeout_reset = -1;
+		stream->fd = *fd;
+		*fd = -1;
+	}
+
+	return stream;
+}
+
+int ast_iostream_start_tls(struct ast_iostream **pstream, SSL_CTX *ssl_ctx, int client)
+{
+#ifdef DO_SSL
+	struct ast_iostream *stream = *pstream;
+	int (*ssl_setup)(SSL *) = client ? SSL_connect : SSL_accept;
+	char err[256];
+
+	stream->ssl = SSL_new(ssl_ctx);
+	if (!stream->ssl) {
+		ast_log(LOG_ERROR, "Unable to create new SSL connection\n");
+		errno = ENOMEM;
+		return -1;
+	}
+
+	/*
+	 * This function takes struct ast_iostream **, so it can chain
+	 * SSL over any ast_iostream. For now we assume it's a file descriptor.
+	 * But later this should instead use BIO wrapper to tie SSL to another
+	 * ast_iostream.
+	 */
+	SSL_set_fd(stream->ssl, stream->fd);
+
+	if (ssl_setup(stream->ssl) <= 0) {
+		ast_log(LOG_ERROR, "Problem setting up ssl connection: %s\n",
+			ERR_error_string(ERR_get_error(), err));
+		errno = EIO;
+		return -1;
+	}
+
+	return 0;
+#else
+	ast_log(LOG_ERROR, "SSL not enabled in this build\n");
+	errno = ENOTSUP;
+	return -1;
+#endif
+}
diff --git a/main/manager.c b/main/manager.c
index ef1afb0..f059015 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1549,8 +1549,7 @@
 struct mansession_session {
 				/*! \todo XXX need to document which fields it is protecting */
 	struct ast_sockaddr addr;	/*!< address we are connecting from */
-	FILE *f;		/*!< fdopen() on the underlying fd */
-	int fd;			/*!< descriptor used for output. Either the socket (AMI) or a temporary file (HTTP) */
+	struct ast_iostream *stream;	/*!< AMI stream */
 	int inuse;		/*!< number of HTTP sessions using this entry */
 	int needdestroy;	/*!< Whether an HTTP session should be destroyed */
 	pthread_t waiting_thread;	/*!< Sleeping thread using this descriptor */
@@ -1592,9 +1591,8 @@
  */
 struct mansession {
 	struct mansession_session *session;
+	struct ast_iostream *stream;
 	struct ast_tcptls_session_instance *tcptls_session;
-	FILE *f;
-	int fd;
 	enum mansession_message_parsing parsing;
 	int write_error:1;
 	struct manager_custom_hook *hook;
@@ -2166,10 +2164,6 @@
 		ast_datastore_free(datastore);
 	}
 
-	if (session->f != NULL) {
-		fflush(session->f);
-		fclose(session->f);
-	}
 	if (eqe) {
 		ast_atomic_fetchadd_int(&eqe->usecount, -1);
 	}
@@ -2204,7 +2198,6 @@
 		return NULL;
 	}
 
-	newsession->fd = -1;
 	newsession->waiting_thread = AST_PTHREADT_NULL;
 	newsession->writetimeout = 100;
 	newsession->send_events = -1;
@@ -2617,7 +2610,7 @@
 				ast_sockaddr_stringify_addr(&session->addr),
 				(int) (session->sessionstart),
 				(int) (now - session->sessionstart),
-				session->fd,
+				session->stream ? ast_iostream_get_fd(session->stream) : -1,
 				session->inuse,
 				session->readperm,
 				session->writeperm);
@@ -2889,7 +2882,6 @@
 			 * This is necessary to meet the previous design of manager.c
 			 */
 			s.hook = hook;
-			s.f = (void*)1; /* set this to something so our request will make it through all functions that test it*/
 
 			ao2_lock(act_found);
 			if (act_found->registered && act_found->func) {
@@ -2920,9 +2912,8 @@
  */
 static int send_string(struct mansession *s, char *string)
 {
-	int res;
-	FILE *f = s->f ? s->f : s->session->f;
-	int fd = s->f ? s->fd : s->session->fd;
+	struct ast_iostream *stream = s->stream ? s->stream : s->session->stream;
+	int len, res;
 
 	/* It's a result from one of the hook's action invocation */
 	if (s->hook) {
@@ -2934,7 +2925,12 @@
 		return 0;
 	}
 
-	if ((res = ast_careful_fwrite(f, fd, string, strlen(string), s->session->writetimeout))) {
+	len = strlen(string);
+	ast_iostream_set_timeout_inactivity(stream, s->session->writetimeout);
+	res = ast_iostream_write(stream, string, len);
+	ast_iostream_set_timeout_disable(stream);
+
+	if (res < len) {
 		s->write_error = 1;
 	}
 
@@ -2975,10 +2971,10 @@
 		return;
 	}
 
-	if (s->f != NULL || s->session->f != NULL) {
+	if (s->tcptls_session != NULL && s->tcptls_session->stream != NULL) {
 		send_string(s, ast_str_buffer(buf));
 	} else {
-		ast_verbose("fd == -1 in astman_append, should not happen\n");
+		ast_verbose("No connection stream in astman_append, should not happen\n");
 	}
 }
 
@@ -4119,7 +4115,7 @@
 			break;
 		}
 		if (s->session->managerid == 0) {	/* AMI session */
-			if (ast_wait_for_input(s->session->fd, 1000)) {
+			if (ast_wait_for_input(ast_iostream_get_fd(s->session->stream), 1000)) {
 				break;
 			}
 		} else {	/* HTTP session */
@@ -5924,7 +5920,7 @@
 	int ret = 0;
 
 	ao2_lock(s->session);
-	if (s->session->f != NULL) {
+	if (s->session->stream != NULL) {
 		struct eventqent *eqe = s->session->last_ev;
 
 		while ((eqe = advance_event(eqe))) {
@@ -6466,7 +6462,7 @@
 		s->session->waiting_thread = pthread_self();
 		ao2_unlock(s->session);
 
-		res = ast_wait_for_input(s->session->fd, timeout);
+		res = ast_wait_for_input(ast_iostream_get_fd(s->session->stream), timeout);
 
 		ao2_lock(s->session);
 		s->session->waiting_thread = AST_PTHREADT_NULL;
@@ -6484,7 +6480,7 @@
 	}
 
 	ao2_lock(s->session);
-	res = fread(src + s->session->inlen, 1, maxlen - s->session->inlen, s->session->f);
+	res = ast_iostream_read(s->session->stream, src + s->session->inlen, maxlen - s->session->inlen);
 	if (res < 1) {
 		res = -1;	/* error return */
 	} else {
@@ -6617,13 +6613,12 @@
 	struct mansession s = {
 		.tcptls_session = data,
 	};
-	int flags;
 	int res;
+	int arg = 1;
 	struct ast_sockaddr ser_remote_address_tmp;
-	struct protoent *p;
 
 	if (ast_atomic_fetchadd_int(&unauth_sessions, +1) >= authlimit) {
-		fclose(ser->f);
+		ast_iostream_close(ser->stream);
 		ast_atomic_fetchadd_int(&unauth_sessions, -1);
 		goto done;
 	}
@@ -6632,7 +6627,7 @@
 	session = build_mansession(&ser_remote_address_tmp);
 
 	if (session == NULL) {
-		fclose(ser->f);
+		ast_iostream_close(ser->stream);
 		ast_atomic_fetchadd_int(&unauth_sessions, -1);
 		goto done;
 	}
@@ -6640,20 +6635,10 @@
 	/* here we set TCP_NODELAY on the socket to disable Nagle's algorithm.
 	 * This is necessary to prevent delays (caused by buffering) as we
 	 * write to the socket in bits and pieces. */
-	p = getprotobyname("tcp");
-	if (p) {
-		int arg = 1;
-		if( setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0 ) {
-			ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
-		}
-	} else {
-		ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY, getprotobyname(\"tcp\") failed\nSome manager actions may be slow to respond.\n");
+	if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0) {
+		ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno));
 	}
-
-	/* make sure socket is non-blocking */
-	flags = fcntl(ser->fd, F_GETFL);
-	flags |= O_NONBLOCK;
-	fcntl(ser->fd, F_SETFL, flags);
+	ast_iostream_nonblock(ser->stream);
 
 	ao2_lock(session);
 	/* Hook to the tail of the event queue */
@@ -6662,8 +6647,7 @@
 	ast_mutex_init(&s.lock);
 
 	/* these fields duplicate those in the 'ser' structure */
-	session->fd = s.fd = ser->fd;
-	session->f = s.f = ser->f;
+	session->stream = s.stream = ser->stream;
 	ast_sockaddr_copy(&session->addr, &ser_remote_address_tmp);
 	s.session = session;
 
@@ -6682,9 +6666,9 @@
 	 * We cannot let the stream exclusively wait for data to arrive.
 	 * We have to wake up the task to send async events.
 	 */
-	ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
+	ast_iostream_set_exclusive_input(ser->stream, 0);
 
-	ast_tcptls_stream_set_timeout_sequence(ser->stream_cookie,
+	ast_iostream_set_timeout_sequence(ser->stream,
 		ast_tvnow(), authtimeout * 1000);
 
 	astman_append(&s, "Asterisk Call Manager/%s\r\n", AMI_VERSION);	/* welcome prompt */
@@ -6693,7 +6677,7 @@
 			break;
 		}
 		if (session->authenticated) {
-			ast_tcptls_stream_set_timeout_disable(ser->stream_cookie);
+			ast_iostream_set_timeout_disable(ser->stream);
 		}
 	}
 	/* session is over, explain why and terminate */
@@ -7552,23 +7536,9 @@
 
 static void close_mansession_file(struct mansession *s)
 {
-	if (s->f) {
-		if (fclose(s->f)) {
-			ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno));
-		}
-		s->f = NULL;
-		s->fd = -1;
-	} else if (s->fd != -1) {
-		/*
-		 * Issuing shutdown() is necessary here to avoid a race
-		 * condition where the last data written may not appear
-		 * in the TCP stream.  See ASTERISK-23548
-		 */
-		shutdown(s->fd, SHUT_RDWR);
-		if (close(s->fd)) {
-			ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
-		}
-		s->fd = -1;
+	if (s->stream) {
+		ast_iostream_close(s->stream);
+		s->stream = NULL;
 	} else {
 		ast_log(LOG_ERROR, "Attempted to close file/file descriptor on mansession without a valid file or file descriptor.\n");
 	}
@@ -7577,17 +7547,20 @@
 static void process_output(struct mansession *s, struct ast_str **out, struct ast_variable *params, enum output_format format)
 {
 	char *buf;
-	size_t l;
+	off_t l;
+	int fd;
 
-	if (!s->f)
+	if (!s->stream)
 		return;
 
 	/* Ensure buffer is NULL-terminated */
-	fprintf(s->f, "%c", 0);
-	fflush(s->f);
+	ast_iostream_write(s->stream, "", 1);
 
-	if ((l = ftell(s->f)) > 0) {
-		if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, s->fd, 0))) {
+	fd = ast_iostream_get_fd(s->stream);
+
+	l = lseek(fd, SEEK_CUR, 0);
+	if (l > 0) {
+		if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0))) {
 			ast_log(LOG_WARNING, "mmap failed.  Manager output was not processed\n");
 		} else {
 			if (format == FORMAT_XML || format == FORMAT_HTML) {
@@ -7614,6 +7587,7 @@
 	struct mansession s = { .session = NULL, .tcptls_session = ser };
 	struct mansession_session *session = NULL;
 	uint32_t ident;
+	int fd;
 	int blastaway = 0;
 	struct ast_variable *v;
 	struct ast_variable *params = get_params;
@@ -7669,17 +7643,17 @@
 	}
 
 	s.session = session;
-	s.fd = mkstemp(template);	/* create a temporary file for command output */
+	fd = mkstemp(template);	/* create a temporary file for command output */
 	unlink(template);
-	if (s.fd <= -1) {
+	if (fd <= -1) {
 		ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
 		goto generic_callback_out;
 	}
-	s.f = fdopen(s.fd, "w+");
-	if (!s.f) {
+	s.stream = ast_iostream_from_fd(&fd);
+	if (!s.stream) {
 		ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
 		ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
-		close(s.fd);
+		close(fd);
 		goto generic_callback_out;
 	}
 
@@ -7819,9 +7793,9 @@
 		if (blastaway) {
 			session_destroy(session);
 		} else {
-			if (session->f) {
-				fclose(session->f);
-				session->f = NULL;
+			if (session->stream) {
+				ast_iostream_close(session->stream);
+				session->stream = NULL;
 			}
 			unref_mansession(session);
 		}
@@ -7846,6 +7820,7 @@
 	struct message m = { 0 };
 	unsigned int idx;
 	size_t hdrlen;
+	int fd;
 
 	time_t time_now = time(NULL);
 	unsigned long nonce = 0, nc;
@@ -8024,17 +7999,17 @@
 
 	ast_mutex_init(&s.lock);
 	s.session = session;
-	s.fd = mkstemp(template);	/* create a temporary file for command output */
+	fd = mkstemp(template);	/* create a temporary file for command output */
 	unlink(template);
-	if (s.fd <= -1) {
+	if (fd <= -1) {
 		ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)");
 		goto auth_callback_out;
 	}
-	s.f = fdopen(s.fd, "w+");
-	if (!s.f) {
+	s.stream = ast_iostream_from_fd(&fd);
+	if (!s.stream) {
 		ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno));
 		ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)");
-		close(s.fd);
+		close(fd);
 		goto auth_callback_out;
 	}
 
@@ -8085,7 +8060,7 @@
 		m.headers[idx] = NULL;
 	}
 
-	result_size = ftell(s.f); /* Calculate approx. size of result */
+	result_size = lseek(ast_iostream_get_fd(s.stream), SEEK_CUR, 0); /* Calculate approx. size of result */
 
 	http_header = ast_str_create(80);
 	out = ast_str_create(result_size * 2 + 512);
@@ -8137,11 +8112,10 @@
 	ast_free(out);
 
 	ao2_lock(session);
-	if (session->f) {
-		fclose(session->f);
+	if (session->stream) {
+		ast_iostream_close(session->stream);
+		session->stream = NULL;
 	}
-	session->f = NULL;
-	session->fd = -1;
 	ao2_unlock(session);
 
 	if (session->needdestroy) {
diff --git a/main/tcptls.c b/main/tcptls.c
index 262fca0..c8ebab4 100644
--- a/main/tcptls.c
+++ b/main/tcptls.c
@@ -47,506 +47,13 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/pbx.h"
 
-/*! ao2 object used for the FILE stream fopencookie()/funopen() cookie. */
-struct ast_tcptls_stream {
-	/*! SSL state if not NULL */
-	SSL *ssl;
-	/*!
-	 * \brief Start time from when an I/O sequence must complete
-	 * by struct ast_tcptls_stream.timeout.
-	 *
-	 * \note If struct ast_tcptls_stream.start.tv_sec is zero then
-	 * start time is the current I/O request.
-	 */
-	struct timeval start;
-	/*!
-	 * \brief The socket returned by accept().
-	 *
-	 * \note Set to -1 if the stream is closed.
-	 */
-	int fd;
-	/*!
-	 * \brief Timeout in ms relative to struct ast_tcptls_stream.start
-	 * to wait for an event on struct ast_tcptls_stream.fd.
-	 *
-	 * \note Set to -1 to disable timeout.
-	 * \note The socket needs to be set to non-blocking for the timeout
-	 * feature to work correctly.
-	 */
-	int timeout;
-	/*! TRUE if stream can exclusively wait for fd input. */
-	int exclusive_input;
-};
-
-void ast_tcptls_stream_set_timeout_disable(struct ast_tcptls_stream *stream)
-{
-	ast_assert(stream != NULL);
-
-	stream->timeout = -1;
-}
-
-void ast_tcptls_stream_set_timeout_inactivity(struct ast_tcptls_stream *stream, int timeout)
-{
-	ast_assert(stream != NULL);
-
-	stream->start.tv_sec = 0;
-	stream->timeout = timeout;
-}
-
-void ast_tcptls_stream_set_timeout_sequence(struct ast_tcptls_stream *stream, struct timeval start, int timeout)
-{
-	ast_assert(stream != NULL);
-
-	stream->start = start;
-	stream->timeout = timeout;
-}
-
-void ast_tcptls_stream_set_exclusive_input(struct ast_tcptls_stream *stream, int exclusive_input)
-{
-	ast_assert(stream != NULL);
-
-	stream->exclusive_input = exclusive_input;
-}
-
-/*!
- * \internal
- * \brief fopencookie()/funopen() stream read function.
- *
- * \param cookie Stream control data.
- * \param buf Where to put read data.
- * \param size Size of the buffer.
- *
- * \retval number of bytes put into buf.
- * \retval 0 on end of file.
- * \retval -1 on error.
- */
-static HOOK_T tcptls_stream_read(void *cookie, char *buf, LEN_T size)
-{
-	struct ast_tcptls_stream *stream = cookie;
-	struct timeval start;
-	int ms;
-	int res;
-
-	if (!size) {
-		/* You asked for no data you got no data. */
-		return 0;
-	}
-
-	if (!stream || stream->fd == -1) {
-		errno = EBADF;
-		return -1;
-	}
-
-	if (stream->start.tv_sec) {
-		start = stream->start;
-	} else {
-		start = ast_tvnow();
-	}
-
-#if defined(DO_SSL)
-	if (stream->ssl) {
-		for (;;) {
-			res = SSL_read(stream->ssl, buf, size);
-			if (0 < res) {
-				/* We read some payload data. */
-				return res;
-			}
-			switch (SSL_get_error(stream->ssl, res)) {
-			case SSL_ERROR_ZERO_RETURN:
-				/* Report EOF for a shutdown */
-				ast_debug(1, "TLS clean shutdown alert reading data\n");
-				return 0;
-			case SSL_ERROR_WANT_READ:
-				if (!stream->exclusive_input) {
-					/* We cannot wait for data now. */
-					errno = EAGAIN;
-					return -1;
-				}
-				while ((ms = ast_remaining_ms(start, stream->timeout))) {
-					res = ast_wait_for_input(stream->fd, ms);
-					if (0 < res) {
-						/* Socket is ready to be read. */
-						break;
-					}
-					if (res < 0) {
-						if (errno == EINTR || errno == EAGAIN) {
-							/* Try again. */
-							continue;
-						}
-						ast_debug(1, "TLS socket error waiting for read data: %s\n",
-							strerror(errno));
-						return -1;
-					}
-				}
-				break;
-			case SSL_ERROR_WANT_WRITE:
-				while ((ms = ast_remaining_ms(start, stream->timeout))) {
-					res = ast_wait_for_output(stream->fd, ms);
-					if (0 < res) {
-						/* Socket is ready to be written. */
-						break;
-					}
-					if (res < 0) {
-						if (errno == EINTR || errno == EAGAIN) {
-							/* Try again. */
-							continue;
-						}
-						ast_debug(1, "TLS socket error waiting for write space: %s\n",
-							strerror(errno));
-						return -1;
-					}
-				}
-				break;
-			default:
-				/* Report EOF for an undecoded SSL or transport error. */
-				ast_debug(1, "TLS transport or SSL error reading data\n");
-				return 0;
-			}
-			if (!ms) {
-				/* Report EOF for a timeout */
-				ast_debug(1, "TLS timeout reading data\n");
-				return 0;
-			}
-		}
-	}
-#endif	/* defined(DO_SSL) */
-
-	for (;;) {
-		res = read(stream->fd, buf, size);
-		if (0 <= res || !stream->exclusive_input) {
-			/* Got data or we cannot wait for it. */
-			return res;
-		}
-		if (errno != EINTR && errno != EAGAIN) {
-			/* Not a retryable error. */
-			ast_debug(1, "TCP socket error reading data: %s\n",
-				strerror(errno));
-			return -1;
-		}
-		ms = ast_remaining_ms(start, stream->timeout);
-		if (!ms) {
-			/* Report EOF for a timeout */
-			ast_debug(1, "TCP timeout reading data\n");
-			return 0;
-		}
-		ast_wait_for_input(stream->fd, ms);
-	}
-}
-
-/*!
- * \internal
- * \brief fopencookie()/funopen() stream write function.
- *
- * \param cookie Stream control data.
- * \param buf Where to get data to write.
- * \param size Size of the buffer.
- *
- * \retval number of bytes written from buf.
- * \retval -1 on error.
- */
-static HOOK_T tcptls_stream_write(void *cookie, const char *buf, LEN_T size)
-{
-	struct ast_tcptls_stream *stream = cookie;
-	struct timeval start;
-	int ms;
-	int res;
-	int written;
-	int remaining;
-
-	if (!size) {
-		/* You asked to write no data you wrote no data. */
-		return 0;
-	}
-
-	if (!stream || stream->fd == -1) {
-		errno = EBADF;
-		return -1;
-	}
-
-	if (stream->start.tv_sec) {
-		start = stream->start;
-	} else {
-		start = ast_tvnow();
-	}
-
-#if defined(DO_SSL)
-	if (stream->ssl) {
-		written = 0;
-		remaining = size;
-		for (;;) {
-			res = SSL_write(stream->ssl, buf + written, remaining);
-			if (res == remaining) {
-				/* Everything was written. */
-				return size;
-			}
-			if (0 < res) {
-				/* Successfully wrote part of the buffer.  Try to write the rest. */
-				written += res;
-				remaining -= res;
-				continue;
-			}
-			switch (SSL_get_error(stream->ssl, res)) {
-			case SSL_ERROR_ZERO_RETURN:
-				ast_debug(1, "TLS clean shutdown alert writing data\n");
-				if (written) {
-					/* Report partial write. */
-					return written;
-				}
-				errno = EBADF;
-				return -1;
-			case SSL_ERROR_WANT_READ:
-				ms = ast_remaining_ms(start, stream->timeout);
-				if (!ms) {
-					/* Report partial write. */
-					ast_debug(1, "TLS timeout writing data (want read)\n");
-					return written;
-				}
-				ast_wait_for_input(stream->fd, ms);
-				break;
-			case SSL_ERROR_WANT_WRITE:
-				ms = ast_remaining_ms(start, stream->timeout);
-				if (!ms) {
-					/* Report partial write. */
-					ast_debug(1, "TLS timeout writing data (want write)\n");
-					return written;
-				}
-				ast_wait_for_output(stream->fd, ms);
-				break;
-			default:
-				/* Undecoded SSL or transport error. */
-				ast_debug(1, "TLS transport or SSL error writing data\n");
-				if (written) {
-					/* Report partial write. */
-					return written;
-				}
-				errno = EBADF;
-				return -1;
-			}
-		}
-	}
-#endif	/* defined(DO_SSL) */
-
-	written = 0;
-	remaining = size;
-	for (;;) {
-		res = write(stream->fd, buf + written, remaining);
-		if (res == remaining) {
-			/* Yay everything was written. */
-			return size;
-		}
-		if (0 < res) {
-			/* Successfully wrote part of the buffer.  Try to write the rest. */
-			written += res;
-			remaining -= res;
-			continue;
-		}
-		if (errno != EINTR && errno != EAGAIN) {
-			/* Not a retryable error. */
-			ast_debug(1, "TCP socket error writing: %s\n", strerror(errno));
-			if (written) {
-				return written;
-			}
-			return -1;
-		}
-		ms = ast_remaining_ms(start, stream->timeout);
-		if (!ms) {
-			/* Report partial write. */
-			ast_debug(1, "TCP timeout writing data\n");
-			return written;
-		}
-		ast_wait_for_output(stream->fd, ms);
-	}
-}
-
-/*!
- * \internal
- * \brief fopencookie()/funopen() stream close function.
- *
- * \param cookie Stream control data.
- *
- * \retval 0 on success.
- * \retval -1 on error.
- */
-static int tcptls_stream_close(void *cookie)
-{
-	struct ast_tcptls_stream *stream = cookie;
-
-	if (!stream) {
-		errno = EBADF;
-		return -1;
-	}
-
-	if (stream->fd != -1) {
-#if defined(DO_SSL)
-		if (stream->ssl) {
-			int res;
-
-			/*
-			 * According to the TLS standard, it is acceptable for an
-			 * application to only send its shutdown alert and then
-			 * close the underlying connection without waiting for
-			 * the peer's response (this way resources can be saved,
-			 * as the process can already terminate or serve another
-			 * connection).
-			 */
-			res = SSL_shutdown(stream->ssl);
-			if (res < 0) {
-				ast_log(LOG_ERROR, "SSL_shutdown() failed: %d\n",
-					SSL_get_error(stream->ssl, res));
-			}
-
-			if (!stream->ssl->server) {
-				/* For client threads, ensure that the error stack is cleared */
-#if OPENSSL_VERSION_NUMBER >= 0x10000000L
-				ERR_remove_thread_state(NULL);
-#else
-				ERR_remove_state(0);
-#endif	/* OPENSSL_VERSION_NUMBER >= 0x10000000L */
-			}
-
-			SSL_free(stream->ssl);
-			stream->ssl = NULL;
-		}
-#endif	/* defined(DO_SSL) */
-
-		/*
-		 * Issuing shutdown() is necessary here to avoid a race
-		 * condition where the last data written may not appear
-		 * in the TCP stream.  See ASTERISK-23548
-		 */
-		shutdown(stream->fd, SHUT_RDWR);
-		if (close(stream->fd)) {
-			ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
-		}
-		stream->fd = -1;
-	}
-	ao2_t_ref(stream, -1, "Closed tcptls stream cookie");
-
-	return 0;
-}
-
-/*!
- * \internal
- * \brief fopencookie()/funopen() stream destructor function.
- *
- * \param cookie Stream control data.
- *
- * \return Nothing
- */
-static void tcptls_stream_dtor(void *cookie)
-{
-#ifdef AST_DEVMODE
-	/* Since the ast_assert below is the only one using stream,
-	 * and ast_assert is only available with AST_DEVMODE, we
-	 * put this in a conditional to avoid compiler warnings. */
-	struct ast_tcptls_stream *stream = cookie;
-#endif
-
-	ast_assert(stream->fd == -1);
-}
-
-/*!
- * \internal
- * \brief fopencookie()/funopen() stream allocation function.
- *
- * \retval stream_cookie on success.
- * \retval NULL on error.
- */
-static struct ast_tcptls_stream *tcptls_stream_alloc(void)
-{
-	struct ast_tcptls_stream *stream;
-
-	stream = ao2_alloc_options(sizeof(*stream), tcptls_stream_dtor,
-		AO2_ALLOC_OPT_LOCK_NOLOCK);
-	if (stream) {
-		stream->fd = -1;
-		stream->timeout = -1;
-	}
-	return stream;
-}
-
-/*!
- * \internal
- * \brief Open a custom FILE stream for tcptls.
- *
- * \param stream Stream cookie control data.
- * \param ssl SSL state if not NULL.
- * \param fd Socket file descriptor.
- * \param timeout ms to wait for an event on fd. -1 if timeout disabled.
- *
- * \retval fp on success.
- * \retval NULL on error.
- */
-static FILE *tcptls_stream_fopen(struct ast_tcptls_stream *stream, SSL *ssl, int fd, int timeout)
-{
-	FILE *fp;
-
-#if defined(HAVE_FOPENCOOKIE)	/* the glibc/linux interface */
-	static const cookie_io_functions_t cookie_funcs = {
-		tcptls_stream_read,
-		tcptls_stream_write,
-		NULL,
-		tcptls_stream_close
-	};
-#endif	/* defined(HAVE_FOPENCOOKIE) */
-
-	if (fd == -1) {
-		/* Socket not open. */
-		return NULL;
-	}
-
-	stream->ssl = ssl;
-	stream->fd = fd;
-	stream->timeout = timeout;
-	ao2_t_ref(stream, +1, "Opening tcptls stream cookie");
-
-#if defined(HAVE_FUNOPEN)	/* the BSD interface */
-	fp = funopen(stream, tcptls_stream_read, tcptls_stream_write, NULL,
-		tcptls_stream_close);
-#elif defined(HAVE_FOPENCOOKIE)	/* the glibc/linux interface */
-	fp = fopencookie(stream, "w+", cookie_funcs);
-#else
-	/* could add other methods here */
-	ast_debug(2, "No stream FILE methods attempted!\n");
-	fp = NULL;
-#endif
-
-	if (!fp) {
-		stream->fd = -1;
-		ao2_t_ref(stream, -1, "Failed to open tcptls stream cookie");
-	}
-	return fp;
-}
-
-HOOK_T ast_tcptls_server_read(struct ast_tcptls_session_instance *tcptls_session, void *buf, size_t count)
-{
-	if (!tcptls_session->stream_cookie || tcptls_session->stream_cookie->fd == -1) {
-		ast_log(LOG_ERROR, "TCP/TLS read called on invalid stream.\n");
-		errno = EIO;
-		return -1;
-	}
-
-	return tcptls_stream_read(tcptls_session->stream_cookie, buf, count);
-}
-
-HOOK_T ast_tcptls_server_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t count)
-{
-	if (!tcptls_session->stream_cookie || tcptls_session->stream_cookie->fd == -1) {
-		ast_log(LOG_ERROR, "TCP/TLS write called on invalid stream.\n");
-		errno = EIO;
-		return -1;
-	}
-
-	return tcptls_stream_write(tcptls_session->stream_cookie, buf, count);
-}
-
 static void session_instance_destructor(void *obj)
 {
 	struct ast_tcptls_session_instance *i = obj;
 
-	if (i->stream_cookie) {
-		ao2_t_ref(i->stream_cookie, -1, "Destroying tcptls session instance");
-		i->stream_cookie = NULL;
+	if (i->stream) {
+		ast_iostream_close(i->stream);
+		i->stream = NULL;
 	}
 	ast_free(i->overflow_buf);
 	ao2_cleanup(i->private_data);
@@ -591,9 +98,7 @@
 {
 	struct ast_tcptls_session_instance *tcptls_session = data;
 #ifdef DO_SSL
-	int (*ssl_setup)(SSL *) = (tcptls_session->client) ? SSL_connect : SSL_accept;
-	int ret;
-	char err[256];
+	SSL *ssl;
 #endif
 
 	/* TCP/TLS connections are associated with external protocols, and
@@ -608,123 +113,94 @@
 		return NULL;
 	}
 
-	tcptls_session->stream_cookie = tcptls_stream_alloc();
-	if (!tcptls_session->stream_cookie) {
-		ast_tcptls_close_session_file(tcptls_session);
-		ao2_ref(tcptls_session, -1);
-		return NULL;
-	}
-
-	/*
-	* open a FILE * as appropriate.
-	*/
-	if (!tcptls_session->parent->tls_cfg) {
-		tcptls_session->f = tcptls_stream_fopen(tcptls_session->stream_cookie, NULL,
-			tcptls_session->fd, -1);
-		if (tcptls_session->f) {
-			if (setvbuf(tcptls_session->f, NULL, _IONBF, 0)) {
-				ast_tcptls_close_session_file(tcptls_session);
-			}
-		}
-	}
+	if (tcptls_session->parent->tls_cfg) {
 #ifdef DO_SSL
-	else if ( (tcptls_session->ssl = SSL_new(tcptls_session->parent->tls_cfg->ssl_ctx)) ) {
-		SSL_set_fd(tcptls_session->ssl, tcptls_session->fd);
-		if ((ret = ssl_setup(tcptls_session->ssl)) <= 0) {
-			ast_log(LOG_ERROR, "Problem setting up ssl connection: %s\n", ERR_error_string(ERR_get_error(), err));
-		} else if ((tcptls_session->f = tcptls_stream_fopen(tcptls_session->stream_cookie,
-			tcptls_session->ssl, tcptls_session->fd, -1))) {
-			if ((tcptls_session->client && !ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_DONT_VERIFY_SERVER))
-				|| (!tcptls_session->client && ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_VERIFY_CLIENT))) {
-				X509 *peer;
-				long res;
-				peer = SSL_get_peer_certificate(tcptls_session->ssl);
-				if (!peer) {
-					ast_log(LOG_ERROR, "No peer SSL certificate to verify\n");
-					ast_tcptls_close_session_file(tcptls_session);
-					ao2_ref(tcptls_session, -1);
-					return NULL;
+		if (ast_iostream_start_tls(&tcptls_session->stream, tcptls_session->parent->tls_cfg->ssl_ctx, tcptls_session->client) < 0) {
+			ast_tcptls_close_session_file(tcptls_session);
+			ao2_ref(tcptls_session, -1);
+			return NULL;
+		}
+
+		ssl = ast_iostream_get_ssl(tcptls_session->stream);
+		if ((tcptls_session->client && !ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_DONT_VERIFY_SERVER))
+			|| (!tcptls_session->client && ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_VERIFY_CLIENT))) {
+			X509 *peer;
+			long res;
+			peer = SSL_get_peer_certificate(ssl);
+			if (!peer) {
+				ast_log(LOG_ERROR, "No peer SSL certificate to verify\n");
+				ast_tcptls_close_session_file(tcptls_session);
+				ao2_ref(tcptls_session, -1);
+				return NULL;
+			}
+
+			res = SSL_get_verify_result(ssl);
+			if (res != X509_V_OK) {
+				ast_log(LOG_ERROR, "Certificate did not verify: %s\n", X509_verify_cert_error_string(res));
+				X509_free(peer);
+				ast_tcptls_close_session_file(tcptls_session);
+				ao2_ref(tcptls_session, -1);
+				return NULL;
+			}
+			if (!ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_IGNORE_COMMON_NAME)) {
+				ASN1_STRING *str;
+				X509_NAME *name = X509_get_subject_name(peer);
+				STACK_OF(GENERAL_NAME) *alt_names;
+				int pos = -1;
+				int found = 0;
+
+				for (;;) {
+					/* Walk the certificate to check all available "Common Name" */
+					/* XXX Probably should do a gethostbyname on the hostname and compare that as well */
+					pos = X509_NAME_get_index_by_NID(name, NID_commonName, pos);
+					if (pos < 0) {
+						break;
+					}
+					str = X509_NAME_ENTRY_get_data(X509_NAME_get_entry(name, pos));
+					if (!check_tcptls_cert_name(str, tcptls_session->parent->hostname, "common name")) {
+						found = 1;
+						break;
+					}
 				}
 
-				res = SSL_get_verify_result(tcptls_session->ssl);
-				if (res != X509_V_OK) {
-					ast_log(LOG_ERROR, "Certificate did not verify: %s\n", X509_verify_cert_error_string(res));
+				if (!found) {
+					alt_names = X509_get_ext_d2i(peer, NID_subject_alt_name, NULL, NULL);
+					if (alt_names != NULL) {
+						int alt_names_count = sk_GENERAL_NAME_num(alt_names);
+
+						for (pos = 0; pos < alt_names_count; pos++) {
+							const GENERAL_NAME *alt_name = sk_GENERAL_NAME_value(alt_names, pos);
+
+							if (alt_name->type != GEN_DNS) {
+								continue;
+							}
+
+							if (!check_tcptls_cert_name(alt_name->d.dNSName, tcptls_session->parent->hostname, "alt name")) {
+								found = 1;
+								break;
+							}
+						}
+
+						sk_GENERAL_NAME_pop_free(alt_names, GENERAL_NAME_free);
+					}
+				}
+
+				if (!found) {
+					ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname);
 					X509_free(peer);
 					ast_tcptls_close_session_file(tcptls_session);
 					ao2_ref(tcptls_session, -1);
 					return NULL;
 				}
-				if (!ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_IGNORE_COMMON_NAME)) {
-					ASN1_STRING *str;
-					X509_NAME *name = X509_get_subject_name(peer);
-					STACK_OF(GENERAL_NAME) *alt_names;
-					int pos = -1;
-					int found = 0;
-
-					for (;;) {
-						/* Walk the certificate to check all available "Common Name" */
-						/* XXX Probably should do a gethostbyname on the hostname and compare that as well */
-						pos = X509_NAME_get_index_by_NID(name, NID_commonName, pos);
-						if (pos < 0) {
-							break;
-						}
-
-						str = X509_NAME_ENTRY_get_data(X509_NAME_get_entry(name, pos));
-						if (!check_tcptls_cert_name(str, tcptls_session->parent->hostname, "common name")) {
-							found = 1;
-							break;
-						}
-					}
-
-					if (!found) {
-						alt_names = X509_get_ext_d2i(peer, NID_subject_alt_name, NULL, NULL);
-						if (alt_names != NULL) {
-							int alt_names_count = sk_GENERAL_NAME_num(alt_names);
-
-							for (pos = 0; pos < alt_names_count; pos++) {
-								const GENERAL_NAME *alt_name = sk_GENERAL_NAME_value(alt_names, pos);
-
-								if (alt_name->type != GEN_DNS) {
-									continue;
-								}
-
-								if (!check_tcptls_cert_name(alt_name->d.dNSName, tcptls_session->parent->hostname, "alt name")) {
-									found = 1;
-									break;
-								}
-							}
-
-							sk_GENERAL_NAME_pop_free(alt_names, GENERAL_NAME_free);
-						}
-					}
-
-					if (!found) {
-						ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname);
-						X509_free(peer);
-						ast_tcptls_close_session_file(tcptls_session);
-						ao2_ref(tcptls_session, -1);
-						return NULL;
-					}
-				}
-				X509_free(peer);
 			}
+			X509_free(peer);
 		}
-		if (!tcptls_session->f) {	/* no success opening descriptor stacking */
-			SSL_free(tcptls_session->ssl);
-		}
-	}
-#endif /* DO_SSL */
-
-	if (!tcptls_session->f) {
+#else
+		ast_log(LOG_ERROR, "Attempted a TLS connection without OpenSSL support. This will not work!\n");
 		ast_tcptls_close_session_file(tcptls_session);
-		ast_log(LOG_WARNING, "FILE * open failed!\n");
-#ifndef DO_SSL
-		if (tcptls_session->parent->tls_cfg) {
-			ast_log(LOG_ERROR, "Attempted a TLS connection without OpenSSL support. This will not work!\n");
-		}
-#endif
 		ao2_ref(tcptls_session, -1);
 		return NULL;
+#endif /* DO_SSL */
 	}
 
 	if (tcptls_session->parent->worker_fn) {
@@ -772,7 +248,13 @@
 		tcptls_session->overflow_buf = ast_str_create(128);
 		flags = fcntl(fd, F_GETFL);
 		fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
-		tcptls_session->fd = fd;
+
+		tcptls_session->stream = ast_iostream_from_fd(&fd);
+		if (!tcptls_session->stream) {
+			ast_log(LOG_WARNING, "No memory for new session iostream\n");
+			continue;
+		}
+
 		tcptls_session->parent = desc;
 		ast_sockaddr_copy(&tcptls_session->remote_address, &addr);
 
@@ -1036,7 +518,7 @@
 
 struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc)
 {
-	int x = 1;
+	int fd, x = 1;
 	struct ast_tcptls_session_instance *tcptls_session = NULL;
 
 	/* Do nothing if nothing has changed */
@@ -1052,8 +534,8 @@
 		close(desc->accept_fd);
 	}
 
-	desc->accept_fd = socket(ast_sockaddr_is_ipv6(&desc->remote_address) ?
-				 AF_INET6 : AF_INET, SOCK_STREAM, IPPROTO_TCP);
+	fd = desc->accept_fd = socket(ast_sockaddr_is_ipv6(&desc->remote_address) ?
+				      AF_INET6 : AF_INET, SOCK_STREAM, IPPROTO_TCP);
 	if (desc->accept_fd < 0) {
 		ast_log(LOG_ERROR, "Unable to allocate socket for %s: %s\n",
 			desc->name, strerror(errno));
@@ -1079,7 +561,11 @@
 
 	tcptls_session->overflow_buf = ast_str_create(128);
 	tcptls_session->client = 1;
-	tcptls_session->fd = desc->accept_fd;
+	tcptls_session->stream = ast_iostream_from_fd(&fd);
+	if (!tcptls_session->stream) {
+		goto error;
+	}
+
 	tcptls_session->parent = desc;
 	tcptls_session->parent->worker_fn = NULL;
 	ast_sockaddr_copy(&tcptls_session->remote_address,
@@ -1170,24 +656,9 @@
 
 void ast_tcptls_close_session_file(struct ast_tcptls_session_instance *tcptls_session)
 {
-	if (tcptls_session->f) {
-		fflush(tcptls_session->f);
-		if (fclose(tcptls_session->f)) {
-			ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno));
-		}
-		tcptls_session->f = NULL;
-		tcptls_session->fd = -1;
-	} else if (tcptls_session->fd != -1) {
-		/*
-		 * Issuing shutdown() is necessary here to avoid a race
-		 * condition where the last data written may not appear
-		 * in the TCP stream.  See ASTERISK-23548
-		 */
-		shutdown(tcptls_session->fd, SHUT_RDWR);
-		if (close(tcptls_session->fd)) {
-			ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno));
-		}
-		tcptls_session->fd = -1;
+	if (tcptls_session->stream) {
+		ast_iostream_close(tcptls_session->stream);
+		tcptls_session->stream = NULL;
 	} else {
 		ast_log(LOG_ERROR, "ast_tcptls_close_session_file invoked on session instance without file or file descriptor\n");
 	}
diff --git a/main/utils.c b/main/utils.c
index 775fae3..2c56af3 100644
--- a/main/utils.c
+++ b/main/utils.c
@@ -1462,68 +1462,6 @@
 	return res;
 }
 
-int ast_careful_fwrite(FILE *f, int fd, const char *src, size_t len, int timeoutms)
-{
-	struct timeval start = ast_tvnow();
-	int n = 0;
-	int elapsed = 0;
-
-	while (len) {
-		if (wait_for_output(fd, timeoutms - elapsed)) {
-			/* poll returned a fatal error, so bail out immediately. */
-			return -1;
-		}
-
-		/* Clear any errors from a previous write */
-		clearerr(f);
-
-		n = fwrite(src, 1, len, f);
-
-		if (ferror(f) && errno != EINTR && errno != EAGAIN) {
-			/* fatal error from fwrite() */
-			if (!feof(f)) {
-				/* Don't spam the logs if it was just that the connection is closed. */
-				ast_log(LOG_ERROR, "fwrite() returned error: %s\n", strerror(errno));
-			}
-			n = -1;
-			break;
-		}
-
-		/* Update for data already written to the socket */
-		len -= n;
-		src += n;
-
-		elapsed = ast_tvdiff_ms(ast_tvnow(), start);
-		if (elapsed >= timeoutms) {
-			/* We've taken too long to write
-			 * This is only an error condition if we haven't finished writing. */
-			n = len ? -1 : 0;
-			break;
-		}
-	}
-
-	errno = 0;
-	while (fflush(f)) {
-		if (errno == EAGAIN || errno == EINTR) {
-			/* fflush() does not appear to reset errno if it flushes
-			 * and reaches EOF at the same time. It returns EOF with
-			 * the last seen value of errno, causing a possible loop.
-			 * Also usleep() to reduce CPU eating if it does loop */
-			errno = 0;
-			usleep(1);
-			continue;
-		}
-		if (errno && !feof(f)) {
-			/* Don't spam the logs if it was just that the connection is closed. */
-			ast_log(LOG_ERROR, "fflush() returned error: %s\n", strerror(errno));
-		}
-		n = -1;
-		break;
-	}
-
-	return n < 0 ? -1 : 0;
-}
-
 char *ast_strip_quoted(char *s, const char *beg_quotes, const char *end_quotes)
 {
 	char *e;
diff --git a/res/res_http_post.c b/res/res_http_post.c
index 241edfe..3f0a58e 100644
--- a/res/res_http_post.c
+++ b/res/res_http_post.c
@@ -211,7 +211,7 @@
 * This function has two modes.  The first to find a boundary marker.  The
 * second is to find the filename immediately after the boundary.
 */
-static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen)
+static int readmimefile(struct ast_iostream *in, FILE *fout, char *boundary, int contentlen)
 {
 	int find_filename = 0;
 	char buf[4096];
@@ -222,7 +222,7 @@
 	int boundary_len;
 	char * path_end, * path_start, * filespec;
 
-	if (NULL == fin || NULL == fout || NULL == boundary || 0 >= contentlen) {
+	if (NULL == in || NULL == fout || NULL == boundary || 0 >= contentlen) {
 		return -1;
 	}
 
@@ -236,8 +236,8 @@
 		}
 
 		if (0 < num_to_read) {
-			if (fread(&(buf[char_in_buf]), 1, num_to_read, fin) < num_to_read) {
-				ast_log(LOG_WARNING, "fread() failed: %s\n", strerror(errno));
+			if (ast_iostream_read(in, &(buf[char_in_buf]), num_to_read) < num_to_read) {
+				ast_log(LOG_WARNING, "read failed: %s\n", strerror(errno));
 				num_to_read = 0;
 			}
 			contentlen -= num_to_read;
@@ -378,7 +378,7 @@
 	 */
 	ast_http_body_read_status(ser, 0);
 
-	if (0 > readmimefile(ser->f, f, boundary_marker, content_len)) {
+	if (0 > readmimefile(ser->stream, f, boundary_marker, content_len)) {
 		ast_debug(1, "Cannot find boundary marker in POST request.\n");
 		fclose(f);
 		ast_http_error(ser, 400, "Bad Request", "Cannot find boundary marker in POST request.");
diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c
index 28bf45f..cb5e5ff 100644
--- a/res/res_http_websocket.c
+++ b/res/res_http_websocket.c
@@ -73,8 +73,7 @@
 
 /*! \brief Structure definition for session */
 struct ast_websocket {
-	FILE *f;                           /*!< Pointer to the file instance used for writing and reading */
-	int fd;                            /*!< File descriptor for the session, only used for polling */
+	struct ast_iostream *stream;       /*!< iostream of the connection */
 	struct ast_sockaddr address;       /*!< Address of the remote client */
 	enum ast_websocket_opcode opcode;  /*!< Cached opcode for multi-frame messages */
 	size_t payload_len;                /*!< Length of the payload */
@@ -165,10 +164,11 @@
 {
 	struct ast_websocket *session = obj;
 
-	if (session->f) {
+	if (session->stream) {
 		ast_websocket_close(session, 0);
-		if (session->f) {
-			fclose(session->f);
+		if (session->stream) {
+			ast_iostream_close(session->stream);
+			session->stream = NULL;
 			ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from",
 				ast_sockaddr_stringify(&session->address));
 		}
@@ -294,20 +294,22 @@
 	session->close_sent = 1;
 
 	ao2_lock(session);
-	res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout);
+	ast_iostream_set_timeout_inactivity(session->stream, session->timeout);
+	res = ast_iostream_write(session->stream, frame, sizeof(frame));
+	ast_iostream_set_timeout_disable(session->stream);
 
 	/* If an error occurred when trying to close this connection explicitly terminate it now.
 	 * Doing so will cause the thread polling on it to wake up and terminate.
 	 */
-	if (res) {
-		fclose(session->f);
-		session->f = NULL;
+	if (res != sizeof(frame)) {
+		ast_iostream_close(session->stream);
+		session->stream = NULL;
 		ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n",
 			session->client ? "to" : "from", ast_sockaddr_stringify(&session->address));
 	}
 
 	ao2_unlock(session);
-	return res;
+	return res == sizeof(frame);
 }
 
 static const char *opcode_map[] = {
@@ -375,7 +377,8 @@
 		return -1;
 	}
 
-	if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) {
+	ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout);
+	if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) {
 		ao2_unlock(session);
 		/* 1011 - server terminating connection due to not being able to fulfill the request */
 		ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n");
@@ -383,7 +386,7 @@
 		return -1;
 	}
 
-	fflush(session->f);
+	ast_iostream_set_timeout_disable(session->stream);
 	ao2_unlock(session);
 
 	return 0;
@@ -411,7 +414,7 @@
 
 int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session)
 {
-	return session->closing ? -1 : session->fd;
+	return session->closing ? -1 : ast_iostream_get_fd(session->stream);
 }
 
 struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session)
@@ -426,18 +429,8 @@
 
 int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session)
 {
-	int flags;
-
-	if ((flags = fcntl(session->fd, F_GETFL)) == -1) {
-		return -1;
-	}
-
-	flags |= O_NONBLOCK;
-
-	if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) {
-		return -1;
-	}
-
+	ast_iostream_nonblock(session->stream);
+	ast_iostream_set_exclusive_input(session->stream, 0);
 	return 0;
 }
 
@@ -490,17 +483,16 @@
 	int sanity = 10;
 
 	ao2_lock(session);
-	if (!session->f) {
+	if (!session->stream) {
 		ao2_unlock(session);
 		errno = ECONNABORTED;
 		return -1;
 	}
 
 	for (;;) {
-		clearerr(session->f);
-		rlen = fread(rbuf, 1, xlen, session->f);
-		if (!rlen) {
-			if (feof(session->f)) {
+		rlen = ast_iostream_read(session->stream, rbuf, xlen);
+		if (rlen != xlen) {
+			if (rlen == 0) {
 				ast_log(LOG_WARNING, "Web socket closed abruptly\n");
 				*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
 				session->closing = 1;
@@ -508,7 +500,7 @@
 				return -1;
 			}
 
-			if (ferror(session->f) && errno != EAGAIN) {
+			if (rlen < 0 && errno != EAGAIN) {
 				ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno));
 				*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
 				session->closing = 1;
@@ -529,7 +521,7 @@
 		if (!xlen) {
 			break;
 		}
-		if (ast_wait_for_input(session->fd, 1000) < 0) {
+		if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) {
 			ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno));
 			*opcode = AST_WEBSOCKET_OPCODE_CLOSE;
 			session->closing = 1;
@@ -824,7 +816,7 @@
 			ao2_ref(protocol_handler, -1);
 			return 0;
 		}
-		session->timeout =  AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
+		session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
 
 		/* Generate the session id */
 		if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) {
@@ -854,7 +846,8 @@
 		 *    Connection_.
 		 */
 		if (protocol) {
-			fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+			ast_iostream_printf(ser->stream,
+				"HTTP/1.1 101 Switching Protocols\r\n"
 				"Upgrade: %s\r\n"
 				"Connection: Upgrade\r\n"
 				"Sec-WebSocket-Accept: %s\r\n"
@@ -863,15 +856,14 @@
 				websocket_combine_key(key, base64, sizeof(base64)),
 				protocol);
 		} else {
-			fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n"
+			ast_iostream_printf(ser->stream,
+				"HTTP/1.1 101 Switching Protocols\r\n"
 				"Upgrade: %s\r\n"
 				"Connection: Upgrade\r\n"
 				"Sec-WebSocket-Accept: %s\r\n\r\n",
 				upgrade,
 				websocket_combine_key(key, base64, sizeof(base64)));
 		}
-
-		fflush(ser->f);
 	} else {
 
 		/* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */
@@ -883,7 +875,7 @@
 	}
 
 	/* Enable keepalive on all sessions so the underlying user does not have to */
-	if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
+	if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) {
 		ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n",
 			ast_sockaddr_stringify(&ser->remote_address));
 		websocket_bad_request(ser);
@@ -895,25 +887,23 @@
 	ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version);
 
 	/* Populate the session with all the needed details */
-	session->f = ser->f;
-	session->fd = ser->fd;
+	session->stream = ser->stream;
 	ast_sockaddr_copy(&session->address, &ser->remote_address);
 	session->opcode = -1;
 	session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING;
-	session->secure = ser->ssl ? 1 : 0;
+	session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0;
 
 	/* Give up ownership of the socket and pass it to the protocol handler */
-	ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0);
+	ast_iostream_set_exclusive_input(session->stream, 0);
 	protocol_handler->session_established(session, get_vars, headers);
 	ao2_ref(protocol_handler, -1);
 
 	/*
-	 * By dropping the FILE* and fd from the session the connection
+	 * By dropping the stream from the session the connection
 	 * won't get closed when the HTTP server cleans up because we
 	 * passed the connection to the protocol handler.
 	 */
-	ser->f = NULL;
-	ser->fd = -1;
+	ser->stream = NULL;
 
 	return 0;
 }
@@ -1247,7 +1237,7 @@
 	int has_accept = 0;
 	int has_protocol = 0;
 
-	if (!fgets(buf, sizeof(buf), client->ser->f)) {
+	if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) {
 		ast_log(LOG_ERROR, "Unable to retrieve HTTP status line.");
 		return WS_BAD_STATUS;
 	}
@@ -1260,7 +1250,7 @@
 
 	/* Ignoring line folding - assuming header field values are contained
 	   within a single line */
-	while (fgets(buf, sizeof(buf), client->ser->f)) {
+	while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) {
 		char *name, *value;
 		int parsed = ast_http_header_parse(buf, &name, &value);
 
@@ -1313,19 +1303,19 @@
 			client->protocols);
 	}
 
-	if (fprintf(client->ser->f,
-		    "GET /%s HTTP/1.1\r\n"
-		    "Sec-WebSocket-Version: %d\r\n"
-		    "Upgrade: websocket\r\n"
-		    "Connection: Upgrade\r\n"
-		    "Host: %s\r\n"
-		    "Sec-WebSocket-Key: %s\r\n"
-		    "%s\r\n",
-		    client->resource_name ? ast_str_buffer(client->resource_name) : "",
-		    client->version,
-		    client->host,
-		    client->key,
-		    protocols) < 0) {
+	if (ast_iostream_printf(client->ser->stream,
+			"GET /%s HTTP/1.1\r\n"
+			"Sec-WebSocket-Version: %d\r\n"
+			"Upgrade: websocket\r\n"
+			"Connection: Upgrade\r\n"
+			"Host: %s\r\n"
+			"Sec-WebSocket-Key: %s\r\n"
+			"%s\r\n",
+			client->resource_name ? ast_str_buffer(client->resource_name) : "",
+			client->version,
+			client->host,
+			client->key,
+			protocols) < 0) {
 		ast_log(LOG_ERROR, "Failed to send handshake.\n");
 		return WS_WRITE_ERROR;
 	}
@@ -1349,9 +1339,9 @@
 		return res;
 	}
 
-	ws->f = ws->client->ser->f;
-	ws->fd = ws->client->ser->fd;
-	ws->secure = ws->client->ser->ssl ? 1 : 0;
+	ws->stream = ws->client->ser->stream;
+	ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0;
+	ws->client->ser->stream = NULL;
 	ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address);
 	return WS_OK;
 }
diff --git a/res/res_phoneprov.c b/res/res_phoneprov.c
index 132c842..92eec31 100644
--- a/res/res_phoneprov.c
+++ b/res/res_phoneprov.c
@@ -949,7 +949,7 @@
 			socklen_t namelen = sizeof(name.sa);
 			int res;
 
-			if ((res = getsockname(ser->fd, &name.sa, &namelen))) {
+			if ((res = getsockname(ast_iostream_get_fd(ser->stream), &name.sa, &namelen))) {
 				ast_log(LOG_WARNING, "Could not get server IP, breakage likely.\n");
 			} else {
 				struct extension *exten_iter;

-- 
To view, visit https://gerrit.asterisk.org/2932
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85
Gerrit-PatchSet: 9
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: Timo Teräs <timo.teras at iki.fi>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Mark Michelson <mmichelson at digium.com>
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>
Gerrit-Reviewer: Timo Teräs <timo.teras at iki.fi>



More information about the asterisk-code-review mailing list