[svn-commits] branch file/txc r15894 - in /team/file/txc: ./ include/asterisk/ txc/

svn-commits at lists.digium.com svn-commits at lists.digium.com
Tue Mar 28 17:12:31 MST 2006


Author: file
Date: Tue Mar 28 18:12:27 2006
New Revision: 15894

URL: http://svn.digium.com/view/asterisk?rev=15894&view=rev
Log:
Introducting TXC (Transcoding eXpansion Community): A method of doing clustered/distributed transcoding with automatic resource discovery and configuration. Stay tuned for more information and documentation!

Added:
    team/file/txc/txc/
    team/file/txc/txc/txc-parser.c   (with props)
    team/file/txc/txc/txc-parser.h   (with props)
    team/file/txc/txc/txc-protocol.h   (with props)
    team/file/txc/txc/txc.c   (with props)
    team/file/txc/txc/txc.h   (with props)
Modified:
    team/file/txc/Makefile
    team/file/txc/asterisk.c
    team/file/txc/include/asterisk/translate.h
    team/file/txc/translate.c

Modified: team/file/txc/Makefile
URL: http://svn.digium.com/view/asterisk/team/file/txc/Makefile?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/Makefile (original)
+++ team/file/txc/Makefile Tue Mar 28 18:12:27 2006
@@ -364,7 +364,7 @@
 	astmm.o enum.o srv.o dns.o aescrypt.o aestab.o aeskey.o \
 	utils.o plc.o jitterbuf.o dnsmgr.o devicestate.o \
 	netsock.o slinfactory.o ast_expr2.o ast_expr2f.o \
-	cryptostub.o sha1.o http.o
+	cryptostub.o sha1.o http.o txc/txc.o txc/txc-parser.o
 
 # we need to link in the objects statically, not as a library, because
 # otherwise modules will not have them available if none of the static
@@ -551,6 +551,7 @@
 clean-depend:
 	for x in $(SUBDIRS); do $(MAKE) -C $$x clean-depend || exit 1 ; done
 	rm -f .depend .tags-depend
+	rm -rf txc/*.o
 
 clean: clean-depend
 	for x in $(SUBDIRS); do $(MAKE) -C $$x clean || exit 1 ; done

Modified: team/file/txc/asterisk.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/asterisk.c?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/asterisk.c (original)
+++ team/file/txc/asterisk.c Tue Mar 28 18:12:27 2006
@@ -123,6 +123,8 @@
 #include "asterisk/linkedlists.h"
 #include "asterisk/devicestate.h"
 #include "asterisk/compat.h"
+
+#include "txc/txc.h"
 
 #include "asterisk/doxyref.h"		/* Doxygen documentation */
 
@@ -2319,6 +2321,10 @@
 		printf(term_quit());
 		exit(1);
 	}
+	if (txc_init()) {
+		printf(term_quit());
+		exit(1);
+	}
 	/* load 'preload' modules, required for access to Realtime-mapped configuration files */
 	if (load_modules(1)) {
 		printf(term_quit());

Modified: team/file/txc/include/asterisk/translate.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/include/asterisk/translate.h?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/include/asterisk/translate.h (original)
+++ team/file/txc/include/asterisk/translate.h Tue Mar 28 18:12:27 2006
@@ -96,6 +96,7 @@
  * Returns ast_trans_pvt on success, NULL on failure
  * */
 extern struct ast_trans_pvt *ast_translator_build_path(int dest, int source);
+extern struct ast_trans_pvt *ast_translator_build_path_txc(int dest, int source, int use_txc);
 
 /*!
  * \brief Frees a translator path

Modified: team/file/txc/translate.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/translate.c?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/translate.c (original)
+++ team/file/txc/translate.c Tue Mar 28 18:12:27 2006
@@ -45,6 +45,8 @@
 #include "asterisk/cli.h"
 #include "asterisk/term.h"
 
+#include "txc/txc.h"
+
 #define MAX_RECALC 200 /* max sample recalc */
 
 /*! \note
@@ -71,6 +73,7 @@
 static struct ast_translator_dir tr_matrix[MAX_FORMAT][MAX_FORMAT];
 
 struct ast_trans_pvt {
+	txc_session_t *session;
 	struct ast_translator *step;
 	struct ast_translator_pvt *state;
 	struct ast_trans_pvt *next;
@@ -92,6 +95,15 @@
 void ast_translator_free_path(struct ast_trans_pvt *p)
 {
 	struct ast_trans_pvt *pl, *pn;
+
+	/* If a session exists... simply destroy it */
+	if (p->session != NULL) {
+		txc_session_destroy(p->session);
+		free(p);
+		p = NULL;
+		return;
+	}
+
 	pn = p;
 	while(pn) {
 		pl = pn;
@@ -102,10 +114,26 @@
 	}
 }
 
+struct ast_trans_pvt *ast_translator_build_path(int dest, int source)
+{
+	return ast_translator_build_path_txc(dest, source, 1);
+}
+
 /*! Build a set of translators based upon the given source and destination formats */
-struct ast_trans_pvt *ast_translator_build_path(int dest, int source)
-{
+struct ast_trans_pvt *ast_translator_build_path_txc(int dest, int source, int use_txc)
+{
+	txc_session_t *session = NULL;
 	struct ast_trans_pvt *tmpr = NULL, *tmp = NULL;
+	
+	/* If TXC is enabled... */
+	if (use_txc) {
+		session = txc_session_create(source, dest);
+		if (session != NULL) {
+			tmp = ast_calloc(1, sizeof(*tmp));
+			tmp->session = session;
+			return tmp;
+		}
+	}		
 	
 	source = powerof(source);
 	dest = powerof(dest);
@@ -123,7 +151,7 @@
 			tmp = tmp->next;
 		} else
 			tmp = malloc(sizeof(*tmp));
-			
+
 		if (!tmp) {
 			ast_log(LOG_WARNING, "Out of memory\n");
 			if (tmpr)
@@ -135,6 +163,7 @@
 		if (!tmpr)
 			tmpr = tmp;
 
+		tmp->session = NULL;
 		tmp->next = NULL;
 		tmp->nextin = tmp->nextout = ast_tv(0, 0);
 		tmp->step = tr_matrix[source][dest].step;
@@ -157,6 +186,15 @@
 	struct ast_trans_pvt *p;
 	struct ast_frame *out;
 	struct timeval delivery;
+
+	/* See if this is using a TXC session - if so, do it! */
+	if (path->session != NULL) {
+		out = txc_session_translate(path->session, f);
+		if (consume)
+			ast_frfree(f);
+		return out;
+	}
+
 	p = path;
 	/* Feed the first frame into the first translator */
 	p->step->framein(p->state, f);

Added: team/file/txc/txc/txc-parser.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-parser.c?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-parser.c (added)
+++ team/file/txc/txc/txc-parser.c Tue Mar 28 18:12:27 2006
@@ -1,0 +1,170 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Implementation of Transcoding eXpansion Community, v 1
+ *
+ * \author Joshua Colp <jcolp at joshua-colp.com>
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "asterisk.h"
+
+#include "asterisk/frame.h"
+#include "asterisk/utils.h"
+#include "asterisk/unaligned.h"
+#include "txc-protocol.h"
+#include "txc-parser.h"
+
+int txc_ie_append_raw(struct txc_ie_data *ied, unsigned char ie, const void *data, int datalen)
+{
+	if (datalen > ((int)sizeof(ied->buf) - ied->pos)) {
+		return -1;
+	}
+	ied->buf[ied->pos++] = ie;
+	ied->buf[ied->pos++] = datalen;
+	memcpy(ied->buf + ied->pos, data, datalen);
+	ied->pos += datalen;
+	return 0;
+}
+
+int txc_ie_append_addr(struct txc_ie_data *ied, unsigned char ie, const struct sockaddr_in *sin)
+{
+	return txc_ie_append_raw(ied, ie, sin, (int)sizeof(struct sockaddr_in));
+}
+
+int txc_ie_append_int(struct txc_ie_data *ied, unsigned char ie, unsigned int value) 
+{
+	unsigned int newval;
+	newval = htonl(value);
+	return txc_ie_append_raw(ied, ie, &newval, (int)sizeof(newval));
+}
+
+int txc_ie_append_short(struct txc_ie_data *ied, unsigned char ie, unsigned short value) 
+{
+	unsigned short newval;
+	newval = htons(value);
+	return txc_ie_append_raw(ied, ie, &newval, (int)sizeof(newval));
+}
+
+int txc_ie_append_str(struct txc_ie_data *ied, unsigned char ie, const char *str)
+{
+	if (str == NULL)
+		return 0;
+	return txc_ie_append_raw(ied, ie, str, strlen(str));
+}
+
+int txc_ie_append_byte(struct txc_ie_data *ied, unsigned char ie, unsigned char dat)
+{
+	return txc_ie_append_raw(ied, ie, &dat, 1);
+}
+
+int txc_ie_append(struct txc_ie_data *ied, unsigned char ie) 
+{
+	return txc_ie_append_raw(ied, ie, NULL, 0);
+}
+
+int txc_parse_ies(struct txc_ies *ies, unsigned char *data, int datalen)
+{
+	/* Parse data into information elements */
+	int len;
+	int ie;
+	char tmp[256];
+
+	memset(ies, 0, (int)sizeof(struct txc_ies));
+
+	while(datalen >= 2) {
+		ie = data[0];
+		len = data[1];
+		if (len > datalen - 2) {
+			return -1;
+		}
+		switch(ie) {
+		case TXC_IE_SOURCE_UID:
+			ies->source_uid = (char *)data + 2;
+			break;
+		case TXC_IE_TARGET_UID:
+			ies->target_uid = (char *)data + 2;
+			break;
+                case TXC_IE_REASON:
+                        ies->reason = (char *)data + 2;
+                        break;
+                case TXC_IE_TYPE:
+                        if (len != (int)sizeof(unsigned int)) {
+                                snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+                        } else
+                                ies->type = ntohl(get_unaligned_uint32(data + 2));
+                        break;
+                case TXC_IE_SESSION_ID:
+                        if (len != (int)sizeof(unsigned int)) {
+                                snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+                        } else
+                                ies->session_id = ntohl(get_unaligned_uint32(data + 2));
+                        break;
+                case TXC_IE_REMOTE_SESSION_ID:
+                        if (len != (int)sizeof(unsigned int)) {
+                                snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+                        } else
+                                ies->remote_session_id = ntohl(get_unaligned_uint32(data + 2));
+                        break;
+		case TXC_IE_CAPABILITY:
+			if (len != (int)sizeof(unsigned int)) {
+				snprintf(tmp, (int)sizeof(tmp), "Expecting capability to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+			} else
+				ies->capability = ntohl(get_unaligned_uint32(data + 2));
+			break;
+		case TXC_IE_SRC_FORMAT:
+			if (len != (int)sizeof(unsigned int)) {
+				snprintf(tmp, (int)sizeof(tmp), "Expecting format to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+			} else
+				ies->src_format = ntohl(get_unaligned_uint32(data + 2));
+			break;
+                case TXC_IE_DST_FORMAT:
+			if (len != (int)sizeof(unsigned int)) {
+				snprintf(tmp, (int)sizeof(tmp), "Expecting format to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+			} else
+				ies->dst_format = ntohl(get_unaligned_uint32(data + 2));
+			break;
+		case TXC_IE_VERSION:
+			if (len != (int)sizeof(unsigned short)) {
+				snprintf(tmp, (int)sizeof(tmp),  "Expecting version to be %d bytes long but was %d\n", (int)sizeof(unsigned short), len);
+			} else
+				ies->version = ntohs(get_unaligned_uint16(data + 2));
+			break;
+		}
+		/* Overwrite information element with 0, to null terminate previous portion */
+		data[0] = 0;
+		datalen -= (len + 2);
+		data += (len + 2);
+	}
+	/* Null-terminate last field */
+	*data = '\0';
+	if (datalen) {
+		return -1;
+	}
+	return 0;
+}

Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/file/txc/txc/txc-parser.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-parser.h?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-parser.h (added)
+++ team/file/txc/txc/txc-parser.h Tue Mar 28 18:12:27 2006
@@ -1,0 +1,52 @@
+/*
+ * Asterisk -- A telephony toolkit for Linux.
+ *
+ * Implementation of Transcoding eXpansion Community
+ * 
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License
+ */
+
+/*!\file
+ * \brief Implementation of the Transcoding eXpansion Community Protocol
+ */
+ 
+#ifndef _TXC_PARSER_H
+#define _TXC_PARSER_H
+
+struct txc_ies {
+	unsigned int type;
+	char *source_uid;
+	char *target_uid;
+	char *reason;
+	unsigned int capability;
+	unsigned int src_format;
+	unsigned int dst_format;
+	unsigned int session_id;
+	unsigned int remote_session_id;
+	int version;
+};
+
+typedef struct txc_ies txc_ies_t;
+
+struct txc_ie_data {
+	unsigned char buf[1024];
+	int pos;
+};
+
+typedef struct txc_ie_data txc_ie_data_t;
+
+int txc_ie_append_raw(struct txc_ie_data *ied, unsigned char ie, const void *data, int datalen);
+int txc_ie_append_addr(struct txc_ie_data *ied, unsigned char ie, const struct sockaddr_in *sin);
+int txc_ie_append_int(struct txc_ie_data *ied, unsigned char ie, unsigned int value);
+int txc_ie_append_short(struct txc_ie_data *ied, unsigned char ie, unsigned short value);
+int txc_ie_append_str(struct txc_ie_data *ied, unsigned char ie, const char *str);
+int txc_ie_append_byte(struct txc_ie_data *ied, unsigned char ie, unsigned char dat);
+int txc_ie_append(struct txc_ie_data *ied, unsigned char ie);
+int txc_parse_ies(struct txc_ies *ies, unsigned char *data, int datalen);
+
+#endif

Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/file/txc/txc/txc-protocol.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-protocol.h?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-protocol.h (added)
+++ team/file/txc/txc/txc-protocol.h Tue Mar 28 18:12:27 2006
@@ -1,0 +1,51 @@
+/*
+ * Asterisk -- A telephony toolkit for Linux.
+ *
+ * Implementation of Transcoding eXpansion Community
+ * 
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License
+ */
+ 
+#ifndef _TXC_PROTOCOL_H
+#define _TXC_PROTOCOL_H
+
+/* Current support TXC protocol version */
+#define TXC_PROTO_VERSION 1
+
+/* Announcement packet types */
+#define TXC_ANNOUNCE_REQUEST      1  /* Request that all TXC nodes announce themselves */
+#define TXC_ANNOUNCE_REPLY        2  /* Reply with TXC node information */
+#define TXC_ANNOUNCE_SUSPEND      3  /* Announce suspension of a TXC node from service */
+#define TXC_ANNOUNCE_FAILURE      4  /* Announce failure of a TXC node as perceived by another TXC node */
+
+/* Session packet types */
+#define TXC_SESSION_NEW         5 /* Create a new TXC session */
+#define TXC_SESSION_REJECT      6 /* Reject session creation request */
+#define TXC_SESSION_ACK         7 /* Acknowledge session start */
+#define TXC_SESSION_END         8 /* End a TXC session */
+
+/* Node packet types */
+#define TXC_NODE_PING           9 /* Hey - are you alive? */
+#define TXC_NODE_PONG           10 /* Yes... I'm alive */
+
+/* Default port number */
+#define TXC_DEFAULT_PORTNO      2657
+
+/* TXC Information elements */
+#define TXC_IE_TYPE             1 /* What type of packet is this? - unsigned int */
+#define TXC_IE_SOURCE_UID       2 /* Unique source ID - string */
+#define TXC_IE_TARGET_UID       3 /* Target source ID - string */
+#define TXC_IE_CAPABILITY       4 /* TXC node capabilities - unsigned int */
+#define TXC_IE_SRC_FORMAT       5 /* Source transcoding format - unsigned int */
+#define TXC_IE_DST_FORMAT       6 /* Destination transcoding format - unsigned int */
+#define TXC_IE_VERSION          7 /* Protocol version - short */
+#define TXC_IE_SESSION_ID       8 /* Session ID - unsigned int */
+#define TXC_IE_REMOTE_SESSION_ID 9 /* Remote Session ID - unsigned int */
+#define TXC_IE_REASON           10 /* Reason for a reply to happen - string */
+
+#endif

Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/file/txc/txc/txc.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc.c?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc.c (added)
+++ team/file/txc/txc/txc.c Tue Mar 28 18:12:27 2006
@@ -1,0 +1,1138 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2006, Joshua Colp.
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Transcoding eXchange Community
+ *
+ * \author Joshua Colp <jcolp at joshua-colp.com> 
+ */
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <resolv.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <regex.h>
+#include <signal.h>
+
+#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(SOLARIS) || defined(__Darwin__)
+#include <sys/types.h>
+#include <netinet/in_systm.h>
+#endif
+#include <netinet/ip.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__Darwin__)
+#include <net/if_dl.h>
+#include <ifaddrs.h>
+#endif
+
+#include "asterisk.h"
+
+#include "asterisk/astobj.h"
+#include "asterisk/linkedlists.h"
+#include "asterisk/utils.h"
+#include "asterisk/config.h"
+#include "asterisk/logger.h"
+#include "asterisk/sched.h"
+#include "asterisk/options.h"
+#include "asterisk/cli.h"
+#include "asterisk/frame.h"
+#include "asterisk/translate.h"
+#include "asterisk/netsock.h"
+
+#include "txc.h"
+#include "txc-protocol.h"
+#include "txc-parser.h"
+
+#define TESTING_LOOPBACK /* Loop back data for testing cause you know */
+
+/* Configuration Information */
+static unsigned int capabilities = AST_FORMAT_ULAW;
+static char uid[80] = "";
+static struct ast_codec_pref default_prefs;
+
+/* Community information (socket/thread/etc) */
+static pthread_t community_thread = AST_PTHREADT_NULL;
+static int community_recv_sock = -1;
+static struct sockaddr_in community_recv_saddr;
+static time_t last_community_update;
+/* Socket information for sending */
+AST_MUTEX_DEFINE_STATIC(community_lock);
+static int community_send_sock = -1;
+static struct sockaddr_in community_send_saddr;
+
+/* Node monitoring */
+static pthread_t node_thread = AST_PTHREADT_NULL;
+
+/* Global session information */
+AST_MUTEX_DEFINE_STATIC(session_id_lock);
+#define TXC_MAX_SESSIONS 1000
+static int session_ids[TXC_MAX_SESSIONS];
+static struct ast_netsock_list *netsock = NULL;
+
+/* Community nodes */
+static struct txc_node_list {
+	ASTOBJ_CONTAINER_COMPONENTS(struct txc_node);
+} nodes;
+
+/* Sessions - local or remote */
+static struct txc_session_list {
+	ASTOBJ_CONTAINER_COMPONENTS(struct txc_session);
+} sessions;
+
+/* Session IO Thread */
+static void *io_thread_exec(void *data)
+{
+	txc_session_t *session = data;
+	struct io_context *io = session->io;
+
+	for (;;) {
+		/* See if I/O thread needs to exit */
+		ast_mutex_lock(&session->lock);
+		if (ast_test_flag(session, TXC_SESSION_EXIT)) {
+			ast_mutex_unlock(&session->lock);
+			break;
+		}
+		ast_mutex_unlock(&session->lock);
+		ast_io_wait(io, -1);
+	}
+
+	return NULL;
+}
+
+/* State conversions (int to string) */
+static char *state_to_str(int state)
+{
+	switch (state) {
+	case TXC_NODE_STATE_ALIVE:
+		return "Alive";
+	case TXC_NODE_STATE_SUSPENDED:
+		return "Suspended";
+	case TXC_NODE_STATE_FAILED:
+		return "Failed";
+	default:
+		return "Unknown";
+	}
+}
+
+/* Append standarding TXC information about ourselves */
+static void txc_append_standard_ied(txc_ie_data_t *ied)
+{
+        /* Append our UID */
+        txc_ie_append_str(ied, TXC_IE_SOURCE_UID, uid);
+        /* Append version */
+        txc_ie_append_short(ied, TXC_IE_VERSION, TXC_PROTO_VERSION);
+}
+
+/* Transmit to a node via P2P */
+static int txc_node_transmit(int type, txc_node_t *node, txc_ie_data_t *ied)
+{
+	socklen_t len = sizeof(struct sockaddr_in);
+	int res = 0;
+
+	/* Add type of packet */
+	txc_ie_append_int(ied, TXC_IE_TYPE, type);
+
+	/* Deliver to the node */
+	res = sendto(node->sock, (unsigned char*)ied->buf, ied->pos, 0, (struct sockaddr *)&node->addr, len);
+	
+	return 1;
+}
+
+/* Return a free session ID */
+static int new_session_id(void)
+{
+	int id = 0, i = 0;
+
+	/* Find a free session ID */
+	ast_mutex_lock(&session_id_lock);
+	for (i=0; i<TXC_MAX_SESSIONS; i++) {
+		if (session_ids[i] == 0) {
+			session_ids[i] = 1;
+			id = i;
+			break;
+		}
+	}
+	ast_mutex_unlock(&session_id_lock);
+
+	return id;
+}
+
+/* Find a free TXC node to set a session up on */
+static txc_node_t *find_txc_node(int src_format, int dst_format)
+{
+	txc_node_t *node = NULL;
+
+	return node;
+}
+
+/* Find a node given a node UID */
+static txc_node_t *find_node(char *uid)
+{
+	txc_node_t *node = NULL;
+
+	node = ASTOBJ_CONTAINER_FIND(&nodes, uid);
+
+	return node;
+}
+
+/* Find a session given a session ID */
+static txc_session_t *find_session(unsigned int session_id)
+{
+	char tmp[128] = "";
+	txc_session_t *session = NULL;
+
+	snprintf(tmp, sizeof(tmp), "%u", session_id);
+	session = ASTOBJ_CONTAINER_FIND(&sessions, tmp);
+
+	return session;
+}
+
+/* Session Audio Port Callback via netsock */
+static int audio_read(int *id, int fd, short events, void *cbdata)
+{
+        struct ast_netsock *ns = cbdata;
+        txc_session_t *session = ast_netsock_data(ns);
+        int res = 0;
+        unsigned char buf[2048];
+        socklen_t len = sizeof(ast_netsock_boundaddr(ns));
+	struct ast_frame received_frame, *extra_frame = NULL;
+
+	memset(&received_frame, 0, sizeof(received_frame));
+
+	ast_mutex_lock(&session->lock);
+
+        res = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)ast_netsock_boundaddr(ns), &len);
+
+	received_frame.frametype = AST_FRAME_VOICE;
+	received_frame.data = buf;
+	received_frame.datalen = res;
+
+        /* See what type of session this is... */
+        if (session->type == TXC_SESSION_LOCAL) {
+                /* Okay we need to transcode this and send it back out */
+		received_frame.subclass = session->src_format;
+		/* Now that we have created a sorta frame... translate it */
+		extra_frame = ast_translate(session->trans, &received_frame, 0);
+		if (extra_frame != NULL) {
+			sendto(session->sock, extra_frame->data, extra_frame->datalen, 0, (struct sockaddr *)&session->addr, sizeof(session->addr));
+			ast_frfree(extra_frame);
+		} else {
+			ast_log(LOG_ERROR, "Failed to translate inbound audio\n");
+		}
+		extra_frame = NULL;
+        } else if (session->type == TXC_SESSION_REMOTE) {
+                /* This is a transcoded frame coming back */
+		received_frame.subclass = session->dst_format;
+		session->fr = ast_frdup(&received_frame);
+		/* Signal that a frame is available */
+		ast_cond_signal(&session->cond);
+        }
+
+	ast_mutex_unlock(&session->lock);
+
+        return 1;
+}
+
+/* Setup a new TXC session via P2P */
+static txc_session_t *txc_session_new(txc_node_t *node, int src_format, int dst_format)
+{
+	int id = 0;
+	txc_ie_data_t ied;
+	txc_session_t *session = NULL;
+
+	memset(&ied, 0, sizeof(ied));
+
+	/* Make sure source and destination formats are not the same */
+	if (src_format == dst_format) {
+		return NULL;
+	}
+
+	/* Get a new ID now */
+	id = new_session_id();
+
+	/* Setup a new session */
+	session = ast_calloc(1, sizeof(*session));
+	snprintf(session->name, sizeof(session->name), "%u", id); 
+	session->local_session_id = id;
+	session->type = TXC_SESSION_REMOTE;
+	session->node = node;
+	session->ns = NULL;
+
+	session->src_format = src_format;
+	session->dst_format = dst_format;
+
+	/* Initialize mutex and condition */
+	ast_mutex_init(&session->lock);
+	ast_cond_init(&session->cond, NULL);
+
+	/* Setup I/O Context */
+	session->io = io_context_create();
+
+        /* Spawn audio netsock stuff */
+        session->ns = ast_netsock_bind(netsock, session->io, "0.0.0.0", id+TXC_DEFAULT_PORTNO+1, 0, audio_read, session);
+        if (session->ns == NULL) {
+                ast_log(LOG_ERROR, "Failed to setup netsock callback stuff\n");
+        }
+
+        /* Spawn I/O Thread */
+        ast_pthread_create(&session->io_thread, NULL, io_thread_exec, session);
+
+	/* Before we add it to the list, lock it */
+	ast_mutex_lock(&session->lock);
+
+	/* Add to the session list */
+	ASTOBJ_CONTAINER_LINK(&sessions, session);
+
+	/* Setup request for new session for transmitting */
+	txc_append_standard_ied(&ied);
+
+	/* Add formats in */
+	txc_ie_append_int(&ied, TXC_IE_SRC_FORMAT, src_format);
+	txc_ie_append_int(&ied, TXC_IE_DST_FORMAT, dst_format);
+
+	/* Add session ID */
+	txc_ie_append_int(&ied, TXC_IE_SESSION_ID, id);
+
+	/* Transmit */
+	txc_node_transmit(TXC_SESSION_NEW, node, &ied);
+	/* Wait for a reply */
+	ast_cond_wait(&session->cond, &session->lock);
+
+	/* Make sure the session is setup okay */
+	if (ast_test_flag(session, TXC_SESSION_FAILED)) {
+		ast_log(LOG_ERROR, "Failed to set session up\n");
+	}
+
+	ast_mutex_unlock(&session->lock);
+
+	return session;
+}
+
+/* Create a session to transcode between two formats */
+txc_session_t *txc_session_create(int src_format, int dst_format)
+{
+        txc_session_t *session = NULL;
+        txc_node_t *node = find_node("proton");
+
+        session = txc_session_new(node, src_format, dst_format);
+
+        return session;
+}
+
+/* Translate using a TXC session */
+struct ast_frame *txc_session_translate(txc_session_t *session, struct ast_frame *frame)
+{
+	int res = 0;
+	struct ast_frame *fr = NULL;
+	struct timeval tv;
+	struct timespec ts;
+
+	/* Acquire session lock */
+	ast_mutex_lock(&session->lock);
+
+	/* This is where it gets fun... we send out the data and wait for a reply - note this is a literal audio stream */
+	res = sendto(session->sock, frame->data, frame->datalen, 0, (struct sockaddr *)&session->addr, sizeof(session->addr));
+
+	/* Calculate a suitable timeout */
+	tv = ast_tvadd(ast_tvnow(), ast_samp2tv(1000, 1000));
+	ts.tv_sec = tv.tv_sec;
+	ts.tv_nsec = tv.tv_usec * 1000;
+
+	/* Wait for a response -- note we have a timeout... */
+	if (ast_cond_timedwait(&session->cond, &session->lock, &ts) < 0) {
+		/* We did not get a reply in the minimum amount of time... */
+		ast_log(LOG_ERROR, "Did not get a reply in time...\n");
+	} else {
+		/* Grab the translated frame */
+		fr = session->fr;
+		session->fr = NULL;
+	}
+
+	ast_mutex_unlock(&session->lock);
+
+	return fr;
+}
+
+/* Destroy a TXC session */
+int txc_session_destroy(txc_session_t *session)
+{
+	txc_ie_data_t ied;
+
+	ast_mutex_lock(&session->lock);
+
+	/* If this is a remote session then we need to send a TXC_SESSION_END to the other node */
+	if (session->type == TXC_SESSION_REMOTE) {
+		memset(&ied, 0, sizeof(ied));
+		txc_append_standard_ied(&ied);
+		
+		txc_ie_append_int(&ied, TXC_IE_SESSION_ID, session->remote_session_id);
+		
+		txc_node_transmit(TXC_SESSION_END, session->node, &ied);
+	}
+
+	/* If a translator path exists -- free it */
+	if (session->trans != NULL) {
+		ast_translator_free_path(session->trans);
+		session->trans = NULL;
+	}
+
+	/* Kill I/O Thread */
+	if (session->io_thread != AST_PTHREADT_NULL) {
+		ast_set_flag(session, TXC_SESSION_EXIT);
+		/* This will knock the thread out of the poll if it is stuck there */
+		pthread_kill(session->io_thread, SIGURG);
+		ast_mutex_unlock(&session->lock);
+		pthread_join(session->io_thread, NULL);
+		ast_mutex_lock(&session->lock);
+		session->io_thread = AST_PTHREADT_NULL;
+	}
+
+	/* If netsock is setup... get rid of it */
+	if (session->ns != NULL) {
+		/* Work around a strange astobj/netsock bug */
+		ast_netsock_unref(session->ns);
+		ast_netsock_unref(session->ns);
+		session->ns = NULL;
+	}
+
+	/* Get rid of I/O Context */
+	io_context_destroy(session->io);
+	session->io = NULL;
+
+	/* Close sending socket */
+	close(session->sock);
+	session->sock = -1;
+
+	/* Finally remove from the list */
+	ASTOBJ_CONTAINER_UNLINK(&sessions, session);
+
+	/* Mark this session ID as unused now */
+	ast_mutex_lock(&session_id_lock);
+	session_ids[session->local_session_id] = 0;
+	ast_mutex_unlock(&session_id_lock);
+
+	/* Free any used memory */
+	ast_mutex_unlock(&session->lock);
+	free(session);
+	session = NULL;
+
+	return 1;
+}
+
+/* Handle a TXC Session New request */
+static int txc_session_handle_new(txc_node_t *node, txc_ies_t *ies)
+{
+	int id = 0;
+	txc_ie_data_t ied;
+	txc_session_t *session = NULL;
+	struct ast_trans_pvt *translator = NULL;
+
+	memset(&ied, 0, sizeof(ied));
+	txc_append_standard_ied(&ied);
+
+	/* Add the session ID cause we know we will need it */
+	txc_ie_append_int(&ied, TXC_IE_SESSION_ID, ies->session_id);
+
+	/* Make sure we can translate this... if not = send back a TXC_SESSION_REJECT */
+	translator = ast_translator_build_path_txc(ies->dst_format, ies->src_format, 0);
+	if (translator == NULL) {
+		ast_log(LOG_ERROR, "Rejected TXC_SESSION_NEW because translator path could not be built from %d to %d\n", ies->src_format, ies->dst_format);
+		txc_node_transmit(TXC_SESSION_REJECT, node, &ied);
+		return 1;
+	}
+
+	/* Get the ID of this session */
+	id = new_session_id();
+
+	/* Setup a new local session */
+	session = ast_calloc(1, sizeof(*session));
+	snprintf(session->name, sizeof(session->name), "%u", id);
+	session->local_session_id = id;
+	session->remote_session_id = ies->session_id;
+	session->type = TXC_SESSION_LOCAL;
+	session->node = node;
+	session->trans = translator;
+
+	/* Record formats */
+	session->src_format = ies->src_format;
+	session->dst_format = ies->dst_format;
+
+	/* Setup I/O Context */
+	session->io = io_context_create();
+
+	/* Spawn audio netsock stuff */
+	session->ns = ast_netsock_bind(netsock, session->io, "0.0.0.0", id+TXC_DEFAULT_PORTNO+1, 0, audio_read, session);
+	if (session->ns == NULL) {
+		ast_log(LOG_ERROR, "Failed to setup netsock callback stuff\n");
+	}
+
+	/* Spawn I/O Thread */
+	ast_pthread_create(&session->io_thread, NULL, io_thread_exec, session);
+
+	ast_log(LOG_NOTICE, "Remote Audio port on NEW is %d\n", ies->session_id+TXC_DEFAULT_PORTNO+1);
+
+	/* Setup stuff so we can reply back frames to the sender */
+	memcpy(&session->addr, &node->addr, sizeof(session->addr));
+	session->addr.sin_family = AF_INET;
+	session->addr.sin_port = htons(ies->session_id+TXC_DEFAULT_PORTNO+1);
+	session->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+
+	/* Add to the list */
+	ASTOBJ_CONTAINER_LINK(&sessions, session);
+
+	/* Send the acknowledgement back */
+	txc_ie_append_int(&ied, TXC_IE_REMOTE_SESSION_ID, id);
+
+	txc_node_transmit(TXC_SESSION_ACK, node, &ied);
+
+	ast_log(LOG_NOTICE, "Have a TXC Session New request from %s with remote session ID of %u\n", node->name, ies->session_id);
+	return 1;
+}
+
+/* Handle a TXC Session ACK response */
+static int txc_session_handle_ack(txc_node_t *node, txc_ies_t *ies)
+{
+	txc_session_t *session = find_session(ies->session_id);
+
+	ast_mutex_lock(&session->lock);
+
+	/* Update session information */
+	session->remote_session_id = ies->remote_session_id;
+
+	/* Setup sending audio port */
+        memcpy(&session->addr, &node->addr, sizeof(session->addr));
+	session->addr.sin_family = AF_INET;
+        session->addr.sin_port = htons(ies->remote_session_id+TXC_DEFAULT_PORTNO+1);
+        session->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+
+	/* Signal wakeup */
+	ast_cond_signal(&session->cond);
+	ast_mutex_unlock(&session->lock);
+
+	return 1;
+}
+
+/* Handle a TXC Session Reject response */
+static int txc_session_handle_reject(txc_node_t *node, txc_ies_t *ies)
+{
+	txc_session_t *session = find_session(ies->session_id);
+
+	ast_mutex_lock(&session->lock);
+
+	ast_set_flag(session, TXC_SESSION_FAILED);
+
+	ast_cond_signal(&session->cond);
+
+	ast_mutex_unlock(&session->lock);
+
+	return 1;
+}
+
+/* Handle a TXC Session End request */
+static int txc_session_handle_end(txc_node_t *node, txc_ies_t *ies)
+{
+	txc_session_t *session = find_session(ies->session_id);
+
+	/* Destroy anything on the session */
+	txc_session_destroy(session);
+
+	return 1;
+}
+
+/* Ping a node */
+static int txc_node_ping(txc_node_t *node)
+{
+        txc_ie_data_t ied;
+
+	ast_mutex_lock(&node->lock);
+
+	/* Change flag to indicate they were pinged */
+	ast_set_flag(node, TXC_NODE_PINGED);
+
+        memset(&ied, 0, sizeof(ied));
+
+        /* Standard information */
+        txc_append_standard_ied(&ied);
+
+        txc_node_transmit(TXC_NODE_PING, node, &ied);
+
+	ast_mutex_unlock(&node->lock);
+
+        return 1;
+}
+
+/* Reply to the ping from a node */
+static int txc_node_pong(txc_node_t *node)
+{
+	txc_ie_data_t ied;
+
+	ast_mutex_lock(&node->lock);
+
+	memset(&ied, 0, sizeof(ied));
+
+	txc_append_standard_ied(&ied);
+
+	txc_node_transmit(TXC_NODE_PONG, node, &ied);
+
+	ast_mutex_unlock(&node->lock);
+
+	return 1;
+}
+
+/* Transmit to the TXC community */
+static int txc_community_transmit(int type, txc_ie_data_t *ied)
+{
+	socklen_t len = sizeof(struct sockaddr_in);
+	int res = 0;
+
+	/* Add our type in */
+	txc_ie_append_int(ied, TXC_IE_TYPE, type);
+
+	ast_mutex_lock(&community_lock);
+	res = sendto(community_send_sock, (unsigned char*)ied->buf, ied->pos, 0, (struct sockaddr *)&community_send_saddr, len);
+	ast_mutex_unlock(&community_lock);
+
+	return 1;
+}
+
+/* Transmit a TXC Request */
+static int txc_community_request(char *target_uid)
+{
+	txc_ie_data_t ied;
+
+	memset(&ied, 0, sizeof(ied));
+
+	/* Setup standard information */
+	txc_append_standard_ied(&ied);
+
+	/* If a target UID is specified, add it in as well */
+	if (target_uid != NULL) {
+		txc_ie_append_str(&ied, TXC_IE_TARGET_UID, target_uid);
+	}
+
+	/* Now send out the request */
+	txc_community_transmit(TXC_ANNOUNCE_REQUEST, &ied);
+
+	return 1;
+}
+
+/* Transmit a TXC Failure */
+static int txc_community_failure(txc_node_t *node)
+{
+	txc_ie_data_t ied;
+
+	ast_mutex_lock(&node->lock);
+
+	memset(&ied, 0, sizeof(ied));
+
+	txc_append_standard_ied(&ied);
+
+	txc_community_transmit(TXC_ANNOUNCE_FAILURE, &ied);
+
+	ast_mutex_unlock(&node->lock);
+
+	return 1;
+}
+
+/* Transmit a TXC Reply */
+static int txc_community_reply(void)
+{
+	txc_ie_data_t ied;
+
+	memset(&ied, 0, sizeof(ied));
+	
+	/* Standard information */
+	txc_append_standard_ied(&ied);
+
+	/* Add our capabilities */
+	txc_ie_append_int(&ied, TXC_IE_CAPABILITY, capabilities);
+
+	/* Finally transmit it */
+	txc_community_transmit(TXC_ANNOUNCE_REPLY, &ied);
+	
+	return 1;
+}
+
+/* Process community data */
+static void community_process(unsigned char *data, int size)
+{
+	txc_node_t *node = NULL, *failed_node = NULL;
+	txc_ies_t ies;
+
+	/* Parse information elements */
+	if (txc_parse_ies(&ies, data, size) || ies.source_uid == NULL)
+		return;
+
+	/* If the packet is from ourselves -- ignore it */
+#if 0
+	if (!strcasecmp(ies.source_uid, uid)) {
+		return;
+	}
+#endif
+
+	/* Get the node information (if available) */
+	node = ASTOBJ_CONTAINER_FIND(&nodes, ies.source_uid);
+
+	/* If the node does not exist, and this is not a reply... do a request on them so we get their information */
+	if (node == NULL && (ies.type != TXC_ANNOUNCE_REPLY && ies.type != TXC_ANNOUNCE_REQUEST)) {
+		txc_community_request(ies.source_uid);
+		return;
+	}
+
+	switch (ies.type) {
+		/* Community announce frame types */
+	case TXC_ANNOUNCE_REQUEST:
+		/* If there is no target UID OR the target UID is us, send a reply */
+		if (ies.target_uid == NULL || !strcasecmp(ies.target_uid, uid))
+			txc_community_reply();
+		time(&last_community_update);
+		break;
+	case TXC_ANNOUNCE_REPLY:
+		if (node == NULL) {
+			/* Create a new node */
+			node = ast_calloc(1, sizeof(*node));
+			strncpy(node->name, ies.source_uid, sizeof(node->name));
+			node->sessions = 0;
+			/* Set state */
+			node->state = TXC_NODE_STATE_ALIVE;
+			/* Set capability to the ones they specified */
+			node->capability = ies.capability;
+			/* Figure out where they came from for direct non-multicast communication */
+			memcpy(&node->addr, &community_recv_saddr, sizeof(node->addr));
+			node->addr.sin_port = htons(TXC_DEFAULT_PORTNO);
+			/* Setup a socket to them */
+			node->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+			/* They are definitely a community member since we found out about them that way */
+			ast_set_flag(node, TXC_NODE_COMMUNITY_MEMBER);
+			/* Finally add them to the node list */
+			ASTOBJ_CONTAINER_LINK(&nodes, node);
+			/* Chances are they don't know about us... so send out a reply */
+			txc_community_reply();
+		} else { 
+			if (node->state == TXC_NODE_STATE_FAILED) {
+				/* Put them back into service */
+				node->state = TXC_NODE_STATE_ALIVE;
+			}
+			if (node->capability != ies.capability) {
+				node->capability = ies.capability;
+			}
+		}
+		break;
+	case TXC_ANNOUNCE_SUSPEND:
+		ast_mutex_lock(&node->lock);
+		node->state = TXC_NODE_STATE_SUSPENDED;
+		ast_mutex_unlock(&node->lock);
+		break;
+	case TXC_ANNOUNCE_FAILURE:
+		if (ies.target_uid != NULL) {
+			failed_node = ASTOBJ_CONTAINER_FIND(&nodes, ies.target_uid);
+		} else {
+			failed_node = node;
+		}
+		break;
+		/* Session frame types */
+	case TXC_SESSION_NEW:
+		txc_session_handle_new(node, &ies);
+		break;
+	case TXC_SESSION_ACK:
+		txc_session_handle_ack(node, &ies);
+		break;
+	case TXC_SESSION_REJECT:
+		txc_session_handle_reject(node, &ies);
+		break;
+	case TXC_SESSION_END:
+		txc_session_handle_end(node, &ies);
+		break;
+		/* Node frame types */
+	case TXC_NODE_PING:
+		/* Send a pong reply back to the sender */
+		txc_node_pong(node);
+		break;
+	case TXC_NODE_PONG:
+		/* Received a pong reply back so clear pinged flag and update last time */
+		ast_clear_flag(node, TXC_NODE_PINGED);
+		break;
+	}
+
+	return;
+}
+
+/* Node monitoring thread */
+static void *node_monitor_exec(void *ignore)
+{
+	time_t now;
+
+	for (;;) {
+		/* Go through the list of nodes */
+		ASTOBJ_CONTAINER_TRAVERSE(&nodes, 1, {
+			ASTOBJ_WRLOCK(iterator);
+			ast_mutex_lock(&iterator->lock);
+			if (iterator->state == TXC_NODE_STATE_ALIVE) {

[... 478 lines stripped ...]


More information about the svn-commits mailing list