mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 10:47:18 +00:00 
			
		
		
		
	Merge "res/res_corosync: Raise a Stasis message on node join/leave events"
This commit is contained in:
		| @@ -58,8 +58,10 @@ enum ast_event_type { | ||||
| 	AST_EVENT_ACL_CHANGE          = 0x0b, | ||||
| 	/*! Send out a ping for debugging distributed events */ | ||||
| 	AST_EVENT_PING                = 0x0c, | ||||
| 	/*! A cluster discovery message */ | ||||
| 	AST_EVENT_CLUSTER_DISCOVERY   = 0x0d, | ||||
| 	/*! Number of event types.  This should be the last event type + 1 */ | ||||
| 	AST_EVENT_TOTAL               = 0x0d, | ||||
| 	AST_EVENT_TOTAL               = 0x0e, | ||||
| }; | ||||
|  | ||||
| /*! \brief Event Information Element types */ | ||||
| @@ -302,8 +304,15 @@ enum ast_event_ie_type { | ||||
| 	 * Payload type: UINT | ||||
| 	 */ | ||||
| 	AST_EVENT_IE_CACHABLE            = 0x003d, | ||||
|  | ||||
| 	/*! | ||||
| 	 * \brief Cluster node ID | ||||
| 	 * Used by: Corosync | ||||
| 	 * Payload type: UINT | ||||
| 	 */ | ||||
| 	AST_EVENT_IE_NODE_ID             = 0x003e, | ||||
| 	/*! \brief Must be the last IE value +1 */ | ||||
| 	AST_EVENT_IE_TOTAL               = 0x003e, | ||||
| 	AST_EVENT_IE_TOTAL               = 0x003f, | ||||
| }; | ||||
|  | ||||
| /*! | ||||
|   | ||||
| @@ -121,6 +121,12 @@ struct stasis_message_type *ast_cc_failure_type(void); | ||||
|  */ | ||||
| struct stasis_message_type *ast_cc_monitorfailed_type(void); | ||||
|  | ||||
| /*! | ||||
|  * \brief A \ref stasis_message_type for Cluster discovery | ||||
|  * \since 13.11.0 | ||||
|  */ | ||||
| struct stasis_message_type *ast_cluster_discovery_type(void); | ||||
|  | ||||
| /*! | ||||
|  * \brief Initialize the stasis system topic and message types | ||||
|  * \retval 0 on success | ||||
|   | ||||
| @@ -115,6 +115,7 @@ STASIS_MESSAGE_TYPE_DEFN(ast_cc_failure_type, | ||||
| STASIS_MESSAGE_TYPE_DEFN(ast_cc_monitorfailed_type, | ||||
| 	.to_ami = cc_monitorfailed_to_ami, | ||||
| 	); | ||||
| STASIS_MESSAGE_TYPE_DEFN(ast_cluster_discovery_type); | ||||
|  | ||||
| void ast_system_publish_registry(const char *channeltype, const char *username, const char *domain, const char *status, const char *cause) | ||||
| { | ||||
| @@ -362,6 +363,7 @@ static void stasis_system_cleanup(void) | ||||
| 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_recallcomplete_type); | ||||
| 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_failure_type); | ||||
| 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_monitorfailed_type); | ||||
| 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cluster_discovery_type); | ||||
| } | ||||
|  | ||||
| /*! \brief Initialize the system level items for \ref stasis */ | ||||
| @@ -422,5 +424,9 @@ int ast_stasis_system_init(void) | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	if (STASIS_MESSAGE_TYPE_INIT(ast_cluster_discovery_type) != 0) { | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	return 0; | ||||
| } | ||||
|   | ||||
| @@ -47,11 +47,16 @@ ASTERISK_REGISTER_FILE(); | ||||
| #include "asterisk/app.h" | ||||
| #include "asterisk/stasis.h" | ||||
| #include "asterisk/stasis_message_router.h" | ||||
| #include "asterisk/stasis_system.h" | ||||
|  | ||||
| AST_RWLOCK_DEFINE_STATIC(event_types_lock); | ||||
|  | ||||
| static void publish_mwi_to_stasis(struct ast_event *event); | ||||
| static void publish_device_state_to_stasis(struct ast_event *event); | ||||
| static void publish_cluster_discovery_to_stasis(struct ast_event *event); | ||||
|  | ||||
| /*! \brief All the nodes that we're aware of */ | ||||
| static struct ao2_container *nodes; | ||||
|  | ||||
| /*! \brief The internal topic used for message forwarding and pings */ | ||||
| static struct stasis_topic *corosync_aggregate_topic; | ||||
| @@ -65,6 +70,78 @@ static struct stasis_topic *corosync_topic(void) | ||||
| 	return corosync_aggregate_topic; | ||||
| } | ||||
|  | ||||
| struct corosync_node { | ||||
| 	/*! The corosync ID */ | ||||
| 	int id; | ||||
| 	/*! The Asterisk EID */ | ||||
| 	struct ast_eid eid; | ||||
| 	/*! The IP address of the node */ | ||||
| 	struct ast_sockaddr addr; | ||||
| }; | ||||
|  | ||||
| static struct corosync_node *corosync_node_alloc(struct ast_event *event) | ||||
| { | ||||
| 	struct corosync_node *node; | ||||
|  | ||||
| 	node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK); | ||||
| 	if (!node) { | ||||
| 		return NULL; | ||||
| 	} | ||||
|  | ||||
| 	memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid)); | ||||
| 	node->id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID); | ||||
| 	ast_sockaddr_parse(&node->addr, ast_event_get_ie_str(event, AST_EVENT_IE_LOCAL_ADDR), PARSE_PORT_IGNORE); | ||||
|  | ||||
| 	return node; | ||||
| } | ||||
|  | ||||
| static int corosync_node_hash_fn(const void *obj, const int flags) | ||||
| { | ||||
| 	const struct corosync_node *node; | ||||
| 	const int *id; | ||||
|  | ||||
| 	switch (flags & OBJ_SEARCH_MASK) { | ||||
| 	case OBJ_SEARCH_KEY: | ||||
| 		id = obj; | ||||
| 		break; | ||||
| 	case OBJ_SEARCH_OBJECT: | ||||
| 		node = obj; | ||||
| 		id = &node->id; | ||||
| 		break; | ||||
| 	default: | ||||
| 		ast_assert(0); | ||||
| 		return 0; | ||||
| 	} | ||||
| 	return *id; | ||||
| } | ||||
|  | ||||
| static int corosync_node_cmp_fn(void *obj, void *arg, int flags) | ||||
| { | ||||
| 	struct corosync_node *left = obj; | ||||
| 	struct corosync_node *right = arg; | ||||
| 	const int *id = arg; | ||||
| 	int cmp; | ||||
|  | ||||
| 	switch (flags & OBJ_SEARCH_MASK) { | ||||
| 	case OBJ_SEARCH_OBJECT: | ||||
| 		id = &right->id; | ||||
| 		/* Fall through */ | ||||
| 	case OBJ_SEARCH_KEY: | ||||
| 		cmp = (left->id == *id); | ||||
| 		break; | ||||
| 	case OBJ_SEARCH_PARTIAL_KEY: | ||||
| 		cmp = (left->id == right->id); | ||||
| 		break; | ||||
| 	default: | ||||
| 		/* Sort can only work on something with a full or partial key. */ | ||||
| 		ast_assert(0); | ||||
| 		cmp = 1; | ||||
| 		break; | ||||
| 	} | ||||
| 	return cmp ? CMP_MATCH : 0; | ||||
| } | ||||
|  | ||||
|  | ||||
| /*! \brief A payload wrapper around a corosync ping event */ | ||||
| struct corosync_ping_payload { | ||||
| 	/*! The corosync ping event being passed over \ref stasis */ | ||||
| @@ -167,6 +244,12 @@ static struct { | ||||
| 	                     .topic_fn = corosync_topic, | ||||
| 	                     .message_type_fn = corosync_ping_message_type, | ||||
| 	                     .publish_to_stasis = publish_corosync_ping_to_stasis, }, | ||||
| 	[AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery", | ||||
| 	                                  .publish_default = 1, | ||||
| 	                                  .subscribe_default = 1, | ||||
| 	                                  .topic_fn = ast_system_topic, | ||||
| 	                                  .message_type_fn = ast_cluster_discovery_type, | ||||
| 	                                  .publish_to_stasis = publish_cluster_discovery_to_stasis, }, | ||||
| }; | ||||
|  | ||||
| static struct { | ||||
| @@ -197,6 +280,97 @@ static corosync_cfg_callbacks_t cfg_callbacks = { | ||||
| 	.corosync_cfg_shutdown_callback = cfg_shutdown_cb, | ||||
| }; | ||||
|  | ||||
| /*! \brief Publish cluster discovery to \ref stasis */ | ||||
| static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined) | ||||
| { | ||||
| 	struct ast_json *json; | ||||
| 	struct ast_json_payload *payload; | ||||
| 	struct stasis_message *message; | ||||
| 	char eid[18]; | ||||
| 	const char *addr; | ||||
|  | ||||
| 	ast_eid_to_str(eid, sizeof(eid), &node->eid); | ||||
| 	addr = ast_sockaddr_stringify_addr(&node->addr); | ||||
|  | ||||
| 	ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n", | ||||
| 		node->id, | ||||
| 		eid, | ||||
| 		addr, | ||||
| 		joined ? "joined" : "left"); | ||||
|  | ||||
| 	json = ast_json_pack("{s: s, s: i, s: s, s: i}", | ||||
| 		"address", addr, | ||||
| 		"node_id", node->id, | ||||
| 		"eid", eid, | ||||
| 		"joined", joined); | ||||
| 	if (!json) { | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	payload = ast_json_payload_create(json); | ||||
| 	if (!payload) { | ||||
| 		ast_json_unref(json); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	message = stasis_message_create(ast_cluster_discovery_type(), payload); | ||||
| 	if (!message) { | ||||
| 		ast_json_unref(json); | ||||
| 		ao2_ref(payload, -1); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	stasis_publish(ast_system_topic(), message); | ||||
| 	ast_json_unref(json); | ||||
| 	ao2_ref(payload, -1); | ||||
| 	ao2_ref(message, -1); | ||||
| } | ||||
|  | ||||
| static void send_cluster_notify(void); | ||||
|  | ||||
| /*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */ | ||||
| static void publish_cluster_discovery_to_stasis(struct ast_event *event) | ||||
| { | ||||
| 	struct corosync_node *node; | ||||
| 	int id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID); | ||||
| 	struct ast_eid *event_eid; | ||||
|  | ||||
| 	ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY); | ||||
|  | ||||
| 	event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID); | ||||
| 	if (!ast_eid_cmp(&ast_eid_default, event_eid)) { | ||||
| 		/* Don't feed events back in that originated locally. */ | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	ao2_lock(nodes); | ||||
| 	node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK); | ||||
| 	if (node) { | ||||
| 		/* We already know about this node */ | ||||
| 		ao2_unlock(nodes); | ||||
| 		ao2_ref(node, -1); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	node = corosync_node_alloc(event); | ||||
| 	if (!node) { | ||||
| 		ao2_unlock(nodes); | ||||
| 		return; | ||||
| 	} | ||||
| 	ao2_link_flags(nodes, node, OBJ_NOLOCK); | ||||
| 	ao2_unlock(nodes); | ||||
|  | ||||
| 	publish_cluster_discovery_to_stasis_full(node, 1); | ||||
|  | ||||
| 	ao2_ref(node, -1); | ||||
|  | ||||
| 	/* | ||||
| 	 * When we get news that someone else has joined, we need to let them | ||||
| 	 * know we exist as well. | ||||
| 	 */ | ||||
| 	send_cluster_notify(); | ||||
| } | ||||
|  | ||||
| /*! \brief Publish a received MWI \ref ast_event to \ref stasis */ | ||||
| static void publish_mwi_to_stasis(struct ast_event *event) | ||||
| { | ||||
| @@ -228,7 +402,7 @@ static void publish_mwi_to_stasis(struct ast_event *event) | ||||
|  | ||||
| 	if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs, | ||||
| 	                               (int)old_msgs, NULL, event_eid)) { | ||||
| 		char eid[16]; | ||||
| 		char eid[18]; | ||||
| 		ast_eid_to_str(eid, sizeof(eid), event_eid); | ||||
| 		ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n", | ||||
| 			mailbox, context, eid); | ||||
| @@ -255,7 +429,7 @@ static void publish_device_state_to_stasis(struct ast_event *event) | ||||
| 	} | ||||
|  | ||||
| 	if (ast_publish_device_state_full(device, state, cachable, event_eid)) { | ||||
| 		char eid[16]; | ||||
| 		char eid[18]; | ||||
| 		ast_eid_to_str(eid, sizeof(eid), event_eid); | ||||
| 		ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n", | ||||
| 			device, eid); | ||||
| @@ -342,10 +516,27 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam | ||||
| 	publish_handler(event); | ||||
| } | ||||
|  | ||||
| static void publish_to_corosync(struct stasis_message *message) | ||||
| static void publish_event_to_corosync(struct ast_event *event) | ||||
| { | ||||
| 	cs_error_t cs_err; | ||||
| 	struct iovec iov; | ||||
|  | ||||
| 	iov.iov_base = (void *)event; | ||||
| 	iov.iov_len = ast_event_get_size(event); | ||||
|  | ||||
| 	ast_debug(5, "Publishing event %s (%u) to corosync\n", | ||||
| 		ast_event_get_type_name(event), ast_event_get_type(event)); | ||||
|  | ||||
| 	/* The stasis subscription will only exist if we are configured to publish | ||||
| 	 * these events, so just send away. */ | ||||
| 	if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { | ||||
| 		ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n", | ||||
| 			cs_err, ast_event_get_type_name(event), ast_event_get_type(event)); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| static void publish_to_corosync(struct stasis_message *message) | ||||
| { | ||||
| 	struct ast_event *event; | ||||
|  | ||||
| 	event = stasis_message_to_event(message); | ||||
| @@ -368,17 +559,7 @@ static void publish_to_corosync(struct stasis_message *message) | ||||
| 		ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf); | ||||
| 	} | ||||
|  | ||||
| 	iov.iov_base = (void *)event; | ||||
| 	iov.iov_len = ast_event_get_size(event); | ||||
|  | ||||
| 	ast_debug(5, "Publishing event %s (%u) to corosync\n", | ||||
| 		ast_event_get_type_name(event), ast_event_get_type(event)); | ||||
|  | ||||
| 	/* The stasis subscription will only exist if we are configured to publish | ||||
| 	 * these events, so just send away. */ | ||||
| 	if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) { | ||||
| 		ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err); | ||||
| 	} | ||||
| 	publish_event_to_corosync(event); | ||||
| } | ||||
|  | ||||
| static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) | ||||
| @@ -410,9 +591,22 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam | ||||
| { | ||||
| 	unsigned int i; | ||||
|  | ||||
|  | ||||
| 	for (i = 0; i < left_list_entries; i++) { | ||||
| 		const struct cpg_address *cpg_node = &left_list[i]; | ||||
| 		struct corosync_node* node; | ||||
|  | ||||
| 		node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY); | ||||
| 		if (!node) { | ||||
| 			continue; | ||||
| 		} | ||||
|  | ||||
| 		publish_cluster_discovery_to_stasis_full(node, 0); | ||||
| 		ao2_ref(node, -1); | ||||
| 	} | ||||
|  | ||||
| 	/* If any new nodes have joined, dump our cache of events we are publishing | ||||
| 	 * that originated from this server. */ | ||||
|  | ||||
| 	if (!joined_list_entries) { | ||||
| 		return; | ||||
| 	} | ||||
| @@ -442,6 +636,45 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /*! \brief Informs the cluster of our EID and our IP addresses */ | ||||
| static void send_cluster_notify(void) | ||||
| { | ||||
| 	struct ast_event *event; | ||||
| 	unsigned int node_id; | ||||
| 	cs_error_t cs_err; | ||||
| 	corosync_cfg_node_address_t corosync_addr; | ||||
| 	int num_addrs = 0; | ||||
| 	struct sockaddr *sa; | ||||
| 	size_t sa_len; | ||||
| 	char buf[128]; | ||||
| 	int res; | ||||
|  | ||||
| 	if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) { | ||||
| 		ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n"); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) { | ||||
| 		ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n"); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	sa = (struct sockaddr *)corosync_addr.address; | ||||
| 	sa_len = (size_t)corosync_addr.address_length; | ||||
| 	if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) { | ||||
| 		ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n", | ||||
| 			gai_strerror(res), res); | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	event = ast_event_new(AST_EVENT_CLUSTER_DISCOVERY, | ||||
| 				    AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_UINT, node_id, | ||||
| 				    AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf, | ||||
| 				    AST_EVENT_IE_END); | ||||
| 	publish_event_to_corosync(event); | ||||
| 	ast_free(event); | ||||
| } | ||||
|  | ||||
| static void *dispatch_thread_handler(void *data) | ||||
| { | ||||
| 	cs_error_t cs_err; | ||||
| @@ -463,6 +696,7 @@ static void *dispatch_thread_handler(void *data) | ||||
|  | ||||
| 	pfd[2].fd = dispatch_thread.alert_pipe[0]; | ||||
|  | ||||
| 	send_cluster_notify(); | ||||
| 	while (!dispatch_thread.stop) { | ||||
| 		int res; | ||||
|  | ||||
| @@ -530,6 +764,7 @@ static void *dispatch_thread_handler(void *data) | ||||
| 			} | ||||
|  | ||||
| 			ast_log(LOG_NOTICE, "Corosync recovery complete.\n"); | ||||
| 			send_cluster_notify(); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -858,6 +1093,9 @@ static void cleanup_module(void) | ||||
| 		ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err); | ||||
| 	} | ||||
| 	cfg_handle = 0; | ||||
|  | ||||
| 	ao2_cleanup(nodes); | ||||
| 	nodes = NULL; | ||||
| } | ||||
|  | ||||
| static int load_module(void) | ||||
| @@ -865,6 +1103,11 @@ static int load_module(void) | ||||
| 	cs_error_t cs_err; | ||||
| 	struct cpg_name name; | ||||
|  | ||||
| 	nodes = ao2_container_alloc(23, corosync_node_hash_fn, corosync_node_cmp_fn); | ||||
| 	if (!nodes) { | ||||
| 		goto failed; | ||||
| 	} | ||||
|  | ||||
| 	corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic"); | ||||
| 	if (!corosync_aggregate_topic) { | ||||
| 		ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n"); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user