[Asterisk-code-review] res odbc: Implement a connection pool. (asterisk[master])

Joshua Colp asteriskteam at digium.com
Mon Jun 6 10:26:47 CDT 2016


Joshua Colp has uploaded a new change for review.

  https://gerrit.asterisk.org/2945

Change subject: res_odbc: Implement a connection pool.
......................................................................

res_odbc: Implement a connection pool.

Testing has shown that our usage of UnixODBC is problematic
due to bugs within UnixODBC itself as well as the heavy weight
cost of connecting and disconnecting database connections, even
when pooling is enabled.

For users of UnixODBC 2.3.1 and earlier crashes would occur due
to insufficient protection of the disconnect operation. This was
fixed in UnixODBC 2.3.2 and above.

For users of UnixODBC 2.3.3 and higher a slow-down would occur
under heavy database use due to repeated connection establishment.
A regression is present where on each connection the database
configuration is cached again, with the cache growing out of
control.

The connection pool implementation present in this change helps
to mitigate these issues by reducing how much we connect and
disconnect database connections. We also solve the issue of
crashes under UnixODBC 2.3.1 by defaulting the maximum number of
connections to 1, returning us to the previous working behavior.
For users who may have a fixed version the maximum concurrent
connection limit can be increased helping with performance.

The connection pool works by keeping a list of active connections.
If the connection limit has not been reached a new connection is
established. If the connection limit has been reached then the
request waits until a connection becomes available before
continuing.

ASTERISK-26074 #close
ASTERISK-26054 #close

Change-Id: I6774bf4bac49a0b30242c76a09c403d2e856ecff
---
M CHANGES
M configs/samples/res_odbc.conf.sample
M res/res_odbc.c
3 files changed, 181 insertions(+), 55 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/45/2945/1

diff --git a/CHANGES b/CHANGES
index 608a4a4..8275a99 100644
--- a/CHANGES
+++ b/CHANGES
@@ -381,6 +381,12 @@
    valid value using the specified 'uuid_type', the module may fallback to a
    more readily available source for the correlation UUID.
 
+res_odbc
+------------------
+ * A new option has been added, 'max_connections', which sets the maximum number
+   of concurrent connections to the database. This option defaults to 1 which
+   returns the behavior to that of Asterisk 13.7 and prior.
+
 app_confbridge
 ------------------
  * Added a bridge profile option called regcontext that allows you to
diff --git a/configs/samples/res_odbc.conf.sample b/configs/samples/res_odbc.conf.sample
index 66659ae..a21e96d 100644
--- a/configs/samples/res_odbc.conf.sample
+++ b/configs/samples/res_odbc.conf.sample
@@ -51,6 +51,11 @@
 ; that we should attempt?
 ;limit => 5
 ;
+; The maximum number of connections to have open at any given time.
+; This defaults to 1 and it is highly recommended to only set this higher
+; if using a version of UnixODBC greater than 2.3.1.
+;max_connections => 20
+;
 ; When the channel is destroyed, should any uncommitted open transactions
 ; automatically be committed?
 ;forcecommit => no
diff --git a/res/res_odbc.c b/res/res_odbc.c
index 81e1b3c..99c0c06 100644
--- a/res/res_odbc.c
+++ b/res/res_odbc.c
@@ -78,10 +78,19 @@
 	unsigned int forcecommit:1;          /*!< Should uncommitted transactions be auto-committed on handle release? */
 	unsigned int isolation;              /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
 	unsigned int conntimeout;            /*!< Maximum time the connection process should take */
+	unsigned int maxconnections;         /*!< Maximum number of allowed connections */
 	/*! When a connection fails, cache that failure for how long? */
 	struct timeval negative_connection_cache;
 	/*! When a connection fails, when did that last occur? */
 	struct timeval last_negative_connect;
+	/*! A pool of available connections */
+	AST_LIST_HEAD_NOLOCK(, odbc_obj) connections;
+	/*! Lock to protect the connections */
+	ast_mutex_t lock;
+	/*! Condition to notify any pending connection requesters */
+	ast_cond_t cond;
+	/*! The total number of current connections */
+	size_t connection_cnt;
 };
 
 static struct ao2_container *class_container;
@@ -90,7 +99,7 @@
 
 static odbc_status odbc_obj_connect(struct odbc_obj *obj);
 static odbc_status odbc_obj_disconnect(struct odbc_obj *obj);
-static int odbc_register_class(struct odbc_class *class, int connect);
+static void odbc_register_class(struct odbc_class *class, int connect);
 
 AST_THREADSTORAGE(errors_buf);
 
@@ -157,6 +166,8 @@
 static void odbc_class_destructor(void *data)
 {
 	struct odbc_class *class = data;
+	struct odbc_obj *obj;
+
 	/* Due to refcounts, we can safely assume that any objects with a reference
 	 * to us will prevent our destruction, so we don't need to worry about them.
 	 */
@@ -169,7 +180,14 @@
 	if (class->sanitysql) {
 		ast_free(class->sanitysql);
 	}
+
+	while ((obj = AST_LIST_REMOVE_HEAD(&class->connections, list))) {
+		ao2_ref(obj, -1);
+	}
+
 	SQLFreeHandle(SQL_HANDLE_ENV, class->env);
+	ast_mutex_destroy(&class->lock);
+	ast_cond_destroy(&class->cond);
 }
 
 static int null_hash_fn(const void *obj, const int flags)
@@ -180,21 +198,23 @@
 static void odbc_obj_destructor(void *data)
 {
 	struct odbc_obj *obj = data;
-	struct odbc_class *class = obj->parent;
-	obj->parent = NULL;
+
 	odbc_obj_disconnect(obj);
-	ao2_ref(class, -1);
 }
 
-static void destroy_table_cache(struct odbc_cache_tables *table) {
+static void destroy_table_cache(struct odbc_cache_tables *table)
+{
 	struct odbc_cache_columns *col;
+
 	ast_debug(1, "Destroying table cache for %s\n", table->table);
+
 	AST_RWLIST_WRLOCK(&table->columns);
 	while ((col = AST_RWLIST_REMOVE_HEAD(&table->columns, list))) {
 		ast_free(col);
 	}
 	AST_RWLIST_UNLOCK(&table->columns);
 	AST_RWLIST_HEAD_DESTROY(&table->columns);
+
 	ast_free(table);
 }
 
@@ -370,18 +390,19 @@
 	 * We must therefore redo everything when we establish a new
 	 * connection. */
 	stmt = prepare_cb(obj, data);
+	if (!stmt) {
+		return NULL;
+	}
 
-	if (stmt) {
-		res = SQLExecute(stmt);
-		if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO) && (res != SQL_NO_DATA)) {
-			if (res == SQL_ERROR) {
-				ast_odbc_print_errors(SQL_HANDLE_STMT, stmt, "SQL Execute");
-			}
-
-			ast_log(LOG_WARNING, "SQL Execute error %d!\n", res);
-			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-			stmt = NULL;
+	res = SQLExecute(stmt);
+	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO) && (res != SQL_NO_DATA)) {
+		if (res == SQL_ERROR) {
+			ast_odbc_print_errors(SQL_HANDLE_STMT, stmt, "SQL Execute");
 		}
+
+		ast_log(LOG_WARNING, "SQL Execute error %d!\n", res);
+		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+		stmt = NULL;
 	}
 
 	return stmt;
@@ -468,7 +489,7 @@
 	struct ast_variable *v;
 	char *cat;
 	const char *dsn, *username, *password, *sanitysql;
-	int enabled, bse, conntimeout, forcecommit, isolation;
+	int enabled, bse, conntimeout, forcecommit, isolation, maxconnections;
 	struct timeval ncache = { 0, 0 };
 	int preconnect = 0, res = 0;
 	struct ast_flags config_flags = { 0 };
@@ -495,6 +516,7 @@
 			conntimeout = 10;
 			forcecommit = 0;
 			isolation = SQL_TXN_READ_COMMITTED;
+			maxconnections = 1;
 			for (v = ast_variable_browse(config, cat); v; v = v->next) {
 				if (!strcasecmp(v->name, "pooling") ||
 						!strncasecmp(v->name, "share", 5) ||
@@ -538,6 +560,11 @@
 						ast_log(LOG_ERROR, "Unrecognized value for 'isolation': '%s' in section '%s'\n", v->value, cat);
 						isolation = SQL_TXN_READ_COMMITTED;
 					}
+				} else if (!strcasecmp(v->name, "max_connections")) {
+					if (sscanf(v->value, "%d", &maxconnections) != 1 || maxconnections < 1) {
+						ast_log(LOG_WARNING, "limit be a positive integer\n");
+						maxconnections = 1;
+                                        }
 				}
 			}
 
@@ -563,6 +590,7 @@
 				new->isolation = isolation;
 				new->conntimeout = conntimeout;
 				new->negative_connection_cache = ncache;
+				new->maxconnections = maxconnections;
 
 				if (cat)
 					ast_copy_string(new->name, cat, sizeof(new->name));
@@ -580,6 +608,9 @@
 					ao2_ref(new, -1);
 					break;
 				}
+
+				ast_mutex_init(&new->lock);
+				ast_cond_init(&new->cond, NULL);
 
 				odbc_register_class(new, preconnect);
 				ast_log(LOG_NOTICE, "Registered ODBC class '%s' dsn->[%s]\n", cat, dsn);
@@ -641,6 +672,7 @@
 			ast_strftime(timestr, sizeof(timestr), "%Y-%m-%d %T", &tm);
 			ast_cli(a->fd, "  Name:   %s\n  DSN:    %s\n", class->name, class->dsn);
 			ast_cli(a->fd, "    Last connection attempt: %s\n", timestr);
+			ast_cli(a->fd, "    Number of active connections: %zd (out of %d)\n", class->connection_cnt, class->maxconnections);
 			ast_cli(a->fd, "\n");
 		}
 		ao2_ref(class, -1);
@@ -654,38 +686,47 @@
 	AST_CLI_DEFINE(handle_cli_odbc_show, "List ODBC DSN(s)")
 };
 
-static int odbc_register_class(struct odbc_class *class, int preconnect)
+static void odbc_register_class(struct odbc_class *class, int preconnect)
 {
 	struct odbc_obj *obj;
-	if (class) {
-		ao2_link(class_container, class);
-		/* I still have a reference in the caller, so a deref is NOT missing here. */
 
-		if (preconnect) {
-			/* Request and release builds a connection */
-			obj = ast_odbc_request_obj(class->name, 0);
-			if (obj) {
-				ast_odbc_release_obj(obj);
-			}
-		}
+	ao2_link(class_container, class);
+	/* I still have a reference in the caller, so a deref is NOT missing here. */
 
-		return 0;
-	} else {
-		ast_log(LOG_WARNING, "Attempted to register a NULL class?\n");
-		return -1;
+	if (!preconnect) {
+		return;
 	}
+
+	/* Request and release builds a connection */
+	obj = ast_odbc_request_obj(class->name, 0);
+	if (obj) {
+		ast_odbc_release_obj(obj);
+	}
+
+	return;
 }
 
 void ast_odbc_release_obj(struct odbc_obj *obj)
 {
-	ast_debug(2, "Releasing ODBC handle %p\n", obj);
+	struct odbc_class *class = obj->parent;
 
-#ifdef DEBUG_THREADS
-	obj->file[0] = '\0';
-	obj->function[0] = '\0';
-	obj->lineno = 0;
-#endif
-	ao2_ref(obj, -1);
+	ast_debug(2, "Releasing ODBC handle %p into pool\n", obj);
+
+	/* The odbc_obj only holds a reference to the class when it is
+	 * actively being used. This guarantees no circular reference
+	 * between odbc_class and odbc_obj. Since it is being released
+	 * we also release our class reference. If a reload occurred before
+	 * the class will go away automatically once all odbc_obj are
+	 * released back.
+	 */
+	obj->parent = NULL;
+
+	ast_mutex_lock(&class->lock);
+	AST_LIST_INSERT_HEAD(&class->connections, obj, list);
+	ast_cond_signal(&class->cond);
+	ast_mutex_unlock(&class->lock);
+
+	ao2_ref(class, -1);
 }
 
 int ast_odbc_backslash_is_escape(struct odbc_obj *obj)
@@ -703,6 +744,50 @@
 	return 0;
 }
 
+/*
+ * \brief Determine if the connection has died.
+ *
+ * \param connection The connection to check
+ * \param class The ODBC class
+ * \retval 1 Yep, it's dead
+ * \retval 0 It's alive and well
+ */
+static int connection_dead(struct odbc_obj *connection, struct odbc_class *class)
+{
+	char *test_sql = "select 1";
+	SQLINTEGER dead;
+	SQLRETURN res;
+	SQLHSTMT stmt;
+
+	res = SQLGetConnectAttr(connection->con, SQL_ATTR_CONNECTION_DEAD, &dead, 0, 0);
+	if (SQL_SUCCEEDED(res)) {
+		return dead == SQL_CD_TRUE ? 1 : 0;
+	}
+
+	/* If the Driver doesn't support SQL_ATTR_CONNECTION_DEAD do a
+	 * probing query instead
+	 */
+	res = SQLAllocHandle(SQL_HANDLE_STMT, connection->con, &stmt);
+	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+		return 1;
+	}
+
+	if (!ast_strlen_zero(class->sanitysql)) {
+		test_sql = class->sanitysql;
+	}
+
+	res = SQLPrepare(stmt, (unsigned char *)test_sql, SQL_NTS);
+	if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
+		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+		return 1;
+	}
+
+	res = SQLExecute(stmt);
+	SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+
+	return ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) ? 1 : 0;
+}
+
 struct odbc_obj *_ast_odbc_request_obj2(const char *name, struct ast_flags flags, const char *file, const char *function, int lineno)
 {
 	struct odbc_obj *obj = NULL;
@@ -713,16 +798,54 @@
 		return NULL;
 	}
 
-	/* XXX ODBC connection objects do not have shared ownership, so there is no reason
-	 * to use refcounted objects here.
-	 */
-	obj = ao2_alloc(sizeof(*obj), odbc_obj_destructor);
-	/* Inherit reference from the ao2_callback from before */
-	obj->parent = class;
-	if (odbc_obj_connect(obj) == ODBC_FAIL) {
-		ao2_ref(obj, -1);
-		return NULL;
+	ast_mutex_lock(&class->lock);
+
+	while (!obj) {
+		obj = AST_LIST_REMOVE_HEAD(&class->connections, list);
+
+		if (!obj) {
+			if (class->connection_cnt < class->maxconnections) {
+				/* If no connection is immediately available establish a new
+				 * one if allowed. If we try and fail we give up completely as
+				 * we could go into an infinite loop otherwise.
+				 */
+				obj = ao2_alloc(sizeof(*obj), odbc_obj_destructor);
+				if (!obj) {
+					break;
+				}
+
+				obj->parent = ao2_bump(class);
+				if (odbc_obj_connect(obj) == ODBC_FAIL) {
+					break;
+				}
+
+				class->connection_cnt++;
+				ast_debug(2, "Created ODBC handle %p\n", obj);
+			} else {
+				/* Otherwise if we're not allowed to create a new one we
+				 * wait for another thread to give up the connection they
+				 * own.
+				 */
+				ast_cond_wait(&class->cond, &class->lock);
+			}
+		} else if (connection_dead(obj, class)) {
+			/* If the connection is dead try to grab another functional one from the
+			 * pool instead of trying to resurrect this one.
+			 */
+			ast_debug(2, "ODBC handle %p dead - removing\n", obj);
+			ao2_ref(obj, -1);
+			obj = NULL;
+			class->connection_cnt--;
+		} else {
+			/* We successfully grabbed a connection from the pool and all is well!
+			 */
+			obj->parent = ao2_bump(class);
+			ast_debug(2, "Reusing ODBC handle %p\n", obj);
+		}
 	}
+
+	ast_mutex_unlock(&class->lock);
+	ao2_ref(class, -1);
 
 	return obj;
 }
@@ -754,14 +877,6 @@
 	con = obj->con;
 	obj->con = NULL;
 	res = SQLDisconnect(con);
-
-	if (obj->parent) {
-		if (res == SQL_SUCCESS || res == SQL_SUCCESS_WITH_INFO) {
-			ast_debug(3, "Disconnected %d from %s [%s](%p)\n", res, obj->parent->name, obj->parent->dsn, obj);
-		} else {
-			ast_debug(3, "res_odbc: %s [%s](%p) already disconnected\n", obj->parent->name, obj->parent->dsn, obj);
-		}
-	}
 
 	if ((res = SQLFreeHandle(SQL_HANDLE_DBC, con)) == SQL_SUCCESS) {
 		ast_debug(3, "Database handle %p (connection %p) deallocated\n", obj, con);

-- 
To view, visit https://gerrit.asterisk.org/2945
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6774bf4bac49a0b30242c76a09c403d2e856ecff
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: Joshua Colp <jcolp at digium.com>



More information about the asterisk-code-review mailing list