[asterisk-commits] dvossel: branch dvossel/sip_nonblocking_tcp_client r220211 - /team/dvossel/si...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Sep 24 12:42:33 CDT 2009
Author: dvossel
Date: Thu Sep 24 12:42:28 2009
New Revision: 220211
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=220211
Log:
Implemented alert pipe into tcp_tls thread.
Modified:
team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c
Modified: team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c?view=diff&rev=220211&r1=220210&r2=220211
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c (original)
+++ team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c Thu Sep 24 12:42:28 2009
@@ -2098,6 +2098,13 @@
char lastmsg[256]; /*!< Last Message sent/received */
};
+enum sip_tcptls_alert {
+ /*! \brief There is new data to be sent out */
+ TCPTLS_ALERT_DATA,
+ /*! \brief A request to stop the tcp_handler thread */
+ TCPTLS_ALERT_STOP,
+};
+
/*! \brief Definition of a thread that handles a socket */
struct sip_threadinfo {
int stop;
@@ -2900,20 +2907,49 @@
static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session)
{
struct sip_threadinfo *th;
- th = ao2_alloc(sizeof(*th), NULL);
-
- if (!th)
+
+ if (!(th = ao2_alloc(sizeof(*th), NULL))) {
return NULL;
-
+ }
+
+ th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+ if (pipe(th->alert_pipe) == -1) {
+ ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
+ return NULL;
+ }
+ ast_log(LOG_NOTICE, " alertpipe is supposed to be up\n"); //todohere remove
th->tcptls_session = tcptls_session;
- if (tcptls_session->ssl)
- th->type = SIP_TRANSPORT_TLS;
- else
- th->type = SIP_TRANSPORT_TCP;
-//todohere create alert pipe!
+ th->type = tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP;
ao2_t_link(threadl, th, "Adding new tcptls helper thread");
return th;
+}
+
+static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len)
+{
+ int res = 0;
+ struct sip_threadinfo *th;
+ struct sip_threadinfo tmp = {
+ .tcptls_session = tcptls_session,
+ };
+ enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
+ if (!(th = ao2_t_find(threadl, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) {
+ return -1;
+ }
+
+ /* must lock the thread info object to guarantee control of alert_pipe fd */
+ ao2_lock(th);
+ res = ast_tcptls_server_write(tcptls_session, buf, len); //todohere, replace this with write to th info queue
+ if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
+ ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
+ //todohere remove from data queue since alert failed
+ res = -1;
+ }
+ ao2_unlock(th);
+
+ ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
+ return res;
}
/*! \brief SIP TCP thread management function
@@ -2925,6 +2961,7 @@
struct sip_request req = { 0, } , reqcpy = { 0, };
struct sip_threadinfo *me;
char buf[1024] = "";
+ struct pollfd fds[2] = { { 0 }, { 0 }, };
/* If this is a server connection, create a new thread info object.
*
@@ -2951,6 +2988,11 @@
me->threadid = pthread_self();
ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
+ /* set up pollfd to watch for reads on both the socket and the alert_pipe */
+ fds[0].fd = tcptls_session->fd;
+ fds[1].fd = me->alert_pipe[0];
+ fds[0].events = fds[1].events = POLLIN|POLLPRI;
+
if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
goto cleanup;
if (!(reqcpy.data = ast_str_create(SIP_MIN_PACKET)))
@@ -2979,54 +3021,110 @@
req.socket.port = htons(ourport_tcp);
}
req.socket.fd = tcptls_session->fd;
- res = ast_wait_for_input(tcptls_session->fd, -1);
+
+ res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */
if (res < 0) {
ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res);
goto cleanup;
}
- /* Read in headers one line at a time */
- while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
- ast_mutex_lock(&tcptls_session->lock);
- if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
- ast_mutex_unlock(&tcptls_session->lock);
- goto cleanup;
- }
- ast_mutex_unlock(&tcptls_session->lock);
- if (me->stop)
- goto cleanup;
- ast_str_append(&req.data, 0, "%s", buf);
- req.len = req.data->used;
- }
- copy_request(&reqcpy, &req);
- parse_request(&reqcpy);
- /* In order to know how much to read, we need the content-length header */
- if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
- while (cl > 0) {
- size_t bytes_read;
+ /* handle the socket event, check for both reads from the socket fd,
+ * and writes from alert_pipe fd */
+ if (fds[0].revents) { /* there is data on the socket to be read */
+ struct ast_str *str_save;
+
+ /* clear request structure */
+ str_save = req.data;
+ memset(&req, 0, sizeof(req));
+ req.data = str_save;
+ ast_str_reset(req.data);
+
+ str_save = reqcpy.data;
+ memset(&reqcpy, 0, sizeof(reqcpy));
+ reqcpy.data = str_save;
+ ast_str_reset(reqcpy.data);
+
+ memset(buf, 0, sizeof(buf));
+
+ if (tcptls_session->ssl) {
+ set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
+ req.socket.port = htons(ourport_tls);
+ } else {
+ set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
+ req.socket.port = htons(ourport_tcp);
+ }
+ req.socket.fd = tcptls_session->fd;
+
+ /* Read in headers one line at a time */
+ while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
ast_mutex_lock(&tcptls_session->lock);
- if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+ if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
ast_mutex_unlock(&tcptls_session->lock);
goto cleanup;
}
- buf[bytes_read] = '\0';
ast_mutex_unlock(&tcptls_session->lock);
if (me->stop)
- goto cleanup;
- cl -= strlen(buf);
+ goto cleanup;
ast_str_append(&req.data, 0, "%s", buf);
req.len = req.data->used;
}
- }
- /*! \todo XXX If there's no Content-Length or if the content-length and what
- we receive is not the same - we should generate an error */
-
- req.socket.tcptls_session = tcptls_session;
- handle_request_do(&req, &tcptls_session->remote_address);
+ copy_request(&reqcpy, &req);
+ parse_request(&reqcpy);
+ /* In order to know how much to read, we need the content-length header */
+ if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
+ while (cl > 0) {
+ size_t bytes_read;
+ ast_mutex_lock(&tcptls_session->lock);
+ if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+ ast_mutex_unlock(&tcptls_session->lock);
+ goto cleanup;
+ }
+ buf[bytes_read] = '\0';
+ ast_mutex_unlock(&tcptls_session->lock);
+ if (me->stop)
+ goto cleanup;
+ cl -= strlen(buf);
+ ast_str_append(&req.data, 0, "%s", buf);
+ req.len = req.data->used;
+ }
+ }
+ /*! \todo XXX If there's no Content-Length or if the content-length and what
+ we receive is not the same - we should generate an error */
+
+ req.socket.tcptls_session = tcptls_session;
+ handle_request_do(&req, &tcptls_session->remote_address);
+ } else if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */
+ enum sip_tcptls_alert alert;
+
+ if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) {
+ ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno));
+ continue;
+ }
+
+ switch (alert) {
+ case TCPTLS_ALERT_STOP:
+ goto cleanup;
+ case TCPTLS_ALERT_DATA:
+ ast_log(LOG_NOTICE, "GOT SEND ALERT!\n"); //todohere remove
+ //todohere check threadinfo read queue for packets to send
+ break;
+ default:
+ ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
+ }
+ }
}
cleanup:
ao2_t_unlink(threadl, me, "Removing tcptls helper thread, thread is closing");
+ ao2_lock(me);
+ if (me->alert_pipe[1] > -1) {
+ close(me->alert_pipe[0]);
+ }
+ if (me->alert_pipe[1] > -1) {
+ close(me->alert_pipe[1]);
+ }
+ me->alert_pipe[0] = me->alert_pipe[1] = -1;
+ ao2_unlock(me);
ao2_t_ref(me, -1, "Closing tcptls thread, decrementing ref of sip_threadinfo");
cleanup2:
fclose(tcptls_session->f);
@@ -3508,7 +3606,7 @@
res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
} else if (p->socket.tcptls_session) {
if (p->socket.tcptls_session->f) {
- res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len);
+ res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
} else {
ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
return XMIT_ERROR;
More information about the asterisk-commits
mailing list