[asterisk-commits] branch file/txc r15894 - in /team/file/txc: ./
include/asterisk/ txc/
asterisk-commits at lists.digium.com
asterisk-commits at lists.digium.com
Tue Mar 28 17:12:31 MST 2006
Author: file
Date: Tue Mar 28 18:12:27 2006
New Revision: 15894
URL: http://svn.digium.com/view/asterisk?rev=15894&view=rev
Log:
Introducting TXC (Transcoding eXpansion Community): A method of doing clustered/distributed transcoding with automatic resource discovery and configuration. Stay tuned for more information and documentation!
Added:
team/file/txc/txc/
team/file/txc/txc/txc-parser.c (with props)
team/file/txc/txc/txc-parser.h (with props)
team/file/txc/txc/txc-protocol.h (with props)
team/file/txc/txc/txc.c (with props)
team/file/txc/txc/txc.h (with props)
Modified:
team/file/txc/Makefile
team/file/txc/asterisk.c
team/file/txc/include/asterisk/translate.h
team/file/txc/translate.c
Modified: team/file/txc/Makefile
URL: http://svn.digium.com/view/asterisk/team/file/txc/Makefile?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/Makefile (original)
+++ team/file/txc/Makefile Tue Mar 28 18:12:27 2006
@@ -364,7 +364,7 @@
astmm.o enum.o srv.o dns.o aescrypt.o aestab.o aeskey.o \
utils.o plc.o jitterbuf.o dnsmgr.o devicestate.o \
netsock.o slinfactory.o ast_expr2.o ast_expr2f.o \
- cryptostub.o sha1.o http.o
+ cryptostub.o sha1.o http.o txc/txc.o txc/txc-parser.o
# we need to link in the objects statically, not as a library, because
# otherwise modules will not have them available if none of the static
@@ -551,6 +551,7 @@
clean-depend:
for x in $(SUBDIRS); do $(MAKE) -C $$x clean-depend || exit 1 ; done
rm -f .depend .tags-depend
+ rm -rf txc/*.o
clean: clean-depend
for x in $(SUBDIRS); do $(MAKE) -C $$x clean || exit 1 ; done
Modified: team/file/txc/asterisk.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/asterisk.c?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/asterisk.c (original)
+++ team/file/txc/asterisk.c Tue Mar 28 18:12:27 2006
@@ -123,6 +123,8 @@
#include "asterisk/linkedlists.h"
#include "asterisk/devicestate.h"
#include "asterisk/compat.h"
+
+#include "txc/txc.h"
#include "asterisk/doxyref.h" /* Doxygen documentation */
@@ -2319,6 +2321,10 @@
printf(term_quit());
exit(1);
}
+ if (txc_init()) {
+ printf(term_quit());
+ exit(1);
+ }
/* load 'preload' modules, required for access to Realtime-mapped configuration files */
if (load_modules(1)) {
printf(term_quit());
Modified: team/file/txc/include/asterisk/translate.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/include/asterisk/translate.h?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/include/asterisk/translate.h (original)
+++ team/file/txc/include/asterisk/translate.h Tue Mar 28 18:12:27 2006
@@ -96,6 +96,7 @@
* Returns ast_trans_pvt on success, NULL on failure
* */
extern struct ast_trans_pvt *ast_translator_build_path(int dest, int source);
+extern struct ast_trans_pvt *ast_translator_build_path_txc(int dest, int source, int use_txc);
/*!
* \brief Frees a translator path
Modified: team/file/txc/translate.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/translate.c?rev=15894&r1=15893&r2=15894&view=diff
==============================================================================
--- team/file/txc/translate.c (original)
+++ team/file/txc/translate.c Tue Mar 28 18:12:27 2006
@@ -45,6 +45,8 @@
#include "asterisk/cli.h"
#include "asterisk/term.h"
+#include "txc/txc.h"
+
#define MAX_RECALC 200 /* max sample recalc */
/*! \note
@@ -71,6 +73,7 @@
static struct ast_translator_dir tr_matrix[MAX_FORMAT][MAX_FORMAT];
struct ast_trans_pvt {
+ txc_session_t *session;
struct ast_translator *step;
struct ast_translator_pvt *state;
struct ast_trans_pvt *next;
@@ -92,6 +95,15 @@
void ast_translator_free_path(struct ast_trans_pvt *p)
{
struct ast_trans_pvt *pl, *pn;
+
+ /* If a session exists... simply destroy it */
+ if (p->session != NULL) {
+ txc_session_destroy(p->session);
+ free(p);
+ p = NULL;
+ return;
+ }
+
pn = p;
while(pn) {
pl = pn;
@@ -102,10 +114,26 @@
}
}
+struct ast_trans_pvt *ast_translator_build_path(int dest, int source)
+{
+ return ast_translator_build_path_txc(dest, source, 1);
+}
+
/*! Build a set of translators based upon the given source and destination formats */
-struct ast_trans_pvt *ast_translator_build_path(int dest, int source)
-{
+struct ast_trans_pvt *ast_translator_build_path_txc(int dest, int source, int use_txc)
+{
+ txc_session_t *session = NULL;
struct ast_trans_pvt *tmpr = NULL, *tmp = NULL;
+
+ /* If TXC is enabled... */
+ if (use_txc) {
+ session = txc_session_create(source, dest);
+ if (session != NULL) {
+ tmp = ast_calloc(1, sizeof(*tmp));
+ tmp->session = session;
+ return tmp;
+ }
+ }
source = powerof(source);
dest = powerof(dest);
@@ -123,7 +151,7 @@
tmp = tmp->next;
} else
tmp = malloc(sizeof(*tmp));
-
+
if (!tmp) {
ast_log(LOG_WARNING, "Out of memory\n");
if (tmpr)
@@ -135,6 +163,7 @@
if (!tmpr)
tmpr = tmp;
+ tmp->session = NULL;
tmp->next = NULL;
tmp->nextin = tmp->nextout = ast_tv(0, 0);
tmp->step = tr_matrix[source][dest].step;
@@ -157,6 +186,15 @@
struct ast_trans_pvt *p;
struct ast_frame *out;
struct timeval delivery;
+
+ /* See if this is using a TXC session - if so, do it! */
+ if (path->session != NULL) {
+ out = txc_session_translate(path->session, f);
+ if (consume)
+ ast_frfree(f);
+ return out;
+ }
+
p = path;
/* Feed the first frame into the first translator */
p->step->framein(p->state, f);
Added: team/file/txc/txc/txc-parser.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-parser.c?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-parser.c (added)
+++ team/file/txc/txc/txc-parser.c Tue Mar 28 18:12:27 2006
@@ -1,0 +1,170 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Implementation of Transcoding eXpansion Community, v 1
+ *
+ * \author Joshua Colp <jcolp at joshua-colp.com>
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "asterisk.h"
+
+#include "asterisk/frame.h"
+#include "asterisk/utils.h"
+#include "asterisk/unaligned.h"
+#include "txc-protocol.h"
+#include "txc-parser.h"
+
+int txc_ie_append_raw(struct txc_ie_data *ied, unsigned char ie, const void *data, int datalen)
+{
+ if (datalen > ((int)sizeof(ied->buf) - ied->pos)) {
+ return -1;
+ }
+ ied->buf[ied->pos++] = ie;
+ ied->buf[ied->pos++] = datalen;
+ memcpy(ied->buf + ied->pos, data, datalen);
+ ied->pos += datalen;
+ return 0;
+}
+
+int txc_ie_append_addr(struct txc_ie_data *ied, unsigned char ie, const struct sockaddr_in *sin)
+{
+ return txc_ie_append_raw(ied, ie, sin, (int)sizeof(struct sockaddr_in));
+}
+
+int txc_ie_append_int(struct txc_ie_data *ied, unsigned char ie, unsigned int value)
+{
+ unsigned int newval;
+ newval = htonl(value);
+ return txc_ie_append_raw(ied, ie, &newval, (int)sizeof(newval));
+}
+
+int txc_ie_append_short(struct txc_ie_data *ied, unsigned char ie, unsigned short value)
+{
+ unsigned short newval;
+ newval = htons(value);
+ return txc_ie_append_raw(ied, ie, &newval, (int)sizeof(newval));
+}
+
+int txc_ie_append_str(struct txc_ie_data *ied, unsigned char ie, const char *str)
+{
+ if (str == NULL)
+ return 0;
+ return txc_ie_append_raw(ied, ie, str, strlen(str));
+}
+
+int txc_ie_append_byte(struct txc_ie_data *ied, unsigned char ie, unsigned char dat)
+{
+ return txc_ie_append_raw(ied, ie, &dat, 1);
+}
+
+int txc_ie_append(struct txc_ie_data *ied, unsigned char ie)
+{
+ return txc_ie_append_raw(ied, ie, NULL, 0);
+}
+
+int txc_parse_ies(struct txc_ies *ies, unsigned char *data, int datalen)
+{
+ /* Parse data into information elements */
+ int len;
+ int ie;
+ char tmp[256];
+
+ memset(ies, 0, (int)sizeof(struct txc_ies));
+
+ while(datalen >= 2) {
+ ie = data[0];
+ len = data[1];
+ if (len > datalen - 2) {
+ return -1;
+ }
+ switch(ie) {
+ case TXC_IE_SOURCE_UID:
+ ies->source_uid = (char *)data + 2;
+ break;
+ case TXC_IE_TARGET_UID:
+ ies->target_uid = (char *)data + 2;
+ break;
+ case TXC_IE_REASON:
+ ies->reason = (char *)data + 2;
+ break;
+ case TXC_IE_TYPE:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->type = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_SESSION_ID:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->session_id = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_REMOTE_SESSION_ID:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting type to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->remote_session_id = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_CAPABILITY:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting capability to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->capability = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_SRC_FORMAT:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting format to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->src_format = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_DST_FORMAT:
+ if (len != (int)sizeof(unsigned int)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting format to be %d bytes long but was %d\n", (int)sizeof(unsigned int), len);
+ } else
+ ies->dst_format = ntohl(get_unaligned_uint32(data + 2));
+ break;
+ case TXC_IE_VERSION:
+ if (len != (int)sizeof(unsigned short)) {
+ snprintf(tmp, (int)sizeof(tmp), "Expecting version to be %d bytes long but was %d\n", (int)sizeof(unsigned short), len);
+ } else
+ ies->version = ntohs(get_unaligned_uint16(data + 2));
+ break;
+ }
+ /* Overwrite information element with 0, to null terminate previous portion */
+ data[0] = 0;
+ datalen -= (len + 2);
+ data += (len + 2);
+ }
+ /* Null-terminate last field */
+ *data = '\0';
+ if (datalen) {
+ return -1;
+ }
+ return 0;
+}
Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/file/txc/txc/txc-parser.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/file/txc/txc/txc-parser.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-parser.h?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-parser.h (added)
+++ team/file/txc/txc/txc-parser.h Tue Mar 28 18:12:27 2006
@@ -1,0 +1,52 @@
+/*
+ * Asterisk -- A telephony toolkit for Linux.
+ *
+ * Implementation of Transcoding eXpansion Community
+ *
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License
+ */
+
+/*!\file
+ * \brief Implementation of the Transcoding eXpansion Community Protocol
+ */
+
+#ifndef _TXC_PARSER_H
+#define _TXC_PARSER_H
+
+struct txc_ies {
+ unsigned int type;
+ char *source_uid;
+ char *target_uid;
+ char *reason;
+ unsigned int capability;
+ unsigned int src_format;
+ unsigned int dst_format;
+ unsigned int session_id;
+ unsigned int remote_session_id;
+ int version;
+};
+
+typedef struct txc_ies txc_ies_t;
+
+struct txc_ie_data {
+ unsigned char buf[1024];
+ int pos;
+};
+
+typedef struct txc_ie_data txc_ie_data_t;
+
+int txc_ie_append_raw(struct txc_ie_data *ied, unsigned char ie, const void *data, int datalen);
+int txc_ie_append_addr(struct txc_ie_data *ied, unsigned char ie, const struct sockaddr_in *sin);
+int txc_ie_append_int(struct txc_ie_data *ied, unsigned char ie, unsigned int value);
+int txc_ie_append_short(struct txc_ie_data *ied, unsigned char ie, unsigned short value);
+int txc_ie_append_str(struct txc_ie_data *ied, unsigned char ie, const char *str);
+int txc_ie_append_byte(struct txc_ie_data *ied, unsigned char ie, unsigned char dat);
+int txc_ie_append(struct txc_ie_data *ied, unsigned char ie);
+int txc_parse_ies(struct txc_ies *ies, unsigned char *data, int datalen);
+
+#endif
Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/file/txc/txc/txc-parser.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/file/txc/txc/txc-protocol.h
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc-protocol.h?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc-protocol.h (added)
+++ team/file/txc/txc/txc-protocol.h Tue Mar 28 18:12:27 2006
@@ -1,0 +1,51 @@
+/*
+ * Asterisk -- A telephony toolkit for Linux.
+ *
+ * Implementation of Transcoding eXpansion Community
+ *
+ * Copyright (C) 2006, Joshua Colp
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License
+ */
+
+#ifndef _TXC_PROTOCOL_H
+#define _TXC_PROTOCOL_H
+
+/* Current support TXC protocol version */
+#define TXC_PROTO_VERSION 1
+
+/* Announcement packet types */
+#define TXC_ANNOUNCE_REQUEST 1 /* Request that all TXC nodes announce themselves */
+#define TXC_ANNOUNCE_REPLY 2 /* Reply with TXC node information */
+#define TXC_ANNOUNCE_SUSPEND 3 /* Announce suspension of a TXC node from service */
+#define TXC_ANNOUNCE_FAILURE 4 /* Announce failure of a TXC node as perceived by another TXC node */
+
+/* Session packet types */
+#define TXC_SESSION_NEW 5 /* Create a new TXC session */
+#define TXC_SESSION_REJECT 6 /* Reject session creation request */
+#define TXC_SESSION_ACK 7 /* Acknowledge session start */
+#define TXC_SESSION_END 8 /* End a TXC session */
+
+/* Node packet types */
+#define TXC_NODE_PING 9 /* Hey - are you alive? */
+#define TXC_NODE_PONG 10 /* Yes... I'm alive */
+
+/* Default port number */
+#define TXC_DEFAULT_PORTNO 2657
+
+/* TXC Information elements */
+#define TXC_IE_TYPE 1 /* What type of packet is this? - unsigned int */
+#define TXC_IE_SOURCE_UID 2 /* Unique source ID - string */
+#define TXC_IE_TARGET_UID 3 /* Target source ID - string */
+#define TXC_IE_CAPABILITY 4 /* TXC node capabilities - unsigned int */
+#define TXC_IE_SRC_FORMAT 5 /* Source transcoding format - unsigned int */
+#define TXC_IE_DST_FORMAT 6 /* Destination transcoding format - unsigned int */
+#define TXC_IE_VERSION 7 /* Protocol version - short */
+#define TXC_IE_SESSION_ID 8 /* Session ID - unsigned int */
+#define TXC_IE_REMOTE_SESSION_ID 9 /* Remote Session ID - unsigned int */
+#define TXC_IE_REASON 10 /* Reason for a reply to happen - string */
+
+#endif
Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/file/txc/txc/txc-protocol.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/file/txc/txc/txc.c
URL: http://svn.digium.com/view/asterisk/team/file/txc/txc/txc.c?rev=15894&view=auto
==============================================================================
--- team/file/txc/txc/txc.c (added)
+++ team/file/txc/txc/txc.c Tue Mar 28 18:12:27 2006
@@ -1,0 +1,1138 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2006, Joshua Colp.
+ *
+ * Joshua Colp <jcolp at joshua-colp.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Transcoding eXchange Community
+ *
+ * \author Joshua Colp <jcolp at joshua-colp.com>
+ */
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <resolv.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <regex.h>
+#include <signal.h>
+
+#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(SOLARIS) || defined(__Darwin__)
+#include <sys/types.h>
+#include <netinet/in_systm.h>
+#endif
+#include <netinet/ip.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__Darwin__)
+#include <net/if_dl.h>
+#include <ifaddrs.h>
+#endif
+
+#include "asterisk.h"
+
+#include "asterisk/astobj.h"
+#include "asterisk/linkedlists.h"
+#include "asterisk/utils.h"
+#include "asterisk/config.h"
+#include "asterisk/logger.h"
+#include "asterisk/sched.h"
+#include "asterisk/options.h"
+#include "asterisk/cli.h"
+#include "asterisk/frame.h"
+#include "asterisk/translate.h"
+#include "asterisk/netsock.h"
+
+#include "txc.h"
+#include "txc-protocol.h"
+#include "txc-parser.h"
+
+#define TESTING_LOOPBACK /* Loop back data for testing cause you know */
+
+/* Configuration Information */
+static unsigned int capabilities = AST_FORMAT_ULAW;
+static char uid[80] = "";
+static struct ast_codec_pref default_prefs;
+
+/* Community information (socket/thread/etc) */
+static pthread_t community_thread = AST_PTHREADT_NULL;
+static int community_recv_sock = -1;
+static struct sockaddr_in community_recv_saddr;
+static time_t last_community_update;
+/* Socket information for sending */
+AST_MUTEX_DEFINE_STATIC(community_lock);
+static int community_send_sock = -1;
+static struct sockaddr_in community_send_saddr;
+
+/* Node monitoring */
+static pthread_t node_thread = AST_PTHREADT_NULL;
+
+/* Global session information */
+AST_MUTEX_DEFINE_STATIC(session_id_lock);
+#define TXC_MAX_SESSIONS 1000
+static int session_ids[TXC_MAX_SESSIONS];
+static struct ast_netsock_list *netsock = NULL;
+
+/* Community nodes */
+static struct txc_node_list {
+ ASTOBJ_CONTAINER_COMPONENTS(struct txc_node);
+} nodes;
+
+/* Sessions - local or remote */
+static struct txc_session_list {
+ ASTOBJ_CONTAINER_COMPONENTS(struct txc_session);
+} sessions;
+
+/* Session IO Thread */
+static void *io_thread_exec(void *data)
+{
+ txc_session_t *session = data;
+ struct io_context *io = session->io;
+
+ for (;;) {
+ /* See if I/O thread needs to exit */
+ ast_mutex_lock(&session->lock);
+ if (ast_test_flag(session, TXC_SESSION_EXIT)) {
+ ast_mutex_unlock(&session->lock);
+ break;
+ }
+ ast_mutex_unlock(&session->lock);
+ ast_io_wait(io, -1);
+ }
+
+ return NULL;
+}
+
+/* State conversions (int to string) */
+static char *state_to_str(int state)
+{
+ switch (state) {
+ case TXC_NODE_STATE_ALIVE:
+ return "Alive";
+ case TXC_NODE_STATE_SUSPENDED:
+ return "Suspended";
+ case TXC_NODE_STATE_FAILED:
+ return "Failed";
+ default:
+ return "Unknown";
+ }
+}
+
+/* Append standarding TXC information about ourselves */
+static void txc_append_standard_ied(txc_ie_data_t *ied)
+{
+ /* Append our UID */
+ txc_ie_append_str(ied, TXC_IE_SOURCE_UID, uid);
+ /* Append version */
+ txc_ie_append_short(ied, TXC_IE_VERSION, TXC_PROTO_VERSION);
+}
+
+/* Transmit to a node via P2P */
+static int txc_node_transmit(int type, txc_node_t *node, txc_ie_data_t *ied)
+{
+ socklen_t len = sizeof(struct sockaddr_in);
+ int res = 0;
+
+ /* Add type of packet */
+ txc_ie_append_int(ied, TXC_IE_TYPE, type);
+
+ /* Deliver to the node */
+ res = sendto(node->sock, (unsigned char*)ied->buf, ied->pos, 0, (struct sockaddr *)&node->addr, len);
+
+ return 1;
+}
+
+/* Return a free session ID */
+static int new_session_id(void)
+{
+ int id = 0, i = 0;
+
+ /* Find a free session ID */
+ ast_mutex_lock(&session_id_lock);
+ for (i=0; i<TXC_MAX_SESSIONS; i++) {
+ if (session_ids[i] == 0) {
+ session_ids[i] = 1;
+ id = i;
+ break;
+ }
+ }
+ ast_mutex_unlock(&session_id_lock);
+
+ return id;
+}
+
+/* Find a free TXC node to set a session up on */
+static txc_node_t *find_txc_node(int src_format, int dst_format)
+{
+ txc_node_t *node = NULL;
+
+ return node;
+}
+
+/* Find a node given a node UID */
+static txc_node_t *find_node(char *uid)
+{
+ txc_node_t *node = NULL;
+
+ node = ASTOBJ_CONTAINER_FIND(&nodes, uid);
+
+ return node;
+}
+
+/* Find a session given a session ID */
+static txc_session_t *find_session(unsigned int session_id)
+{
+ char tmp[128] = "";
+ txc_session_t *session = NULL;
+
+ snprintf(tmp, sizeof(tmp), "%u", session_id);
+ session = ASTOBJ_CONTAINER_FIND(&sessions, tmp);
+
+ return session;
+}
+
+/* Session Audio Port Callback via netsock */
+static int audio_read(int *id, int fd, short events, void *cbdata)
+{
+ struct ast_netsock *ns = cbdata;
+ txc_session_t *session = ast_netsock_data(ns);
+ int res = 0;
+ unsigned char buf[2048];
+ socklen_t len = sizeof(ast_netsock_boundaddr(ns));
+ struct ast_frame received_frame, *extra_frame = NULL;
+
+ memset(&received_frame, 0, sizeof(received_frame));
+
+ ast_mutex_lock(&session->lock);
+
+ res = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)ast_netsock_boundaddr(ns), &len);
+
+ received_frame.frametype = AST_FRAME_VOICE;
+ received_frame.data = buf;
+ received_frame.datalen = res;
+
+ /* See what type of session this is... */
+ if (session->type == TXC_SESSION_LOCAL) {
+ /* Okay we need to transcode this and send it back out */
+ received_frame.subclass = session->src_format;
+ /* Now that we have created a sorta frame... translate it */
+ extra_frame = ast_translate(session->trans, &received_frame, 0);
+ if (extra_frame != NULL) {
+ sendto(session->sock, extra_frame->data, extra_frame->datalen, 0, (struct sockaddr *)&session->addr, sizeof(session->addr));
+ ast_frfree(extra_frame);
+ } else {
+ ast_log(LOG_ERROR, "Failed to translate inbound audio\n");
+ }
+ extra_frame = NULL;
+ } else if (session->type == TXC_SESSION_REMOTE) {
+ /* This is a transcoded frame coming back */
+ received_frame.subclass = session->dst_format;
+ session->fr = ast_frdup(&received_frame);
+ /* Signal that a frame is available */
+ ast_cond_signal(&session->cond);
+ }
+
+ ast_mutex_unlock(&session->lock);
+
+ return 1;
+}
+
+/* Setup a new TXC session via P2P */
+static txc_session_t *txc_session_new(txc_node_t *node, int src_format, int dst_format)
+{
+ int id = 0;
+ txc_ie_data_t ied;
+ txc_session_t *session = NULL;
+
+ memset(&ied, 0, sizeof(ied));
+
+ /* Make sure source and destination formats are not the same */
+ if (src_format == dst_format) {
+ return NULL;
+ }
+
+ /* Get a new ID now */
+ id = new_session_id();
+
+ /* Setup a new session */
+ session = ast_calloc(1, sizeof(*session));
+ snprintf(session->name, sizeof(session->name), "%u", id);
+ session->local_session_id = id;
+ session->type = TXC_SESSION_REMOTE;
+ session->node = node;
+ session->ns = NULL;
+
+ session->src_format = src_format;
+ session->dst_format = dst_format;
+
+ /* Initialize mutex and condition */
+ ast_mutex_init(&session->lock);
+ ast_cond_init(&session->cond, NULL);
+
+ /* Setup I/O Context */
+ session->io = io_context_create();
+
+ /* Spawn audio netsock stuff */
+ session->ns = ast_netsock_bind(netsock, session->io, "0.0.0.0", id+TXC_DEFAULT_PORTNO+1, 0, audio_read, session);
+ if (session->ns == NULL) {
+ ast_log(LOG_ERROR, "Failed to setup netsock callback stuff\n");
+ }
+
+ /* Spawn I/O Thread */
+ ast_pthread_create(&session->io_thread, NULL, io_thread_exec, session);
+
+ /* Before we add it to the list, lock it */
+ ast_mutex_lock(&session->lock);
+
+ /* Add to the session list */
+ ASTOBJ_CONTAINER_LINK(&sessions, session);
+
+ /* Setup request for new session for transmitting */
+ txc_append_standard_ied(&ied);
+
+ /* Add formats in */
+ txc_ie_append_int(&ied, TXC_IE_SRC_FORMAT, src_format);
+ txc_ie_append_int(&ied, TXC_IE_DST_FORMAT, dst_format);
+
+ /* Add session ID */
+ txc_ie_append_int(&ied, TXC_IE_SESSION_ID, id);
+
+ /* Transmit */
+ txc_node_transmit(TXC_SESSION_NEW, node, &ied);
+ /* Wait for a reply */
+ ast_cond_wait(&session->cond, &session->lock);
+
+ /* Make sure the session is setup okay */
+ if (ast_test_flag(session, TXC_SESSION_FAILED)) {
+ ast_log(LOG_ERROR, "Failed to set session up\n");
+ }
+
+ ast_mutex_unlock(&session->lock);
+
+ return session;
+}
+
+/* Create a session to transcode between two formats */
+txc_session_t *txc_session_create(int src_format, int dst_format)
+{
+ txc_session_t *session = NULL;
+ txc_node_t *node = find_node("proton");
+
+ session = txc_session_new(node, src_format, dst_format);
+
+ return session;
+}
+
+/* Translate using a TXC session */
+struct ast_frame *txc_session_translate(txc_session_t *session, struct ast_frame *frame)
+{
+ int res = 0;
+ struct ast_frame *fr = NULL;
+ struct timeval tv;
+ struct timespec ts;
+
+ /* Acquire session lock */
+ ast_mutex_lock(&session->lock);
+
+ /* This is where it gets fun... we send out the data and wait for a reply - note this is a literal audio stream */
+ res = sendto(session->sock, frame->data, frame->datalen, 0, (struct sockaddr *)&session->addr, sizeof(session->addr));
+
+ /* Calculate a suitable timeout */
+ tv = ast_tvadd(ast_tvnow(), ast_samp2tv(1000, 1000));
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000;
+
+ /* Wait for a response -- note we have a timeout... */
+ if (ast_cond_timedwait(&session->cond, &session->lock, &ts) < 0) {
+ /* We did not get a reply in the minimum amount of time... */
+ ast_log(LOG_ERROR, "Did not get a reply in time...\n");
+ } else {
+ /* Grab the translated frame */
+ fr = session->fr;
+ session->fr = NULL;
+ }
+
+ ast_mutex_unlock(&session->lock);
+
+ return fr;
+}
+
+/* Destroy a TXC session */
+int txc_session_destroy(txc_session_t *session)
+{
+ txc_ie_data_t ied;
+
+ ast_mutex_lock(&session->lock);
+
+ /* If this is a remote session then we need to send a TXC_SESSION_END to the other node */
+ if (session->type == TXC_SESSION_REMOTE) {
+ memset(&ied, 0, sizeof(ied));
+ txc_append_standard_ied(&ied);
+
+ txc_ie_append_int(&ied, TXC_IE_SESSION_ID, session->remote_session_id);
+
+ txc_node_transmit(TXC_SESSION_END, session->node, &ied);
+ }
+
+ /* If a translator path exists -- free it */
+ if (session->trans != NULL) {
+ ast_translator_free_path(session->trans);
+ session->trans = NULL;
+ }
+
+ /* Kill I/O Thread */
+ if (session->io_thread != AST_PTHREADT_NULL) {
+ ast_set_flag(session, TXC_SESSION_EXIT);
+ /* This will knock the thread out of the poll if it is stuck there */
+ pthread_kill(session->io_thread, SIGURG);
+ ast_mutex_unlock(&session->lock);
+ pthread_join(session->io_thread, NULL);
+ ast_mutex_lock(&session->lock);
+ session->io_thread = AST_PTHREADT_NULL;
+ }
+
+ /* If netsock is setup... get rid of it */
+ if (session->ns != NULL) {
+ /* Work around a strange astobj/netsock bug */
+ ast_netsock_unref(session->ns);
+ ast_netsock_unref(session->ns);
+ session->ns = NULL;
+ }
+
+ /* Get rid of I/O Context */
+ io_context_destroy(session->io);
+ session->io = NULL;
+
+ /* Close sending socket */
+ close(session->sock);
+ session->sock = -1;
+
+ /* Finally remove from the list */
+ ASTOBJ_CONTAINER_UNLINK(&sessions, session);
+
+ /* Mark this session ID as unused now */
+ ast_mutex_lock(&session_id_lock);
+ session_ids[session->local_session_id] = 0;
+ ast_mutex_unlock(&session_id_lock);
+
+ /* Free any used memory */
+ ast_mutex_unlock(&session->lock);
+ free(session);
+ session = NULL;
+
+ return 1;
+}
+
+/* Handle a TXC Session New request */
+static int txc_session_handle_new(txc_node_t *node, txc_ies_t *ies)
+{
+ int id = 0;
+ txc_ie_data_t ied;
+ txc_session_t *session = NULL;
+ struct ast_trans_pvt *translator = NULL;
+
+ memset(&ied, 0, sizeof(ied));
+ txc_append_standard_ied(&ied);
+
+ /* Add the session ID cause we know we will need it */
+ txc_ie_append_int(&ied, TXC_IE_SESSION_ID, ies->session_id);
+
+ /* Make sure we can translate this... if not = send back a TXC_SESSION_REJECT */
+ translator = ast_translator_build_path_txc(ies->dst_format, ies->src_format, 0);
+ if (translator == NULL) {
+ ast_log(LOG_ERROR, "Rejected TXC_SESSION_NEW because translator path could not be built from %d to %d\n", ies->src_format, ies->dst_format);
+ txc_node_transmit(TXC_SESSION_REJECT, node, &ied);
+ return 1;
+ }
+
+ /* Get the ID of this session */
+ id = new_session_id();
+
+ /* Setup a new local session */
+ session = ast_calloc(1, sizeof(*session));
+ snprintf(session->name, sizeof(session->name), "%u", id);
+ session->local_session_id = id;
+ session->remote_session_id = ies->session_id;
+ session->type = TXC_SESSION_LOCAL;
+ session->node = node;
+ session->trans = translator;
+
+ /* Record formats */
+ session->src_format = ies->src_format;
+ session->dst_format = ies->dst_format;
+
+ /* Setup I/O Context */
+ session->io = io_context_create();
+
+ /* Spawn audio netsock stuff */
+ session->ns = ast_netsock_bind(netsock, session->io, "0.0.0.0", id+TXC_DEFAULT_PORTNO+1, 0, audio_read, session);
+ if (session->ns == NULL) {
+ ast_log(LOG_ERROR, "Failed to setup netsock callback stuff\n");
+ }
+
+ /* Spawn I/O Thread */
+ ast_pthread_create(&session->io_thread, NULL, io_thread_exec, session);
+
+ ast_log(LOG_NOTICE, "Remote Audio port on NEW is %d\n", ies->session_id+TXC_DEFAULT_PORTNO+1);
+
+ /* Setup stuff so we can reply back frames to the sender */
+ memcpy(&session->addr, &node->addr, sizeof(session->addr));
+ session->addr.sin_family = AF_INET;
+ session->addr.sin_port = htons(ies->session_id+TXC_DEFAULT_PORTNO+1);
+ session->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+
+ /* Add to the list */
+ ASTOBJ_CONTAINER_LINK(&sessions, session);
+
+ /* Send the acknowledgement back */
+ txc_ie_append_int(&ied, TXC_IE_REMOTE_SESSION_ID, id);
+
+ txc_node_transmit(TXC_SESSION_ACK, node, &ied);
+
+ ast_log(LOG_NOTICE, "Have a TXC Session New request from %s with remote session ID of %u\n", node->name, ies->session_id);
+ return 1;
+}
+
+/* Handle a TXC Session ACK response */
+static int txc_session_handle_ack(txc_node_t *node, txc_ies_t *ies)
+{
+ txc_session_t *session = find_session(ies->session_id);
+
+ ast_mutex_lock(&session->lock);
+
+ /* Update session information */
+ session->remote_session_id = ies->remote_session_id;
+
+ /* Setup sending audio port */
+ memcpy(&session->addr, &node->addr, sizeof(session->addr));
+ session->addr.sin_family = AF_INET;
+ session->addr.sin_port = htons(ies->remote_session_id+TXC_DEFAULT_PORTNO+1);
+ session->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+
+ /* Signal wakeup */
+ ast_cond_signal(&session->cond);
+ ast_mutex_unlock(&session->lock);
+
+ return 1;
+}
+
+/* Handle a TXC Session Reject response */
+static int txc_session_handle_reject(txc_node_t *node, txc_ies_t *ies)
+{
+ txc_session_t *session = find_session(ies->session_id);
+
+ ast_mutex_lock(&session->lock);
+
+ ast_set_flag(session, TXC_SESSION_FAILED);
+
+ ast_cond_signal(&session->cond);
+
+ ast_mutex_unlock(&session->lock);
+
+ return 1;
+}
+
+/* Handle a TXC Session End request */
+static int txc_session_handle_end(txc_node_t *node, txc_ies_t *ies)
+{
+ txc_session_t *session = find_session(ies->session_id);
+
+ /* Destroy anything on the session */
+ txc_session_destroy(session);
+
+ return 1;
+}
+
+/* Ping a node */
+static int txc_node_ping(txc_node_t *node)
+{
+ txc_ie_data_t ied;
+
+ ast_mutex_lock(&node->lock);
+
+ /* Change flag to indicate they were pinged */
+ ast_set_flag(node, TXC_NODE_PINGED);
+
+ memset(&ied, 0, sizeof(ied));
+
+ /* Standard information */
+ txc_append_standard_ied(&ied);
+
+ txc_node_transmit(TXC_NODE_PING, node, &ied);
+
+ ast_mutex_unlock(&node->lock);
+
+ return 1;
+}
+
+/* Reply to the ping from a node */
+static int txc_node_pong(txc_node_t *node)
+{
+ txc_ie_data_t ied;
+
+ ast_mutex_lock(&node->lock);
+
+ memset(&ied, 0, sizeof(ied));
+
+ txc_append_standard_ied(&ied);
+
+ txc_node_transmit(TXC_NODE_PONG, node, &ied);
+
+ ast_mutex_unlock(&node->lock);
+
+ return 1;
+}
+
+/* Transmit to the TXC community */
+static int txc_community_transmit(int type, txc_ie_data_t *ied)
+{
+ socklen_t len = sizeof(struct sockaddr_in);
+ int res = 0;
+
+ /* Add our type in */
+ txc_ie_append_int(ied, TXC_IE_TYPE, type);
+
+ ast_mutex_lock(&community_lock);
+ res = sendto(community_send_sock, (unsigned char*)ied->buf, ied->pos, 0, (struct sockaddr *)&community_send_saddr, len);
+ ast_mutex_unlock(&community_lock);
+
+ return 1;
+}
+
+/* Transmit a TXC Request */
+static int txc_community_request(char *target_uid)
+{
+ txc_ie_data_t ied;
+
+ memset(&ied, 0, sizeof(ied));
+
+ /* Setup standard information */
+ txc_append_standard_ied(&ied);
+
+ /* If a target UID is specified, add it in as well */
+ if (target_uid != NULL) {
+ txc_ie_append_str(&ied, TXC_IE_TARGET_UID, target_uid);
+ }
+
+ /* Now send out the request */
+ txc_community_transmit(TXC_ANNOUNCE_REQUEST, &ied);
+
+ return 1;
+}
+
+/* Transmit a TXC Failure */
+static int txc_community_failure(txc_node_t *node)
+{
+ txc_ie_data_t ied;
+
+ ast_mutex_lock(&node->lock);
+
+ memset(&ied, 0, sizeof(ied));
+
+ txc_append_standard_ied(&ied);
+
+ txc_community_transmit(TXC_ANNOUNCE_FAILURE, &ied);
+
+ ast_mutex_unlock(&node->lock);
+
+ return 1;
+}
+
+/* Transmit a TXC Reply */
+static int txc_community_reply(void)
+{
+ txc_ie_data_t ied;
+
+ memset(&ied, 0, sizeof(ied));
+
+ /* Standard information */
+ txc_append_standard_ied(&ied);
+
+ /* Add our capabilities */
+ txc_ie_append_int(&ied, TXC_IE_CAPABILITY, capabilities);
+
+ /* Finally transmit it */
+ txc_community_transmit(TXC_ANNOUNCE_REPLY, &ied);
+
+ return 1;
+}
+
+/* Process community data */
+static void community_process(unsigned char *data, int size)
+{
+ txc_node_t *node = NULL, *failed_node = NULL;
+ txc_ies_t ies;
+
+ /* Parse information elements */
+ if (txc_parse_ies(&ies, data, size) || ies.source_uid == NULL)
+ return;
+
+ /* If the packet is from ourselves -- ignore it */
+#if 0
+ if (!strcasecmp(ies.source_uid, uid)) {
+ return;
+ }
+#endif
+
+ /* Get the node information (if available) */
+ node = ASTOBJ_CONTAINER_FIND(&nodes, ies.source_uid);
+
+ /* If the node does not exist, and this is not a reply... do a request on them so we get their information */
+ if (node == NULL && (ies.type != TXC_ANNOUNCE_REPLY && ies.type != TXC_ANNOUNCE_REQUEST)) {
+ txc_community_request(ies.source_uid);
+ return;
+ }
+
+ switch (ies.type) {
+ /* Community announce frame types */
+ case TXC_ANNOUNCE_REQUEST:
+ /* If there is no target UID OR the target UID is us, send a reply */
+ if (ies.target_uid == NULL || !strcasecmp(ies.target_uid, uid))
+ txc_community_reply();
+ time(&last_community_update);
+ break;
+ case TXC_ANNOUNCE_REPLY:
+ if (node == NULL) {
+ /* Create a new node */
+ node = ast_calloc(1, sizeof(*node));
+ strncpy(node->name, ies.source_uid, sizeof(node->name));
+ node->sessions = 0;
+ /* Set state */
+ node->state = TXC_NODE_STATE_ALIVE;
+ /* Set capability to the ones they specified */
+ node->capability = ies.capability;
+ /* Figure out where they came from for direct non-multicast communication */
+ memcpy(&node->addr, &community_recv_saddr, sizeof(node->addr));
+ node->addr.sin_port = htons(TXC_DEFAULT_PORTNO);
+ /* Setup a socket to them */
+ node->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ /* They are definitely a community member since we found out about them that way */
+ ast_set_flag(node, TXC_NODE_COMMUNITY_MEMBER);
+ /* Finally add them to the node list */
+ ASTOBJ_CONTAINER_LINK(&nodes, node);
+ /* Chances are they don't know about us... so send out a reply */
+ txc_community_reply();
+ } else {
+ if (node->state == TXC_NODE_STATE_FAILED) {
+ /* Put them back into service */
+ node->state = TXC_NODE_STATE_ALIVE;
+ }
+ if (node->capability != ies.capability) {
+ node->capability = ies.capability;
+ }
+ }
+ break;
+ case TXC_ANNOUNCE_SUSPEND:
+ ast_mutex_lock(&node->lock);
+ node->state = TXC_NODE_STATE_SUSPENDED;
+ ast_mutex_unlock(&node->lock);
+ break;
+ case TXC_ANNOUNCE_FAILURE:
+ if (ies.target_uid != NULL) {
+ failed_node = ASTOBJ_CONTAINER_FIND(&nodes, ies.target_uid);
+ } else {
+ failed_node = node;
+ }
+ break;
+ /* Session frame types */
+ case TXC_SESSION_NEW:
+ txc_session_handle_new(node, &ies);
+ break;
+ case TXC_SESSION_ACK:
+ txc_session_handle_ack(node, &ies);
+ break;
+ case TXC_SESSION_REJECT:
+ txc_session_handle_reject(node, &ies);
+ break;
+ case TXC_SESSION_END:
+ txc_session_handle_end(node, &ies);
+ break;
+ /* Node frame types */
+ case TXC_NODE_PING:
+ /* Send a pong reply back to the sender */
+ txc_node_pong(node);
+ break;
+ case TXC_NODE_PONG:
+ /* Received a pong reply back so clear pinged flag and update last time */
+ ast_clear_flag(node, TXC_NODE_PINGED);
+ break;
+ }
+
+ return;
+}
+
+/* Node monitoring thread */
+static void *node_monitor_exec(void *ignore)
+{
+ time_t now;
+
+ for (;;) {
+ /* Go through the list of nodes */
+ ASTOBJ_CONTAINER_TRAVERSE(&nodes, 1, {
+ ASTOBJ_WRLOCK(iterator);
+ ast_mutex_lock(&iterator->lock);
+ if (iterator->state == TXC_NODE_STATE_ALIVE) {
[... 478 lines stripped ...]
More information about the asterisk-commits
mailing list