[asterisk-commits] russell: branch 1.8 r359053 - in /branches/1.8/res: ./ ais/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Mar 13 18:45:27 CDT 2012


Author: russell
Date: Tue Mar 13 18:45:23 2012
New Revision: 359053

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=359053
Log:
Dump cache of published events when a node joins the cluster.

Also use a more reliable method for stopping the poll() thread.

Modified:
    branches/1.8/res/ais/ais.h
    branches/1.8/res/ais/clm.c
    branches/1.8/res/ais/evt.c
    branches/1.8/res/res_ais.c

Modified: branches/1.8/res/ais/ais.h
URL: http://svnview.digium.com/svn/asterisk/branches/1.8/res/ais/ais.h?view=diff&rev=359053&r1=359052&r2=359053
==============================================================================
--- branches/1.8/res/ais/ais.h (original)
+++ branches/1.8/res/ais/ais.h Tue Mar 13 18:45:23 2012
@@ -45,4 +45,13 @@
 
 const char *ais_err2str(SaAisErrorT error);
 
+void ast_ais_evt_membership_changed(void);
+
+enum ast_ais_cmd {
+	AST_AIS_CMD_EXIT,
+	AST_AIS_CMD_MEMBERSHIP_CHANGED,
+};
+
+int ast_ais_cmd(enum ast_ais_cmd cmd);
+
 #endif /* RES_AIS_AIS_H */

Modified: branches/1.8/res/ais/clm.c
URL: http://svnview.digium.com/svn/asterisk/branches/1.8/res/ais/clm.c?view=diff&rev=359053&r1=359052&r2=359053
==============================================================================
--- branches/1.8/res/ais/clm.c (original)
+++ branches/1.8/res/ais/clm.c Tue Mar 13 18:45:23 2012
@@ -67,7 +67,24 @@
 static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
 	SaUint32T num_members, SaAisErrorT error)
 {
+	unsigned int i;
+	unsigned int node_joined = 0;
 
+	ast_debug(1, "Cluster membership changed. Number of members: %u\n", num_members);
+
+	for (i = 0; i < notif_buffer->numberOfItems; i++) {
+		SaClmClusterNotificationT *notif = notif_buffer->notification + i;
+
+		if (notif->clusterChange == SA_CLM_NODE_JOINED) {
+			node_joined = 1;
+			break;
+		}
+	}
+
+	if (node_joined) {
+		ast_debug(1, "A node has joined the cluster, dumping event cache.\n");
+		ast_ais_cmd(AST_AIS_CMD_MEMBERSHIP_CHANGED);
+	}
 }
 
 static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
@@ -135,11 +152,18 @@
 
 int ast_ais_clm_load_module(void)
 {
+	SaAisErrorT ais_res;
+
 	clm_init_res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version);
 	if (clm_init_res != SA_AIS_OK) {
 		ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n",
 			ais_err2str(clm_init_res));
 		return -1;
+	}
+
+	ais_res = saClmClusterTrack(clm_handle, SA_TRACK_CHANGES, NULL);
+	if (ais_res != SA_AIS_OK) {
+		ast_log(LOG_ERROR, "Error starting tracking of cluster membership changes.\n");
 	}
 
 	ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));

Modified: branches/1.8/res/ais/evt.c
URL: http://svnview.digium.com/svn/asterisk/branches/1.8/res/ais/evt.c?view=diff&rev=359053&r1=359052&r2=359053
==============================================================================
--- branches/1.8/res/ais/evt.c (original)
+++ branches/1.8/res/ais/evt.c Tue Mar 13 18:45:23 2012
@@ -232,8 +232,15 @@
 		goto return_event_free;
 	}
 
-	ais_res = saEvtEventPublish(event_handle,
-		ast_event, ast_event_get_size(ast_event), &event_id);
+	for (;;) {
+		ais_res = saEvtEventPublish(event_handle,
+			ast_event, ast_event_get_size(ast_event), &event_id);
+		if (ais_res != SA_AIS_ERR_TRY_AGAIN) {
+			break;
+		}
+		sched_yield();
+	}
+
 	if (ais_res != SA_AIS_OK) {
 		ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
 		goto return_event_free;
@@ -304,6 +311,22 @@
 static struct ast_cli_entry ais_cli[] = {
 	AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
 };
+
+void ast_ais_evt_membership_changed(void)
+{
+	struct event_channel *ec;
+
+	AST_RWLIST_RDLOCK(&event_channels);
+	AST_RWLIST_TRAVERSE(&event_channels, ec, entry) {
+		struct publish_event *pe;
+
+		AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) {
+			ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name);
+			ast_event_dump_cache(pe->sub);
+		}
+	}
+	AST_RWLIST_UNLOCK(&event_channels);
+}
 
 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
 {

Modified: branches/1.8/res/res_ais.c
URL: http://svnview.digium.com/svn/asterisk/branches/1.8/res/res_ais.c?view=diff&rev=359053&r1=359052&r2=359053
==============================================================================
--- branches/1.8/res/res_ais.c (original)
+++ branches/1.8/res/res_ais.c Tue Mar 13 18:45:23 2012
@@ -60,9 +60,11 @@
 
 static struct {
 	pthread_t id;
+	int alert_pipe[2];
 	unsigned int stop:1;
 } dispatch_thread = {
 	.id = AST_PTHREADT_NULL,
+	.alert_pipe = { -1, -1 },
 };
 
 SaVersionT ais_version = { 'B', 1, 1 };
@@ -116,7 +118,11 @@
 {
 	SaSelectionObjectT clm_fd, evt_fd;
 	int res;
-	struct pollfd pfd[2] = { { .events = POLLIN, }, { .events = POLLIN, } };
+	struct pollfd pfd[3] = {
+		{ .events = POLLIN, },
+		{ .events = POLLIN, },
+		{ .events = POLLIN, },
+	};
 	SaAisErrorT ais_res;
 
 	ais_res = saClmSelectionObjectGet(clm_handle, &clm_fd);
@@ -135,12 +141,14 @@
 
 	pfd[0].fd = clm_fd;
 	pfd[1].fd = evt_fd;
+	pfd[2].fd = dispatch_thread.alert_pipe[0];
 
 	while (!dispatch_thread.stop) {
 		pfd[0].revents = 0;
 		pfd[1].revents = 0;
-
-		res = ast_poll(pfd, 2, -1);
+		pfd[2].revents = 0;
+
+		res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
 		if (res == -1 && errno != EINTR && errno != EAGAIN) {
 			ast_log(LOG_ERROR, "Select error (%s) dispatch thread going away now, "
 				"and the module will no longer operate.\n", strerror(errno));
@@ -153,15 +161,45 @@
 		if (pfd[1].revents & POLLIN) {
 			saEvtDispatch(evt_handle,   SA_DISPATCH_ALL);
 		}
+		if (pfd[2].revents & POLLIN) {
+			enum ast_ais_cmd cmd;
+			ast_debug(1, "Got a command in the poll() loop\n");
+			if (read(dispatch_thread.alert_pipe[0], &cmd, sizeof(cmd)) != -1) {
+				switch (cmd) {
+				case AST_AIS_CMD_MEMBERSHIP_CHANGED:
+					ast_ais_evt_membership_changed();
+					break;
+				case AST_AIS_CMD_EXIT:
+					break;
+				}
+			}
+		}
 	}
 
 	return NULL;
 }
 
+int ast_ais_cmd(enum ast_ais_cmd cmd)
+{
+	int res;
+
+	res = write(dispatch_thread.alert_pipe[1], (char *) &cmd, sizeof(cmd));
+
+	ast_debug(1, "AIS cmd: %d, res: %d\n", cmd, res);
+
+	return res;
+}
+
 static int load_module(void)
 {
+	if (pipe(dispatch_thread.alert_pipe) == -1) {
+		ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
+				strerror(errno), errno);
+		goto return_error;
+	}
+
 	if (ast_ais_clm_load_module())
-		goto return_error;
+		goto clm_failed;
 
 	if (ast_ais_evt_load_module())
 		goto evt_failed;
@@ -178,6 +216,9 @@
 	ast_ais_evt_unload_module();
 evt_failed:
 	ast_ais_clm_unload_module();
+clm_failed:
+	close(dispatch_thread.alert_pipe[0]);
+	close(dispatch_thread.alert_pipe[1]);
 return_error:
 	return AST_MODULE_LOAD_DECLINE;
 }
@@ -189,10 +230,23 @@
 
 	if (dispatch_thread.id != AST_PTHREADT_NULL) {
 		dispatch_thread.stop = 1;
-		pthread_kill(dispatch_thread.id, SIGURG); /* poke! */
+		if (ast_ais_cmd(AST_AIS_CMD_EXIT) == -1) {
+			ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
+					strerror(errno), errno);
+		}
 		pthread_join(dispatch_thread.id, NULL);
 	}
 
+	if (dispatch_thread.alert_pipe[0] != -1) {
+		close(dispatch_thread.alert_pipe[0]);
+		dispatch_thread.alert_pipe[0] = -1;
+	}
+
+	if (dispatch_thread.alert_pipe[1] != -1) {
+		close(dispatch_thread.alert_pipe[1]);
+		dispatch_thread.alert_pipe[1] = -1;
+	}
+
 	return 0;
 }
 




More information about the asterisk-commits mailing list