diff --git a/res/res_corosync.c b/res/res_corosync.c index 4e96bff8dc..cf1fbc1995 100644 --- a/res/res_corosync.c +++ b/res/res_corosync.c @@ -48,13 +48,21 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/stasis.h" #include "asterisk/stasis_message_router.h" #include "asterisk/stasis_system.h" +#include "asterisk/taskprocessor.h" AST_RWLOCK_DEFINE_STATIC(event_types_lock); +AST_RWLOCK_DEFINE_STATIC(init_cpg_lock); + +/*! \brief Timeout for Corosync's poll process */ +#define COROSYNC_POLL_TIMEOUT (10 * 1000) static void publish_mwi_to_stasis(struct ast_event *event); static void publish_device_state_to_stasis(struct ast_event *event); static void publish_cluster_discovery_to_stasis(struct ast_event *event); +/*! \brief Join to corosync */ +static int corosync_node_joined = 0; + /*! \brief All the nodes that we're aware of */ static struct ao2_container *nodes; @@ -199,6 +207,7 @@ static void publish_corosync_ping_to_stasis(struct ast_event *event) { struct corosync_ping_payload *payload; struct stasis_message *message; + struct ast_eid *event_eid; ast_assert(ast_event_get_type(event) == AST_EVENT_PING); ast_assert(event != NULL); @@ -211,7 +220,10 @@ static void publish_corosync_ping_to_stasis(struct ast_event *event) if (!payload) { return; } - payload->event = event; + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + payload->event = ast_event_new(AST_EVENT_PING, + AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid), + AST_EVENT_IE_END); message = stasis_message_create(corosync_ping_message_type(), payload); if (!message) { @@ -347,7 +359,7 @@ static void publish_cluster_discovery_to_stasis(struct ast_event *event) ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY); event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); - if (!ast_eid_cmp(&ast_eid_default, event_eid)) { + if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) { /* Don't feed events back in that originated locally. */ return; } @@ -477,6 +489,7 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam struct ast_event *event; void (*publish_handler)(struct ast_event *) = NULL; enum ast_event_type event_type; + struct ast_eid *event_eid; if (msg_len < ast_event_minimum_length()) { ast_debug(1, "Ignoring event that's too small. %u < %u\n", @@ -485,7 +498,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam return; } - if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) { + event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID); + if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) { /* Don't feed events back in that originated locally. */ return; } @@ -497,14 +511,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam } ast_rwlock_rdlock(&event_types_lock); + ast_debug(5, "cpg_deliver_cb rdlock\n"); publish_handler = event_types[event_type].publish_to_stasis; if (!event_types[event_type].subscribe || !publish_handler) { /* We are not configured to subscribe to these events or we have no way to publish it internally. */ ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_deliver_cb unlock\n"); return; } ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_deliver_cb unlock\n"); if (!(event = ast_malloc(msg_len))) { return; @@ -516,13 +533,14 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam const struct ast_eid *eid; char buf[128] = ""; - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf); } ast_debug(5, "Publishing event %s (%u) to stasis\n", ast_event_get_type_name(event), event_type); publish_handler(event); + ast_free(event); } static void publish_event_to_corosync(struct ast_event *event) @@ -538,22 +556,32 @@ static void publish_event_to_corosync(struct ast_event *event) /* The stasis subscription will only exist if we are configured to publish * these events, so just send away. */ - if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { - ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n", - cs_err, ast_event_get_type_name(event), ast_event_get_type(event)); + if (corosync_node_joined && !ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "publish_event_to_corosync rdlock\n"); + if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { + ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n", + cs_err, ast_event_get_type_name(event), ast_event_get_type(event)); + } + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "publish_event_to_corosync unlock\n"); + } else { + ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n", + ast_event_get_type_name(event), ast_event_get_type(event)); } } static void publish_to_corosync(struct stasis_message *message) { struct ast_event *event; + struct ast_eid *event_eid; event = stasis_message_to_event(message); if (!event) { return; } - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) { + event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) { /* If the event didn't originate from this server, don't send it back out. */ ast_event_destroy(event); return; @@ -563,12 +591,13 @@ static void publish_to_corosync(struct stasis_message *message) const struct ast_eid *eid; char buf[128] = ""; - eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID); + eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid); ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf); } publish_event_to_corosync(event); + ast_event_destroy(event); } static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) @@ -593,6 +622,30 @@ static int dump_cache_cb(void *obj, void *arg, int flags) return 0; } +static int clear_node_cache(void *obj, void *arg, int flags) +{ + struct stasis_message *cached_msg = obj; + struct stasis_topic *topic = arg; + struct stasis_message *msg; + struct ast_eid *msg_eid; + + if (!cached_msg) { + return 0; + } + + msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg); + if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid)) + { + msg = stasis_cache_clear_create(cached_msg); + if (msg) { + stasis_publish(topic, msg); + ao2_cleanup(msg); + } + } + + return 0; +} + static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, @@ -604,12 +657,43 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam for (i = 0; i < left_list_entries; i++) { const struct cpg_address *cpg_node = &left_list[i]; struct corosync_node* node; + unsigned int j; node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY); if (!node) { continue; } + for (j = 0; j < ARRAY_LEN(event_types); j++) { + struct ao2_container *messages; + int messages_count; + + ast_rwlock_rdlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb rdlock\n"); + if (!event_types[j].subscribe) { + ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); + continue; + } + + if (!event_types[j].cache_fn || !event_types[j].message_type_fn) { + ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); + continue; + } + ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); + + messages = stasis_cache_dump_by_eid(event_types[j].cache_fn(), event_types[j].message_type_fn(), &node->eid); + + messages_count = ao2_container_count(messages); + ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id); + ao2_callback(messages, OBJ_NODATA, clear_node_cache, event_types[j].topic_fn()); + ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name); + + ao2_t_ref(messages, -1, "Dispose of flushed cache"); + } + publish_cluster_discovery_to_stasis_full(node, 0); ao2_ref(node, -1); } @@ -622,24 +706,30 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam for (i = 0; i < ARRAY_LEN(event_types); i++) { struct ao2_container *messages; + int messages_count; ast_rwlock_rdlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb rdlock\n"); if (!event_types[i].publish) { ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); continue; } if (!event_types[i].cache_fn || !event_types[i].message_type_fn) { ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); continue; } - - messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(), - event_types[i].message_type_fn(), - &ast_eid_default); ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cpg_confchg_cb unlock\n"); + messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(), event_types[i].message_type_fn(), &ast_eid_default); + + messages_count = ao2_container_count(messages); + ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name); ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL); + ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name); ao2_t_ref(messages, -1, "Dispose of dumped cache"); } @@ -658,14 +748,21 @@ static void send_cluster_notify(void) char buf[128]; int res; - if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) { - ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n"); - return; - } + if (!ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "send_cluster_notify rdlock\n"); - if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) { - ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n"); - return; + if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) { + ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n"); + return; + } + + if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) { + ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n"); + return; + } + + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "send_cluster_notify unlock\n"); } sa = (struct sockaddr *)corosync_addr.address; @@ -681,7 +778,7 @@ static void send_cluster_notify(void) AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf, AST_EVENT_IE_END); publish_event_to_corosync(event); - ast_free(event); + ast_event_destroy(event); } static void *dispatch_thread_handler(void *data) @@ -693,18 +790,29 @@ static void *dispatch_thread_handler(void *data) { .events = POLLIN, }, }; - if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n"); + if (!ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "dispatch_thread_handler rdlock\n"); + if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n"); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + return NULL; + } + + if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n"); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + return NULL; + } + + pfd[2].fd = dispatch_thread.alert_pipe[0]; + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + } else { + ast_log(LOG_ERROR, "Failed to get fd: initiliazing CPG. This module is now broken.\n"); return NULL; } - - if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n"); - return NULL; - } - - pfd[2].fd = dispatch_thread.alert_pipe[0]; - send_cluster_notify(); while (!dispatch_thread.stop) { int res; @@ -715,65 +823,141 @@ static void *dispatch_thread_handler(void *data) pfd[1].revents = 0; pfd[2].revents = 0; - res = ast_poll(pfd, ARRAY_LEN(pfd), -1); + res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT); if (res == -1 && errno != EINTR && errno != EAGAIN) { ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno); - continue; - } + cs_err = CS_ERR_BAD_HANDLE; + } else if (res == 0) { + unsigned int local_nodeid; - if (pfd[0].revents & POLLIN) { - if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err); + if (!ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "dispatch_thread_handler rdlock\n"); + if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) { + struct cpg_name name; + struct cpg_address address[CPG_MEMBERS_MAX]; + int entries = CPG_MEMBERS_MAX; + + ast_copy_string(name.value, "asterisk", sizeof(name.value)); + name.length = strlen(name.value); + if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) { + int i; + int found = 0; + + ast_debug(1, "CPG group has %i node membership\n", entries); + for (i = 0; (i < entries) && !found; i++) { + if (address[i].nodeid == local_nodeid) + found = 1; + } + if (!found) { + ast_log(LOG_WARNING, "Failed to check CPG node membership\n"); + corosync_node_joined = 0; + cs_err = CS_ERR_BAD_HANDLE; + } + } else { + ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err); + corosync_node_joined = 0; + cs_err = CS_ERR_BAD_HANDLE; + } + } else { + ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err); + corosync_node_joined = 0; + cs_err = CS_ERR_BAD_HANDLE; + } + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + } else { + ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n"); + corosync_node_joined = 0; + cs_err = CS_ERR_BAD_HANDLE; + } + } else { + if (!ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "dispatch_thread_handler rdlock\n"); + if (pfd[0].revents & POLLIN) { + if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) { + ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err); + } + } + + if (pfd[1].revents & POLLIN) { + if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) { + ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err); + } + } + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + } else { + ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n"); } } - - if (pfd[1].revents & POLLIN) { - if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) { - ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err); - } - } - if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) { - struct cpg_name name; /* If corosync gets restarted out from under Asterisk, try to recover. */ ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n"); - if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err); - sleep(5); - continue; - } + if (!ast_rwlock_trywrlock(&init_cpg_lock)) { + struct cpg_name name; + ast_debug(5, "dispatch_thread_handler wrlock\n"); - if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) { - ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err); - sleep(5); - continue; - } + corosync_node_joined = 0; + if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err); + } - if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to get CPG fd.\n"); - sleep(5); - continue; - } + if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); + } - if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to get CFG fd.\n"); - sleep(5); - continue; - } + if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + sleep(5); + continue; + } - ast_copy_string(name.value, "asterisk", sizeof(name.value)); - name.length = strlen(name.value); - if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err); - sleep(5); - continue; - } + if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) { + ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + sleep(5); + continue; + } - ast_log(LOG_NOTICE, "Corosync recovery complete.\n"); - send_cluster_notify(); + if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CPG fd.\n"); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + sleep(5); + continue; + } + + if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to get CFG fd.\n"); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + sleep(5); + continue; + } + + ast_copy_string(name.value, "asterisk", sizeof(name.value)); + name.length = strlen(name.value); + if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + sleep(5); + continue; + } + corosync_node_joined = 1; + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "dispatch_thread_handler unlock\n"); + ast_log(LOG_NOTICE, "Corosync recovery complete.\n"); + send_cluster_notify(); + } else { + ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n"); + } } } @@ -803,64 +987,74 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_ return CLI_SHOWUSAGE; } - cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter); + if (!ast_rwlock_tryrdlock(&init_cpg_lock)) { + ast_debug(5, "corosync_show_members rdlock\n"); + cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter); - if (cs_err != CS_OK) { - ast_cli(a->fd, "Failed to initialize CPG iterator.\n"); - return CLI_FAILURE; - } - - ast_cli(a->fd, "\n" - "=============================================================\n" - "=== Cluster members =========================================\n" - "=============================================================\n" - "===\n"); - - for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc); - cs_err == CS_OK; - cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) { -#ifdef HAVE_COROSYNC_CFG_STATE_TRACK - corosync_cfg_node_address_t addrs[8]; - int num_addrs = 0; - unsigned int j; -#endif - - ast_cli(a->fd, "=== Node %u\n", i); - ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value); - -#ifdef HAVE_COROSYNC_CFG_STATE_TRACK - /* - * Corosync 2.x cfg lib needs to allocate 1M on stack after calling - * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M - * resulting in crash. - */ - cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid, - ARRAY_LEN(addrs), &num_addrs, addrs); if (cs_err != CS_OK) { - ast_log(LOG_WARNING, "Failed to get node addresses\n"); - continue; + ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err); + cpg_iteration_finalize(cpg_iter); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "corosync_show_members unlock\n"); + return CLI_FAILURE; } - for (j = 0; j < num_addrs; j++) { - struct sockaddr *sa = (struct sockaddr *) addrs[j].address; - size_t sa_len = (size_t) addrs[j].address_length; - char buf[128]; + ast_cli(a->fd, "\n" + "=============================================================\n" + "=== Cluster members =========================================\n" + "=============================================================\n" + "===\n"); - getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST); + for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc); + cs_err == CS_OK; + cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) { + #ifdef HAVE_COROSYNC_CFG_STATE_TRACK + corosync_cfg_node_address_t addrs[8]; + int num_addrs = 0; + unsigned int j; + #endif - ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf); + ast_cli(a->fd, "=== Node %u\n", i); + ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value); + + #ifdef HAVE_COROSYNC_CFG_STATE_TRACK + /* + * Corosync 2.x cfg lib needs to allocate 1M on stack after calling + * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M + * resulting in crash. + */ + cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid, + ARRAY_LEN(addrs), &num_addrs, addrs); + if (cs_err != CS_OK) { + ast_log(LOG_WARNING, "Failed to get node addresses\n"); + continue; + } + + for (j = 0; j < num_addrs; j++) { + struct sockaddr *sa = (struct sockaddr *) addrs[j].address; + size_t sa_len = (size_t) addrs[j].address_length; + char buf[128]; + + getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST); + + ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf); + } + #else + ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid); + #endif } -#else - ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid); -#endif + + ast_cli(a->fd, "===\n" + "=============================================================\n" + "\n"); + + cpg_iteration_finalize(cpg_iter); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "corosync_show_members unlock\n"); + } else { + ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n"); } - ast_cli(a->fd, "===\n" - "=============================================================\n" - "\n"); - - cpg_iteration_finalize(cpg_iter); - return CLI_SUCCESS; } @@ -893,10 +1087,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_FAILURE; } - ast_rwlock_rdlock(&event_types_lock); event_types[AST_EVENT_PING].publish_to_stasis(event); - ast_rwlock_unlock(&event_types_lock); + ast_event_destroy(event); return CLI_SUCCESS; } @@ -927,6 +1120,7 @@ static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_c "===\n"); ast_rwlock_rdlock(&event_types_lock); + ast_debug(5, "corosync_show_config rdlock\n"); for (i = 0; i < ARRAY_LEN(event_types); i++) { if (event_types[i].publish) { ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", @@ -938,6 +1132,7 @@ static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_c } } ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "corosync_show_config unlock\n"); ast_cli(a->fd, "===\n" "=============================================================\n" @@ -988,6 +1183,7 @@ static int load_general_config(struct ast_config *cfg) unsigned int i; ast_rwlock_wrlock(&event_types_lock); + ast_debug(5, "load_general_config wrlock\n"); for (i = 0; i < ARRAY_LEN(event_types); i++) { event_types[i].publish = event_types[i].publish_default; @@ -1020,6 +1216,7 @@ static int load_general_config(struct ast_config *cfg) } ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "load_general_config unlock\n"); return res; } @@ -1059,17 +1256,33 @@ static void cleanup_module(void) if (stasis_router) { /* Unsubscribe all topic forwards and cancel all message routes */ - ast_rwlock_wrlock(&event_types_lock); for (i = 0; i < ARRAY_LEN(event_types); i++) { + struct ao2_container *messages = NULL; + int messages_count; + unsigned char subscribe = 0; + + ast_rwlock_wrlock(&event_types_lock); + ast_debug(5, "cleanup_module wrlock\n"); + subscribe = event_types[i].subscribe; + if (event_types[i].sub) { event_types[i].sub = stasis_forward_cancel(event_types[i].sub); - stasis_message_router_remove(stasis_router, - event_types[i].message_type_fn()); + stasis_message_router_remove(stasis_router, event_types[i].message_type_fn()); } event_types[i].publish = 0; event_types[i].subscribe = 0; + ast_rwlock_unlock(&event_types_lock); + ast_debug(5, "cleanup_module unlock\n"); + + if (subscribe && event_types[i].cache_fn && event_types[i].message_type_fn) { + messages = stasis_cache_dump_all(event_types[i].cache_fn(), event_types[i].message_type_fn()); + messages_count = ao2_container_count(messages); + ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name); + ao2_callback(messages, OBJ_NODATA, clear_node_cache, event_types[i].topic_fn()); + ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name); + ao2_t_ref(messages, -1, "Dispose of flushed cache"); + } } - ast_rwlock_unlock(&event_types_lock); stasis_message_router_unsubscribe_and_join(stasis_router); stasis_router = NULL; @@ -1103,16 +1316,21 @@ static void cleanup_module(void) dispatch_thread.alert_pipe[1] = -1; } - if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err); - } - cpg_handle = 0; + if (!ast_rwlock_trywrlock(&init_cpg_lock)) { + ast_debug(5, "cleanup_module wrlock\n"); + if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err); + } + cpg_handle = 0; - if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); + if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); + } + cfg_handle = 0; + corosync_node_joined = 0; + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "cleanup_module unlock\n"); } - cfg_handle = 0; - ao2_cleanup(nodes); nodes = NULL; } @@ -1144,6 +1362,8 @@ static int load_module(void) ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n"); goto failed; } + stasis_message_router_set_congestion_limits(stasis_router, -1, + 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) { ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n"); @@ -1155,39 +1375,55 @@ static int load_module(void) goto failed; } - if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err); + if (!ast_rwlock_trywrlock(&init_cpg_lock)) { + corosync_node_joined = 0; + ast_debug(5, "load_module wrlock\n"); + if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "load_module unlock\n"); + goto failed; + } + + if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "load_module unlock\n"); + goto failed; + } + + ast_copy_string(name.value, "asterisk", sizeof(name.value)); + name.length = strlen(name.value); + + if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { + ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "load_module unlock\n"); + goto failed; + } + + if (pipe(dispatch_thread.alert_pipe) == -1) { + ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n", + strerror(errno), errno); + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "load_module unlock\n"); + goto failed; + } + corosync_node_joined = 1; + + ast_rwlock_unlock(&init_cpg_lock); + ast_debug(5, "load_module unlock\n"); + if (corosync_pthread_create_background(&dispatch_thread.id, NULL, + dispatch_thread_handler, NULL)) { + ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n"); + goto failed; + } + + ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli)); + } else { goto failed; } - if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err); - goto failed; - } - - ast_copy_string(name.value, "asterisk", sizeof(name.value)); - name.length = strlen(name.value); - - if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) { - ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err); - goto failed; - } - - if (pipe(dispatch_thread.alert_pipe) == -1) { - ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n", - strerror(errno), errno); - goto failed; - } - - if (corosync_pthread_create_background(&dispatch_thread.id, NULL, - dispatch_thread_handler, NULL)) { - ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n"); - goto failed; - } - - ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli)); - - return AST_MODULE_LOAD_SUCCESS; failed: