[Asterisk-code-review] cel_odbc: Rewritten most of cel_odbc (asterisk[13])

Dennis asteriskteam at digium.com
Mon Jul 27 10:09:37 CDT 2020


Dennis has uploaded this change for review. ( https://gerrit.asterisk.org/c/asterisk/+/14698 )


Change subject: cel_odbc: Rewritten most of cel_odbc
......................................................................

cel_odbc: Rewritten most of cel_odbc

We found that if a datasource is unavailable when cel_odbc.conf is
processed, the table is skipped and no records will ever be inserted
into the skipped table.

While spliting code apart, I found lots of interesting problems hidden
inside this file and ended up rewriting most of it. Trying to keep
changes minimal became a near impossible task because of the amount of
collisions and rewrites.

The first problem I encountered was malloc tricks and pointer hacks,
stuffing null-terminated strings below structures. As these structures
were only created when cel_odbc.conf is parsed, the loss of
maintainability far outweighs any performance gained from doing so.

Next, I split up all the massive functions best I could. Each function
now has its own exact purpose. Some functions were renamed during
this process to better describe what they do.

I also renamed many variables to what they really are/contained.
Redundant logging of allocation failures was removed as ast_calloc
already logs them. Functions were added to allocate and initialize
every structure with proper default values. Lots of temporary variables
were removed, avoiding extra string copies.

Arbitrary length limits on strings have been mostly removed.
Configuration file is now traversed instead of scanned repeatedly.

The SQL_ERROR return code from SQLFetch is now handled properly,
causing the table to be skipped instead of trying to perform partial
inserts into it.

Optimizing odbc_log() was fairly interesting and while I haven't
benchmarked should be fairly fruitful. The original algorithm
constructed completely new SQL for every table on every event,
stringifying values in the process and performing
events*tables*columns*20-ish string comparisons. The new algorithm
generates static queries once tables columns are known. Relative
addresses are generated for all parameters, to be used later when
binding parameters. By binding parameters instead of stringifying them
we allow the driver to decide on the most optimal conversion.

All identifiers have been wrapped with backticks, which should close
any tickets related to column names and reserved words. Because we no
longer perform type checks on all columns, I was able to discard lots of
extra data from the columns struct.

Lastly, I tried to make verbose logs a bit more clear and useful.

Hopefully I was able to re-implement the "static", "filter" and "alias"
features properly. Documentation on how they actually function was
somewhat light.

This module now works as follows:

First, it reads the configuration file and stores all relevant bits in
structures for later. Once this is done, it will attempt to retrieve
column names for every table mentioned in the config file, construct a
query for these tables and pre-bind any parameters.

For any tables that were inaccessible at config time, a manager thread
will go through the list of tables every 10 seconds and attempts to
construct queries for any tables that this module failed to connect to
previously. Once all tables have a valid query, the manager thread will
self-terminate.

ASTERISK-29012 #close

Change-Id: Id85d81add33096f8282d212daf239f2fc845d783
---
M cel/cel_odbc.c
1 file changed, 815 insertions(+), 680 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/98/14698/1

diff --git a/cel/cel_odbc.c b/cel/cel_odbc.c
index f6b6321..e85ea57 100644
--- a/cel/cel_odbc.c
+++ b/cel/cel_odbc.c
@@ -7,6 +7,9 @@
  * Tilghman Lesher <tlesher AT digium DOT com>
  * by Steve Murphy
  *
+ * Refactored most of this module to support delayed connectivity.
+ * Dennis Buteyn <dennis.buteyn at xorcom.com>
+ *
  * See http://www.asterisk.org for more information about
  * the Asterisk project. Please do not directly contact
  * any of the maintainers of this project for assistance;
@@ -51,6 +54,7 @@
 #include "asterisk/res_odbc.h"
 #include "asterisk/cel.h"
 #include "asterisk/module.h"
+#include "asterisk/sem.h"
 
 #define	CONFIG	"cel_odbc.conf"
 
@@ -62,49 +66,602 @@
 /*! TRUE if we should set the eventtype field to USER_DEFINED on user events. */
 static unsigned char cel_show_user_def;
 
-/* Optimization to reduce number of memory allocations */
-static int maxsize = 512, maxsize2 = 512;
-
 struct columns {
-	char *name;
-	char *celname;
-	char *filtervalue;
-	char *staticvalue;
-	SQLSMALLINT type;
-	SQLINTEGER size;
-	SQLSMALLINT decimals;
-	SQLSMALLINT radix;
-	SQLSMALLINT nullable;
-	SQLINTEGER octetlen;
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(name);
+	);
+	const char * staticval; /* Reference to staticval->value, no need to free */
+	int rva; /* Relative offset into ast_cel_event_record */
 	AST_LIST_ENTRY(columns) list;
 };
 
+struct aliases {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(source);
+		AST_STRING_FIELD(target);
+	);
+	int rva; /* Relative offset into ast_cel_event_record */
+	AST_LIST_ENTRY(aliases) list;
+};
+
+struct staticvals {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(colname);
+		AST_STRING_FIELD(value);
+	);
+	AST_LIST_ENTRY(staticvals) list;
+};
+
+struct filters {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(celname);
+		AST_STRING_FIELD(value);
+	);
+	int rva; /* Relative offset into ast_cel_event_record */
+	AST_LIST_ENTRY(filters) list;
+};
+
 struct tables {
-	char *connection;
-	char *table;
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(name);
+		AST_STRING_FIELD(category);
+		AST_STRING_FIELD(connection);
+		AST_STRING_FIELD(query);
+	);
 	unsigned int usegmtime:1;
 	unsigned int allowleapsec:1;
 	AST_LIST_HEAD_NOLOCK(odbc_columns, columns) columns;
+	AST_LIST_HEAD_NOLOCK(odbc_aliases, aliases) aliases;
+	AST_LIST_HEAD_NOLOCK(odbc_staticvals, staticvals) staticvals;
+	AST_LIST_HEAD_NOLOCK(odbc_filters, filters) filters;
 	AST_RWLIST_ENTRY(tables) list;
 };
 
+/* Used when calling generic_prepare */
+struct prepare_params {
+	struct tables *table;
+	struct ast_cel_event_record record;
+	SQL_TIMESTAMP_STRUCT event_time;
+};
+
 static AST_RWLIST_HEAD_STATIC(odbc_tables, tables);
+struct ast_sem manager_sem; /* Raised when manager told to stop */
+static pthread_t manager_thread = AST_PTHREADT_NULL;
+
+static struct columns * create_column(void)
+{
+	struct columns *column = ast_calloc(1, sizeof(*column));
+
+	if (column) {
+		if (ast_string_field_init(column, 256) == 0) {
+			column->rva = -1;
+			return column;
+		} else {
+			ast_free(column);
+		}
+	}
+
+	return NULL;
+}
+
+static void destroy_column(struct columns *column)
+{
+	ast_string_field_free_memory(column);
+	ast_free(column);
+}
+
+static void destroy_columns(struct tables *table)
+{
+	struct columns *column;
+
+	while ((column = AST_LIST_REMOVE_HEAD(&(table->columns), list))) {
+		destroy_column(column);
+	}
+}
+
+static struct aliases * create_alias(void)
+{
+	struct aliases *alias = ast_calloc(1, sizeof(*alias));
+
+	if (alias) {
+		if (ast_string_field_init(alias, 256) == 0) {
+			alias->rva = -1;
+			return alias;
+		} else {
+			ast_free(alias);
+		}
+	}
+
+	return NULL;
+}
+
+static void destroy_alias(struct aliases *alias)
+{
+	ast_string_field_free_memory(alias);
+	ast_free(alias);
+}
+
+static void destroy_aliases(struct tables *table)
+{
+	struct aliases *alias;
+
+	while ((alias = AST_RWLIST_REMOVE_HEAD(&(table->aliases), list))) {
+		destroy_alias(alias);
+	}
+}
+
+static int append_alias(
+	struct tables *table,
+	const char *source,
+	const char *target,
+	int rva
+) {
+	struct aliases *alias = create_alias();
+
+	if (alias) {
+		ast_string_field_set(alias, source, source);
+		ast_string_field_set(alias, target, target);
+		alias->rva = rva;
+		AST_LIST_INSERT_TAIL(&(table->aliases), alias, list);
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+static struct staticvals * create_staticval(void)
+{
+	struct staticvals *staticval = ast_calloc(1, sizeof(*staticval));
+
+	if (staticval) {
+		if (ast_string_field_init(staticval, 256) == 0) {
+			return staticval;
+		} else {
+			ast_free(staticval);
+		}
+	}
+
+	return NULL;
+}
+
+static void destroy_staticval(struct staticvals *staticval)
+{
+	ast_string_field_free_memory(staticval);
+	ast_free(staticval);
+}
+
+static void destroy_staticvals(struct tables *table)
+{
+	struct staticvals *staticval;
+
+	while ((staticval = AST_RWLIST_REMOVE_HEAD(&(table->staticvals), list))) {
+		destroy_staticval(staticval);
+	}
+}
+
+static int append_staticval(
+	struct tables *table,
+	const char *colname,
+	const char *value
+) {
+	struct staticvals *staticval = create_staticval();
+
+	if (staticval) {
+		ast_string_field_set(staticval, colname, colname);
+		ast_string_field_set(staticval, value, value);
+		AST_LIST_INSERT_TAIL(&(table->staticvals), staticval, list);
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+static struct filters * create_filter(void)
+{
+	struct filters *filter = ast_calloc(1, sizeof(*filter));
+
+	if (filter) {
+		if (ast_string_field_init(filter, 256) == 0) {
+			filter->rva = -1;
+			return filter;
+		} else {
+			ast_free(filter);
+		}
+	}
+
+	return NULL;
+}
+
+static void destroy_filter(struct filters *filter)
+{
+	ast_string_field_free_memory(filter);
+	ast_free(filter);
+}
+
+static void destroy_filters(struct tables *table)
+{
+	struct filters *filter;
+
+	while ((filter = AST_RWLIST_REMOVE_HEAD(&(table->filters), list))) {
+		destroy_filter(filter);
+	}
+}
+
+static int append_filter(
+	struct tables *table,
+	const char *celname,
+	const char *value,
+	int rva
+) {
+	struct filters *filter = create_filter();
+
+	if (filter) {
+		ast_string_field_set(filter, celname, celname);
+		ast_string_field_set(filter, value, value);
+		filter->rva = rva;
+		AST_LIST_INSERT_TAIL(&(table->filters), filter, list);
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+static struct tables * create_table(void)
+{
+	struct tables *table = ast_calloc(1, sizeof(*table));
+
+	if (table) {
+		if (ast_string_field_init(table, 256) == 0) {
+			table->usegmtime = 0;
+			table->allowleapsec = 1;
+			ast_string_field_set(table, name, "cel");
+			return table;
+		} else {
+			ast_free(table);
+		}
+	}
+
+	return NULL;
+}
+
+static void destroy_table(struct tables *table)
+{
+	destroy_columns(table);
+	destroy_aliases(table);
+	destroy_staticvals(table);
+	destroy_filters(table);
+	ast_string_field_free_memory(table);
+	ast_free(table);
+}
+
+static void destroy_tables(void)
+{
+	struct tables *table;
+
+	while ((table = AST_RWLIST_REMOVE_HEAD(&odbc_tables, list))) {
+		destroy_table(table);
+	}
+}
+
+static void destroy_tables_safely(void)
+{
+	while (AST_RWLIST_WRLOCK(&odbc_tables)) {
+		ast_log(LOG_WARNING, "Unable to lock table list for modification. Retrying.\n");
+	}
+	destroy_tables();
+	AST_RWLIST_UNLOCK(&odbc_tables);
+}
+
+static int wait_for_signal(void)
+{
+	struct timeval timeout_val = ast_tvadd(ast_tvnow(), ast_tv(10, 0));
+	struct timespec timeout_spec = {
+		.tv_sec = timeout_val.tv_sec,
+		.tv_nsec = timeout_val.tv_usec * 1000,
+	};
+
+	ast_sem_timedwait(&manager_sem, &timeout_spec);
+	return errno;
+}
+
+static int cel_record_rva(const char * name)
+{
+	if (ast_strings_equal(name, "eventtime")) {
+		return offsetof(struct ast_cel_event_record, event_time);
+	} else if (ast_strings_equal(name, "userdeftype")) {
+		return offsetof(struct ast_cel_event_record, user_defined_name);
+	} else if (ast_strings_equal(name, "cid_name")) {
+		return offsetof(struct ast_cel_event_record, caller_id_name);
+	} else if (ast_strings_equal(name, "cid_num")) {
+		return offsetof(struct ast_cel_event_record, caller_id_num);
+	} else if (ast_strings_equal(name, "cid_ani")) {
+		return offsetof(struct ast_cel_event_record, caller_id_ani);
+	} else if (ast_strings_equal(name, "cid_rdnis")) {
+		return offsetof(struct ast_cel_event_record, caller_id_rdnis);
+	} else if (ast_strings_equal(name, "cid_dnid")) {
+		return offsetof(struct ast_cel_event_record, caller_id_dnid);
+	} else if (ast_strings_equal(name, "exten")) {
+		return offsetof(struct ast_cel_event_record, extension);
+	} else if (ast_strings_equal(name, "context")) {
+		return offsetof(struct ast_cel_event_record, context);
+	} else if (ast_strings_equal(name, "channame")) {
+		return offsetof(struct ast_cel_event_record, channel_name);
+	} else if (ast_strings_equal(name, "appname")) {
+		return offsetof(struct ast_cel_event_record, application_name);
+	} else if (ast_strings_equal(name, "appdata")) {
+		return offsetof(struct ast_cel_event_record, application_data);
+	} else if (ast_strings_equal(name, "accountcode")) {
+		return offsetof(struct ast_cel_event_record, account_code);
+	} else if (ast_strings_equal(name, "peeraccount")) {
+		return offsetof(struct ast_cel_event_record, peer_account);
+	} else if (ast_strings_equal(name, "uniqueid")) {
+		return offsetof(struct ast_cel_event_record, unique_id);
+	} else if (ast_strings_equal(name, "linkedid")) {
+		return offsetof(struct ast_cel_event_record, linked_id);
+	} else if (ast_strings_equal(name, "userfield")) {
+		return offsetof(struct ast_cel_event_record, user_field);
+	} else if (ast_strings_equal(name, "peer")) {
+		return offsetof(struct ast_cel_event_record, peer);
+	} else if (ast_strings_equal(name, "amaflags")) {
+		return offsetof(struct ast_cel_event_record, amaflag);
+	} else if (ast_strings_equal(name, "extra")) {
+		return offsetof(struct ast_cel_event_record, extra);
+	} else if (ast_strings_equal(name, "eventtype")) {
+		return offsetof(struct ast_cel_event_record, event_type);
+	} else {
+		return -1;
+	}
+}
+
+static void construct_query(struct tables *table)
+{
+	struct columns *column;
+	struct ast_str *query = ast_str_create(2000);
+	int count = 0, i;
+
+	ast_str_append(&query, 0, "INSERT INTO `%s` (", table->name);
+
+	AST_LIST_TRAVERSE(&(table->columns), column, list) {
+		if (column->rva >= 0 || column->staticval) {
+			if (count > 0) {
+				ast_str_append(&query, 0, ", `%s`", column->name);
+			} else {
+				ast_str_append(&query, 0, "`%s`", column->name);
+			}
+			count++;
+		}
+	}
+
+	if (count == 0) {
+		/* Cannot construct query with zero values to insert. */
+		ast_free(query);
+		return;
+	}
+
+	ast_str_append(&query, 0, "%s", ") VALUES (");
+
+	for (i = 0; i < count; ++i) {
+		if (i > 0) {
+			ast_str_append(&query, 0, "%s", ", ?");
+		} else {
+			ast_str_append(&query, 0, "%s", "?");
+		}
+	}
+
+	ast_str_append(&query, 0, "%s", ")");
+	ast_string_field_set(table, query, ast_str_buffer(query));
+	ast_free(query);
+	ast_verb(3, "Constructed query for %s@%s: %s\n", table->name, table->connection, table->query);
+}
+
+static int fetch_table_columns(struct tables *table)
+{
+	struct odbc_obj *obj;
+	SQLHSTMT stmt = NULL;
+	struct columns *column;
+	char columnname[80];
+	int res;
+
+	obj = ast_odbc_request_obj(table->connection, 1);
+	if (!obj) {
+		ast_log(LOG_WARNING, "Connection '%s' unavailable at this time.\n", table->connection);
+		return 0;
+	}
+
+	res = SQLAllocHandle(SQL_HANDLE_STMT, obj->con, &stmt);
+	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+		ast_log(LOG_WARNING, "SQL Alloc Handle failed on connection '%s'!\n", table->connection);
+		ast_odbc_release_obj(obj);
+		return 0;
+	}
+
+	res = SQLColumns(stmt, NULL, 0, NULL, 0, (SQLCHAR*) table->name, SQL_NTS, NULL, 0);
+	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+		ast_log(LOG_ERROR, "Unable to query database columns on connection '%s'.\n", table->connection);
+		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+		ast_odbc_release_obj(obj);
+		return 0;
+	}
+
+	while (1) {
+		res = SQLFetch(stmt);
+
+		if (res == SQL_NO_DATA) {
+			/* No more columns, all done */
+			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+			ast_odbc_release_obj(obj);
+			return 1;
+		} else if (res == SQL_SUCCESS || res == SQL_SUCCESS_WITH_INFO) {
+			/* Got a column, save it */
+			column = create_column();
+			if (column) {
+				SQLGetData(stmt, 4, SQL_C_CHAR, columnname, sizeof(columnname), NULL);
+				ast_string_field_set(column, name, columnname);
+				column->rva = cel_record_rva(columnname);
+				AST_LIST_INSERT_TAIL(&(table->columns), column, list);
+			} else {
+				/* Out of memory, bail out */
+				destroy_columns(table);
+				SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+				ast_odbc_release_obj(obj);
+				return 0;
+			}
+		} else {
+			/* Something bad happened, bail out */
+			destroy_columns(table);
+			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+			ast_odbc_release_obj(obj);
+			return 0;
+		}
+	}
+}
+
+static void load_general_config(struct ast_config *cfg)
+{
+	struct ast_variable *var;
+
+	cel_show_user_def = CEL_SHOW_USERDEF_DEFAULT;
+
+	for (var = ast_variable_browse(cfg, "general"); var; var = var->next) {
+		if (strcasecmp(var->name, "show_user_defined") == 0) {
+			cel_show_user_def = ast_true(var->value) ? 1 : 0;
+		}
+	}
+}
+
+static char * strip_quotes(char *input)
+{
+	/* Only to be used with stack-allocated memory! */
+	if (input[0] == '"' && input[strlen(input) - 1] == '"') {
+		input[strlen(input) - 1] = '\0';
+		return input + 1;
+	}
+	return input;
+}
+
+static int load_table_config(
+	struct ast_config *cfg,
+	const char *category
+) {
+	struct tables *table;
+	struct ast_variable *var;
+	int rva;
+
+	table = create_table();
+	if (table == NULL) {
+		return 0;
+	}
+
+	ast_string_field_set(table, category, category);
+
+	for (var = ast_variable_browse(cfg, category); var; var = var->next) {
+		if (strcasecmp(var->name, "connection") == 0) {
+			ast_string_field_set(table, connection, var->value);
+		} else if (strcasecmp(var->name, "table") == 0) {
+			ast_string_field_set(table, name, var->value);
+		} else if (strcasecmp(var->name, "usegmtime") == 0) {
+			table->usegmtime = ast_true(var->value);
+		} else if (strcasecmp(var->name, "allowleapsecond") == 0) {
+			table->allowleapsec = ast_true(var->value);
+		} else if (strncmp(var->name, "alias", 5) == 0) {
+			char *source = ast_strip(ast_strdupa(var->name + 5));
+			rva = cel_record_rva(source);
+			if (rva >= 0) {
+				if (!append_alias(table, source, var->value, rva)) {
+					destroy_table(table);
+					return 0;
+				}
+			}
+		} else if (strncmp(var->name, "static", 6) == 0) {
+			char *value = strip_quotes(ast_strip(ast_strdupa(var->name + 6)));
+			if (!append_staticval(table, var->value, value)) {
+				destroy_table(table);
+				return 0;
+			}
+		} else if (strncmp(var->name, "filter", 6) == 0) {
+			char *celname = ast_strip(ast_strdupa(var->name + 6));
+			rva = cel_record_rva(celname);
+			if (rva >= 0) {
+				if (append_filter(table, celname, var->value, rva)) {
+					ast_verb(3, "Filtering CEL events matching %s = \"%s\" in %s@%s\n", celname, var->value, table->name, table->connection);
+				} else {
+					destroy_table(table);
+					return 0;
+				}
+			}
+		}
+	}
+
+	if (ast_strlen_zero(table->connection)) {
+		ast_log(LOG_WARNING, "No connection parameter found in '%s'.  Skipping.\n", category);
+		destroy_table(table);
+	} else {
+		ast_verb(3, "Found CEL table %s@%s.\n", table->name, table->connection);
+		AST_RWLIST_INSERT_TAIL(&odbc_tables, table, list);
+	}
+
+	return 1;
+}
+
+static void resolve_aliases(struct tables *table)
+{
+	struct columns *column;
+	struct aliases *alias;
+
+	/* Somewhat inefficient but not like we're doing this every insertion. */
+	AST_LIST_TRAVERSE(&(table->aliases), alias, list) {
+		AST_LIST_TRAVERSE(&(table->columns), column, list) {
+			if (ast_strings_equal(column->name, alias->target)) {
+				column->rva = alias->rva;
+				ast_verb(3, "Aliasing %s to %s in %s@%s\n", alias->source, alias->target, table->name, table->connection);
+				break;
+			}
+		}
+	}
+}
+
+static void resolve_staticvals(struct tables *table)
+{
+	struct columns *column;
+	struct staticvals *staticval;
+
+	/* Somewhat inefficient but not like we're doing this every insertion. */
+	AST_LIST_TRAVERSE(&(table->staticvals), staticval, list) {
+		AST_LIST_TRAVERSE(&(table->columns), column, list) {
+			if (ast_strings_equal(column->name, staticval->colname)) {
+				column->staticval = staticval->value;
+				ast_verb(3, "Forcing %s to \"%s\" in %s@%s\n", staticval->colname, staticval->value, table->name, table->connection);
+				break;
+			}
+		}
+	}
+}
+
+static int construct_queries(void)
+{
+	struct tables *table;
+	int failures = 0;
+
+	AST_LIST_TRAVERSE(&odbc_tables, table, list) {
+		if (!ast_strlen_zero(table->query)) {
+			continue;
+		} else if (fetch_table_columns(table)) {
+			resolve_aliases(table);
+			resolve_staticvals(table);
+			construct_query(table);
+		} else {
+			failures += 1;
+		}
+	}
+
+	return failures;
+}
 
 static int load_config(void)
 {
 	struct ast_config *cfg;
-	struct ast_variable *var;
-	const char *tmp, *catg;
-	struct tables *tableptr;
-	struct columns *entry;
-	struct odbc_obj *obj;
-	char columnname[80];
-	char connection[40];
-	char table[40];
-	int lenconnection, lentable;
-	SQLLEN sqlptr;
-	int res = 0;
-	SQLHSTMT stmt = NULL;
+	const char *catg;
+
 	struct ast_flags config_flags = { 0 }; /* Part of our config comes from the database */
 
 	cfg = ast_config_load(CONFIG, config_flags);
@@ -113,211 +670,149 @@
 		return -1;
 	}
 
-	/* Process the general category */
-	cel_show_user_def = CEL_SHOW_USERDEF_DEFAULT;
-	for (var = ast_variable_browse(cfg, "general"); var; var = var->next) {
-		if (!strcasecmp(var->name, "show_user_defined")) {
-			cel_show_user_def = ast_true(var->value) ? 1 : 0;
-		} else {
-			/* Unknown option name. */
-		}
-	}
-
 	for (catg = ast_category_browse(cfg, NULL); catg; catg = ast_category_browse(cfg, catg)) {
-		if (!strcasecmp(catg, "general")) {
-			continue;
+		if (strcasecmp(catg, "general") == 0) {
+			load_general_config(cfg);
+		} else if (!load_table_config(cfg, catg)) {
+			destroy_tables();
+			ast_config_destroy(cfg);
+			return -1;
 		}
-		var = ast_variable_browse(cfg, catg);
-		if (!var)
-			continue;
-
-		if (ast_strlen_zero(tmp = ast_variable_retrieve(cfg, catg, "connection"))) {
-			ast_log(LOG_WARNING, "No connection parameter found in '%s'.  Skipping.\n", catg);
-			continue;
-		}
-		ast_copy_string(connection, tmp, sizeof(connection));
-		lenconnection = strlen(connection);
-
-		/* When loading, we want to be sure we can connect. */
-		obj = ast_odbc_request_obj(connection, 1);
-		if (!obj) {
-			ast_log(LOG_WARNING, "No such connection '%s' in the '%s' section of " CONFIG ".  Check res_odbc.conf.\n", connection, catg);
-			continue;
-		}
-
-		if (ast_strlen_zero(tmp = ast_variable_retrieve(cfg, catg, "table"))) {
-			ast_log(LOG_NOTICE, "No table name found.  Assuming 'cel'.\n");
-			tmp = "cel";
-		}
-		ast_copy_string(table, tmp, sizeof(table));
-		lentable = strlen(table);
-
-		res = SQLAllocHandle(SQL_HANDLE_STMT, obj->con, &stmt);
-		if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
-			ast_log(LOG_WARNING, "SQL Alloc Handle failed on connection '%s'!\n", connection);
-			ast_odbc_release_obj(obj);
-			continue;
-		}
-
-		res = SQLColumns(stmt, NULL, 0, NULL, 0, (unsigned char *)table, SQL_NTS, (unsigned char *)"%", SQL_NTS);
-		if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
-			ast_log(LOG_ERROR, "Unable to query database columns on connection '%s'.  Skipping.\n", connection);
-			ast_odbc_release_obj(obj);
-			continue;
-		}
-
-		tableptr = ast_calloc(sizeof(char), sizeof(*tableptr) + lenconnection + 1 + lentable + 1);
-		if (!tableptr) {
-			ast_log(LOG_ERROR, "Out of memory creating entry for table '%s' on connection '%s'\n", table, connection);
-			ast_odbc_release_obj(obj);
-			res = -1;
-			break;
-		}
-
-		tableptr->connection = (char *)tableptr + sizeof(*tableptr);
-		tableptr->table = (char *)tableptr + sizeof(*tableptr) + lenconnection + 1;
-		ast_copy_string(tableptr->connection, connection, lenconnection + 1);
-		ast_copy_string(tableptr->table, table, lentable + 1);
-
-		tableptr->usegmtime = 0;
-		if (!ast_strlen_zero(tmp = ast_variable_retrieve(cfg, catg, "usegmtime"))) {
-			tableptr->usegmtime = ast_true(tmp);
-		}
-
-		tableptr->allowleapsec = 1;
-		if (!ast_strlen_zero(tmp = ast_variable_retrieve(cfg, catg, "allowleapsecond"))) {
-			tableptr->allowleapsec = ast_true(tmp);
-		}
-
-		ast_verb(3, "Found CEL table %s@%s.\n", tableptr->table, tableptr->connection);
-
-		/* Check for filters first */
-		for (var = ast_variable_browse(cfg, catg); var; var = var->next) {
-			if (strncmp(var->name, "filter", 6) == 0) {
-				char *celvar = ast_strdupa(var->name + 6);
-				celvar = ast_strip(celvar);
-				ast_verb(3, "Found filter %s for cel variable %s in %s@%s\n", var->value, celvar, tableptr->table, tableptr->connection);
-
-				entry = ast_calloc(sizeof(char), sizeof(*entry) + strlen(celvar) + 1 + strlen(var->value) + 1);
-				if (!entry) {
-					ast_log(LOG_ERROR, "Out of memory creating filter entry for CEL variable '%s' in table '%s' on connection '%s'\n", celvar, table, connection);
-					res = -1;
-					break;
-				}
-
-				/* NULL column entry means this isn't a column in the database */
-				entry->name = NULL;
-				entry->celname = (char *)entry + sizeof(*entry);
-				entry->filtervalue = (char *)entry + sizeof(*entry) + strlen(celvar) + 1;
-				strcpy(entry->celname, celvar);
-				strcpy(entry->filtervalue, var->value);
-
-				AST_LIST_INSERT_TAIL(&(tableptr->columns), entry, list);
-			}
-		}
-
-		while ((res = SQLFetch(stmt)) != SQL_NO_DATA && res != SQL_ERROR) {
-			char *celvar = "", *staticvalue = "";
-
-			SQLGetData(stmt,  4, SQL_C_CHAR, columnname, sizeof(columnname), &sqlptr);
-
-			/* Is there an alias for this column? */
-
-			/* NOTE: This seems like a non-optimal parse method, but I'm going
-			 * for user configuration readability, rather than fast parsing. We
-			 * really don't parse this file all that often, anyway.
-			 */
-			for (var = ast_variable_browse(cfg, catg); var; var = var->next) {
-				if (strncmp(var->name, "alias", 5) == 0 && strcasecmp(var->value, columnname) == 0) {
-					char *alias = ast_strdupa(var->name + 5);
-					celvar = ast_strip(alias);
-					ast_verb(3, "Found alias %s for column %s in %s@%s\n", celvar, columnname, tableptr->table, tableptr->connection);
-					break;
-				} else if (strncmp(var->name, "static", 6) == 0 && strcasecmp(var->value, columnname) == 0) {
-					char *item = ast_strdupa(var->name + 6);
-					item = ast_strip(item);
-					if (item[0] == '"' && item[strlen(item) - 1] == '"') {
-						/* Remove surrounding quotes */
-						item[strlen(item) - 1] = '\0';
-						item++;
-					}
-					staticvalue = item;
-				}
-			}
-
-			entry = ast_calloc(sizeof(char), sizeof(*entry) + strlen(columnname) + 1 + strlen(celvar) + 1 + strlen(staticvalue) + 1);
-			if (!entry) {
-				ast_log(LOG_ERROR, "Out of memory creating entry for column '%s' in table '%s' on connection '%s'\n", columnname, table, connection);
-				res = -1;
-				break;
-			}
-			entry->name = (char *)entry + sizeof(*entry);
-			strcpy(entry->name, columnname);
-
-			if (!ast_strlen_zero(celvar)) {
-				entry->celname = entry->name + strlen(columnname) + 1;
-				strcpy(entry->celname, celvar);
-			} else { /* Point to same place as the column name */
-				entry->celname = (char *)entry + sizeof(*entry);
-			}
-
-			if (!ast_strlen_zero(staticvalue)) {
-				entry->staticvalue = entry->celname + strlen(entry->celname) + 1;
-				strcpy(entry->staticvalue, staticvalue);
-			}
-
-			SQLGetData(stmt,  5, SQL_C_SHORT, &entry->type, sizeof(entry->type), NULL);
-			SQLGetData(stmt,  7, SQL_C_LONG, &entry->size, sizeof(entry->size), NULL);
-			SQLGetData(stmt,  9, SQL_C_SHORT, &entry->decimals, sizeof(entry->decimals), NULL);
-			SQLGetData(stmt, 10, SQL_C_SHORT, &entry->radix, sizeof(entry->radix), NULL);
-			SQLGetData(stmt, 11, SQL_C_SHORT, &entry->nullable, sizeof(entry->nullable), NULL);
-			SQLGetData(stmt, 16, SQL_C_LONG, &entry->octetlen, sizeof(entry->octetlen), NULL);
-
-			/* Specification states that the octenlen should be the maximum number of bytes
-			 * returned in a char or binary column, but it seems that some drivers just set
-			 * it to NULL. (Bad Postgres! No biscuit!) */
-			if (entry->octetlen == 0)
-				entry->octetlen = entry->size;
-
-			ast_verb(10, "Found %s column with type %hd with len %ld, octetlen %ld, and numlen (%hd,%hd)\n", entry->name, entry->type, (long) entry->size, (long) entry->octetlen, entry->decimals, entry->radix);
-			/* Insert column info into column list */
-			AST_LIST_INSERT_TAIL(&(tableptr->columns), entry, list);
-			res = 0;
-		}
-
-		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-		ast_odbc_release_obj(obj);
-
-		if (AST_LIST_FIRST(&(tableptr->columns)))
-			AST_RWLIST_INSERT_TAIL(&odbc_tables, tableptr, list);
-		else
-			ast_free(tableptr);
 	}
+
 	ast_config_destroy(cfg);
-	return res;
+
+	return construct_queries();
 }
 
-static int free_config(void)
+static void *mb_manager_thread(void *data)
 {
-	struct tables *table;
-	struct columns *entry;
-	while ((table = AST_RWLIST_REMOVE_HEAD(&odbc_tables, list))) {
-		while ((entry = AST_LIST_REMOVE_HEAD(&(table->columns), list))) {
-			ast_free(entry);
-		}
-		ast_free(table);
+	int failed = 0;
+
+	ast_verb(3, "CEL ODBC Manager started.\n");
+
+	while (AST_RWLIST_WRLOCK(&odbc_tables)) {
+		ast_log(LOG_WARNING, "Unable to lock table list for modification. Retrying.\n");
 	}
-	return 0;
+	failed = load_config();
+	AST_RWLIST_UNLOCK(&odbc_tables);
+
+	while (failed) {
+		if (wait_for_signal() == ETIMEDOUT) {
+			while (AST_RWLIST_WRLOCK(&odbc_tables)) {
+				ast_log(LOG_WARNING, "Unable to lock table list for modification. Retrying.\n");
+			}
+			ast_verb(3, "Retrying %d failed table(s).\n", failed);
+			failed = construct_queries();
+			AST_RWLIST_UNLOCK(&odbc_tables);
+		} else {
+			ast_verb(3, "Terminating CEL ODBC Manager.\n");
+			return NULL;
+		}
+	}
+
+	ast_verb(3, "Terminating CEL ODBC Manager, no longer needed.\n");
+
+	return NULL;
 }
 
-static SQLHSTMT generic_prepare(struct odbc_obj *obj, void *data)
+static int start_table_manager(void)
 {
+	int errcode = 0;
+
+	ast_sem_init(&manager_sem, 0, 0);
+
+	if ((errcode = ast_pthread_create(&manager_thread, NULL, mb_manager_thread, NULL))) {
+		ast_log(LOG_ERROR, "Could not create thread: %s\n", strerror(errcode));
+		return 0;
+	} else {
+		return 1;
+	}
+}
+
+static void stop_table_manager(void)
+{
+	ast_sem_post(&manager_sem);
+	pthread_join(manager_thread, NULL);
+	manager_thread = AST_PTHREADT_NULL;
+	ast_sem_destroy(&manager_sem);
+}
+
+/* Converting from const pointer to non-const (SQLPOINTER) is SAFE.
+ *
+ * SQLBindParameter is designed for bi-directional data transfer.
+ * Logically there is no reason why a driver would clobber memory
+ * without asking and I have confirmed this hypothesis with the help
+ * of valgrind and a test. Valgrind did not report any memory
+ * violations and no segfault occured trying to write to
+ * write-protected memory.
+ *
+ * I also found that leaving all length-related parameters is fine.
+ * The driver is able to deduce the size of our parameters just fine
+ * from the value and parameter types alone. */
+
+static void bind_parameter(
+	SQLHSTMT stmt,
+	const SQL_TIMESTAMP_STRUCT *event_time,
+	const struct ast_cel_event_record *record,
+	const struct tables *table,
+	int index,
+	int rva
+) {
+	if (rva == offsetof(struct ast_cel_event_record, event_time)) {
+		/* Special case for timestamp. */
+		SQLBindParameter(stmt, index, SQL_PARAM_INPUT, SQL_C_TIMESTAMP,
+			SQL_TYPE_TIMESTAMP, 0, 0, (SQLPOINTER) event_time, 0, NULL);
+	} else if (rva == offsetof(struct ast_cel_event_record, event_type)) {
+		/* Special case for event type. */
+		if (!cel_show_user_def && record->event_type == AST_CEL_USER_DEFINED) {
+			SQLBindParameter(stmt, index, SQL_PARAM_INPUT, SQL_C_CHAR,
+				SQL_CHAR, 0, 0, (SQLPOINTER) record->user_defined_name, 0, NULL);
+		} else {
+			SQLBindParameter(stmt, index, SQL_PARAM_INPUT, SQL_C_CHAR,
+				SQL_CHAR, 0, 0, (SQLPOINTER) record->event_name, 0, NULL);
+		}
+	} else if (rva == offsetof(struct ast_cel_event_record, amaflag)) {
+		/* Special case for flags. */
+		SQLBindParameter(stmt, index, SQL_PARAM_INPUT, SQL_C_ULONG,
+			SQL_INTEGER, 0, 0, (SQLPOINTER) &record->amaflag, 0, NULL);
+	} else {
+		/* Obtain pointer to string value by adding rva offset to record base address */
+		char ** string_ptr = (char **) (((char *) record) + rva);
+		SQLBindParameter(stmt, index, SQL_PARAM_INPUT, SQL_C_CHAR,
+			SQL_CHAR, 0, 0, (SQLPOINTER) *string_ptr, 0, NULL);
+	}
+}
+
+static void bind_parameters(
+	SQLHSTMT stmt,
+	const SQL_TIMESTAMP_STRUCT *event_time,
+	const struct ast_cel_event_record *record,
+	const struct tables *table
+) {
+	struct columns *column;
+	int index = 1;
+
+	AST_LIST_TRAVERSE(&(table->columns), column, list) {
+		if (column->staticval) {
+			SQLBindParameter(stmt, index++, SQL_PARAM_INPUT, SQL_C_CHAR,
+				SQL_CHAR, 0, 0, (SQLPOINTER) column->staticval, 0, NULL);
+		} else if  (column->rva >= 0) {
+			bind_parameter(stmt, event_time, record, table, index++, column->rva);
+		}
+	}
+}
+
+static SQLHSTMT prepare_query(
+	struct odbc_obj *obj,
+	void *data
+) {
 	int res, i;
-	char *sql = data;
+	const struct prepare_params *params = data;
 	SQLHSTMT stmt;
 	SQLINTEGER nativeerror = 0, numfields = 0;
 	SQLSMALLINT diagbytes = 0;
-	unsigned char state[10], diagnostic[256];
+	unsigned char state[10], diagnostic[512];
 
 	res = SQLAllocHandle (SQL_HANDLE_STMT, obj->con, &stmt);
 	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
@@ -325,9 +820,14 @@
 		return NULL;
 	}
 
-	res = ast_odbc_prepare(obj, stmt, sql);
-	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
-		ast_log(LOG_WARNING, "SQL Prepare failed![%s]\n", sql);
+	ast_debug(3, "Executing SQL statement: [%s]\n", params->table->query);
+	bind_parameters(stmt, &params->event_time, &params->record, params->table);
+	res = ast_odbc_prepare(obj, stmt, params->table->query);
+
+	if (res == SQL_SUCCESS || res == SQL_SUCCESS_WITH_INFO) {
+		return stmt;
+	} else {
+		ast_log(LOG_WARNING, "SQL Prepare failed![%s]\n", params->table->query);
 		SQLGetDiagField(SQL_HANDLE_STMT, stmt, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
 		for (i = 0; i < numfields; i++) {
 			SQLGetDiagRec(SQL_HANDLE_STMT, stmt, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
@@ -340,489 +840,128 @@
 		SQLFreeHandle (SQL_HANDLE_STMT, stmt);
 		return NULL;
 	}
-
-	return stmt;
 }
 
-#define LENGTHEN_BUF1(size)														\
-			do {																\
-				/* Lengthen buffer, if necessary */								\
-				if (ast_str_strlen(sql) + size + 1 > ast_str_size(sql)) {		\
-					if (ast_str_make_space(&sql, ((ast_str_size(sql) + size + 1) / 512 + 1) * 512) != 0) { \
-						ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CEL '%s:%s' failed.\n", tableptr->connection, tableptr->table); \
-						ast_free(sql);											\
-						ast_free(sql2);											\
-						AST_RWLIST_UNLOCK(&odbc_tables);						\
-						return;													\
-					}															\
-				}																\
-			} while (0)
+static int event_filtered_out(
+	const struct tables *table,
+	const struct ast_cel_event_record *record
+) {
+	/* Adding a filter means that only events matching the filter will
+	 * be written to the table. Multiple filters behave like a logical AND,
+	 * meaning the event must match all filters. */
 
-#define LENGTHEN_BUF2(size)														\
-			do {																\
-				if (ast_str_strlen(sql2) + size + 1 > ast_str_size(sql2)) {		\
-					if (ast_str_make_space(&sql2, ((ast_str_size(sql2) + size + 3) / 512 + 1) * 512) != 0) { \
-						ast_log(LOG_ERROR, "Unable to allocate sufficient memory.  Insert CEL '%s:%s' failed.\n", tableptr->connection, tableptr->table); \
-						ast_free(sql);											\
-						ast_free(sql2);											\
-						AST_RWLIST_UNLOCK(&odbc_tables);						\
-						return;													\
-					}															\
-				}																\
-			} while (0)
+	struct filters *filter;
+
+	AST_LIST_TRAVERSE(&(table->filters), filter, list) {
+		if (filter->rva == offsetof(struct ast_cel_event_record, event_time)) {
+			/* Filtering timestamps is ignored because pointless. */
+		} else if (filter->rva == offsetof(struct ast_cel_event_record, event_type)) {
+			/* Special case for event type. */
+			if (!cel_show_user_def && record->event_type == AST_CEL_USER_DEFINED) {
+				if (!ast_strings_equal(record->user_defined_name, filter->value)) {
+					return 1;
+				}
+			} else {
+				if (!ast_strings_equal(record->event_name, filter->value)) {
+					return 1;
+				}
+			}
+		} else if (filter->rva == offsetof(struct ast_cel_event_record, amaflag)) {
+			/* Special case for flags. */
+			int len = snprintf(NULL, 0, "%u", record->amaflag) + 1;
+			char * buffer = ast_alloca(len);
+			snprintf(buffer, len, "%u", record->amaflag);
+			if (!ast_strings_equal(buffer, filter->value)) {
+				return 1;
+			}
+		} else if (filter->rva >= 0) {
+			/* Obtain pointer to CEL string variable by adding rva to
+			 * base address of CEL record. Then obtain string value by
+			 * dereferencing that pointer. */
+			const char * value = *(const char **) (((const char *) record) + filter->rva);
+			if (!ast_strings_equal(value, filter->value)) {
+				return 1;
+			}
+		}
+	}
+
+	return 0;
+}
+
+static void timeval_to_sql_timestamp(
+	SQL_TIMESTAMP_STRUCT * timestamp,
+	const struct timeval * event_time,
+	const int usegmtime
+) {
+	struct timeval date_tv = *event_time;
+	struct ast_tm tm = { 0, };
+	ast_localtime(&date_tv, &tm, usegmtime ? "UTC" : NULL);
+
+	timestamp->year = tm.tm_year + 1900;
+	timestamp->month = tm.tm_mon + 1;
+	timestamp->day = tm.tm_mday;
+	timestamp->hour = tm.tm_hour;
+	timestamp->minute = tm.tm_min;
+	timestamp->second = tm.tm_sec;
+	timestamp->fraction = tm.tm_usec;
+}
 
 static void odbc_log(struct ast_event *event)
 {
-	struct tables *tableptr;
-	struct columns *entry;
+	struct tables *table;
 	struct odbc_obj *obj;
-	struct ast_str *sql = ast_str_create(maxsize), *sql2 = ast_str_create(maxsize2);
-	char *tmp;
-	char colbuf[1024], *colptr;
 	SQLHSTMT stmt = NULL;
 	SQLLEN rows = 0;
-	struct ast_cel_event_record record = {
-		.version = AST_CEL_EVENT_RECORD_VERSION,
-	};
+	struct prepare_params params;
 
-	if (ast_cel_fill_record(event, &record)) {
-		return;
-	}
+	params.record.version = AST_CEL_EVENT_RECORD_VERSION;
 
-	if (!sql || !sql2) {
-		if (sql)
-			ast_free(sql);
-		if (sql2)
-			ast_free(sql2);
+	if (ast_cel_fill_record(event, &params.record)) {
 		return;
 	}
 
 	if (AST_RWLIST_RDLOCK(&odbc_tables)) {
 		ast_log(LOG_ERROR, "Unable to lock table list.  Insert CEL(s) failed.\n");
-		ast_free(sql);
-		ast_free(sql2);
 		return;
 	}
 
-	AST_LIST_TRAVERSE(&odbc_tables, tableptr, list) {
-		int first = 1;
-		ast_str_set(&sql, 0, "INSERT INTO %s (", tableptr->table);
-		ast_str_set(&sql2, 0, " VALUES (");
+	AST_LIST_TRAVERSE(&odbc_tables, table, list) {
 
-		/* No need to check the connection now; we'll handle any failure in prepare_and_execute */
-		if (!(obj = ast_odbc_request_obj(tableptr->connection, 0))) {
-			ast_log(LOG_WARNING, "Unable to retrieve database handle for '%s:%s'.  CEL failed: %s\n", tableptr->connection, tableptr->table, ast_str_buffer(sql));
+		if (ast_strlen_zero(table->query)) {
 			continue;
 		}
 
-		AST_LIST_TRAVERSE(&(tableptr->columns), entry, list) {
-			int datefield = 0;
-			int unknown = 0;
-			if (strcasecmp(entry->celname, "eventtime") == 0) {
-				datefield = 1;
-			}
-
-			/* Check if we have a similarly named variable */
-			if (entry->staticvalue) {
-				colptr = ast_strdupa(entry->staticvalue);
-			} else if (datefield) {
-				struct timeval date_tv = record.event_time;
-				struct ast_tm tm = { 0, };
-				ast_localtime(&date_tv, &tm, tableptr->usegmtime ? "UTC" : NULL);
-				/* SQL server 2008 added datetime2 and datetimeoffset data types, that
-				   are reported to SQLColumns() as SQL_WVARCHAR, according to "Enhanced
-				   Date/Time Type Behavior with Previous SQL Server Versions (ODBC)".
-				   Here we format the event time with fraction seconds, so these new
-				   column types will be set to high-precision event time. However, 'date'
-				   and 'time' columns, also newly introduced, reported as SQL_WVARCHAR
-				   too, and insertion of the value formatted here into these will fail.
-				   This should be ok, however, as nobody is going to store just event
-				   date or just time for CDR purposes.
-				 */
-				ast_strftime(colbuf, sizeof(colbuf), "%Y-%m-%d %H:%M:%S.%6q", &tm);
-				colptr = colbuf;
-			} else {
-				if (strcmp(entry->celname, "userdeftype") == 0) {
-					ast_copy_string(colbuf, record.user_defined_name, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "cid_name") == 0) {
-					ast_copy_string(colbuf, record.caller_id_name, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "cid_num") == 0) {
-					ast_copy_string(colbuf, record.caller_id_num, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "cid_ani") == 0) {
-					ast_copy_string(colbuf, record.caller_id_ani, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "cid_rdnis") == 0) {
-					ast_copy_string(colbuf, record.caller_id_rdnis, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "cid_dnid") == 0) {
-					ast_copy_string(colbuf, record.caller_id_dnid, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "exten") == 0) {
-					ast_copy_string(colbuf, record.extension, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "context") == 0) {
-					ast_copy_string(colbuf, record.context, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "channame") == 0) {
-					ast_copy_string(colbuf, record.channel_name, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "appname") == 0) {
-					ast_copy_string(colbuf, record.application_name, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "appdata") == 0) {
-					ast_copy_string(colbuf, record.application_data, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "accountcode") == 0) {
-					ast_copy_string(colbuf, record.account_code, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "peeraccount") == 0) {
-					ast_copy_string(colbuf, record.peer_account, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "uniqueid") == 0) {
-					ast_copy_string(colbuf, record.unique_id, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "linkedid") == 0) {
-					ast_copy_string(colbuf, record.linked_id, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "userfield") == 0) {
-					ast_copy_string(colbuf, record.user_field, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "peer") == 0) {
-					ast_copy_string(colbuf, record.peer, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "amaflags") == 0) {
-					snprintf(colbuf, sizeof(colbuf), "%u", record.amaflag);
-				} else if (strcmp(entry->celname, "extra") == 0) {
-					ast_copy_string(colbuf, record.extra, sizeof(colbuf));
-				} else if (strcmp(entry->celname, "eventtype") == 0) {
-					snprintf(colbuf, sizeof(colbuf), "%u", record.event_type);
-				} else {
-					colbuf[0] = 0;
-					unknown = 1;
-				}
-				colptr = colbuf;
-			}
-
-			if (colptr && !unknown) {
-				/* Check first if the column filters this entry.  Note that this
-				 * is very specifically NOT ast_strlen_zero(), because the filter
-				 * could legitimately specify that the field is blank, which is
-				 * different from the field being unspecified (NULL). */
-				if (entry->filtervalue && strcasecmp(colptr, entry->filtervalue) != 0) {
-					ast_verb(4, "CEL column '%s' with value '%s' does not match filter of"
-						" '%s'.  Cancelling this CEL.\n",
-						entry->celname, colptr, entry->filtervalue);
-					goto early_release;
-				}
-
-				/* Only a filter? */
-				if (ast_strlen_zero(entry->name))
-					continue;
-
-				LENGTHEN_BUF1(strlen(entry->name));
-
-				switch (entry->type) {
-				case SQL_CHAR:
-				case SQL_VARCHAR:
-				case SQL_LONGVARCHAR:
-#ifdef HAVE_ODBC_WCHAR
-				case SQL_WCHAR:
-				case SQL_WVARCHAR:
-				case SQL_WLONGVARCHAR:
-#endif
-				case SQL_BINARY:
-				case SQL_VARBINARY:
-				case SQL_LONGVARBINARY:
-				case SQL_GUID:
-					/* For these two field names, get the rendered form, instead of the raw
-					 * form (but only when we're dealing with a character-based field).
-					 */
-					if (strcasecmp(entry->name, "eventtype") == 0) {
-						const char *event_name;
-
-						event_name = (!cel_show_user_def
-							&& record.event_type == AST_CEL_USER_DEFINED)
-							? record.user_defined_name : record.event_name;
-						snprintf(colbuf, sizeof(colbuf), "%s", event_name);
-					}
-
-					/* Truncate too-long fields */
-					if (entry->type != SQL_GUID) {
-						if (strlen(colptr) > entry->octetlen) {
-							colptr[entry->octetlen] = '\0';
-						}
-					}
-
-					ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-					LENGTHEN_BUF2(strlen(colptr));
-
-					/* Encode value, with escaping */
-					ast_str_append(&sql2, 0, "%s'", first ? "" : ",");
-					for (tmp = colptr; *tmp; tmp++) {
-						if (*tmp == '\'') {
-							ast_str_append(&sql2, 0, "''");
-						} else if (*tmp == '\\' && ast_odbc_backslash_is_escape(obj)) {
-							ast_str_append(&sql2, 0, "\\\\");
-						} else {
-							ast_str_append(&sql2, 0, "%c", *tmp);
-						}
-					}
-					ast_str_append(&sql2, 0, "'");
-					break;
-				case SQL_TYPE_DATE:
-					if (ast_strlen_zero(colptr)) {
-						continue;
-					} else {
-						int year = 0, month = 0, day = 0;
-						if (strcasecmp(entry->name, "eventdate") == 0) {
-							struct ast_tm tm;
-							ast_localtime(&record.event_time, &tm, tableptr->usegmtime ? "UTC" : NULL);
-							year = tm.tm_year + 1900;
-							month = tm.tm_mon + 1;
-							day = tm.tm_mday;
-						} else {
-							if (sscanf(colptr, "%4d-%2d-%2d", &year, &month, &day) != 3 || year <= 0 ||
-								month <= 0 || month > 12 || day < 0 || day > 31 ||
-								((month == 4 || month == 6 || month == 9 || month == 11) && day == 31) ||
-								(month == 2 && year % 400 == 0 && day > 29) ||
-								(month == 2 && year % 100 == 0 && day > 28) ||
-								(month == 2 && year % 4 == 0 && day > 29) ||
-								(month == 2 && year % 4 != 0 && day > 28)) {
-								ast_log(LOG_WARNING, "CEL variable %s is not a valid date ('%s').\n", entry->name, colptr);
-								continue;
-							}
-
-							if (year > 0 && year < 100) {
-								year += 2000;
-							}
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(17);
-						ast_str_append(&sql2, 0, "%s{d '%04d-%02d-%02d'}", first ? "" : ",", year, month, day);
-					}
-					break;
-				case SQL_TYPE_TIME:
-					if (ast_strlen_zero(colptr)) {
-						continue;
-					} else {
-						int hour = 0, minute = 0, second = 0;
-						if (strcasecmp(entry->name, "eventdate") == 0) {
-							struct ast_tm tm;
-							ast_localtime(&record.event_time, &tm, tableptr->usegmtime ? "UTC" : NULL);
-							hour = tm.tm_hour;
-							minute = tm.tm_min;
-							second = (tableptr->allowleapsec || tm.tm_sec < 60) ? tm.tm_sec : 59;
-						} else {
-							int count = sscanf(colptr, "%2d:%2d:%2d", &hour, &minute, &second);
-
-							if ((count != 2 && count != 3) || hour < 0 || hour > 23 || minute < 0 || minute > 59 || second < 0 || second > (tableptr->allowleapsec ? 60 : 59)) {
-								ast_log(LOG_WARNING, "CEL variable %s is not a valid time ('%s').\n", entry->name, colptr);
-								continue;
-							}
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(15);
-						ast_str_append(&sql2, 0, "%s{t '%02d:%02d:%02d'}", first ? "" : ",", hour, minute, second);
-					}
-					break;
-				case SQL_TYPE_TIMESTAMP:
-				case SQL_TIMESTAMP:
-					if (ast_strlen_zero(colptr)) {
-						continue;
-					} else {
-						if (datefield) {
-							/*
-							 * We've already properly formatted the timestamp so there's no need
-							 * to parse it and re-format it.
-							 */
-							ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-							LENGTHEN_BUF2(27);
-							ast_str_append(&sql2, 0, "%s{ts '%s'}", first ? "" : ",", colptr);
-						} else {
-							int year = 0, month = 0, day = 0, hour = 0, minute = 0;
-							/* MUST use double for microsecond precision */
-							double second = 0.0;
-							if (strcasecmp(entry->name, "eventdate") == 0) {
-								/*
-								 * There doesn't seem to be any reference to 'eventdate' anywhere
-								 * other than in this module.  It should be considered for removal
-								 * at a later date.
-								 */
-								struct ast_tm tm;
-								ast_localtime(&record.event_time, &tm, tableptr->usegmtime ? "UTC" : NULL);
-								year = tm.tm_year + 1900;
-								month = tm.tm_mon + 1;
-								day = tm.tm_mday;
-								hour = tm.tm_hour;
-								minute = tm.tm_min;
-								second = (tableptr->allowleapsec || tm.tm_sec < 60) ? tm.tm_sec : 59;
-								second += (tm.tm_usec / 1000000.0);
-							} else {
-								/*
-								 * If we're here, the data to be inserted MAY be a timestamp
-								 * but the column is.  We parse as much as we can.
-								 */
-								int count = sscanf(colptr, "%4d-%2d-%2d %2d:%2d:%lf", &year, &month, &day, &hour, &minute, &second);
-
-								if ((count != 3 && count != 5 && count != 6) || year <= 0 ||
-									month <= 0 || month > 12 || day < 0 || day > 31 ||
-									((month == 4 || month == 6 || month == 9 || month == 11) && day == 31) ||
-									(month == 2 && year % 400 == 0 && day > 29) ||
-									(month == 2 && year % 100 == 0 && day > 28) ||
-									(month == 2 && year % 4 == 0 && day > 29) ||
-									(month == 2 && year % 4 != 0 && day > 28) ||
-									hour > 23 || minute > 59 || ((int)floor(second)) > (tableptr->allowleapsec ? 60 : 59) ||
-									hour < 0 || minute < 0 || ((int)floor(second)) < 0) {
-									ast_log(LOG_WARNING, "CEL variable %s is not a valid timestamp ('%s').\n", entry->name, colptr);
-									continue;
-								}
-
-								if (year > 0 && year < 100) {
-									year += 2000;
-								}
-							}
-
-							ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-							LENGTHEN_BUF2(27);
-							ast_str_append(&sql2, 0, "%s{ts '%04d-%02d-%02d %02d:%02d:%09.6lf'}", first ? "" : ",", year, month, day, hour, minute, second);
-						}
-					}
-					break;
-				case SQL_INTEGER:
-					{
-						int integer = 0;
-						if (sscanf(colptr, "%30d", &integer) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an integer.\n", entry->name);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(12);
-						ast_str_append(&sql2, 0, "%s%d", first ? "" : ",", integer);
-					}
-					break;
-				case SQL_BIGINT:
-					{
-						long long integer = 0;
-						int ret;
-						if ((ret = sscanf(colptr, "%30lld", &integer)) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an integer. (%d - '%s')\n", entry->name, ret, colptr);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(24);
-						ast_str_append(&sql2, 0, "%s%lld", first ? "" : ",", integer);
-					}
-					break;
-				case SQL_SMALLINT:
-					{
-						short integer = 0;
-						if (sscanf(colptr, "%30hd", &integer) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an integer.\n", entry->name);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(7);
-						ast_str_append(&sql2, 0, "%s%d", first ? "" : ",", integer);
-					}
-					break;
-				case SQL_TINYINT:
-					{
-						signed char integer = 0;
-						if (sscanf(colptr, "%30hhd", &integer) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an integer.\n", entry->name);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(4);
-						ast_str_append(&sql2, 0, "%s%d", first ? "" : ",", integer);
-					}
-					break;
-				case SQL_BIT:
-					{
-						signed char integer = 0;
-						if (sscanf(colptr, "%30hhd", &integer) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an integer.\n", entry->name);
-							continue;
-						}
-						if (integer != 0)
-							integer = 1;
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(2);
-						ast_str_append(&sql2, 0, "%s%d", first ? "" : ",", integer);
-					}
-					break;
-				case SQL_NUMERIC:
-				case SQL_DECIMAL:
-					{
-						double number = 0.0;
-						if (sscanf(colptr, "%30lf", &number) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an numeric type.\n", entry->name);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(entry->decimals + 2);
-						ast_str_append(&sql2, 0, "%s%*.*lf", first ? "" : ",", entry->decimals, entry->radix, number);
-					}
-					break;
-				case SQL_FLOAT:
-				case SQL_REAL:
-				case SQL_DOUBLE:
-					{
-						double number = 0.0;
-						if (sscanf(colptr, "%30lf", &number) != 1) {
-							ast_log(LOG_WARNING, "CEL variable %s is not an numeric type.\n", entry->name);
-							continue;
-						}
-
-						ast_str_append(&sql, 0, "%s%s", first ? "" : ",", entry->name);
-						LENGTHEN_BUF2(entry->decimals);
-						ast_str_append(&sql2, 0, "%s%lf", first ? "" : ",", number);
-					}
-					break;
-				default:
-					ast_log(LOG_WARNING, "Column type %d (field '%s:%s:%s') is unsupported at this time.\n", entry->type, tableptr->connection, tableptr->table, entry->name);
-					continue;
-				}
-				first = 0;
-			}
+		if (event_filtered_out(table, &params.record)) {
+			continue;
 		}
 
-		/* Concatenate the two constructed buffers */
-		LENGTHEN_BUF1(ast_str_strlen(sql2));
-		ast_str_append(&sql, 0, ")");
-		ast_str_append(&sql2, 0, ")");
-		ast_str_append(&sql, 0, "%s", ast_str_buffer(sql2));
+		/* No need to check the connection now; we'll handle any failure in prepare_and_execute */
+		if (!(obj = ast_odbc_request_obj(table->connection, 0))) {
+			ast_log(LOG_WARNING, "Unable to retrieve database handle for '%s:%s'.  CEL failed: %s\n", table->connection, table->name, table->query);
+			continue;
+		}
 
-		ast_debug(3, "Executing SQL statement: [%s]\n", ast_str_buffer(sql));
-		stmt = ast_odbc_prepare_and_execute(obj, generic_prepare, ast_str_buffer(sql));
+		params.table = table;
+		timeval_to_sql_timestamp(&params.event_time, &params.record.event_time, table->usegmtime);
+
+		stmt = ast_odbc_prepare_and_execute(obj, prepare_query, &params);
 		if (stmt) {
 			SQLRowCount(stmt, &rows);
 			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
 		}
 		if (rows == 0) {
-			ast_log(LOG_WARNING, "Insert failed on '%s:%s'.  CEL failed: %s\n", tableptr->connection, tableptr->table, ast_str_buffer(sql));
+			ast_log(LOG_WARNING, "Insert failed on '%s:%s'.  CEL failed: %s\n", table->connection, table->name, table->query);
 		}
-early_release:
 		ast_odbc_release_obj(obj);
 	}
 	AST_RWLIST_UNLOCK(&odbc_tables);
-
-	/* Next time, just allocate buffers that are that big to start with. */
-	if (ast_str_strlen(sql) > maxsize) {
-		maxsize = ast_str_strlen(sql);
-	}
-	if (ast_str_strlen(sql2) > maxsize2) {
-		maxsize2 = ast_str_strlen(sql2);
-	}
-
-	ast_free(sql);
-	ast_free(sql2);
 }
 
 static int unload_module(void)
 {
-	if (AST_RWLIST_WRLOCK(&odbc_tables)) {
-		ast_log(LOG_ERROR, "Unable to lock column list.  Unload failed.\n");
-		return -1;
-	}
-
 	ast_cel_backend_unregister(ODBC_BACKEND_NAME);
-	free_config();
-	AST_RWLIST_UNLOCK(&odbc_tables);
+	stop_table_manager();
+	destroy_tables_safely();
 	AST_RWLIST_HEAD_DESTROY(&odbc_tables);
 
 	return 0;
@@ -832,34 +971,30 @@
 {
 	AST_RWLIST_HEAD_INIT(&odbc_tables);
 
-	if (AST_RWLIST_WRLOCK(&odbc_tables)) {
-		ast_log(LOG_ERROR, "Unable to lock column list.  Load failed.\n");
-		return AST_MODULE_LOAD_DECLINE;
-	}
-	load_config();
-	AST_RWLIST_UNLOCK(&odbc_tables);
 	if (ast_cel_backend_register(ODBC_BACKEND_NAME, odbc_log)) {
-		ast_log(LOG_ERROR, "Unable to subscribe to CEL events\n");
-		free_config();
+		ast_log(LOG_ERROR, "Unable to subscribe to CEL events.\n");
 		return AST_MODULE_LOAD_DECLINE;
 	}
-	return AST_MODULE_LOAD_SUCCESS;
+
+	if (start_table_manager()) {
+		return AST_MODULE_LOAD_SUCCESS;
+	} else {
+		ast_cel_backend_unregister(ODBC_BACKEND_NAME);
+		AST_RWLIST_HEAD_DESTROY(&odbc_tables);
+		return AST_MODULE_LOAD_DECLINE;
+	}
 }
 
 static int reload(void)
 {
-	if (AST_RWLIST_WRLOCK(&odbc_tables)) {
-		ast_log(LOG_ERROR, "Unable to lock column list.  Reload failed.\n");
-		return AST_MODULE_LOAD_DECLINE;
-	}
+	stop_table_manager();
+	destroy_tables_safely();
+	start_table_manager();
 
-	free_config();
-	load_config();
-	AST_RWLIST_UNLOCK(&odbc_tables);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 
-AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "ODBC CEL backend",
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, ODBC_BACKEND_NAME,
 	.support_level = AST_MODULE_SUPPORT_CORE,
 	.load = load_module,
 	.unload = unload_module,

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/14698
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Change-Id: Id85d81add33096f8282d212daf239f2fc845d783
Gerrit-Change-Number: 14698
Gerrit-PatchSet: 1
Gerrit-Owner: Dennis <dennis.buteyn at xorcom.com>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20200727/bcca8cbd/attachment-0001.html>


More information about the asterisk-code-review mailing list