[asterisk-commits] func odbc: Use one connection per DSN. (asterisk[13])

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sun Apr 24 20:14:33 CDT 2016


Anonymous Coward #1000019 has submitted this change and it was merged.

Change subject: func_odbc: Use one connection per DSN.
......................................................................


func_odbc: Use one connection per DSN.

res_odbc was changed in Asterisk 13.8.0 to remove connection management,
opting instead to let unixodbc maintain open connections and return
those to Asterisk as requested.

This was a boon for realtime, since it meant that multiple threads could
potentially run parallel queries since they could each be using their
own database connections.

However, on the user-facing side, func_odbc, there were some inherent
behaviors being relied on that no longer hold true after the change.
One such reported behavior was that MySQL's LAST_INSERTED_ID() works
per-connection. This means that if Asterisk uses separate connections
for every database operation, whereas before it used one connection for
everything, we have broken expectations and functionality.

The fix provided in this patch is to make func_odbc use a single
database connection per DSN. This way, user-facing database usage will
have the same behavior as it did pre-13.8.0. However, realtime, which is
the real workhorse of database interaction, will continue to let
unixodbc manage connections.

ASTERISK-25938 #close
Reported by Edwin Vandamme

Change-Id: Iac961fe79154c6211569afcdfec843c0c24c46dc
---
M funcs/func_odbc.c
1 file changed, 229 insertions(+), 85 deletions(-)

Approvals:
  Anonymous Coward #1000019: Verified
  Joshua Colp: Looks good to me, approved
  George Joseph: Looks good to me, but someone else must approve



diff --git a/funcs/func_odbc.c b/funcs/func_odbc.c
index 0af3fd1..d17debd 100644
--- a/funcs/func_odbc.c
+++ b/funcs/func_odbc.c
@@ -137,6 +137,163 @@
 	char names[0];
 };
 
+/* \brief Data source name
+ *
+ * This holds data that pertains to a DSN
+ */
+struct dsn {
+	/*! A connection to the database */
+	struct odbc_obj *connection;
+	/*! The name of the DSN as defined in res_odbc.conf */
+	char name[0];
+};
+
+#define DSN_BUCKETS 37
+
+struct ao2_container *dsns;
+
+static int dsn_hash(const void *obj, const int flags)
+{
+	const struct dsn *object;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		object = obj;
+		key = object->name;
+		break;
+	default:
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_hash(key);
+}
+
+static int dsn_cmp(void *obj, void *arg, int flags)
+{
+	const struct dsn *object_left = obj;
+	const struct dsn *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->name;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcmp(object_left->name, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		cmp = strncmp(object_left->name, right_key, strlen(right_key));
+		break;
+	default:
+		cmp = 0;
+		break;
+	}
+
+	if (cmp) {
+		return 0;
+	}
+
+	return CMP_MATCH;
+}
+
+static void dsn_destructor(void *obj)
+{
+	struct dsn *dsn = obj;
+
+	if (dsn->connection) {
+		ast_odbc_release_obj(dsn->connection);
+	}
+}
+
+/*!
+ * \brief Create a DSN and connect to the database
+ *
+ * \param name The name of the DSN as found in res_odbc.conf
+ * \retval NULL Fail
+ * \retval non-NULL The newly-created structure
+ */
+static struct dsn *create_dsn(const char *name)
+{
+	struct dsn *dsn;
+
+	dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor);
+	if (!dsn) {
+		return NULL;
+	}
+
+	/* Safe */
+	strcpy(dsn->name, name);
+
+	dsn->connection = ast_odbc_request_obj(name, 0);
+	if (!dsn->connection) {
+		ao2_ref(dsn, -1);
+		return NULL;
+	}
+
+	if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) {
+		ao2_ref(dsn, -1);
+		return NULL;
+	}
+
+	return dsn;
+}
+
+/*!
+ * \brief Retrieve a DSN, or create it if it does not exist.
+ *
+ * The created DSN is returned locked. This should be inconsequential
+ * to callers in most cases.
+ *
+ * When finished with the returned structure, the caller must call
+ * \ref release_dsn
+ *
+ * \param name Name of the DSN as found in res_odbc.conf
+ * \retval NULL Unable to retrieve or create the DSN
+ * \retval non-NULL The retrieved/created locked DSN
+ */
+static struct dsn *get_dsn(const char *name)
+{
+	struct dsn *dsn;
+
+	ao2_lock(dsns);
+	dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (!dsn) {
+		dsn = create_dsn(name);
+	}
+	ao2_unlock(dsns);
+
+	if (!dsn) {
+		return NULL;
+	}
+
+	ao2_lock(dsn->connection);
+
+	return dsn;
+}
+
+/*!
+ * \brief Unlock and unreference a DSN
+ *
+ * \param dsn The dsn to unlock and unreference
+ * \return NULL
+ */
+static void *release_dsn(struct dsn *dsn)
+{
+	if (!dsn) {
+		return NULL;
+	}
+
+	ao2_unlock(dsn->connection);
+	ao2_ref(dsn, -1);
+
+	return NULL;
+}
+
 static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query);
 
 static int resultcount = 0;
@@ -214,7 +371,7 @@
 	struct odbc_obj *obj = NULL;
 	struct acf_odbc_query *query;
 	char *t, varname[15];
-	int i, dsn, bogus_chan = 0;
+	int i, dsn_num, bogus_chan = 0;
 	int transactional = 0;
 	AST_DECLARE_APP_ARGS(values,
 		AST_APP_ARG(field)[100];
@@ -227,6 +384,7 @@
 	struct ast_str *buf = ast_str_thread_get(&sql_buf, 16);
 	struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16);
 	const char *status = "FAILURE";
+	struct dsn *dsn = NULL;
 
 	if (!buf || !insertbuf) {
 		return -1;
@@ -324,17 +482,21 @@
 	 * to multiple DSNs.  We MUST have a single handle all the way through the
 	 * transaction, or else we CANNOT enforce atomicity.
 	 */
-	for (dsn = 0; dsn < 5; dsn++) {
-		if (!ast_strlen_zero(query->writehandle[dsn])) {
+	for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+		if (!ast_strlen_zero(query->writehandle[dsn_num])) {
 			if (transactional) {
 				/* This can only happen second time through or greater. */
 				ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
 			}
 
-			if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+			if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
 				transactional = 1;
 			} else {
-				obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+				dsn = get_dsn(query->writehandle[dsn_num]);
+				if (!dsn) {
+					continue;
+				}
+				obj = dsn->connection;
 				transactional = 0;
 			}
 
@@ -342,10 +504,7 @@
 				break;
 			}
 
-			if (obj && !transactional) {
-				ast_odbc_release_obj(obj);
-				obj = NULL;
-			}
+			dsn = release_dsn(dsn);
 		}
 	}
 
@@ -358,25 +517,25 @@
 			status = "SUCCESS";
 
 		} else if (query->sql_insert) {
-			if (obj && !transactional) {
-				ast_odbc_release_obj(obj);
-				obj = NULL;
-			}
+			dsn = release_dsn(dsn);
 
-			for (transactional = 0, dsn = 0; dsn < 5; dsn++) {
-				if (!ast_strlen_zero(query->writehandle[dsn])) {
+			for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) {
+				if (!ast_strlen_zero(query->writehandle[dsn_num])) {
 					if (transactional) {
 						/* This can only happen second time through or greater. */
 						ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
 					} else if (obj) {
-						ast_odbc_release_obj(obj);
-						obj = NULL;
+						dsn = release_dsn(dsn);
 					}
 
-					if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+					if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
 						transactional = 1;
 					} else {
-						obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+						dsn = get_dsn(query->writehandle[dsn_num]);
+						if (!dsn) {
+							continue;
+						}
+						obj = dsn->connection;
 						transactional = 0;
 					}
 					if (obj) {
@@ -406,10 +565,7 @@
 		pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
 	}
 
-	if (obj && !transactional) {
-		ast_odbc_release_obj(obj);
-		obj = NULL;
-	}
+	dsn = release_dsn(dsn);
 
 	if (!bogus_chan) {
 		ast_autoservice_stop(chan);
@@ -420,11 +576,10 @@
 
 static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len)
 {
-	struct odbc_obj *obj = NULL;
 	struct acf_odbc_query *query;
 	char varname[15], rowcount[12] = "-1";
 	struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16);
-	int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0;
+	int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0;
 	AST_DECLARE_APP_ARGS(args,
 		AST_APP_ARG(field)[100];
 	);
@@ -436,6 +591,7 @@
 	struct odbc_datastore_row *row = NULL;
 	struct ast_str *sql = ast_str_thread_get(&sql_buf, 16);
 	const char *status = "FAILURE";
+	struct dsn *dsn = NULL;
 
 	if (!sql || !colnames) {
 		if (chan) {
@@ -523,28 +679,23 @@
 	}
 	AST_RWLIST_UNLOCK(&queries);
 
-	for (dsn = 0; dsn < 5; dsn++) {
-		if (!ast_strlen_zero(query->readhandle[dsn])) {
-			obj = ast_odbc_request_obj(query->readhandle[dsn], 0);
-			if (obj) {
-				stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql));
+	for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+		if (!ast_strlen_zero(query->readhandle[dsn_num])) {
+			dsn = get_dsn(query->readhandle[dsn_num]);
+			if (!dsn) {
+				continue;
 			}
+			stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql));
 		}
 		if (stmt) {
 			break;
 		}
-		if (obj) {
-			ast_odbc_release_obj(obj);
-			obj = NULL;
-		}
+		dsn = release_dsn(dsn);
 	}
 
 	if (!stmt) {
 		ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql));
-		if (obj) {
-			ast_odbc_release_obj(obj);
-			obj = NULL;
-		}
+		dsn = release_dsn(dsn);
 		if (!bogus_chan) {
 			pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
 			ast_autoservice_stop(chan);
@@ -558,8 +709,7 @@
 		ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
 		SQLCloseCursor(stmt);
 		SQLFreeHandle (SQL_HANDLE_STMT, stmt);
-		ast_odbc_release_obj(obj);
-		obj = NULL;
+		dsn = release_dsn(dsn);
 		if (!bogus_chan) {
 			pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
 			ast_autoservice_stop(chan);
@@ -583,8 +733,7 @@
 		}
 		SQLCloseCursor(stmt);
 		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-		ast_odbc_release_obj(obj);
-		obj = NULL;
+		dsn = release_dsn(dsn);
 		if (!bogus_chan) {
 			pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
 			pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
@@ -607,8 +756,7 @@
 				odbc_datastore_free(resultset);
 				SQLCloseCursor(stmt);
 				SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+				dsn = release_dsn(dsn);
 				if (!bogus_chan) {
 					pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
 					ast_autoservice_stop(chan);
@@ -640,8 +788,7 @@
 						odbc_datastore_free(resultset);
 						SQLCloseCursor(stmt);
 						SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-						ast_odbc_release_obj(obj);
-						obj = NULL;
+						dsn = release_dsn(dsn);
 						if (!bogus_chan) {
 							pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
 							pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
@@ -750,8 +897,7 @@
 				odbc_datastore_free(resultset);
 				SQLCloseCursor(stmt);
 				SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+				dsn = release_dsn(dsn);
 				pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
 				ast_autoservice_stop(chan);
 				return -1;
@@ -764,8 +910,7 @@
 	}
 	SQLCloseCursor(stmt);
 	SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-	ast_odbc_release_obj(obj);
-	obj = NULL;
+	dsn = release_dsn(dsn);
 	if (resultset && !multirow) {
 		/* Fetch the first resultset */
 		if (!acf_fetch(chan, "", buf, buf, len)) {
@@ -1192,8 +1337,8 @@
 
 	if (a->argc == 5 && !strcmp(a->argv[4], "exec")) {
 		/* Execute the query */
-		struct odbc_obj *obj = NULL;
-		int dsn, executed = 0;
+		struct dsn *dsn = NULL;
+		int dsn_num, executed = 0;
 		SQLHSTMT stmt;
 		int rows = 0, res, x;
 		SQLSMALLINT colcount = 0, collength;
@@ -1207,19 +1352,18 @@
 			return CLI_SUCCESS;
 		}
 
-		for (dsn = 0; dsn < 5; dsn++) {
-			if (ast_strlen_zero(query->readhandle[dsn])) {
+		for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+			if (ast_strlen_zero(query->readhandle[dsn_num])) {
 				continue;
 			}
-			ast_debug(1, "Found handle %s\n", query->readhandle[dsn]);
-			if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) {
+			dsn = get_dsn(query->readhandle[dsn_num]);
+			if (!dsn) {
 				continue;
 			}
+			ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]);
 
-			ast_debug(1, "Got obj\n");
-			if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+			if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+				dsn = release_dsn(dsn);
 				continue;
 			}
 
@@ -1230,8 +1374,7 @@
 				ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
 				SQLCloseCursor(stmt);
 				SQLFreeHandle (SQL_HANDLE_STMT, stmt);
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+				dsn = release_dsn(dsn);
 				AST_RWLIST_UNLOCK(&queries);
 				return CLI_SUCCESS;
 			}
@@ -1240,10 +1383,9 @@
 			if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
 				SQLCloseCursor(stmt);
 				SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+				dsn = release_dsn(dsn);
 				if (res == SQL_NO_DATA) {
-					ast_cli(a->fd, "Returned %d rows.  Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql));
+					ast_cli(a->fd, "Returned %d rows.  Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql));
 					break;
 				} else {
 					ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql));
@@ -1270,8 +1412,7 @@
 						ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql));
 						SQLCloseCursor(stmt);
 						SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-						ast_odbc_release_obj(obj);
-						obj = NULL;
+						dsn = release_dsn(dsn);
 						AST_RWLIST_UNLOCK(&queries);
 						return CLI_SUCCESS;
 					}
@@ -1289,15 +1430,11 @@
 			}
 			SQLCloseCursor(stmt);
 			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-			ast_odbc_release_obj(obj);
-			obj = NULL;
-			ast_cli(a->fd, "Returned %d row%s.  Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]);
+			dsn = release_dsn(dsn);
+			ast_cli(a->fd, "Returned %d row%s.  Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]);
 			break;
 		}
-		if (obj) {
-			ast_odbc_release_obj(obj);
-			obj = NULL;
-		}
+		dsn = release_dsn(dsn);
 
 		if (!executed) {
 			ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql));
@@ -1420,30 +1557,29 @@
 
 	if (a->argc == 6 && !strcmp(a->argv[5], "exec")) {
 		/* Execute the query */
-		struct odbc_obj *obj = NULL;
-		int dsn, executed = 0;
+		struct dsn *dsn;
+		int dsn_num, executed = 0;
 		SQLHSTMT stmt;
 		SQLLEN rows = -1;
 
-		for (dsn = 0; dsn < 5; dsn++) {
-			if (ast_strlen_zero(query->writehandle[dsn])) {
+		for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+			if (ast_strlen_zero(query->writehandle[dsn_num])) {
 				continue;
 			}
-			if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) {
+			dsn = get_dsn(query->writehandle[dsn_num]);
+			if (!dsn) {
 				continue;
 			}
-			if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
-				ast_odbc_release_obj(obj);
-				obj = NULL;
+			if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+				dsn = release_dsn(dsn);
 				continue;
 			}
 
 			SQLRowCount(stmt, &rows);
 			SQLCloseCursor(stmt);
 			SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-			ast_odbc_release_obj(obj);
-			obj = NULL;
-			ast_cli(a->fd, "Affected %d rows.  Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]);
+			dsn = release_dsn(dsn);
+			ast_cli(a->fd, "Affected %d rows.  Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]);
 			executed = 1;
 			break;
 		}
@@ -1470,6 +1606,11 @@
 	char *catg;
 	struct ast_flags config_flags = { 0 };
 
+	dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp);
+	if (!dsns) {
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
 	res |= ast_custom_function_register(&fetch_function);
 	res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish);
 	AST_RWLIST_WRLOCK(&queries);
@@ -1478,6 +1619,7 @@
 	if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) {
 		ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config);
 		AST_RWLIST_UNLOCK(&queries);
+		ao2_ref(dsns, -1);
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
@@ -1531,6 +1673,8 @@
 	AST_RWLIST_WRLOCK(&queries);
 
 	AST_RWLIST_UNLOCK(&queries);
+
+	ao2_ref(dsns, -1);
 	return res;
 }
 

-- 
To view, visit https://gerrit.asterisk.org/2682
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Iac961fe79154c6211569afcdfec843c0c24c46dc
Gerrit-PatchSet: 3
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Mark Michelson <mmichelson at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>



More information about the asterisk-commits mailing list