[asterisk-commits] res/res corosync: Raise a Stasis message on node join/leave ... (asterisk[13])

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Jul 13 18:52:56 CDT 2016


Joshua Colp has submitted this change and it was merged.

Change subject: res/res_corosync: Raise a Stasis message on node join/leave events
......................................................................


res/res_corosync: Raise a Stasis message on node join/leave events

When res_corosync detects that a node leaves or joins, it currently is
informed of this via Corosync callbacks. However, there are a few
limitations with the information presented:
(1) While we have information that Corosync is aware of - such as the
    Corosync nodeid - that information is really only useful inside of
    Corosync or res_corosync. There's no way to translate a Corosync
    nodeid to some other internally useful unique identifier for the
    Asterisk instance that just joined or left the cluster.
(2) While res_corosync is notified of the instance joining or leaving
    the cluster, it has no mechanism to inform the Asterisk core or
    other modules of this event. This limits the usefulness of res_corosync
    as a heartbeat mechanism for other modules.

This patch addresses both issues.

First, it adds the notion of a cluster discovery message both within the
Stasis message bus, as well as the binary event messages that
res_corosync uses to transmit data back and forth within the cluster.
When Asterisk joins the cluster, it sends a discovery message to the other
nodes in the cluster, which correlates the Corosync nodeid along with
the Asterisk EID. res_corosync now maintains a hash of Corosync nodeids
to Asterisk EIDs, such that it can map changes in cluster state with the
Asterisk instance that has that nodeid. Likewise, when an Asterisk
instance receives a discovery message from a node in the cluster, it now
sends its own discovery message back to the originating node with the
local Asterisk EID. This lets Asterisk instances within the cluster
build a complete picture of the other Asterisk instances within the
cluster.

Second, it publishes the discovery messages onto the Stasis message bus.
Said messages are published whenever a node joins or leaves the cluster.
Interested modules can subscribe for the ast_cluster_discovery_type()
message under the ast_system_topic() and be notified when changes in
cluster state occur.

Change-Id: I9015f418d6ae7f47e4994e04e18948df4d49b465
---
M include/asterisk/event_defs.h
M include/asterisk/stasis_system.h
M main/stasis_system.c
M res/res_corosync.c
4 files changed, 281 insertions(+), 17 deletions(-)

Approvals:
  Mark Michelson: Looks good to me, approved
  Richard Mudgett: Looks good to me, but someone else must approve
  Anonymous Coward #1000019: Verified
  Joshua Colp: Looks good to me, but someone else must approve



diff --git a/include/asterisk/event_defs.h b/include/asterisk/event_defs.h
index 80a8d7d..2d5c75a 100644
--- a/include/asterisk/event_defs.h
+++ b/include/asterisk/event_defs.h
@@ -58,8 +58,10 @@
 	AST_EVENT_ACL_CHANGE          = 0x0b,
 	/*! Send out a ping for debugging distributed events */
 	AST_EVENT_PING                = 0x0c,
+	/*! A cluster discovery message */
+	AST_EVENT_CLUSTER_DISCOVERY   = 0x0d,
 	/*! Number of event types.  This should be the last event type + 1 */
-	AST_EVENT_TOTAL               = 0x0d,
+	AST_EVENT_TOTAL               = 0x0e,
 };
 
 /*! \brief Event Information Element types */
@@ -302,8 +304,15 @@
 	 * Payload type: UINT
 	 */
 	AST_EVENT_IE_CACHABLE            = 0x003d,
+
+	/*!
+	 * \brief Cluster node ID
+	 * Used by: Corosync
+	 * Payload type: UINT
+	 */
+	AST_EVENT_IE_NODE_ID             = 0x003e,
 	/*! \brief Must be the last IE value +1 */
-	AST_EVENT_IE_TOTAL               = 0x003e,
+	AST_EVENT_IE_TOTAL               = 0x003f,
 };
 
 /*!
diff --git a/include/asterisk/stasis_system.h b/include/asterisk/stasis_system.h
index 8c6e60f..274c02e 100644
--- a/include/asterisk/stasis_system.h
+++ b/include/asterisk/stasis_system.h
@@ -122,6 +122,12 @@
 struct stasis_message_type *ast_cc_monitorfailed_type(void);
 
 /*!
+ * \brief A \ref stasis_message_type for Cluster discovery
+ * \since 13.11.0
+ */
+struct stasis_message_type *ast_cluster_discovery_type(void);
+
+/*!
  * \brief Initialize the stasis system topic and message types
  * \retval 0 on success
  * \retval -1 on failure
diff --git a/main/stasis_system.c b/main/stasis_system.c
index e232b8e..67970bd 100644
--- a/main/stasis_system.c
+++ b/main/stasis_system.c
@@ -115,6 +115,7 @@
 STASIS_MESSAGE_TYPE_DEFN(ast_cc_monitorfailed_type,
 	.to_ami = cc_monitorfailed_to_ami,
 	);
+STASIS_MESSAGE_TYPE_DEFN(ast_cluster_discovery_type);
 
 void ast_system_publish_registry(const char *channeltype, const char *username, const char *domain, const char *status, const char *cause)
 {
@@ -362,6 +363,7 @@
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_recallcomplete_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_failure_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_cc_monitorfailed_type);
+	STASIS_MESSAGE_TYPE_CLEANUP(ast_cluster_discovery_type);
 }
 
 /*! \brief Initialize the system level items for \ref stasis */
@@ -422,5 +424,9 @@
 		return -1;
 	}
 
+	if (STASIS_MESSAGE_TYPE_INIT(ast_cluster_discovery_type) != 0) {
+		return -1;
+	}
+
 	return 0;
 }
diff --git a/res/res_corosync.c b/res/res_corosync.c
index 72da3f1..9ffffaa 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -47,11 +47,16 @@
 #include "asterisk/app.h"
 #include "asterisk/stasis.h"
 #include "asterisk/stasis_message_router.h"
+#include "asterisk/stasis_system.h"
 
 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
 
 static void publish_mwi_to_stasis(struct ast_event *event);
 static void publish_device_state_to_stasis(struct ast_event *event);
+static void publish_cluster_discovery_to_stasis(struct ast_event *event);
+
+/*! \brief All the nodes that we're aware of */
+static struct ao2_container *nodes;
 
 /*! \brief The internal topic used for message forwarding and pings */
 static struct stasis_topic *corosync_aggregate_topic;
@@ -64,6 +69,78 @@
 {
 	return corosync_aggregate_topic;
 }
+
+struct corosync_node {
+	/*! The corosync ID */
+	int id;
+	/*! The Asterisk EID */
+	struct ast_eid eid;
+	/*! The IP address of the node */
+	struct ast_sockaddr addr;
+};
+
+static struct corosync_node *corosync_node_alloc(struct ast_event *event)
+{
+	struct corosync_node *node;
+
+	node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!node) {
+		return NULL;
+	}
+
+	memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
+	node->id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
+	ast_sockaddr_parse(&node->addr, ast_event_get_ie_str(event, AST_EVENT_IE_LOCAL_ADDR), PARSE_PORT_IGNORE);
+
+	return node;
+}
+
+static int corosync_node_hash_fn(const void *obj, const int flags)
+{
+	const struct corosync_node *node;
+	const int *id;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		id = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		node = obj;
+		id = &node->id;
+		break;
+	default:
+		ast_assert(0);
+		return 0;
+	}
+	return *id;
+}
+
+static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
+{
+	struct corosync_node *left = obj;
+	struct corosync_node *right = arg;
+	const int *id = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		id = &right->id;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = (left->id == *id);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		cmp = (left->id == right->id);
+		break;
+	default:
+		/* Sort can only work on something with a full or partial key. */
+		ast_assert(0);
+		cmp = 1;
+		break;
+	}
+	return cmp ? CMP_MATCH : 0;
+}
+
 
 /*! \brief A payload wrapper around a corosync ping event */
 struct corosync_ping_payload {
@@ -167,6 +244,12 @@
 	                     .topic_fn = corosync_topic,
 	                     .message_type_fn = corosync_ping_message_type,
 	                     .publish_to_stasis = publish_corosync_ping_to_stasis, },
+	[AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery",
+	                                  .publish_default = 1,
+	                                  .subscribe_default = 1,
+	                                  .topic_fn = ast_system_topic,
+	                                  .message_type_fn = ast_cluster_discovery_type,
+	                                  .publish_to_stasis = publish_cluster_discovery_to_stasis, },
 };
 
 static struct {
@@ -196,6 +279,97 @@
 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
 	.corosync_cfg_shutdown_callback = cfg_shutdown_cb,
 };
+
+/*! \brief Publish cluster discovery to \ref stasis */
+static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
+{
+	struct ast_json *json;
+	struct ast_json_payload *payload;
+	struct stasis_message *message;
+	char eid[18];
+	const char *addr;
+
+	ast_eid_to_str(eid, sizeof(eid), &node->eid);
+	addr = ast_sockaddr_stringify_addr(&node->addr);
+
+	ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
+		node->id,
+		eid,
+		addr,
+		joined ? "joined" : "left");
+
+	json = ast_json_pack("{s: s, s: i, s: s, s: i}",
+		"address", addr,
+		"node_id", node->id,
+		"eid", eid,
+		"joined", joined);
+	if (!json) {
+		return;
+	}
+
+	payload = ast_json_payload_create(json);
+	if (!payload) {
+		ast_json_unref(json);
+		return;
+	}
+
+	message = stasis_message_create(ast_cluster_discovery_type(), payload);
+	if (!message) {
+		ast_json_unref(json);
+		ao2_ref(payload, -1);
+		return;
+	}
+
+	stasis_publish(ast_system_topic(), message);
+	ast_json_unref(json);
+	ao2_ref(payload, -1);
+	ao2_ref(message, -1);
+}
+
+static void send_cluster_notify(void);
+
+/*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */
+static void publish_cluster_discovery_to_stasis(struct ast_event *event)
+{
+	struct corosync_node *node;
+	int id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
+	struct ast_eid *event_eid;
+
+	ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY);
+
+	event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+	if (!ast_eid_cmp(&ast_eid_default, event_eid)) {
+		/* Don't feed events back in that originated locally. */
+		return;
+	}
+
+	ao2_lock(nodes);
+	node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (node) {
+		/* We already know about this node */
+		ao2_unlock(nodes);
+		ao2_ref(node, -1);
+		return;
+	}
+
+	node = corosync_node_alloc(event);
+	if (!node) {
+		ao2_unlock(nodes);
+		return;
+	}
+	ao2_link_flags(nodes, node, OBJ_NOLOCK);
+	ao2_unlock(nodes);
+
+	publish_cluster_discovery_to_stasis_full(node, 1);
+
+	ao2_ref(node, -1);
+
+	/*
+	 * When we get news that someone else has joined, we need to let them
+	 * know we exist as well.
+	 */
+	send_cluster_notify();
+}
 
 /*! \brief Publish a received MWI \ref ast_event to \ref stasis */
 static void publish_mwi_to_stasis(struct ast_event *event)
@@ -228,7 +402,7 @@
 
 	if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
 	                               (int)old_msgs, NULL, event_eid)) {
-		char eid[16];
+		char eid[18];
 		ast_eid_to_str(eid, sizeof(eid), event_eid);
 		ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
 			mailbox, context, eid);
@@ -255,7 +429,7 @@
 	}
 
 	if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
-		char eid[16];
+		char eid[18];
 		ast_eid_to_str(eid, sizeof(eid), event_eid);
 		ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
 			device, eid);
@@ -342,10 +516,27 @@
 	publish_handler(event);
 }
 
-static void publish_to_corosync(struct stasis_message *message)
+static void publish_event_to_corosync(struct ast_event *event)
 {
 	cs_error_t cs_err;
 	struct iovec iov;
+
+	iov.iov_base = (void *)event;
+	iov.iov_len = ast_event_get_size(event);
+
+	ast_debug(5, "Publishing event %s (%u) to corosync\n",
+		ast_event_get_type_name(event), ast_event_get_type(event));
+
+	/* The stasis subscription will only exist if we are configured to publish
+	 * these events, so just send away. */
+	if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+		ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
+			cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
+	}
+}
+
+static void publish_to_corosync(struct stasis_message *message)
+{
 	struct ast_event *event;
 
 	event = stasis_message_to_event(message);
@@ -368,17 +559,7 @@
 		ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
 	}
 
-	iov.iov_base = (void *)event;
-	iov.iov_len = ast_event_get_size(event);
-
-	ast_debug(5, "Publishing event %s (%u) to corosync\n",
-		ast_event_get_type_name(event), ast_event_get_type(event));
-
-	/* The stasis subscription will only exist if we are configured to publish
-	 * these events, so just send away. */
-	if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
-		ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
-	}
+	publish_event_to_corosync(event);
 }
 
 static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
@@ -410,9 +591,22 @@
 {
 	unsigned int i;
 
+
+	for (i = 0; i < left_list_entries; i++) {
+		const struct cpg_address *cpg_node = &left_list[i];
+		struct corosync_node* node;
+
+		node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
+		if (!node) {
+			continue;
+		}
+
+		publish_cluster_discovery_to_stasis_full(node, 0);
+		ao2_ref(node, -1);
+	}
+
 	/* If any new nodes have joined, dump our cache of events we are publishing
 	 * that originated from this server. */
-
 	if (!joined_list_entries) {
 		return;
 	}
@@ -442,6 +636,45 @@
 	}
 }
 
+/*! \brief Informs the cluster of our EID and our IP addresses */
+static void send_cluster_notify(void)
+{
+	struct ast_event *event;
+	unsigned int node_id;
+	cs_error_t cs_err;
+	corosync_cfg_node_address_t corosync_addr;
+	int num_addrs = 0;
+	struct sockaddr *sa;
+	size_t sa_len;
+	char buf[128];
+	int res;
+
+	if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
+		ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
+		return;
+	}
+
+	if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
+		ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
+		return;
+	}
+
+	sa = (struct sockaddr *)corosync_addr.address;
+	sa_len = (size_t)corosync_addr.address_length;
+	if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
+		ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
+			gai_strerror(res), res);
+		return;
+	}
+
+	event = ast_event_new(AST_EVENT_CLUSTER_DISCOVERY,
+				    AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_UINT, node_id,
+				    AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf,
+				    AST_EVENT_IE_END);
+	publish_event_to_corosync(event);
+	ast_free(event);
+}
+
 static void *dispatch_thread_handler(void *data)
 {
 	cs_error_t cs_err;
@@ -463,6 +696,7 @@
 
 	pfd[2].fd = dispatch_thread.alert_pipe[0];
 
+	send_cluster_notify();
 	while (!dispatch_thread.stop) {
 		int res;
 
@@ -530,6 +764,7 @@
 			}
 
 			ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
+			send_cluster_notify();
 		}
 	}
 
@@ -858,6 +1093,9 @@
 		ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
 	}
 	cfg_handle = 0;
+
+	ao2_cleanup(nodes);
+	nodes = NULL;
 }
 
 static int load_module(void)
@@ -865,6 +1103,11 @@
 	cs_error_t cs_err;
 	struct cpg_name name;
 
+	nodes = ao2_container_alloc(23, corosync_node_hash_fn, corosync_node_cmp_fn);
+	if (!nodes) {
+		goto failed;
+	}
+
 	corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
 	if (!corosync_aggregate_topic) {
 		ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");

-- 
To view, visit https://gerrit.asterisk.org/3109
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I9015f418d6ae7f47e4994e04e18948df4d49b465
Gerrit-PatchSet: 4
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Matt Jordan <mjordan at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Mark Michelson <mmichelson at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>



More information about the asterisk-commits mailing list