[asterisk-commits] tilghman: branch tilghman/odbc_tx_support r158914 - /team/tilghman/odbc_tx_su...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Nov 24 17:10:49 CST 2008
Author: tilghman
Date: Mon Nov 24 17:10:48 2008
New Revision: 158914
URL: http://svn.digium.com/view/asterisk?view=rev&rev=158914
Log:
Fix isolation values
Modified:
team/tilghman/odbc_tx_support/res/res_odbc.c
Modified: team/tilghman/odbc_tx_support/res/res_odbc.c
URL: http://svn.digium.com/view/asterisk/team/tilghman/odbc_tx_support/res/res_odbc.c?view=diff&rev=158914&r1=158913&r2=158914
==============================================================================
--- team/tilghman/odbc_tx_support/res/res_odbc.c (original)
+++ team/tilghman/odbc_tx_support/res/res_odbc.c Mon Nov 24 17:10:48 2008
@@ -51,6 +51,8 @@
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/app.h"
+#include "asterisk/strings.h"
+#include "asterisk/threadstorage.h"
struct odbc_class
{
@@ -65,7 +67,7 @@
unsigned int delme:1; /*!< Purge the class */
unsigned int backslash_is_escape:1; /*!< On this database, the backslash is a native escape sequence */
unsigned int forcecommit:1; /*!< Should uncommitted transactions be auto-committed on handle release? */
- unsigned int isolation:2; /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
+ unsigned int isolation; /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
unsigned int limit; /*!< 1023 wasn't enough for some people */
int count; /*!< Running count of pooled connections */
unsigned int idlecheck; /*!< Recheck the connection if it is idle for this long */
@@ -80,6 +82,8 @@
static odbc_status odbc_obj_disconnect(struct odbc_obj *obj);
static int odbc_register_class(struct odbc_class *class, int connect);
static void odbc_txn_free(void *data);
+
+AST_THREADSTORAGE(errors_buf);
static struct ast_datastore_info txn_info = {
.type = "ODBC_Transaction",
@@ -105,19 +109,19 @@
*/
unsigned int active:1;
unsigned int forcecommit:1; /*!< Should uncommitted transactions be auto-committed on handle release? */
- unsigned int isolation:2; /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
+ unsigned int isolation; /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
char name[0]; /*!< Name of this transaction ID */
};
static const char *isolation2text(int iso)
{
- if (iso == 0) {
+ if (iso == SQL_TXN_READ_COMMITTED) {
return "read_committed";
- } else if (iso == 1) {
+ } else if (iso == SQL_TXN_READ_UNCOMMITTED) {
return "read_uncommitted";
- } else if (iso == 2) {
+ } else if (iso == SQL_TXN_SERIALIZABLE) {
return "serializable";
- } else if (iso == 3) {
+ } else if (iso == SQL_TXN_REPEATABLE_READ) {
return "repeatable_read";
} else {
return "unknown";
@@ -128,18 +132,18 @@
{
if (strncasecmp(txt, "read_", 5) == 0) {
if (strncasecmp(txt + 5, "c", 1) == 0) {
+ return SQL_TXN_READ_COMMITTED;
+ } else if (strncasecmp(txt + 5, "u", 1) == 0) {
+ return SQL_TXN_READ_UNCOMMITTED;
+ } else {
return 0;
- } else if (strncasecmp(txt + 5, "u", 1) == 0) {
- return 1;
- } else {
- return -1;
}
} else if (strncasecmp(txt, "ser", 3) == 0) {
- return 2;
+ return SQL_TXN_SERIALIZABLE;
} else if (strncasecmp(txt, "rep", 3) == 0) {
- return 3;
+ return SQL_TXN_REPEATABLE_READ;
} else {
- return -1;
+ return 0;
}
}
@@ -160,14 +164,14 @@
oldlist = txn_store->data;
} else {
/* Need to create a new datastore */
- if (!(txn_store = ast_channel_datastore_alloc(&txn_info, NULL))) {
+ if (!(txn_store = ast_datastore_alloc(&txn_info, NULL))) {
ast_log(LOG_ERROR, "Unable to allocate a new datastore. Cannot create a new transaction.\n");
return NULL;
}
if (!(oldlist = ast_calloc(1, sizeof(*oldlist)))) {
ast_log(LOG_ERROR, "Unable to allocate datastore list head. Cannot create a new transaction.\n");
- ast_channel_datastore_free(txn_store);
+ ast_datastore_free(txn_store);
return NULL;
}
@@ -226,12 +230,14 @@
if (tx && tx->owner) {
chan = tx->owner;
+ ast_channel_lock(chan);
} else {
/* No channel == no transaction */
return;
}
if (!(txn_store = ast_channel_datastore_find(chan, &txn_info, NULL))) {
+ ast_channel_unlock(chan);
return;
}
@@ -247,6 +253,7 @@
}
tx->owner = NULL;
ast_free(tx);
+ ast_channel_unlock(chan);
}
static void odbc_txn_free(void *vdata)
@@ -266,13 +273,15 @@
{
struct ast_datastore *txn_store;
AST_LIST_HEAD(, odbc_txn_frame) *oldlist;
- struct odbc_txn_frame *active, *txn;
+ struct odbc_txn_frame *active = NULL, *txn;
if (!chan && tx && tx->owner) {
chan = tx->owner;
}
+ ast_channel_lock(chan);
if (!(txn_store = ast_channel_datastore_find(chan, &txn_info, NULL))) {
+ ast_channel_unlock(chan);
return -1;
}
@@ -287,6 +296,7 @@
}
}
AST_LIST_UNLOCK(oldlist);
+ ast_channel_unlock(chan);
return active ? 0 : -1;
}
@@ -296,12 +306,15 @@
/* Due to refcounts, we can safely assume that any objects with a reference
* to us will prevent our destruction, so we don't need to worry about them.
*/
- if (class->username)
+ if (class->username) {
ast_free(class->username);
- if (class->password)
+ }
+ if (class->password) {
ast_free(class->password);
- if (class->sanitysql)
+ }
+ if (class->sanitysql) {
ast_free(class->sanitysql);
+ }
ao2_ref(class->obj_container, -1);
SQLFreeHandle(SQL_HANDLE_ENV, class->env);
}
@@ -366,18 +379,18 @@
if (!obj) {
ast_log(LOG_WARNING, "Unable to retrieve database handle for table description '%s@%s'\n", tablename, database);
+ AST_RWLIST_UNLOCK(&odbc_tables);
return NULL;
}
/* Table structure not already cached; build it now. */
do {
-retry:
res = SQLAllocHandle(SQL_HANDLE_STMT, obj->con, &stmt);
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
if (try == 0) {
try = 1;
ast_odbc_sanity_check(obj);
- goto retry;
+ continue;
}
ast_log(LOG_WARNING, "SQL Alloc Handle failed on connection '%s'!\n", database);
break;
@@ -389,7 +402,7 @@
try = 1;
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_sanity_check(obj);
- goto retry;
+ continue;
}
ast_log(LOG_ERROR, "Unable to query database columns on connection '%s'.\n", database);
break;
@@ -439,7 +452,8 @@
AST_RWLIST_INSERT_TAIL(&odbc_tables, tableptr, list);
AST_RWLIST_RDLOCK(&(tableptr->columns));
- } while (0);
+ break;
+ } while (1);
AST_RWLIST_UNLOCK(&odbc_tables);
@@ -490,6 +504,9 @@
stmt = exec_cb(obj, data);
if (stmt) {
+ break;
+ } else if (obj->tx) {
+ ast_log(LOG_WARNING, "Failed to execute, but unable to reconnect, as we're transactional.\n");
break;
} else {
obj->up = 0;
@@ -534,22 +551,29 @@
}
}
- ast_log(LOG_WARNING, "SQL Execute error %d! Attempting a reconnect...\n", res);
- SQLFreeHandle(SQL_HANDLE_STMT, stmt);
- stmt = NULL;
-
- obj->up = 0;
- /*
- * While this isn't the best way to try to correct an error, this won't automatically
- * fail when the statement handle invalidates.
- */
- ast_odbc_sanity_check(obj);
- continue;
- } else
+ if (obj->tx) {
+ ast_log(LOG_WARNING, "SQL Execute error, but unable to reconnect, as we're transactional.\n");
+ break;
+ } else {
+ ast_log(LOG_WARNING, "SQL Execute error %d! Attempting a reconnect...\n", res);
+ SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+ stmt = NULL;
+
+ obj->up = 0;
+ /*
+ * While this isn't the best way to try to correct an error, this won't automatically
+ * fail when the statement handle invalidates.
+ */
+ ast_odbc_sanity_check(obj);
+ continue;
+ }
+ } else {
obj->last_used = ast_tvnow();
+ }
break;
- } else if (attempt == 0)
+ } else if (attempt == 0) {
ast_odbc_sanity_check(obj);
+ }
}
return stmt;
@@ -609,7 +633,7 @@
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
}
- if (!obj->up) { /* Try to reconnect! */
+ if (!obj->up && !obj->tx) { /* Try to reconnect! */
ast_log(LOG_WARNING, "Connection is down attempting to reconnect...\n");
odbc_obj_disconnect(obj);
odbc_obj_connect(obj);
@@ -688,9 +712,9 @@
} else if (!strcasecmp(v->name, "forcecommit")) {
forcecommit = ast_true(v->value);
} else if (!strcasecmp(v->name, "isolation")) {
- if ((isolation = text2isolation(v->value)) < 0) {
+ if ((isolation = text2isolation(v->value)) == 0) {
ast_log(LOG_ERROR, "Unrecognized value for 'isolation': '%s' in section '%s'\n", v->value, cat);
- isolation = 0;
+ isolation = SQL_TXN_READ_COMMITTED;
}
}
}
@@ -821,10 +845,11 @@
ao2_ref(current, -1);
}
} else {
- /* Should only ever be one of these */
+ /* Should only ever be one of these (unless there are transactions) */
struct ao2_iterator aoi2 = ao2_iterator_init(class->obj_container, 0);
while ((current = ao2_iterator_next(&aoi2))) {
- ast_cli(a->fd, " Pooled: No\n Connected: %s\n", current->up && ast_odbc_sanity_check(current) ? "Yes" : "No");
+ ast_cli(a->fd, " Pooled: No\n Connected: %s\n", current->used ? "In use" :
+ current->up && ast_odbc_sanity_check(current) ? "Yes" : "No");
ao2_ref(current, -1);
}
}
@@ -850,8 +875,9 @@
if (preconnect) {
/* Request and release builds a connection */
obj = ast_odbc_request_obj(class->name, 0);
- if (obj)
+ if (obj) {
ast_odbc_release_obj(obj);
+ }
}
return 0;
@@ -864,9 +890,42 @@
void ast_odbc_release_obj(struct odbc_obj *obj)
{
struct odbc_txn_frame *tx = find_transaction(NULL, obj, NULL, 0);
+ SQLINTEGER nativeerror=0, numfields=0;
+ SQLSMALLINT diagbytes=0, i;
+ unsigned char state[10], diagnostic[256];
if (tx) {
- SQLEndTran(SQL_HANDLE_DBC, obj->con, obj->parent->forcecommit ? SQL_COMMIT : SQL_ROLLBACK);
+ if (SQLEndTran(SQL_HANDLE_DBC, obj->con, obj->parent->forcecommit ? SQL_COMMIT : SQL_ROLLBACK) == SQL_ERROR) {
+ /* Handle possible transaction commit failure */
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SQLEndTran returned an error: %s: %s\n", state, diagnostic);
+ if (!strcmp((char *)state, "25S02") || !strcmp((char *)state, "08007")) {
+ /* These codes mean that a commit failed and a transaction
+ * is still active. We must rollback, or things will get
+ * very, very weird for anybody using the handle next. */
+ SQLEndTran(SQL_HANDLE_DBC, obj->con, SQL_ROLLBACK);
+ }
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ }
+
+ /* Transaction is done, reset autocommit */
+ if (obj->parent->haspool && SQLSetConnectAttr(obj->con, SQL_ATTR_AUTOCOMMIT, (void *)SQL_AUTOCOMMIT_ON, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SetConnectAttr (Autocommit) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ }
}
#ifdef DEBUG_THREADS
@@ -894,14 +953,36 @@
static int commit_exec(struct ast_channel *chan, void *data)
{
struct odbc_txn_frame *tx;
+ SQLINTEGER nativeerror=0, numfields=0;
+ SQLSMALLINT diagbytes=0, i;
+ unsigned char state[10], diagnostic[256];
+
if (ast_strlen_zero(data)) {
tx = find_transaction(chan, NULL, NULL, 1);
} else {
tx = find_transaction(chan, NULL, data, 0);
}
+ pbx_builtin_setvar_helper(chan, "COMMIT_RESULT", "OK");
+
if (tx) {
- SQLEndTran(SQL_HANDLE_DBC, tx->obj->con, SQL_COMMIT);
+ if (SQLEndTran(SQL_HANDLE_DBC, tx->obj->con, SQL_COMMIT) == SQL_ERROR) {
+ struct ast_str *errors = ast_str_thread_get(&errors_buf, 16);
+ ast_str_reset(errors);
+
+ /* Handle possible transaction commit failure */
+ SQLGetDiagField(SQL_HANDLE_DBC, tx->obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, tx->obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_str_append(&errors, 0, "%s%s", errors->used ? "," : "", state);
+ ast_log(LOG_WARNING, "SQLEndTran returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ pbx_builtin_setvar_helper(chan, "COMMIT_RESULT", errors->str);
+ }
tx->status = ODBC_STATUS_TXCLOSED;
}
return 0;
@@ -910,14 +991,36 @@
static int rollback_exec(struct ast_channel *chan, void *data)
{
struct odbc_txn_frame *tx;
+ SQLINTEGER nativeerror=0, numfields=0;
+ SQLSMALLINT diagbytes=0, i;
+ unsigned char state[10], diagnostic[256];
+
if (ast_strlen_zero(data)) {
tx = find_transaction(chan, NULL, NULL, 1);
} else {
tx = find_transaction(chan, NULL, data, 0);
}
+ pbx_builtin_setvar_helper(chan, "ROLLBACK_RESULT", "OK");
+
if (tx) {
- SQLEndTran(SQL_HANDLE_DBC, tx->obj->con, SQL_ROLLBACK);
+ if (SQLEndTran(SQL_HANDLE_DBC, tx->obj->con, SQL_ROLLBACK) == SQL_ERROR) {
+ struct ast_str *errors = ast_str_thread_get(&errors_buf, 16);
+ ast_str_reset(errors);
+
+ /* Handle possible transaction commit failure */
+ SQLGetDiagField(SQL_HANDLE_DBC, tx->obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, tx->obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_str_append(&errors, 0, "%s%s", errors->used ? "," : "", state);
+ ast_log(LOG_WARNING, "SQLEndTran returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ pbx_builtin_setvar_helper(chan, "ROLLBACK_RESULT", errors->str);
+ }
tx->status = ODBC_STATUS_TXCLOSED;
}
return 0;
@@ -932,6 +1035,9 @@
struct odbc_obj *obj = NULL;
struct odbc_class *class;
struct ao2_iterator aoi = ao2_iterator_init(class_container, 0);
+ SQLINTEGER nativeerror=0, numfields=0;
+ SQLSMALLINT diagbytes=0, i;
+ unsigned char state[10], diagnostic[256];
while ((class = ao2_iterator_next(&aoi))) {
if (!strcmp(class->name, name) && !class->delme) {
@@ -975,6 +1081,21 @@
ast_atomic_fetchadd_int(&class->count, +1);
}
}
+
+ if (obj && ast_test_flag(&flags, RES_ODBC_INDEPENDENT_CONNECTION)) {
+ /* Ensure this connection has autocommit turned off. */
+ if (SQLSetConnectAttr(obj->con, SQL_ATTR_AUTOCOMMIT, (void *)SQL_AUTOCOMMIT_OFF, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SQLSetConnectAttr (Autocommit) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ }
+ }
} else if (ast_test_flag(&flags, RES_ODBC_INDEPENDENT_CONNECTION)) {
/* Non-pooled connections -- but must use a separate connection handle */
aoi = ao2_iterator_init(class->obj_container, 0);
@@ -1006,6 +1127,18 @@
ast_atomic_fetchadd_int(&class->count, +1);
}
}
+
+ if (obj && SQLSetConnectAttr(obj->con, SQL_ATTR_AUTOCOMMIT, (void *)SQL_AUTOCOMMIT_OFF, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SetConnectAttr (Autocommit) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ }
} else {
/* Non-pooled connection: multiple modules can use the same connection. */
aoi = ao2_iterator_init(class->obj_container, 0);
@@ -1037,6 +1170,31 @@
ao2_link(class->obj_container, obj);
}
}
+
+ if (obj && SQLSetConnectAttr(obj->con, SQL_ATTR_AUTOCOMMIT, (void *)SQL_AUTOCOMMIT_ON, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SetConnectAttr (Autocommit) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
+ }
+ }
+
+ /* Set the isolation property */
+ if (SQLSetConnectAttr(obj->con, SQL_ATTR_TXN_ISOLATION, (void *)(long)obj->parent->isolation, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SetConnectAttr (Txn isolation) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
}
if (obj && ast_test_flag(&flags, RES_ODBC_SANITY_CHECK)) {
@@ -1230,6 +1388,9 @@
AST_APP_ARG(opt);
);
struct odbc_txn_frame *tx;
+ SQLINTEGER nativeerror=0, numfields=0;
+ SQLSMALLINT diagbytes=0, i;
+ unsigned char state[10], diagnostic[256];
AST_STANDARD_APP_ARGS(args, s);
if (strcasecmp(args.property, "transaction") == 0) {
@@ -1274,8 +1435,18 @@
} else {
tx = find_transaction(chan, NULL, args.opt, 0);
}
- if (isolation == -1) {
+ if (isolation == 0) {
ast_log(LOG_ERROR, "Invalid isolation specification: '%s'\n", S_OR(value, ""));
+ } else if (SQLSetConnectAttr(tx->obj->con, SQL_ATTR_TXN_ISOLATION, (void *)(long)isolation, 0) == SQL_ERROR) {
+ SQLGetDiagField(SQL_HANDLE_DBC, tx->obj->con, 1, SQL_DIAG_NUMBER, &numfields, SQL_IS_INTEGER, &diagbytes);
+ for (i = 0; i < numfields; i++) {
+ SQLGetDiagRec(SQL_HANDLE_DBC, tx->obj->con, i + 1, state, &nativeerror, diagnostic, sizeof(diagnostic), &diagbytes);
+ ast_log(LOG_WARNING, "SetConnectAttr (Txn isolation) returned an error: %s: %s\n", state, diagnostic);
+ if (i > 10) {
+ ast_log(LOG_WARNING, "Oh, that was good. There are really %d diagnostics?\n", (int)numfields);
+ break;
+ }
+ }
} else {
tx->isolation = isolation;
}
More information about the asterisk-commits
mailing list