Initial core-pgsql support based on native libpq; FS starts and stops without errors using core-pgsql.

This commit is contained in:
Eliot Gable
2012-09-30 01:48:48 +00:00
parent 0263ce9247
commit 5cb354dddc
17 changed files with 1283 additions and 90 deletions

View File

@@ -190,7 +190,7 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
}
if (zstr(runtime.odbc_dsn)) {
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
return SWITCH_STATUS_FALSE;
}
@@ -202,11 +202,18 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass;
char *dsn;
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
@@ -218,6 +225,56 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
return r;
}
#define SWITCH_CORE_PERSIST_DB "core"
/*!
\brief Open the default system database
*/
SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
return SWITCH_STATUS_FALSE;
}
if (zstr(runtime.odbc_dsn)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
return SWITCH_STATUS_FALSE;
}
if (runtime.dbname) {
options.core_db_options.db_path = runtime.dbname;
} else {
options.core_db_options.db_path = SWITCH_CORE_PERSIST_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
char *dsn;
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
}
*/
return r;
}
#define SWITCH_CORE_RECOVERY_DB "core_recovery"
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
@@ -229,7 +286,7 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_
}
if (zstr(runtime.recovery_odbc_dsn)) {
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
return SWITCH_STATUS_FALSE;
}
@@ -241,11 +298,18 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.recovery_odbc_dsn;
options.odbc_options.user = runtime.recovery_odbc_user;
options.odbc_options.pass = runtime.recovery_odbc_pass;
char *dsn;
if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.recovery_odbc_dsn;
options.odbc_options.user = runtime.recovery_odbc_user;
options.odbc_options.pass = runtime.recovery_odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
@@ -286,6 +350,11 @@ static void sql_close(time_t prune)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Dropping idle DB connection %s\n", dbh->name);
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
switch_pgsql_handle_destroy(&dbh->native_handle.pgsql_dbh);
}
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_handle_destroy(&dbh->native_handle.odbc_dbh);
@@ -373,8 +442,8 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
uint32_t yield_len = 100000, total_yield = 0;
const char *db_name = NULL;
const char *db_user = NULL;
const char *db_pass = NULL;
const char *odbc_user = NULL;
const char *odbc_pass = NULL;
while(runtime.max_db_handles && sql_manager.total_handles >= runtime.max_db_handles && sql_manager.total_used_handles >= sql_manager.total_handles) {
if (!waiting++) {
@@ -393,18 +462,24 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
}
switch (type) {
case SCDB_TYPE_PGSQL:
{
db_name = connection_options->odbc_options.dsn;
odbc_user = NULL;
odbc_pass = NULL;
}
case SCDB_TYPE_ODBC:
{
db_name = connection_options->odbc_options.dsn;
db_user = connection_options->odbc_options.user;
db_pass = connection_options->odbc_options.pass;
odbc_user = connection_options->odbc_options.user;
odbc_pass = connection_options->odbc_options.pass;
}
break;
case SCDB_TYPE_CORE_DB:
{
db_name = connection_options->core_db_options.db_path;
db_user = "";
db_pass = "";
odbc_user = NULL;
odbc_pass = NULL;
}
break;
}
@@ -413,8 +488,11 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
return SWITCH_STATUS_FALSE;
}
snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\";user=\"%s\";pass=\"%s\"", db_name, db_user, db_pass);
if (odbc_user || odbc_pass) {
snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\";user=\"%s\";pass=\"%s\"", db_name, odbc_user, odbc_pass);
} else {
snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\"", db_name);
}
snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
snprintf(thread_str, sizeof(thread_str) - 1, "thread=\"%lu\"", (unsigned long) (intptr_t) self);
@@ -424,8 +502,23 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
} else {
switch_core_db_t *db = NULL;
switch_odbc_handle_t *odbc_dbh = NULL;
switch_pgsql_handle_t *pgsql_dbh = NULL;
switch (type) {
case SCDB_TYPE_PGSQL:
{
if (!switch_pgsql_available()) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! PGSQL NOT AVAILABLE! Can't connect to DSN %s\n", connection_options->pgsql_options.dsn);
goto end;
}
if ((pgsql_dbh = switch_pgsql_handle_new(connection_options->pgsql_options.dsn))) {
if (switch_pgsql_handle_connect(pgsql_dbh) != SWITCH_PGSQL_SUCCESS) {
switch_pgsql_handle_destroy(&pgsql_dbh);
}
}
}
break;
case SCDB_TYPE_ODBC:
{
@@ -454,8 +547,8 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
goto end;
}
if (!db && !odbc_dbh) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure to connect to %s %s!\n", db?"SQLITE":"ODBC", db?connection_options->core_db_options.db_path:connection_options->odbc_options.dsn);
if (!db && !odbc_dbh && !pgsql_dbh) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure to connect to %s %s!\n", (db ? "SQLITE" : (odbc_dbh ? "ODBC" : "PGSQL")), (db ? connection_options->core_db_options.db_path : (odbc_dbh ? connection_options->odbc_options.dsn : connection_options->pgsql_options.dsn)));
goto end;
}
@@ -466,8 +559,10 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
if (db) {
new_dbh->native_handle.core_db_dbh = db;
} else {
} else if (odbc_dbh) {
new_dbh->native_handle.odbc_dbh = odbc_dbh;
} else {
new_dbh->native_handle.pgsql_dbh = pgsql_dbh;
}
add_handle(new_dbh, db_str, db_callsite_str, thread_str);
@@ -499,6 +594,11 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
}
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
status = switch_pgsql_handle_exec(dbh->native_handle.pgsql_dbh, sql, &errmsg);
}
break;
case SCDB_TYPE_ODBC:
{
status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, NULL, &errmsg);
@@ -650,6 +750,11 @@ SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh)
return switch_odbc_handle_affected_rows(dbh->native_handle.odbc_dbh);
}
break;
case SCDB_TYPE_PGSQL:
{
return switch_pgsql_handle_affected_rows(dbh->native_handle.pgsql_dbh);
}
break;
}
return 0;
}
@@ -707,6 +812,11 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t
status = switch_odbc_handle_exec_string(dbh->native_handle.odbc_dbh, sql, str, len, err);
}
break;
case SCDB_TYPE_PGSQL:
{
status = switch_pgsql_handle_exec_string(dbh->native_handle.pgsql_dbh, sql, str, len, err);
}
break;
}
end:
@@ -784,16 +894,34 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
while (begin_retries > 0) {
again = 0;
if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
} else {
switch_odbc_status_t result;
if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
}
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_status_t result;
if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
}
}
break;
case SCDB_TYPE_PGSQL:
{
switch_pgsql_status_t result;
if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
}
}
break;
}
if (errmsg) {
@@ -807,11 +935,25 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
errmsg = NULL;
if (again) {
if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
} else {
switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
}
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
}
break;
case SCDB_TYPE_PGSQL:
{
switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
}
break;
}
goto again;
@@ -868,11 +1010,25 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
done:
if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
} else {
switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
}
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
}
break;
case SCDB_TYPE_PGSQL:
{
switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
}
break;
}
if (!zstr(runtime.core_db_post_trans_execute)) {
@@ -903,6 +1059,11 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
status = switch_pgsql_handle_callback_exec(dbh->native_handle.pgsql_dbh, sql, callback, pdata, err);
}
break;
case SCDB_TYPE_ODBC:
{
status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
@@ -951,6 +1112,17 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
if (io_mutex) switch_mutex_lock(io_mutex);
switch (dbh->type) {
case SCDB_TYPE_PGSQL:
{
if (switch_pgsql_handle_exec(dbh->native_handle.pgsql_dbh, test_sql, NULL) != SWITCH_PGSQL_SUCCESS) {
r = SWITCH_FALSE;
if (drop_sql) {
switch_pgsql_handle_exec(dbh->native_handle.pgsql_dbh, drop_sql, NULL);
}
switch_pgsql_handle_exec(dbh->native_handle.pgsql_dbh, reactive_sql, NULL);
}
}
break;
case SCDB_TYPE_ODBC:
{
if (switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, test_sql, NULL, NULL) != SWITCH_ODBC_SUCCESS) {
@@ -1057,6 +1229,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
switch_mutex_lock(sql_manager.cond_mutex);
switch (sql_manager.event_db->type) {
case SCDB_TYPE_PGSQL:
break;
case SCDB_TYPE_ODBC:
break;
case SCDB_TYPE_CORE_DB:
@@ -2313,7 +2487,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
return SWITCH_STATUS_FALSE;
}
@@ -2336,6 +2510,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n");
switch (sql_manager.dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
char sql[512] = "";
@@ -2376,6 +2551,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch (sql_manager.dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
{
char *err;
@@ -2405,7 +2581,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
runtime.odbc_dsn = NULL;
runtime.odbc_user = NULL;
runtime.odbc_pass = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling ODBC\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling non-SQLite support; using SQLite\n");
switch_cache_db_release_db_handle(&sql_manager.dbh);
free(err);
goto top;
@@ -2536,9 +2712,9 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
int arg = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg);
}
@@ -2562,8 +2738,8 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
goto end;
}