add nitrous oxide tank to core event sql backend

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@1260 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale
2006-04-26 17:18:33 +00:00
parent a70b4a15a6
commit d59cae60c7
8 changed files with 139 additions and 36 deletions

View File

@@ -35,6 +35,7 @@
//#define DEBUG_ALLOC
#define SWITCH_EVENT_QUEUE_LEN 256
#define SWITCH_SQL_QUEUE_LEN 2000
struct switch_core_session {
uint32_t id;
@@ -87,6 +88,7 @@ struct switch_core_runtime {
const struct switch_state_handler_table *state_handlers[SWITCH_MAX_STATE_HANDLERS];
int state_handler_index;
FILE *console;
switch_queue_t *sql_queue;
};
/* Prototypes */
@@ -2092,7 +2094,7 @@ SWITCH_DECLARE(void) switch_core_session_destroy(switch_core_session **session)
switch_event *event;
if (switch_event_create(&event, SWITCH_EVENT_CHANNEL_DESTROY) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(session->channel, event);
switch_channel_event_set_data((*session)->channel, event);
switch_event_fire(&event);
}
@@ -2324,16 +2326,116 @@ SWITCH_DECLARE(switch_core_session *) switch_core_session_request_by_name(char *
return switch_core_session_request(endpoint_interface, pool);
}
static switch_status switch_core_sql_persistant_execute(switch_core_db *db, char *sql, uint32_t retries)
{
char *errmsg;
switch_status status = SWITCH_STATUS_FALSE;
while(retries > 0) {
switch_core_db_exec(
db,
sql,
NULL,
NULL,
&errmsg
);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
switch_core_db_free(errmsg);
switch_yield(100000);
retries--;
} else {
status = SWITCH_STATUS_SUCCESS;
break;
}
}
return status;
}
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread *thread, void *obj)
{
void *pop;
uint32_t itterations = 0;
uint8_t trans = 0;
switch_time_t last_commit = switch_time_now();
uint32_t work = 0, freq = 500, target = 500, diff = 0;
if (!runtime.event_db) {
runtime.event_db = switch_core_db_handle();
}
for(;;) {
if (switch_queue_trypop(runtime.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) {
char *sql = (char *) pop;
if (sql) {
work++;
if (itterations == 0) {
char *isql = "begin transaction CORE1;";
if (switch_core_sql_persistant_execute(runtime.event_db, isql, 25) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", isql);
} else {
trans = 1;
}
}
itterations++;
if (switch_core_sql_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql);
}
free(sql);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
break;
}
}
if (diff < freq) {
diff = (switch_time_now() - last_commit) / 1000;
}
if (trans && (itterations == target || diff >= freq)) {
char *sql = "end transaction CORE1";
work++;
if (switch_core_sql_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql);
}
last_commit = switch_time_now();
itterations = 0;
trans = 0;
diff = 0;
}
if (!work) {
switch_yield(1000);
}
}
return NULL;
}
static void switch_core_sql_thread_launch(void)
{
switch_thread *thread;
switch_threadattr_t *thd_attr;;
assert(runtime.memory_pool != NULL);
switch_threadattr_create(&thd_attr, runtime.memory_pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, switch_core_sql_thread, NULL, runtime.memory_pool);
}
static void core_event_handler(switch_event *event)
{
char buf[1024];
char *sql = NULL;
char *errmsg;
if (!runtime.event_db) {
runtime.event_db = switch_core_db_handle();
}
switch (event->event_id) {
@@ -2419,32 +2521,12 @@ static void core_event_handler(switch_event *event)
default:
//buf[0] = '\0';
//switch_event_serialize(event, buf, sizeof(buf), NULL);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "\nCORE EVENT\n--------------------------------\n%s\n", buf);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nCORE EVENT\n--------------------------------\n%s\n", buf);
break;
}
if (sql) {
uint8_t max = 25;
while(max > 0) {
switch_core_db_exec(
runtime.event_db,
sql,
NULL,
NULL,
&errmsg
);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
switch_core_db_free(errmsg);
switch_yield(100000);
max--;
} else {
break;
}
}
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL [%s]\n", sql);
switch_queue_push(runtime.sql_queue, strdup(sql));
}
}
@@ -2494,6 +2576,7 @@ SWITCH_DECLARE(switch_status) switch_core_init(char *console)
}
assert(runtime.memory_pool != NULL);
switch_log_init(runtime.memory_pool);
switch_core_sql_thread_launch();
if(console) {
if (*console != '/') {
@@ -2507,7 +2590,7 @@ SWITCH_DECLARE(switch_status) switch_core_init(char *console)
}
switch_queue_create(&runtime.sql_queue, SWITCH_SQL_QUEUE_LEN, runtime.memory_pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Allocated memory pool. Sessions are %u bytes\n", sizeof(struct switch_core_session));
@@ -2574,6 +2657,11 @@ SWITCH_DECLARE(switch_status) switch_core_destroy(void)
switch_event_shutdown();
switch_log_shutdown();
switch_queue_push(runtime.sql_queue, NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
while (switch_queue_size(runtime.sql_queue) > 0) {
switch_yield(1000);
}
switch_core_db_close(runtime.db);
if (runtime.memory_pool) {