diff --git a/src/mod/applications/mod_voicemail/mod_voicemail.c b/src/mod/applications/mod_voicemail/mod_voicemail.c index b662620682..3d0e48981e 100644 --- a/src/mod/applications/mod_voicemail/mod_voicemail.c +++ b/src/mod/applications/mod_voicemail/mod_voicemail.c @@ -45,6 +45,7 @@ SWITCH_MODULE_DEFINITION(mod_voicemail, mod_voicemail_load, mod_voicemail_shutdo #define VM_EVENT_MAINT "vm::maintenance" #define VM_MAX_GREETINGS 9 +#define VM_EVENT_QUEUE_SIZE 50000 static switch_status_t voicemail_inject(const char *data, switch_core_session_t *session); @@ -53,6 +54,9 @@ static struct { switch_hash_t *profile_hash; int debug; int message_query_exact_match; + int32_t threads; + int32_t running; + switch_queue_t *event_queue; switch_mutex_t *mutex; switch_memory_pool_t *pool; } globals; @@ -3655,7 +3659,7 @@ SWITCH_STANDARD_API(prefs_api_function) } -static void message_query_handler(switch_event_t *event) +static void actual_message_query_handler(switch_event_t *event) { char *account = switch_event_get_header(event, "message-account"); int created = 0; @@ -3727,6 +3731,101 @@ static void message_query_handler(switch_event_t *event) } +static int EVENT_THREAD_RUNNING = 0; +static int EVENT_THREAD_STARTED = 0; + +void *SWITCH_THREAD_FUNC vm_event_thread_run(switch_thread_t *thread, void *obj) +{ + void *pop; + int done = 0; + + switch_mutex_lock(globals.mutex); + if (!EVENT_THREAD_RUNNING) { + EVENT_THREAD_RUNNING++; + globals.threads++; + } else { + done = 1; + } + switch_mutex_unlock(globals.mutex); + + if (done) { + return NULL; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Started\n"); + + while (globals.running == 1) { + int count = 0; + + if (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + switch_event_t *event = (switch_event_t *) pop; + + if (!pop) { + break; + } + actual_message_query_handler(event); + switch_event_destroy(&event); + count++; + } + + if (!count) { + switch_yield(100000); + } + } + + while (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_event_t *event = (switch_event_t *) pop; + switch_event_destroy(&event); + } + + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n"); + + switch_mutex_lock(globals.mutex); + globals.threads--; + EVENT_THREAD_RUNNING = EVENT_THREAD_STARTED = 0; + switch_mutex_unlock(globals.mutex); + + return NULL; +} + +void vm_event_thread_start(void) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + int done = 0; + + switch_mutex_lock(globals.mutex); + if (!EVENT_THREAD_STARTED) { + EVENT_THREAD_STARTED++; + } else { + done = 1; + } + switch_mutex_unlock(globals.mutex); + + if (done) { + return; + } + + switch_threadattr_create(&thd_attr, globals.pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_threadattr_priority_increase(thd_attr); + switch_thread_create(&thread, thd_attr, vm_event_thread_run, NULL, globals.pool); +} + +void vm_event_handler(switch_event_t *event) +{ + switch_event_t *cloned_event; + + switch_event_dup(&cloned_event, event); + switch_assert(cloned_event); + switch_queue_push(globals.event_queue, cloned_event); + + if (!EVENT_THREAD_STARTED) { + vm_event_thread_start(); + } +} struct holder { vm_profile_t *profile; @@ -5502,14 +5601,20 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_voicemail_load) switch_core_hash_init(&globals.profile_hash, globals.pool); switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_lock(globals.mutex); + globals.running = 1; + switch_mutex_unlock(globals.mutex); + + switch_queue_create(&globals.event_queue, VM_EVENT_QUEUE_SIZE, globals.pool); if ((status = load_config()) != SWITCH_STATUS_SUCCESS) { + globals.running = 0; return status; } /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); - if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, message_query_handler, NULL) + if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, vm_event_handler, NULL) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); return SWITCH_STATUS_GENERR; @@ -5554,9 +5659,23 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_voicemail_shutdown) void *val = NULL; const void *key; switch_ssize_t keylen; + int sanity = 0; + + switch_mutex_lock(globals.mutex); + if (globals.running == 1) { + globals.running = 0; + } + switch_mutex_unlock(globals.mutex); switch_event_free_subclass(VM_EVENT_MAINT); - switch_event_unbind_callback(message_query_handler); + switch_event_unbind_callback(vm_event_handler); + + while (globals.threads) { + switch_cond_next(); + if (++sanity >= 60000) { + break; + } + } switch_mutex_lock(globals.mutex); while ((hi = switch_hash_first(NULL, globals.profile_hash))) {