[svn-commits] tilghman: trunk r104101 - in /trunk: CHANGES cdr/cdr_pgsql.c

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Feb 25 17:04:20 CST 2008


Author: tilghman
Date: Mon Feb 25 17:04:20 2008
New Revision: 104101

URL: http://svn.digium.com/view/asterisk?view=rev&rev=104101
Log:
Permit additional CDR columns to be saved in Postgres.  Note that these
changes are backward-compatible, so no changes to UPGRADE.txt are
necessary.
(closes issue #9279)
 Reported by: rottenroddy
 Patches: 
       20080125__bug9279.diff.txt uploaded by Corydon76 (license 14)
 Tested by: Corydon76

Modified:
    trunk/CHANGES
    trunk/cdr/cdr_pgsql.c

Modified: trunk/CHANGES
URL: http://svn.digium.com/view/asterisk/trunk/CHANGES?view=diff&rev=104101&r1=104100&r2=104101
==============================================================================
--- trunk/CHANGES (original)
+++ trunk/CHANGES Mon Feb 25 17:04:20 2008
@@ -450,6 +450,29 @@
      and to ensure that the oldest log file gets deleted.
   * Added realtime support for the queue log
 
+Call Detail Records 
+-------------------
+  * The cdr_manager module has a [mappings] feature, like cdr_custom,
+    to add fields to the manager event from the CDR variables.
+  * Added cdr_adaptive_odbc, a new module that adapts to the structure of your
+     backend database CDR table.  Specifically, additional, non-standard
+     columns are supported, merely by setting the corresponding CDR variable in
+     your dialplan.  In addition, you may alias any column to another name (for
+     example, if you want the 'src' CDR variable to be column 'ANI' in the DB,
+     simply "alias src => ANI" in the configuration file).  Records may be
+     posted to more than one backend, simply by specifying multiple categories
+     in the configuration file.  And finally, you may filter which CDRs get
+     posted to each backend, by specifying a filter (which the record must
+     match) for the particular category.  Filters are additive (meaning all
+     rules must match to post that CDR).
+  * The Postgres CDR module now supports some features of the cdr_adaptive_odbc
+     module.  Specifically, you may add additional columns into the table and
+     they will be set, if you set the corresponding CDR variable name.  Also,
+     if you omit columns in your database table, they will be silently skipped
+     (but a record will still be inserted, based on what columns remain).  Note
+     that the other two features from cdr_adaptive_odbc (alias and filter) are
+     not currently supported.
+
 Miscellaneous New Modules
 -------------------------
   * Added a new CDR module, cdr_sqlite3_custom.
@@ -494,8 +517,6 @@
   * Added maxfiles option to options section of asterisk.conf which allows you to specify
      what Asterisk should set as the maximum number of open files when it loads.
   * Added the jittertargetextra configuration option.
-  * The cdr_manager module has a [mappings] feature, like cdr_custom,
-    to add fields to the manager event from the CDR variables.
   * Added support for setting the CoS for VLAN traffic (802.1p).  See the sample
      configuration files for the IP channel drivers.  The new option is "cos".
      This information is also documented in doc/qos.tex, or the IP Quality of Service
@@ -523,3 +544,4 @@
   * Added a compiler flag, CHANNEL_TRACE, which permits channel tracing to be
      turned on, via the CHANNEL(trace) dialplan function.  Could be useful for
      dialplan debugging.
+

Modified: trunk/cdr/cdr_pgsql.c
URL: http://svn.digium.com/view/asterisk/trunk/cdr/cdr_pgsql.c?view=diff&rev=104101&r1=104100&r2=104101
==============================================================================
--- trunk/cdr/cdr_pgsql.c (original)
+++ trunk/cdr/cdr_pgsql.c Mon Feb 25 17:04:20 2008
@@ -50,28 +50,67 @@
 #include "asterisk/cdr.h"
 #include "asterisk/module.h"
 
-#define DATE_FORMAT "%Y-%m-%d %T"
+#define DATE_FORMAT "'%Y-%m-%d %T'"
 
 static char *name = "pgsql";
 static char *config = "cdr_pgsql.conf";
 static char *pghostname = NULL, *pgdbname = NULL, *pgdbuser = NULL, *pgpassword = NULL, *pgdbport = NULL, *table = NULL;
 static int connected = 0;
+static int maxsize = 512, maxsize2 = 512;
 
 AST_MUTEX_DEFINE_STATIC(pgsql_lock);
 
 static PGconn	*conn = NULL;
+
+struct columns {
+	char *name;
+	char *type;
+	int len;
+	AST_RWLIST_ENTRY(columns) list;
+};
+
+static AST_RWLIST_HEAD_STATIC(psql_columns, columns);
+
+#define LENGTHEN_BUF1(size)														\
+			do {																\
+				/* Lengthen buffer, if necessary */								\
+				if ((newsize = lensql + (size) + 3) > sizesql) {	\
+					if ((tmp = ast_realloc(sql, (newsize / 512 + 1) * 512))) {	\
+						sql = tmp;												\
+						sizesql = (newsize / 512 + 1) * 512;					\
+					} else {													\
+						ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CDR failed.\n"); \
+						ast_free(sql);											\
+						ast_free(sql2);											\
+						AST_RWLIST_UNLOCK(&psql_columns);						\
+						return -1;												\
+					}															\
+				}																\
+			} while (0)
+
+#define LENGTHEN_BUF2(size)														\
+			do {																\
+				if ((newsize = lensql2 + (size) + 3) > sizesql2) {				\
+					if ((tmp = ast_realloc(sql2, (newsize / 512 + 1) * 512))) {	\
+						sql2 = tmp;												\
+						sizesql2 = (newsize / 512 + 1) * 512;					\
+					} else {													\
+						ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CDR failed.\n");	\
+						ast_free(sql);											\
+						ast_free(sql2);											\
+						AST_RWLIST_UNLOCK(&psql_columns);						\
+						return -1;												\
+					}															\
+				}																\
+			} while (0)
 
 static int pgsql_log(struct ast_cdr *cdr)
 {
 	struct ast_tm tm;
-	char sqlcmd[2048] = "", timestr[128];
 	char *pgerror;
 	PGresult *result;
 
 	ast_mutex_lock(&pgsql_lock);
-
-	ast_localtime(&cdr->start, &tm, NULL);
-	ast_strftime(timestr, sizeof(timestr), DATE_FORMAT, &tm);
 
 	if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
 		conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
@@ -87,49 +126,135 @@
 	}
 
 	if (connected) {
-		char *clid=NULL, *dcontext=NULL, *channel=NULL, *dstchannel=NULL, *lastapp=NULL, *lastdata=NULL;
-		char *src=NULL, *dst=NULL, *uniqueid=NULL, *userfield=NULL;
-		int pgerr;
-
-		/* Maximum space needed would be if all characters needed to be escaped, plus a trailing NULL */
-		if ((clid = alloca(strlen(cdr->clid) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, clid, cdr->clid, strlen(cdr->clid), &pgerr);
-		if ((dcontext = alloca(strlen(cdr->dcontext) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, dcontext, cdr->dcontext, strlen(cdr->dcontext), &pgerr);
-		if ((channel = alloca(strlen(cdr->channel) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, channel, cdr->channel, strlen(cdr->channel), &pgerr);
-		if ((dstchannel = alloca(strlen(cdr->dstchannel) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, dstchannel, cdr->dstchannel, strlen(cdr->dstchannel), &pgerr);
-		if ((lastapp = alloca(strlen(cdr->lastapp) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, lastapp, cdr->lastapp, strlen(cdr->lastapp), &pgerr);
-		if ((lastdata = alloca(strlen(cdr->lastdata) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, lastdata, cdr->lastdata, strlen(cdr->lastdata), &pgerr);
-		if ((uniqueid = alloca(strlen(cdr->uniqueid) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, uniqueid, cdr->uniqueid, strlen(cdr->uniqueid), &pgerr);
-		if ((userfield = alloca(strlen(cdr->userfield) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, userfield, cdr->userfield, strlen(cdr->userfield), &pgerr);
-		if ((src = alloca(strlen(cdr->src) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, src, cdr->src, strlen(cdr->src), &pgerr);
-		if ((dst = alloca(strlen(cdr->dst) * 2 + 1)) != NULL)
-			PQescapeStringConn(conn, dst, cdr->dst, strlen(cdr->dst), &pgerr);
-
-		/* Check for all alloca failures above at once */
-		if ((!clid) || (!dcontext) || (!channel) || (!dstchannel) || (!lastapp) || (!lastdata) || (!uniqueid) || (!userfield) || (!src) || (!dst)) {
-			ast_log(LOG_ERROR, "cdr_pgsql:  Out of memory error (insert fails)\n");
-			ast_mutex_unlock(&pgsql_lock);
-			return -1;
-		}
+		struct columns *cur;
+		int lensql, lensql2, sizesql = maxsize, sizesql2 = maxsize2, newsize;
+		char *sql = ast_calloc(sizeof(char), sizesql), *sql2 = ast_calloc(sizeof(char), sizesql2), *tmp, *value;
+		char buf[257], escapebuf[513];
+  
+		lensql = snprintf(sql, sizesql, "INSERT INTO %s (", table);
+		lensql2 = snprintf(sql2, sizesql2, " VALUES (");
+  
+		AST_RWLIST_RDLOCK(&psql_columns);
+		AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
+			/* For fields not set, simply skip them */
+			ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+			if (!value)
+				continue;
+			
+			LENGTHEN_BUF1(strlen(cur->name));
+			lensql += snprintf(sql + lensql, sizesql - lensql, "%s,", cur->name);
+
+			if (strcmp(cur->name, "start") == 0 || strcmp(cur->name, "calldate") == 0) {
+				if (strncmp(cur->type, "int", 3) == 0) {
+					LENGTHEN_BUF2(12);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->start.tv_sec);
+				} else if (strncmp(cur->type, "float", 5) == 0) {
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->start.tv_sec + (double)cdr->start.tv_usec / 1000000.0);
+				} else {
+					/* char, hopefully */
+					LENGTHEN_BUF2(30);
+					ast_localtime(&cdr->start, &tm, NULL);
+					lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+				}
+			} else if (strcmp(cur->name, "answer") == 0) {
+				if (strncmp(cur->type, "int", 3) == 0) {
+					LENGTHEN_BUF2(12);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->answer.tv_sec);
+				} else if (strncmp(cur->type, "float", 5) == 0) {
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->answer.tv_sec + (double)cdr->answer.tv_usec / 1000000.0);
+				} else {
+					/* char, hopefully */
+					LENGTHEN_BUF2(30);
+					ast_localtime(&cdr->start, &tm, NULL);
+					lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+				}
+			} else if (strcmp(cur->name, "end") == 0) {
+				if (strncmp(cur->type, "int", 3) == 0) {
+					LENGTHEN_BUF2(12);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->end.tv_sec);
+				} else if (strncmp(cur->type, "float", 5) == 0) {
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec + (double)cdr->end.tv_usec / 1000000.0);
+				} else {
+					/* char, hopefully */
+					LENGTHEN_BUF2(30);
+					ast_localtime(&cdr->end, &tm, NULL);
+					lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
+				}
+			} else if (strcmp(cur->name, "duration") == 0 || strcmp(cur->name, "billsec") == 0) {
+				if (cur->type[0] == 'i') {
+					/* Get integer, no need to escape anything */
+					ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+					LENGTHEN_BUF2(12);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+				} else if (strncmp(cur->type, "float", 5) == 0) {
+					struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+				} else {
+					/* Char field, probably */
+					struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%f'", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
+				}
+			} else if (strcmp(cur->name, "disposition") == 0 || strcmp(cur->name, "amaflags") == 0) {
+				if (strncmp(cur->type, "int", 3) == 0) {
+					/* Integer, no need to escape anything */
+					ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 1);
+					LENGTHEN_BUF2(12);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
+				} else {
+					/* Although this is a char field, there are no special characters in the values for these fields */
+					ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+					LENGTHEN_BUF2(30);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", value);
+				}
+			} else {
+				/* Arbitrary field, could be anything */
+				ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
+				if (strncmp(cur->type, "int", 3) == 0) {
+					long long whatever;
+					if (value && sscanf(value, "%lld", &whatever) == 1) {
+						LENGTHEN_BUF2(25);
+						lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%lld", whatever);
+					} else {
+						LENGTHEN_BUF2(1);
+						lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+					}
+				} else if (strncmp(cur->type, "float", 5) == 0) {
+					long double whatever;
+					if (value && sscanf(value, "%Lf", &whatever) == 1) {
+						LENGTHEN_BUF2(50);
+						lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%30Lf", whatever);
+					} else {
+						LENGTHEN_BUF2(1);
+						lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
+					}
+				/* XXX Might want to handle dates, times, and other misc fields here XXX */
+				} else {
+					if (value)
+						PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
+					else
+						escapebuf[0] = '\0';
+					LENGTHEN_BUF2(strlen(escapebuf) + 2);
+					lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", escapebuf);
+				}
+			}
+			LENGTHEN_BUF2(1);
+			strcat(sql2 + lensql2, ",");
+			lensql2++;
+  		}
+		AST_RWLIST_UNLOCK(&psql_columns);
+		LENGTHEN_BUF1(lensql2);
+		sql[lensql - 1] = ')';
+		sql2[lensql2 - 1] = ')';
+		strcat(sql + lensql, sql2);
+		ast_verb(11, "[%s]\n", sql);
 
 		ast_debug(2, "cdr_pgsql: inserting a CDR record.\n");
 
-		snprintf(sqlcmd,sizeof(sqlcmd),"INSERT INTO %s (calldate,clid,src,dst,dcontext,channel,dstchannel,"
-				 "lastapp,lastdata,duration,billsec,disposition,amaflags,accountcode,uniqueid,userfield) VALUES"
-				 " ('%s','%s','%s','%s','%s', '%s','%s','%s','%s',%ld,%ld,'%s',%ld,'%s','%s','%s')",
-				 table, timestr, clid, src, dst, dcontext, channel, dstchannel, lastapp, lastdata,
-				 cdr->duration,cdr->billsec,ast_cdr_disp2str(cdr->disposition),cdr->amaflags, cdr->accountcode, uniqueid, userfield);
-		
-		ast_debug(3, "cdr_pgsql: SQL command executed:  %s\n",sqlcmd);
-		
 		/* Test to be sure we're still connected... */
 		/* If we're connected, and connection is working, good. */
 		/* Otherwise, attempt reconnect.  If it fails... sorry... */
@@ -152,7 +277,7 @@
 				return -1;
 			}
 		}
-		result = PQexec(conn, sqlcmd);
+		result = PQexec(conn, sql);
 		if (PQresultStatus(result) != PGRES_COMMAND_OK) {
 			pgerror = PQresultErrorMessage(result);
 			ast_log(LOG_ERROR,"cdr_pgsql: Failed to insert call detail record into database!\n");
@@ -163,7 +288,7 @@
 				ast_log(LOG_ERROR, "cdr_pgsql: Connection reestablished.\n");
 				connected = 1;
 				PQclear(result);
-				result = PQexec(conn, sqlcmd);
+				result = PQexec(conn, sql);
 				if (PQresultStatus(result) != PGRES_COMMAND_OK) {
 					pgerror = PQresultErrorMessage(result);
 					ast_log(LOG_ERROR,"cdr_pgsql: HARD ERROR!  Attempted reconnection failed.  DROPPING CALL RECORD!\n");
@@ -181,8 +306,14 @@
 }
 
 static int unload_module(void)
-{ 
+{
+	struct columns *cur;
+	ast_cdr_unregister(name);
+
+	/* Give all threads time to finish */
+	usleep(1);
 	PQfinish(conn);
+
 	if (pghostname)
 		ast_free(pghostname);
 	if (pgdbname)
@@ -195,7 +326,13 @@
 		ast_free(pgdbport);
 	if (table)
 		ast_free(table);
-	ast_cdr_unregister(name);
+
+	AST_RWLIST_WRLOCK(&psql_columns);
+	while ((cur = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
+		ast_free(cur);
+	}
+	AST_RWLIST_UNLOCK(&psql_columns);
+
 	return 0;
 }
 
@@ -203,6 +340,8 @@
 {
 	struct ast_variable *var;
 	char *pgerror;
+	struct columns *cur;
+	PGresult *result;
 	const char *tmp;
 	struct ast_config *cfg;
 	struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
@@ -304,8 +443,40 @@
 	
 	conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
 	if (PQstatus(conn) != CONNECTION_BAD) {
+		char sqlcmd[256];
+		char *fname, *ftype, *flen;
+		int i, rows;
 		ast_debug(1, "Successfully connected to PostgreSQL database.\n");
 		connected = 1;
+
+		/* Query the columns */
+		snprintf(sqlcmd, sizeof(sqlcmd), "select a.attname, t.typname, a.attlen from pg_class c, pg_attribute a, pg_type t where c.oid = a.attrelid and a.atttypid = t.oid and (a.attnum > 0) and c.relname = '%s' order by c.relname, attnum", table);
+		result = PQexec(conn, sqlcmd);
+		if (PQresultStatus(result) != PGRES_TUPLES_OK) {
+			pgerror = PQresultErrorMessage(result);
+			ast_log(LOG_ERROR, "cdr_pgsql: Failed to query database columns: %s\n", pgerror);
+			PQclear(result);
+			unload_module();
+			return AST_MODULE_LOAD_DECLINE;
+		}
+
+		rows = PQntuples(result);
+		for (i = 0; i < rows; i++) {
+			fname = PQgetvalue(result, i, 0);
+			ftype = PQgetvalue(result, i, 1);
+			flen = PQgetvalue(result, i, 2);
+			ast_verb(4, "Found column '%s' of type '%s'\n", fname, ftype);
+			cur = ast_calloc(1, sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
+			if (cur) {
+				sscanf(flen, "%d", &cur->len);
+				cur->name = (char *)cur + sizeof(*cur);
+				cur->type = (char *)cur + sizeof(*cur) + strlen(fname) + 1;
+				strcpy(cur->name, fname);
+				strcpy(cur->type, ftype);
+				AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
+			}
+		}
+		PQclear(result);
 	} else {
 		pgerror = PQerrorMessage(conn);
 		ast_log(LOG_ERROR, "cdr_pgsql: Unable to connect to database server %s.  CALLS WILL NOT BE LOGGED!!\n", pghostname);




More information about the svn-commits mailing list