[Asterisk-code-review] stasis: Allow filtering by formatter (asterisk[master])

Friendly Automation asteriskteam at digium.com
Wed Dec 12 11:09:20 CST 2018


Friendly Automation has submitted this change and it was merged. ( https://gerrit.asterisk.org/10741 )

Change subject: stasis:  Allow filtering by formatter
......................................................................

stasis:  Allow filtering by formatter

A subscriber can now indicate that it only wants messages
that have formatters of a specific type.  For instance,
manager can indicate that it only wants messages that have a
"to_ami" formatter.  You can combine this with the existing
filter for message type to get only messages with specific
formatters or messages of specific types.

ASTERISK-28186

Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c
---
M include/asterisk/stasis.h
M include/asterisk/stasis_message_router.h
M main/stasis.c
M main/stasis_message.c
M main/stasis_message_router.c
M tests/test_stasis.c
6 files changed, 548 insertions(+), 10 deletions(-)

Approvals:
  Joshua C. Colp: Looks good to me, but someone else must approve
  Kevin Harwell: Looks good to me, approved
  Friendly Automation: Approved for Submit



diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 85e78dc..6d423d9 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -301,6 +301,21 @@
 };
 
 /*!
+ * \brief Stasis subscription formatter filters
+ *
+ * There should be an entry here for each member of \ref stasis_message_vtable
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters {
+	STASIS_SUBSCRIPTION_FORMATTER_NONE =  0,
+	STASIS_SUBSCRIPTION_FORMATTER_JSON =  1 << 0,  /*!< Allow messages with a to_json formatter */
+	STASIS_SUBSCRIPTION_FORMATTER_AMI =   1 << 1,  /*!< Allow messages with a to_ami formatter */
+	STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2,  /*!< Allow messages with a to_event formatter */
+};
+
+/*!
  * \brief Create a new message type.
  *
  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
@@ -676,6 +691,30 @@
 	enum stasis_subscription_message_filter filter);
 
 /*!
+ * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
+ *
+ * \param subscription Subscription to alter.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+	enum stasis_subscription_message_formatters formatters);
+
+/*!
+ * \brief Get a bitmap of available formatters for a message type
+ *
+ * \param message_type Message type
+ * \return A bitmap of \ref stasis_subscription_message_formatters
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+	const struct stasis_message_type *message_type);
+
+/*!
  * \brief Cancel a subscription.
  *
  * Note that in an asynchronous system, there may still be messages queued or
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 8dcdfcc..9897d62 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -242,4 +242,23 @@
 				      stasis_subscription_cb callback,
 				      void *data);
 
+/*!
+ * \brief Indicate to a message router that we are interested in messages with one or more formatters.
+ *
+ * The formatters are passed on to the underlying subscription.
+ *
+ * \warning With direct subscriptions, adding a formatter filter is an OR operation
+ * with any message type filters.  In the current implementation of message router however,
+ * it's an AND operation.  Even when setting a default route, the callback will only get
+ * messages that have the formatters provides in this call.
+ *
+ * \param router Router to set the formatters of.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+	enum stasis_subscription_message_formatters formatters);
+
 #endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
diff --git a/main/stasis.c b/main/stasis.c
index 0c60b13..69ec1a5 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -399,6 +399,8 @@
 
 	/*! The message types this subscription is accepting */
 	AST_VECTOR(, char) accepted_message_types;
+	/*! The message formatters this subscription is accepting */
+	enum stasis_subscription_message_formatters accepted_formatters;
 	/*! The message filter currently in use */
 	enum stasis_subscription_message_filter filter;
 };
@@ -443,6 +445,10 @@
 		ao2_unlock(sub);
 	}
 
+	/*
+	 * If filtering is turned on and this is a 'final' message, we only invoke the callback
+	 * if the subscriber accepts subscription_change message types.
+	 */
 	if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
 		(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
 		/* Since sub is mostly immutable, no need to lock sub */
@@ -520,6 +526,7 @@
 	ast_cond_init(&sub->join_cond, NULL);
 	sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
 	AST_VECTOR_INIT(&sub->accepted_message_types, 0);
+	sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
 
 	if (topic_add_subscription(topic, sub) != 0) {
 		ao2_ref(sub, -1);
@@ -676,6 +683,18 @@
 	return 0;
 }
 
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+	enum stasis_subscription_message_formatters formatters)
+{
+	ast_assert(subscription != NULL);
+
+	ao2_lock(subscription->topic);
+	subscription->accepted_formatters = formatters;
+	ao2_unlock(subscription->topic);
+
+	return;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
 	if (subscription) {
@@ -871,17 +890,57 @@
 	struct stasis_message *message,
 	int synchronous)
 {
-	/* Determine if this subscription is interested in this message. Note that final
-	 * messages are special and are always invoked on the subscription.
+	int is_final = stasis_subscription_final_message(sub, message);
+
+	/*
+	 * The 'do while' gives us an easy way to skip remaining logic once
+	 * we determine the message should be accepted.
+	 * The code looks more verbose than it needs to be but it optimizes
+	 * down very nicely.  It's just easier to understand and debug this way.
 	 */
-	if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
-		int message_type_id = stasis_message_type_id(stasis_message_type(message));
-		if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
-			!AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
-			!stasis_subscription_final_message(sub, message)) {
-			return;
+	do {
+		struct stasis_message_type *message_type = stasis_message_type(message);
+		int type_id = stasis_message_type_id(message_type);
+		int type_filter_specified = 0;
+		int formatter_filter_specified = 0;
+		int type_filter_passed = 0;
+		int formatter_filter_passed = 0;
+
+		/* We always accept final messages so only run the filter logic if not final */
+		if (is_final) {
+			break;
 		}
-	}
+
+		type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
+		formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
+
+		/* Accept if no filters of either type were specified */
+		if (!type_filter_specified && !formatter_filter_specified) {
+			break;
+		}
+
+		type_filter_passed = type_filter_specified
+			&& type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
+			&& AST_VECTOR_GET(&sub->accepted_message_types, type_id);
+
+		/*
+		 * Since the type and formatter filters are OR'd, we can skip
+		 * the formatter check if the type check passes.
+		 */
+		if (type_filter_passed) {
+			break;
+		}
+
+		formatter_filter_passed = formatter_filter_specified
+			&& (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
+
+		if (formatter_filter_passed) {
+			break;
+		}
+
+		return;
+
+	} while (0);
 
 	if (!sub->mailbox) {
 		/* Dispatch directly */
diff --git a/main/stasis_message.c b/main/stasis_message.c
index 1fdbe85..d3f304c 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -40,6 +40,7 @@
 	char *name;
 	unsigned int hash;
 	int id;
+	enum stasis_subscription_message_formatters available_formatters;
 };
 
 static struct stasis_message_vtable null_vtable = {};
@@ -80,6 +81,15 @@
 	}
 	type->hash = ast_hashtab_hash_string(name);
 	type->vtable = vtable;
+	if (vtable->to_json) {
+		type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON;
+	}
+	if (vtable->to_ami) {
+		type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI;
+	}
+	if (vtable->to_event) {
+		type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT;
+	}
 	type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
 	*result = type;
 
@@ -101,6 +111,12 @@
 	return type->id;
 }
 
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+	const struct stasis_message_type *type)
+{
+	return type->available_formatters;
+}
+
 /*! \internal */
 struct stasis_message {
 	/*! Time the message was created */
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 41ebc7e..197f7f9 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -399,3 +399,13 @@
 	/* While this implementation can never fail, it used to be able to */
 	return 0;
 }
+
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+	enum stasis_subscription_message_formatters formatters)
+{
+	ast_assert(router != NULL);
+
+	stasis_subscription_accept_formatters(router->subscription, formatters);
+
+	return;
+}
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 5bc38c5..e620039 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -38,7 +38,13 @@
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/test.h"
 
-static const char *test_category = "/stasis/core/";
+#define test_category "/stasis/core/"
+
+static struct ast_event *fake_event(struct stasis_message *message)
+{
+	return ast_event_new(AST_EVENT_CUSTOM,
+		AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
+}
 
 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 {
@@ -2044,6 +2050,389 @@
 	return AST_TEST_PASS;
 }
 
+struct test_message_types {
+	struct stasis_message_type *none;
+	struct stasis_message_type *ami;
+	struct stasis_message_type *json;
+	struct stasis_message_type *event;
+	struct stasis_message_type *amievent;
+	struct stasis_message_type *type1;
+	struct stasis_message_type *type2;
+	struct stasis_message_type *type3;
+	struct stasis_message_type *change;
+};
+
+static void destroy_message_types(void *obj)
+{
+	struct test_message_types *types = obj;
+
+	ao2_cleanup(types->none);
+	ao2_cleanup(types->ami);
+	ao2_cleanup(types->json);
+	ao2_cleanup(types->event);
+	ao2_cleanup(types->amievent);
+	ao2_cleanup(types->type1);
+	ao2_cleanup(types->type2);
+	ao2_cleanup(types->type3);
+	/* N.B.  Don't cleanup types->change! */
+}
+
+static struct test_message_types *create_message_types(struct ast_test *test)
+{
+	struct stasis_message_vtable vtable = { 0 };
+	struct test_message_types *types;
+	enum ast_test_result_state  __attribute__ ((unused)) rc;
+
+	types = ao2_alloc(sizeof(*types), destroy_message_types);
+	if (!types) {
+		return NULL;
+	}
+
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	vtable.to_ami = fake_ami;
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	vtable.to_ami = NULL;
+	vtable.to_json = fake_json;
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	vtable.to_ami = NULL;
+	vtable.to_json = NULL;
+	vtable.to_event = fake_event;
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	vtable.to_ami = fake_ami;
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	ast_test_validate_cleanup(test,
+		stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
+		rc, cleanup);
+
+	types->change = stasis_subscription_change_type();
+
+	return types;
+
+cleanup:
+	ao2_cleanup(types);
+	return NULL;
+}
+
+struct cts {
+	struct consumer *consumer;
+	struct stasis_topic *topic;
+	struct stasis_subscription *sub;
+};
+
+static void destroy_cts(void *obj)
+{
+	struct cts *c = obj;
+
+	stasis_unsubscribe(c->sub);
+	ao2_cleanup(c->topic);
+	ao2_cleanup(c->consumer);
+}
+
+static struct cts *create_cts(struct ast_test *test)
+{
+	struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
+	enum ast_test_result_state  __attribute__ ((unused)) rc;
+
+	ast_test_validate_cleanup(test, cts, rc, cleanup);
+
+	cts->topic = stasis_topic_create("TestTopic");
+	ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
+
+	cts->consumer = consumer_create(0);
+	ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
+
+	ao2_ref(cts->consumer, +1);
+	cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
+	ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
+
+	return cts;
+
+cleanup:
+	ao2_cleanup(cts);
+	return NULL;
+}
+
+static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
+{
+	struct stasis_subscription_change *msg_data = stasis_message_data(msg);
+
+	if (stasis_message_type(msg) != mtype) {
+		return 0;
+	}
+
+	if (data) {
+		return (strcmp(data, msg_data->description) == 0);
+	}
+
+	return 1;
+}
+
+static void dump_consumer(struct ast_test *test, struct cts *cts)
+{
+	int i;
+	struct stasis_subscription_change *data;
+
+	ast_test_status_update(test, "Messages received: %ld  Final? %s\n", cts->consumer->messages_rxed_len,
+		cts->consumer->complete ? "yes" : "no");
+	for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
+		data = stasis_message_data(cts->consumer->messages_rxed[i]);
+		ast_test_status_update(test, "Message type received: %s %s\n",
+			stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
+			data && data->description ? data->description : "no data");
+	}
+}
+
+static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
+	const char *data)
+{
+	struct stasis_message *msg;
+	struct stasis_subscription_change *test_data =
+		ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
+
+	if (!test_data) {
+		return 0;
+	}
+	strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
+
+	msg = stasis_message_create(msg_type, test_data);
+	ao2_ref(test_data, -1);
+	if (!msg) {
+		ast_test_status_update(test, "Unable to create %s message\n",
+			stasis_message_type_name(msg_type));
+		return 0;
+	}
+
+	stasis_publish(cts->topic, msg);
+	ao2_ref(msg, -1);
+
+	return 1;
+}
+
+AST_TEST_DEFINE(type_filters)
+{
+	RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+	RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+	int ix = 0;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category "filtering/";
+		info->summary = "Test message filtering by type";
+		info->description = "Test message filtering by type";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	types = create_message_types(test);
+	ast_test_validate(test, NULL != types);
+
+	cts = create_cts(test);
+	ast_test_validate(test, NULL != cts);
+
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+	ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+
+	/* We should get these */
+	ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+	/* ... but not this one */
+	ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+	/* Wait for change(subscribe) and "Pass" messages */
+	consumer_wait_for(cts->consumer, 3);
+
+	/* Remove type 1 */
+	ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
+
+	/* We should now NOT get this one */
+	ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+	/* We should get this one (again) */
+	ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
+	/* We still should NOT get this one */
+	ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+	/* We should now have a second type2 */
+	consumer_wait_for(cts->consumer, 4);
+
+	stasis_unsubscribe(cts->sub);
+	cts->sub = NULL;
+	consumer_wait_for_completion(cts->consumer);
+
+	dump_consumer(test, cts);
+
+	ast_test_validate(test, 1 == cts->consumer->complete);
+	ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(formatter_filters)
+{
+	RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+	RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup)	;
+	int ix = 0;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category "filtering/";
+		info->summary = "Test message filtering by formatter";
+		info->description = "Test message filtering by formatter";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	types = create_message_types(test);
+	ast_test_validate(test, NULL != types);
+
+	cts = create_cts(test);
+	ast_test_validate(test, NULL != cts);
+
+	stasis_subscription_accept_formatters(cts->sub,
+		STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+	/* We should get these */
+	ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+
+	/* ... but not these */
+	ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
+	ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+	ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+
+	/* Wait for change(subscribe) and the "Pass" messages */
+	consumer_wait_for(cts->consumer, 4);
+
+	/* Change the subscription to accept only event formatters */
+	stasis_subscription_accept_formatters(cts->sub,	STASIS_SUBSCRIPTION_FORMATTER_EVENT);
+
+	/* We should NOT get these now */
+	ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
+	ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
+	/* ... but we should still get this one */
+	ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
+	/* ... and this one should be new */
+	ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
+
+	/* We should now have a second amievent */
+	consumer_wait_for(cts->consumer, 6);
+
+	stasis_unsubscribe(cts->sub);
+	cts->sub = NULL;
+	consumer_wait_for_completion(cts->consumer);
+
+	dump_consumer(test, cts);
+
+	ast_test_validate(test, 1 == cts->consumer->complete);
+	ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(combo_filters)
+{
+	RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+	RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+	int ix = 0;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category "filtering/";
+		info->summary = "Test message filtering by type and formatter";
+		info->description = "Test message filtering by type and formatter";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	types = create_message_types(test);
+	ast_test_validate(test, NULL != types);
+
+	cts = create_cts(test);
+	ast_test_validate(test, NULL != cts);
+
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+	ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+	ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+	stasis_subscription_accept_formatters(cts->sub,
+		STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+	/* We should get these */
+	ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+	ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+
+	/* ... but not these */
+	ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+	ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+
+	/* Wait for change(subscribe) and the "Pass" messages */
+	consumer_wait_for(cts->consumer, 6);
+
+	stasis_unsubscribe(cts->sub);
+	cts->sub = NULL;
+	consumer_wait_for_completion(cts->consumer);
+
+	dump_consumer(test, cts);
+
+	ast_test_validate(test, 1 == cts->consumer->complete);
+	ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+	ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+	return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
 	AST_TEST_UNREGISTER(message_type);
@@ -2070,6 +2459,9 @@
 	AST_TEST_UNREGISTER(to_ami);
 	AST_TEST_UNREGISTER(dtor_order);
 	AST_TEST_UNREGISTER(caching_dtor_order);
+	AST_TEST_UNREGISTER(type_filters);
+	AST_TEST_UNREGISTER(formatter_filters);
+	AST_TEST_UNREGISTER(combo_filters);
 	return 0;
 }
 
@@ -2099,6 +2491,9 @@
 	AST_TEST_REGISTER(to_ami);
 	AST_TEST_REGISTER(dtor_order);
 	AST_TEST_REGISTER(caching_dtor_order);
+	AST_TEST_REGISTER(type_filters);
+	AST_TEST_REGISTER(formatter_filters);
+	AST_TEST_REGISTER(combo_filters);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 

-- 
To view, visit https://gerrit.asterisk.org/10741
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c
Gerrit-Change-Number: 10741
Gerrit-PatchSet: 2
Gerrit-Owner: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Friendly Automation (1000185)
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua C. Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181212/5ee17288/attachment-0001.html>


More information about the asterisk-code-review mailing list