mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-08-13 17:38:59 +00:00
refactor db stuff to use single string dsn to avoid code duplication and introduce switch_sql_queue_manager api to create transactional sql queues to aggregate a bunch of sql stmts into transactions
This commit is contained in:
@@ -175,106 +175,33 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user
|
||||
|
||||
}
|
||||
|
||||
|
||||
#define SWITCH_CORE_DB "core"
|
||||
/*!
|
||||
\brief Open the default system database
|
||||
*/
|
||||
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
|
||||
{
|
||||
switch_cache_db_connection_options_t options = { {0} };
|
||||
switch_status_t r;
|
||||
char *dsn;
|
||||
|
||||
if (!sql_manager.manage) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (zstr(runtime.odbc_dsn)) {
|
||||
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (runtime.dbname) {
|
||||
options.core_db_options.db_path = runtime.dbname;
|
||||
} else {
|
||||
options.core_db_options.db_path = SWITCH_CORE_DB;
|
||||
}
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
|
||||
|
||||
if (!zstr(runtime.odbc_dsn)) {
|
||||
dsn = runtime.odbc_dsn;
|
||||
} else if (!zstr(runtime.dbname)) {
|
||||
dsn = runtime.dbname;
|
||||
} else {
|
||||
char *dsn;
|
||||
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
|
||||
options.pgsql_options.dsn = (char*)(dsn + 6);
|
||||
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
|
||||
} else {
|
||||
options.odbc_options.dsn = runtime.odbc_dsn;
|
||||
options.odbc_options.user = runtime.odbc_user;
|
||||
options.odbc_options.pass = runtime.odbc_pass;
|
||||
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
|
||||
}
|
||||
dsn = "core";
|
||||
}
|
||||
|
||||
/* I *think* we can do without this now, if not let me know
|
||||
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
|
||||
(*dbh)->io_mutex = sql_manager.io_mutex;
|
||||
}
|
||||
*/
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
#define SWITCH_CORE_PERSIST_DB "core"
|
||||
/*!
|
||||
\brief Open the default system database
|
||||
*/
|
||||
SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
|
||||
{
|
||||
switch_cache_db_connection_options_t options = { {0} };
|
||||
switch_status_t r;
|
||||
if ((r = _switch_cache_db_get_db_handle_dsn(dbh, dsn, file, func, line)) != SWITCH_STATUS_SUCCESS) {
|
||||
*dbh = NULL;
|
||||
}
|
||||
|
||||
if (!sql_manager.manage) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (zstr(runtime.odbc_dsn)) {
|
||||
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (runtime.dbname) {
|
||||
options.core_db_options.db_path = runtime.dbname;
|
||||
} else {
|
||||
options.core_db_options.db_path = SWITCH_CORE_PERSIST_DB;
|
||||
}
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
|
||||
|
||||
} else {
|
||||
char *dsn;
|
||||
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
|
||||
options.pgsql_options.dsn = (char*)(dsn + 6);
|
||||
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
|
||||
} else {
|
||||
options.odbc_options.dsn = runtime.odbc_dsn;
|
||||
options.odbc_options.user = runtime.odbc_user;
|
||||
options.odbc_options.pass = runtime.odbc_pass;
|
||||
|
||||
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
|
||||
}
|
||||
}
|
||||
|
||||
/* I *think* we can do without this now, if not let me know
|
||||
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
|
||||
(*dbh)->io_mutex = sql_manager.io_mutex;
|
||||
}
|
||||
*/
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
#define SQL_CACHE_TIMEOUT 30
|
||||
#define SQL_REG_TIMEOUT 15
|
||||
|
||||
@@ -381,6 +308,55 @@ SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t
|
||||
}
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle_dsn(switch_cache_db_handle_t **dbh, const char *dsn,
|
||||
const char *file, const char *func, int line)
|
||||
{
|
||||
switch_cache_db_connection_options_t connection_options = { {0} };
|
||||
switch_cache_db_handle_type_t type;
|
||||
char tmp[256] = "";
|
||||
char *p;
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
int i;
|
||||
|
||||
if (!strncmp(dsn, "pgsql;", 6)) {
|
||||
type = SCDB_TYPE_PGSQL;
|
||||
connection_options.pgsql_options.dsn = (char *)(dsn + 6);
|
||||
} else if ((!(i = strncmp(dsn, "odbc;", 6))) || strchr(dsn, ':')) {
|
||||
type = SCDB_TYPE_ODBC;
|
||||
|
||||
if (i) {
|
||||
switch_set_string(tmp, dsn);
|
||||
} else {
|
||||
switch_set_string(tmp, dsn+6);
|
||||
}
|
||||
|
||||
connection_options.odbc_options.dsn = tmp;
|
||||
|
||||
if ((p = strchr(tmp, ':'))) {
|
||||
*p++ = '\0';
|
||||
connection_options.odbc_options.user = p;
|
||||
|
||||
if ((p = strchr(connection_options.odbc_options.user, ':'))) {
|
||||
*p++ = '\0';
|
||||
connection_options.odbc_options.pass = p;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} else {
|
||||
type = SCDB_TYPE_CORE_DB;
|
||||
connection_options.core_db_options.db_path = (char *)dsn;
|
||||
}
|
||||
|
||||
status = _switch_cache_db_get_db_handle(dbh, type, &connection_options, file, func, line);
|
||||
|
||||
if (status != SWITCH_STATUS_SUCCESS) *dbh = NULL;
|
||||
|
||||
return status;
|
||||
|
||||
}
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
|
||||
switch_cache_db_handle_type_t type,
|
||||
switch_cache_db_connection_options_t *connection_options,
|
||||
@@ -822,7 +798,12 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_
|
||||
}
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_cache_db_handle_t *dbh, char *sql, uint32_t retries)
|
||||
SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(switch_cache_db_handle_t *dbh,
|
||||
char *sql, uint32_t retries,
|
||||
const char *pre_trans_execute,
|
||||
const char *post_trans_execute,
|
||||
const char *inner_pre_trans_execute,
|
||||
const char *inner_post_trans_execute)
|
||||
{
|
||||
char *errmsg = NULL;
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
@@ -838,10 +819,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
|
||||
|
||||
if (io_mutex) switch_mutex_lock(io_mutex);
|
||||
|
||||
if (!zstr(runtime.core_db_pre_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, runtime.core_db_pre_trans_execute, &errmsg);
|
||||
if (!zstr(pre_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
|
||||
if (errmsg) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", runtime.core_db_pre_trans_execute, errmsg);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
|
||||
free(errmsg);
|
||||
}
|
||||
}
|
||||
@@ -929,10 +910,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
|
||||
}
|
||||
|
||||
|
||||
if (!zstr(runtime.core_db_inner_pre_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, runtime.core_db_inner_pre_trans_execute, &errmsg);
|
||||
if (!zstr(inner_pre_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
|
||||
if (errmsg) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", runtime.core_db_inner_pre_trans_execute, errmsg);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
|
||||
free(errmsg);
|
||||
}
|
||||
}
|
||||
@@ -957,10 +938,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
|
||||
}
|
||||
}
|
||||
|
||||
if (!zstr(runtime.core_db_inner_post_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, runtime.core_db_inner_post_trans_execute, &errmsg);
|
||||
if (!zstr(inner_post_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
|
||||
if (errmsg) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", runtime.core_db_inner_post_trans_execute, errmsg);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
|
||||
free(errmsg);
|
||||
}
|
||||
}
|
||||
@@ -988,10 +969,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
|
||||
break;
|
||||
}
|
||||
|
||||
if (!zstr(runtime.core_db_post_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, runtime.core_db_post_trans_execute, &errmsg);
|
||||
if (!zstr(post_trans_execute)) {
|
||||
switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
|
||||
if (errmsg) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", runtime.core_db_post_trans_execute, errmsg);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
|
||||
free(errmsg);
|
||||
}
|
||||
}
|
||||
@@ -1152,6 +1133,396 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
|
||||
|
||||
struct switch_sql_queue_manager {
|
||||
switch_cache_db_handle_t *event_db;
|
||||
switch_queue_t **sql_queue;
|
||||
uint32_t numq;
|
||||
char *dsn;
|
||||
switch_thread_t *thread;
|
||||
int thread_running;
|
||||
switch_thread_cond_t *cond;
|
||||
switch_mutex_t *cond_mutex;
|
||||
char *pre_trans_execute;
|
||||
char *post_trans_execute;
|
||||
char *inner_pre_trans_execute;
|
||||
char *inner_post_trans_execute;
|
||||
switch_memory_pool_t *pool;
|
||||
};
|
||||
|
||||
static void qm_wake(switch_sql_queue_manager_t *qm)
|
||||
{
|
||||
if (switch_mutex_trylock(qm->cond_mutex) == SWITCH_STATUS_SUCCESS) {
|
||||
switch_thread_cond_signal(qm->cond);
|
||||
switch_mutex_unlock(qm->cond_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
|
||||
{
|
||||
uint32_t ttl = 0;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
ttl += switch_queue_size(qm->sql_queue[i]);
|
||||
}
|
||||
|
||||
return ttl;
|
||||
}
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm)
|
||||
{
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
|
||||
if (qm->thread_running) {
|
||||
qm->thread_running = 0;
|
||||
switch_queue_push(qm->sql_queue[0], NULL);
|
||||
qm_wake(qm);
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
if (qm->thread) {
|
||||
switch_thread_join(&status, qm->thread);
|
||||
qm->thread = NULL;
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm)
|
||||
{
|
||||
switch_threadattr_t *thd_attr;
|
||||
|
||||
if (!qm->thread_running) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
|
||||
switch_threadattr_create(&thd_attr, qm->pool);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
|
||||
switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool);
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
|
||||
{
|
||||
switch_sql_queue_manager_t *qm;
|
||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||
switch_memory_pool_t *pool;
|
||||
void *pop;
|
||||
int i;
|
||||
|
||||
switch_assert(qmp);
|
||||
qm = *qmp;
|
||||
*qmp = NULL;
|
||||
|
||||
switch_switch_sql_queue_manager_stop(qm);
|
||||
|
||||
for(i = 0; i < qm->numq; i++) {
|
||||
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||
switch_safe_free(pop);
|
||||
}
|
||||
}
|
||||
|
||||
pool = qm->pool;
|
||||
switch_core_destroy_memory_pool(&pool);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
|
||||
{
|
||||
|
||||
if (!qm->thread_running) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (sql_manager.thread_running != 1) {
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (pos > qm->numq - 1) {
|
||||
pos = 0;
|
||||
}
|
||||
|
||||
switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
|
||||
qm_wake(qm);
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp,
|
||||
uint32_t numq, const char *dsn,
|
||||
const char *pre_trans_execute,
|
||||
const char *post_trans_execute,
|
||||
const char *inner_pre_trans_execute,
|
||||
const char *inner_post_trans_execute)
|
||||
{
|
||||
switch_memory_pool_t *pool;
|
||||
switch_sql_queue_manager_t *qm;
|
||||
int i;
|
||||
|
||||
if (!numq) numq = 1;
|
||||
|
||||
switch_core_new_memory_pool(&pool);
|
||||
qm = switch_core_alloc(pool, sizeof(*qm));
|
||||
|
||||
qm->pool = pool;
|
||||
qm->numq = numq;
|
||||
qm->dsn = switch_core_strdup(qm->pool, dsn);
|
||||
|
||||
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
|
||||
switch_thread_cond_create(&qm->cond, qm->pool);
|
||||
|
||||
qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
|
||||
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
|
||||
}
|
||||
|
||||
if (pre_trans_execute) {
|
||||
qm->pre_trans_execute = switch_core_strdup(qm->pool, qm->pre_trans_execute);
|
||||
qm->post_trans_execute = switch_core_strdup(qm->pool, qm->post_trans_execute);
|
||||
qm->inner_pre_trans_execute = switch_core_strdup(qm->pool, qm->inner_pre_trans_execute);
|
||||
qm->inner_post_trans_execute = switch_core_strdup(qm->pool, qm->inner_post_trans_execute);
|
||||
}
|
||||
|
||||
*qmp = qm;
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
|
||||
}
|
||||
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
void *pop = NULL;
|
||||
uint32_t iterations = 0;
|
||||
uint8_t trans = 0;
|
||||
uint32_t target = 20000;
|
||||
switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
|
||||
char *tmp, *sqlbuf = (char *) malloc(sql_len);
|
||||
char *sql = NULL, *save_sql = NULL;
|
||||
switch_size_t newlen;
|
||||
int lc = 0, wrote = 0, do_sleep = 1;
|
||||
uint32_t sanity = 120;
|
||||
int auto_pause = 0;
|
||||
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
|
||||
int i;
|
||||
|
||||
switch_assert(sqlbuf);
|
||||
|
||||
while (!qm->event_db) {
|
||||
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
|
||||
break;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
|
||||
switch_yield(500000);
|
||||
sanity--;
|
||||
}
|
||||
|
||||
if (!qm->event_db) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
qm->thread_running = 1;
|
||||
|
||||
switch_mutex_lock(qm->cond_mutex);
|
||||
|
||||
switch (qm->event_db->type) {
|
||||
case SCDB_TYPE_PGSQL:
|
||||
break;
|
||||
case SCDB_TYPE_ODBC:
|
||||
break;
|
||||
case SCDB_TYPE_CORE_DB:
|
||||
{
|
||||
switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
|
||||
switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
|
||||
switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
|
||||
switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
while (qm->thread_running == 1) {
|
||||
int proceed = !!save_sql;
|
||||
|
||||
if (!proceed) {
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
if (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||
if (sql_manager.thread_running != 1) {
|
||||
free(pop);
|
||||
pop = NULL;
|
||||
} else {
|
||||
proceed = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (proceed) {
|
||||
|
||||
if (save_sql) {
|
||||
sql = save_sql;
|
||||
save_sql = NULL;
|
||||
} else if ((sql = (char *) pop)) {
|
||||
pop = NULL;
|
||||
}
|
||||
|
||||
if (sql) {
|
||||
newlen = strlen(sql) + 2;
|
||||
|
||||
if (iterations == 0) {
|
||||
trans = 1;
|
||||
}
|
||||
|
||||
if (len + newlen + 1 > sql_len) {
|
||||
int new_mlen = len + newlen + 10240;
|
||||
|
||||
if (new_mlen < runtime.max_sql_buffer_len) {
|
||||
sql_len = new_mlen;
|
||||
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
|
||||
"REALLOC QUEUE %ld %d %d\n",
|
||||
sql_len,
|
||||
i,
|
||||
switch_queue_size(qm->sql_queue[i]));
|
||||
|
||||
}
|
||||
}
|
||||
if (!(tmp = realloc(sqlbuf, sql_len))) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
|
||||
abort();
|
||||
break;
|
||||
}
|
||||
sqlbuf = tmp;
|
||||
} else {
|
||||
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
|
||||
"SAVE QUEUE %d %d\n",
|
||||
i,
|
||||
switch_queue_size(qm->sql_queue[i]));
|
||||
|
||||
}
|
||||
}
|
||||
save_sql = sql;
|
||||
sql = NULL;
|
||||
lc = 0;
|
||||
goto skip;
|
||||
}
|
||||
}
|
||||
|
||||
iterations++;
|
||||
sprintf(sqlbuf + len, "%s;\n", sql);
|
||||
len += newlen;
|
||||
free(sql);
|
||||
sql = NULL;
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
lc = qm_ttl(qm);
|
||||
|
||||
|
||||
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
|
||||
if (!auto_pause) {
|
||||
auto_pause = 1;
|
||||
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
|
||||
auto_pause = 1;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
|
||||
}
|
||||
} else {
|
||||
if (auto_pause && lc < 1000) {
|
||||
auto_pause = 0;
|
||||
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
|
||||
auto_pause = 0;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
|
||||
}
|
||||
}
|
||||
|
||||
skip:
|
||||
|
||||
wrote = 0;
|
||||
|
||||
if (trans && iterations && (iterations > target || !lc)) {
|
||||
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
|
||||
for (i = 0; i < qm->numq; i++) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
|
||||
"RUN QUEUE %d %d %d\n",
|
||||
i,
|
||||
switch_queue_size(qm->sql_queue[i]),
|
||||
iterations);
|
||||
}
|
||||
}
|
||||
if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
|
||||
qm->pre_trans_execute,
|
||||
qm->post_trans_execute,
|
||||
qm->inner_pre_trans_execute,
|
||||
qm->inner_post_trans_execute
|
||||
) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
|
||||
}
|
||||
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
|
||||
}
|
||||
|
||||
|
||||
iterations = 0;
|
||||
trans = 0;
|
||||
len = 0;
|
||||
*sqlbuf = '\0';
|
||||
lc = 0;
|
||||
if (do_sleep) {
|
||||
switch_yield(200000);
|
||||
} else {
|
||||
switch_yield(1000);
|
||||
}
|
||||
wrote = 1;
|
||||
}
|
||||
|
||||
lc = qm_ttl(qm);
|
||||
|
||||
if (!lc) {
|
||||
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
|
||||
} else if (wrote) {
|
||||
if (lc > 2000) {
|
||||
do_sleep = 0;
|
||||
} else {
|
||||
do_sleep = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch_mutex_unlock(qm->cond_mutex);
|
||||
|
||||
for(i = 0; i < qm->numq; i++) {
|
||||
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
|
||||
switch_safe_free(pop);
|
||||
}
|
||||
}
|
||||
|
||||
free(sqlbuf);
|
||||
|
||||
qm->thread_running = 0;
|
||||
|
||||
switch_cache_db_release_db_handle(&qm->event_db);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
void *pop = NULL;
|
||||
@@ -1300,7 +1671,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
|
||||
switch_queue_size(sql_manager.sql_queue[3]),
|
||||
iterations);
|
||||
}
|
||||
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
|
||||
if (switch_cache_db_persistant_execute_trans_full(sql_manager.event_db, sqlbuf, 1,
|
||||
runtime.core_db_pre_trans_execute,
|
||||
runtime.core_db_post_trans_execute,
|
||||
runtime.core_db_inner_pre_trans_execute,
|
||||
runtime.core_db_inner_post_trans_execute
|
||||
) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
|
||||
}
|
||||
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
|
||||
@@ -2250,6 +2626,19 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
|
||||
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
|
||||
{
|
||||
return sql_manager.event_db->type;
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
|
||||
{
|
||||
if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch_queue_push(sql_manager.sql_queue[3], strdup(sql));
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
|
||||
{
|
||||
@@ -2454,8 +2843,6 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
|
||||
|
||||
if (runtime.odbc_dsn) {
|
||||
runtime.odbc_dsn = NULL;
|
||||
runtime.odbc_user = NULL;
|
||||
runtime.odbc_pass = NULL;
|
||||
runtime.odbc_dbtype = DBTYPE_DEFAULT;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
|
||||
goto top;
|
||||
@@ -2539,8 +2926,6 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
|
||||
|
||||
if (err) {
|
||||
runtime.odbc_dsn = NULL;
|
||||
runtime.odbc_user = NULL;
|
||||
runtime.odbc_pass = NULL;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling non-SQLite support; using SQLite\n");
|
||||
switch_cache_db_release_db_handle(&sql_manager.dbh);
|
||||
free(err);
|
||||
@@ -2709,8 +3094,6 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
|
||||
|
||||
if (runtime.odbc_dsn) {
|
||||
runtime.odbc_dsn = NULL;
|
||||
runtime.odbc_user = NULL;
|
||||
runtime.odbc_pass = NULL;
|
||||
runtime.odbc_dbtype = DBTYPE_DEFAULT;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
|
||||
sql_manager.dbh = NULL;
|
||||
|
Reference in New Issue
Block a user