[Asterisk-code-review] stasis: Allow filtering by formatter (asterisk[16])
Friendly Automation
asteriskteam at digium.com
Wed Dec 12 11:09:29 CST 2018
Friendly Automation has submitted this change and it was merged. ( https://gerrit.asterisk.org/10740 )
Change subject: stasis: Allow filtering by formatter
......................................................................
stasis: Allow filtering by formatter
A subscriber can now indicate that it only wants messages
that have formatters of a specific type. For instance,
manager can indicate that it only wants messages that have a
"to_ami" formatter. You can combine this with the existing
filter for message type to get only messages with specific
formatters or messages of specific types.
ASTERISK-28186
Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c
---
M include/asterisk/stasis.h
M include/asterisk/stasis_message_router.h
M main/stasis.c
M main/stasis_message.c
M main/stasis_message_router.c
M tests/test_stasis.c
6 files changed, 548 insertions(+), 10 deletions(-)
Approvals:
Joshua C. Colp: Looks good to me, but someone else must approve
Kevin Harwell: Looks good to me, approved
Friendly Automation: Approved for Submit
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 85e78dc..6d423d9 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -301,6 +301,21 @@
};
/*!
+ * \brief Stasis subscription formatter filters
+ *
+ * There should be an entry here for each member of \ref stasis_message_vtable
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters {
+ STASIS_SUBSCRIPTION_FORMATTER_NONE = 0,
+ STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
+ STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
+ STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
+};
+
+/*!
* \brief Create a new message type.
*
* \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
@@ -676,6 +691,30 @@
enum stasis_subscription_message_filter filter);
/*!
+ * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
+ *
+ * \param subscription Subscription to alter.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+ enum stasis_subscription_message_formatters formatters);
+
+/*!
+ * \brief Get a bitmap of available formatters for a message type
+ *
+ * \param message_type Message type
+ * \return A bitmap of \ref stasis_subscription_message_formatters
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+ const struct stasis_message_type *message_type);
+
+/*!
* \brief Cancel a subscription.
*
* Note that in an asynchronous system, there may still be messages queued or
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 8dcdfcc..9897d62 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -242,4 +242,23 @@
stasis_subscription_cb callback,
void *data);
+/*!
+ * \brief Indicate to a message router that we are interested in messages with one or more formatters.
+ *
+ * The formatters are passed on to the underlying subscription.
+ *
+ * \warning With direct subscriptions, adding a formatter filter is an OR operation
+ * with any message type filters. In the current implementation of message router however,
+ * it's an AND operation. Even when setting a default route, the callback will only get
+ * messages that have the formatters provides in this call.
+ *
+ * \param router Router to set the formatters of.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+ enum stasis_subscription_message_formatters formatters);
+
#endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
diff --git a/main/stasis.c b/main/stasis.c
index 0c60b13..69ec1a5 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -399,6 +399,8 @@
/*! The message types this subscription is accepting */
AST_VECTOR(, char) accepted_message_types;
+ /*! The message formatters this subscription is accepting */
+ enum stasis_subscription_message_formatters accepted_formatters;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter;
};
@@ -443,6 +445,10 @@
ao2_unlock(sub);
}
+ /*
+ * If filtering is turned on and this is a 'final' message, we only invoke the callback
+ * if the subscriber accepts subscription_change message types.
+ */
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
/* Since sub is mostly immutable, no need to lock sub */
@@ -520,6 +526,7 @@
ast_cond_init(&sub->join_cond, NULL);
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
+ sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
if (topic_add_subscription(topic, sub) != 0) {
ao2_ref(sub, -1);
@@ -676,6 +683,18 @@
return 0;
}
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+ enum stasis_subscription_message_formatters formatters)
+{
+ ast_assert(subscription != NULL);
+
+ ao2_lock(subscription->topic);
+ subscription->accepted_formatters = formatters;
+ ao2_unlock(subscription->topic);
+
+ return;
+}
+
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
@@ -871,17 +890,57 @@
struct stasis_message *message,
int synchronous)
{
- /* Determine if this subscription is interested in this message. Note that final
- * messages are special and are always invoked on the subscription.
+ int is_final = stasis_subscription_final_message(sub, message);
+
+ /*
+ * The 'do while' gives us an easy way to skip remaining logic once
+ * we determine the message should be accepted.
+ * The code looks more verbose than it needs to be but it optimizes
+ * down very nicely. It's just easier to understand and debug this way.
*/
- if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
- int message_type_id = stasis_message_type_id(stasis_message_type(message));
- if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
- !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
- !stasis_subscription_final_message(sub, message)) {
- return;
+ do {
+ struct stasis_message_type *message_type = stasis_message_type(message);
+ int type_id = stasis_message_type_id(message_type);
+ int type_filter_specified = 0;
+ int formatter_filter_specified = 0;
+ int type_filter_passed = 0;
+ int formatter_filter_passed = 0;
+
+ /* We always accept final messages so only run the filter logic if not final */
+ if (is_final) {
+ break;
}
- }
+
+ type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
+ formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
+
+ /* Accept if no filters of either type were specified */
+ if (!type_filter_specified && !formatter_filter_specified) {
+ break;
+ }
+
+ type_filter_passed = type_filter_specified
+ && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
+ && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
+
+ /*
+ * Since the type and formatter filters are OR'd, we can skip
+ * the formatter check if the type check passes.
+ */
+ if (type_filter_passed) {
+ break;
+ }
+
+ formatter_filter_passed = formatter_filter_specified
+ && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
+
+ if (formatter_filter_passed) {
+ break;
+ }
+
+ return;
+
+ } while (0);
if (!sub->mailbox) {
/* Dispatch directly */
diff --git a/main/stasis_message.c b/main/stasis_message.c
index 1fdbe85..d3f304c 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -40,6 +40,7 @@
char *name;
unsigned int hash;
int id;
+ enum stasis_subscription_message_formatters available_formatters;
};
static struct stasis_message_vtable null_vtable = {};
@@ -80,6 +81,15 @@
}
type->hash = ast_hashtab_hash_string(name);
type->vtable = vtable;
+ if (vtable->to_json) {
+ type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON;
+ }
+ if (vtable->to_ami) {
+ type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI;
+ }
+ if (vtable->to_event) {
+ type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT;
+ }
type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
*result = type;
@@ -101,6 +111,12 @@
return type->id;
}
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+ const struct stasis_message_type *type)
+{
+ return type->available_formatters;
+}
+
/*! \internal */
struct stasis_message {
/*! Time the message was created */
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 41ebc7e..197f7f9 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -399,3 +399,13 @@
/* While this implementation can never fail, it used to be able to */
return 0;
}
+
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+ enum stasis_subscription_message_formatters formatters)
+{
+ ast_assert(router != NULL);
+
+ stasis_subscription_accept_formatters(router->subscription, formatters);
+
+ return;
+}
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 5bc38c5..e620039 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -38,7 +38,13 @@
#include "asterisk/stasis_message_router.h"
#include "asterisk/test.h"
-static const char *test_category = "/stasis/core/";
+#define test_category "/stasis/core/"
+
+static struct ast_event *fake_event(struct stasis_message *message)
+{
+ return ast_event_new(AST_EVENT_CUSTOM,
+ AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
+}
static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
{
@@ -2044,6 +2050,389 @@
return AST_TEST_PASS;
}
+struct test_message_types {
+ struct stasis_message_type *none;
+ struct stasis_message_type *ami;
+ struct stasis_message_type *json;
+ struct stasis_message_type *event;
+ struct stasis_message_type *amievent;
+ struct stasis_message_type *type1;
+ struct stasis_message_type *type2;
+ struct stasis_message_type *type3;
+ struct stasis_message_type *change;
+};
+
+static void destroy_message_types(void *obj)
+{
+ struct test_message_types *types = obj;
+
+ ao2_cleanup(types->none);
+ ao2_cleanup(types->ami);
+ ao2_cleanup(types->json);
+ ao2_cleanup(types->event);
+ ao2_cleanup(types->amievent);
+ ao2_cleanup(types->type1);
+ ao2_cleanup(types->type2);
+ ao2_cleanup(types->type3);
+ /* N.B. Don't cleanup types->change! */
+}
+
+static struct test_message_types *create_message_types(struct ast_test *test)
+{
+ struct stasis_message_vtable vtable = { 0 };
+ struct test_message_types *types;
+ enum ast_test_result_state __attribute__ ((unused)) rc;
+
+ types = ao2_alloc(sizeof(*types), destroy_message_types);
+ if (!types) {
+ return NULL;
+ }
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = fake_ami;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = NULL;
+ vtable.to_json = fake_json;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = NULL;
+ vtable.to_json = NULL;
+ vtable.to_event = fake_event;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = fake_ami;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ types->change = stasis_subscription_change_type();
+
+ return types;
+
+cleanup:
+ ao2_cleanup(types);
+ return NULL;
+}
+
+struct cts {
+ struct consumer *consumer;
+ struct stasis_topic *topic;
+ struct stasis_subscription *sub;
+};
+
+static void destroy_cts(void *obj)
+{
+ struct cts *c = obj;
+
+ stasis_unsubscribe(c->sub);
+ ao2_cleanup(c->topic);
+ ao2_cleanup(c->consumer);
+}
+
+static struct cts *create_cts(struct ast_test *test)
+{
+ struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
+ enum ast_test_result_state __attribute__ ((unused)) rc;
+
+ ast_test_validate_cleanup(test, cts, rc, cleanup);
+
+ cts->topic = stasis_topic_create("TestTopic");
+ ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
+
+ cts->consumer = consumer_create(0);
+ ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
+
+ ao2_ref(cts->consumer, +1);
+ cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
+ ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
+
+ return cts;
+
+cleanup:
+ ao2_cleanup(cts);
+ return NULL;
+}
+
+static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
+{
+ struct stasis_subscription_change *msg_data = stasis_message_data(msg);
+
+ if (stasis_message_type(msg) != mtype) {
+ return 0;
+ }
+
+ if (data) {
+ return (strcmp(data, msg_data->description) == 0);
+ }
+
+ return 1;
+}
+
+static void dump_consumer(struct ast_test *test, struct cts *cts)
+{
+ int i;
+ struct stasis_subscription_change *data;
+
+ ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len,
+ cts->consumer->complete ? "yes" : "no");
+ for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
+ data = stasis_message_data(cts->consumer->messages_rxed[i]);
+ ast_test_status_update(test, "Message type received: %s %s\n",
+ stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
+ data && data->description ? data->description : "no data");
+ }
+}
+
+static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
+ const char *data)
+{
+ struct stasis_message *msg;
+ struct stasis_subscription_change *test_data =
+ ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
+
+ if (!test_data) {
+ return 0;
+ }
+ strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
+
+ msg = stasis_message_create(msg_type, test_data);
+ ao2_ref(test_data, -1);
+ if (!msg) {
+ ast_test_status_update(test, "Unable to create %s message\n",
+ stasis_message_type_name(msg_type));
+ return 0;
+ }
+
+ stasis_publish(cts->topic, msg);
+ ao2_ref(msg, -1);
+
+ return 1;
+}
+
+AST_TEST_DEFINE(type_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by type";
+ info->description = "Test message filtering by type";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+ ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+ /* ... but not this one */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+ /* Wait for change(subscribe) and "Pass" messages */
+ consumer_wait_for(cts->consumer, 3);
+
+ /* Remove type 1 */
+ ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
+
+ /* We should now NOT get this one */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+ /* We should get this one (again) */
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
+ /* We still should NOT get this one */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+ /* We should now have a second type2 */
+ consumer_wait_for(cts->consumer, 4);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(formatter_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by formatter";
+ info->description = "Test message filtering by formatter";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ stasis_subscription_accept_formatters(cts->sub,
+ STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+
+ /* ... but not these */
+ ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+
+ /* Wait for change(subscribe) and the "Pass" messages */
+ consumer_wait_for(cts->consumer, 4);
+
+ /* Change the subscription to accept only event formatters */
+ stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
+
+ /* We should NOT get these now */
+ ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
+ /* ... but we should still get this one */
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
+ /* ... and this one should be new */
+ ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
+
+ /* We should now have a second amievent */
+ consumer_wait_for(cts->consumer, 6);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(combo_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by type and formatter";
+ info->description = "Test message filtering by type and formatter";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+ ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+ stasis_subscription_accept_formatters(cts->sub,
+ STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+
+ /* ... but not these */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+
+ /* Wait for change(subscribe) and the "Pass" messages */
+ consumer_wait_for(cts->consumer, 6);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
AST_TEST_UNREGISTER(message_type);
@@ -2070,6 +2459,9 @@
AST_TEST_UNREGISTER(to_ami);
AST_TEST_UNREGISTER(dtor_order);
AST_TEST_UNREGISTER(caching_dtor_order);
+ AST_TEST_UNREGISTER(type_filters);
+ AST_TEST_UNREGISTER(formatter_filters);
+ AST_TEST_UNREGISTER(combo_filters);
return 0;
}
@@ -2099,6 +2491,9 @@
AST_TEST_REGISTER(to_ami);
AST_TEST_REGISTER(dtor_order);
AST_TEST_REGISTER(caching_dtor_order);
+ AST_TEST_REGISTER(type_filters);
+ AST_TEST_REGISTER(formatter_filters);
+ AST_TEST_REGISTER(combo_filters);
return AST_MODULE_LOAD_SUCCESS;
}
--
To view, visit https://gerrit.asterisk.org/10740
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: 16
Gerrit-MessageType: merged
Gerrit-Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c
Gerrit-Change-Number: 10740
Gerrit-PatchSet: 2
Gerrit-Owner: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Friendly Automation (1000185)
Gerrit-Reviewer: Joshua C. Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181212/da55c7b6/attachment-0001.html>
More information about the asterisk-code-review
mailing list