mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-03 03:20:57 +00:00
stasis: Allow filtering by formatter
A subscriber can now indicate that it only wants messages that have formatters of a specific type. For instance, manager can indicate that it only wants messages that have a "to_ami" formatter. You can combine this with the existing filter for message type to get only messages with specific formatters or messages of specific types. ASTERISK-28186 Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c
This commit is contained in:
@@ -300,6 +300,21 @@ enum stasis_subscription_message_filter {
|
|||||||
STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
|
STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Stasis subscription formatter filters
|
||||||
|
*
|
||||||
|
* There should be an entry here for each member of \ref stasis_message_vtable
|
||||||
|
*
|
||||||
|
* \since 13.25.0
|
||||||
|
* \since 16.2.0
|
||||||
|
*/
|
||||||
|
enum stasis_subscription_message_formatters {
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_NONE = 0,
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
|
||||||
|
};
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Create a new message type.
|
* \brief Create a new message type.
|
||||||
*
|
*
|
||||||
@@ -675,6 +690,30 @@ int stasis_subscription_decline_message_type(struct stasis_subscription *subscri
|
|||||||
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
|
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
|
||||||
enum stasis_subscription_message_filter filter);
|
enum stasis_subscription_message_filter filter);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Indicate to a subscription that we are interested in messages with one or more formatters.
|
||||||
|
*
|
||||||
|
* \param subscription Subscription to alter.
|
||||||
|
* \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
|
||||||
|
*
|
||||||
|
* \since 13.25.0
|
||||||
|
* \since 16.2.0
|
||||||
|
*/
|
||||||
|
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
|
||||||
|
enum stasis_subscription_message_formatters formatters);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Get a bitmap of available formatters for a message type
|
||||||
|
*
|
||||||
|
* \param message_type Message type
|
||||||
|
* \return A bitmap of \ref stasis_subscription_message_formatters
|
||||||
|
*
|
||||||
|
* \since 13.25.0
|
||||||
|
* \since 16.2.0
|
||||||
|
*/
|
||||||
|
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
|
||||||
|
const struct stasis_message_type *message_type);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Cancel a subscription.
|
* \brief Cancel a subscription.
|
||||||
*
|
*
|
||||||
|
@@ -242,4 +242,23 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
|
|||||||
stasis_subscription_cb callback,
|
stasis_subscription_cb callback,
|
||||||
void *data);
|
void *data);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Indicate to a message router that we are interested in messages with one or more formatters.
|
||||||
|
*
|
||||||
|
* The formatters are passed on to the underlying subscription.
|
||||||
|
*
|
||||||
|
* \warning With direct subscriptions, adding a formatter filter is an OR operation
|
||||||
|
* with any message type filters. In the current implementation of message router however,
|
||||||
|
* it's an AND operation. Even when setting a default route, the callback will only get
|
||||||
|
* messages that have the formatters provides in this call.
|
||||||
|
*
|
||||||
|
* \param router Router to set the formatters of.
|
||||||
|
* \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
|
||||||
|
*
|
||||||
|
* \since 13.25.0
|
||||||
|
* \since 16.2.0
|
||||||
|
*/
|
||||||
|
void stasis_message_router_accept_formatters(struct stasis_message_router *router,
|
||||||
|
enum stasis_subscription_message_formatters formatters);
|
||||||
|
|
||||||
#endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
|
#endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
|
||||||
|
@@ -399,6 +399,8 @@ struct stasis_subscription {
|
|||||||
|
|
||||||
/*! The message types this subscription is accepting */
|
/*! The message types this subscription is accepting */
|
||||||
AST_VECTOR(, char) accepted_message_types;
|
AST_VECTOR(, char) accepted_message_types;
|
||||||
|
/*! The message formatters this subscription is accepting */
|
||||||
|
enum stasis_subscription_message_formatters accepted_formatters;
|
||||||
/*! The message filter currently in use */
|
/*! The message filter currently in use */
|
||||||
enum stasis_subscription_message_filter filter;
|
enum stasis_subscription_message_filter filter;
|
||||||
};
|
};
|
||||||
@@ -443,6 +445,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
|
|||||||
ao2_unlock(sub);
|
ao2_unlock(sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If filtering is turned on and this is a 'final' message, we only invoke the callback
|
||||||
|
* if the subscriber accepts subscription_change message types.
|
||||||
|
*/
|
||||||
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
|
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
|
||||||
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
|
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
|
||||||
/* Since sub is mostly immutable, no need to lock sub */
|
/* Since sub is mostly immutable, no need to lock sub */
|
||||||
@@ -520,6 +526,7 @@ struct stasis_subscription *internal_stasis_subscribe(
|
|||||||
ast_cond_init(&sub->join_cond, NULL);
|
ast_cond_init(&sub->join_cond, NULL);
|
||||||
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
|
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
|
||||||
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
|
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
|
||||||
|
sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
|
||||||
|
|
||||||
if (topic_add_subscription(topic, sub) != 0) {
|
if (topic_add_subscription(topic, sub) != 0) {
|
||||||
ao2_ref(sub, -1);
|
ao2_ref(sub, -1);
|
||||||
@@ -676,6 +683,18 @@ int stasis_subscription_set_filter(struct stasis_subscription *subscription,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
|
||||||
|
enum stasis_subscription_message_formatters formatters)
|
||||||
|
{
|
||||||
|
ast_assert(subscription != NULL);
|
||||||
|
|
||||||
|
ao2_lock(subscription->topic);
|
||||||
|
subscription->accepted_formatters = formatters;
|
||||||
|
ao2_unlock(subscription->topic);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
void stasis_subscription_join(struct stasis_subscription *subscription)
|
void stasis_subscription_join(struct stasis_subscription *subscription)
|
||||||
{
|
{
|
||||||
if (subscription) {
|
if (subscription) {
|
||||||
@@ -871,17 +890,57 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|||||||
struct stasis_message *message,
|
struct stasis_message *message,
|
||||||
int synchronous)
|
int synchronous)
|
||||||
{
|
{
|
||||||
/* Determine if this subscription is interested in this message. Note that final
|
int is_final = stasis_subscription_final_message(sub, message);
|
||||||
* messages are special and are always invoked on the subscription.
|
|
||||||
|
/*
|
||||||
|
* The 'do while' gives us an easy way to skip remaining logic once
|
||||||
|
* we determine the message should be accepted.
|
||||||
|
* The code looks more verbose than it needs to be but it optimizes
|
||||||
|
* down very nicely. It's just easier to understand and debug this way.
|
||||||
*/
|
*/
|
||||||
if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
|
do {
|
||||||
int message_type_id = stasis_message_type_id(stasis_message_type(message));
|
struct stasis_message_type *message_type = stasis_message_type(message);
|
||||||
if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
|
int type_id = stasis_message_type_id(message_type);
|
||||||
!AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
|
int type_filter_specified = 0;
|
||||||
!stasis_subscription_final_message(sub, message)) {
|
int formatter_filter_specified = 0;
|
||||||
return;
|
int type_filter_passed = 0;
|
||||||
|
int formatter_filter_passed = 0;
|
||||||
|
|
||||||
|
/* We always accept final messages so only run the filter logic if not final */
|
||||||
|
if (is_final) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
|
||||||
|
formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
|
||||||
|
|
||||||
|
/* Accept if no filters of either type were specified */
|
||||||
|
if (!type_filter_specified && !formatter_filter_specified) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
type_filter_passed = type_filter_specified
|
||||||
|
&& type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
|
||||||
|
&& AST_VECTOR_GET(&sub->accepted_message_types, type_id);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Since the type and formatter filters are OR'd, we can skip
|
||||||
|
* the formatter check if the type check passes.
|
||||||
|
*/
|
||||||
|
if (type_filter_passed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
formatter_filter_passed = formatter_filter_specified
|
||||||
|
&& (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
|
||||||
|
|
||||||
|
if (formatter_filter_passed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
|
||||||
|
} while (0);
|
||||||
|
|
||||||
if (!sub->mailbox) {
|
if (!sub->mailbox) {
|
||||||
/* Dispatch directly */
|
/* Dispatch directly */
|
||||||
|
@@ -40,6 +40,7 @@ struct stasis_message_type {
|
|||||||
char *name;
|
char *name;
|
||||||
unsigned int hash;
|
unsigned int hash;
|
||||||
int id;
|
int id;
|
||||||
|
enum stasis_subscription_message_formatters available_formatters;
|
||||||
};
|
};
|
||||||
|
|
||||||
static struct stasis_message_vtable null_vtable = {};
|
static struct stasis_message_vtable null_vtable = {};
|
||||||
@@ -80,6 +81,15 @@ int stasis_message_type_create(const char *name,
|
|||||||
}
|
}
|
||||||
type->hash = ast_hashtab_hash_string(name);
|
type->hash = ast_hashtab_hash_string(name);
|
||||||
type->vtable = vtable;
|
type->vtable = vtable;
|
||||||
|
if (vtable->to_json) {
|
||||||
|
type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON;
|
||||||
|
}
|
||||||
|
if (vtable->to_ami) {
|
||||||
|
type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI;
|
||||||
|
}
|
||||||
|
if (vtable->to_event) {
|
||||||
|
type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT;
|
||||||
|
}
|
||||||
type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
|
type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
|
||||||
*result = type;
|
*result = type;
|
||||||
|
|
||||||
@@ -101,6 +111,12 @@ int stasis_message_type_id(const struct stasis_message_type *type)
|
|||||||
return type->id;
|
return type->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
|
||||||
|
const struct stasis_message_type *type)
|
||||||
|
{
|
||||||
|
return type->available_formatters;
|
||||||
|
}
|
||||||
|
|
||||||
/*! \internal */
|
/*! \internal */
|
||||||
struct stasis_message {
|
struct stasis_message {
|
||||||
/*! Time the message was created */
|
/*! Time the message was created */
|
||||||
|
@@ -399,3 +399,13 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
|
|||||||
/* While this implementation can never fail, it used to be able to */
|
/* While this implementation can never fail, it used to be able to */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void stasis_message_router_accept_formatters(struct stasis_message_router *router,
|
||||||
|
enum stasis_subscription_message_formatters formatters)
|
||||||
|
{
|
||||||
|
ast_assert(router != NULL);
|
||||||
|
|
||||||
|
stasis_subscription_accept_formatters(router->subscription, formatters);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
@@ -38,7 +38,13 @@
|
|||||||
#include "asterisk/stasis_message_router.h"
|
#include "asterisk/stasis_message_router.h"
|
||||||
#include "asterisk/test.h"
|
#include "asterisk/test.h"
|
||||||
|
|
||||||
static const char *test_category = "/stasis/core/";
|
#define test_category "/stasis/core/"
|
||||||
|
|
||||||
|
static struct ast_event *fake_event(struct stasis_message *message)
|
||||||
|
{
|
||||||
|
return ast_event_new(AST_EVENT_CUSTOM,
|
||||||
|
AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
|
||||||
|
}
|
||||||
|
|
||||||
static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
|
static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
|
||||||
{
|
{
|
||||||
@@ -2044,6 +2050,389 @@ AST_TEST_DEFINE(caching_dtor_order)
|
|||||||
return AST_TEST_PASS;
|
return AST_TEST_PASS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct test_message_types {
|
||||||
|
struct stasis_message_type *none;
|
||||||
|
struct stasis_message_type *ami;
|
||||||
|
struct stasis_message_type *json;
|
||||||
|
struct stasis_message_type *event;
|
||||||
|
struct stasis_message_type *amievent;
|
||||||
|
struct stasis_message_type *type1;
|
||||||
|
struct stasis_message_type *type2;
|
||||||
|
struct stasis_message_type *type3;
|
||||||
|
struct stasis_message_type *change;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void destroy_message_types(void *obj)
|
||||||
|
{
|
||||||
|
struct test_message_types *types = obj;
|
||||||
|
|
||||||
|
ao2_cleanup(types->none);
|
||||||
|
ao2_cleanup(types->ami);
|
||||||
|
ao2_cleanup(types->json);
|
||||||
|
ao2_cleanup(types->event);
|
||||||
|
ao2_cleanup(types->amievent);
|
||||||
|
ao2_cleanup(types->type1);
|
||||||
|
ao2_cleanup(types->type2);
|
||||||
|
ao2_cleanup(types->type3);
|
||||||
|
/* N.B. Don't cleanup types->change! */
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct test_message_types *create_message_types(struct ast_test *test)
|
||||||
|
{
|
||||||
|
struct stasis_message_vtable vtable = { 0 };
|
||||||
|
struct test_message_types *types;
|
||||||
|
enum ast_test_result_state __attribute__ ((unused)) rc;
|
||||||
|
|
||||||
|
types = ao2_alloc(sizeof(*types), destroy_message_types);
|
||||||
|
if (!types) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
vtable.to_ami = fake_ami;
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
vtable.to_ami = NULL;
|
||||||
|
vtable.to_json = fake_json;
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
vtable.to_ami = NULL;
|
||||||
|
vtable.to_json = NULL;
|
||||||
|
vtable.to_event = fake_event;
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
vtable.to_ami = fake_ami;
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
ast_test_validate_cleanup(test,
|
||||||
|
stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
|
||||||
|
rc, cleanup);
|
||||||
|
|
||||||
|
types->change = stasis_subscription_change_type();
|
||||||
|
|
||||||
|
return types;
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
ao2_cleanup(types);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct cts {
|
||||||
|
struct consumer *consumer;
|
||||||
|
struct stasis_topic *topic;
|
||||||
|
struct stasis_subscription *sub;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void destroy_cts(void *obj)
|
||||||
|
{
|
||||||
|
struct cts *c = obj;
|
||||||
|
|
||||||
|
stasis_unsubscribe(c->sub);
|
||||||
|
ao2_cleanup(c->topic);
|
||||||
|
ao2_cleanup(c->consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct cts *create_cts(struct ast_test *test)
|
||||||
|
{
|
||||||
|
struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
|
||||||
|
enum ast_test_result_state __attribute__ ((unused)) rc;
|
||||||
|
|
||||||
|
ast_test_validate_cleanup(test, cts, rc, cleanup);
|
||||||
|
|
||||||
|
cts->topic = stasis_topic_create("TestTopic");
|
||||||
|
ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
|
||||||
|
|
||||||
|
cts->consumer = consumer_create(0);
|
||||||
|
ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
|
||||||
|
|
||||||
|
ao2_ref(cts->consumer, +1);
|
||||||
|
cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
|
||||||
|
ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
|
||||||
|
|
||||||
|
return cts;
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
ao2_cleanup(cts);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
|
||||||
|
{
|
||||||
|
struct stasis_subscription_change *msg_data = stasis_message_data(msg);
|
||||||
|
|
||||||
|
if (stasis_message_type(msg) != mtype) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data) {
|
||||||
|
return (strcmp(data, msg_data->description) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dump_consumer(struct ast_test *test, struct cts *cts)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
struct stasis_subscription_change *data;
|
||||||
|
|
||||||
|
ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len,
|
||||||
|
cts->consumer->complete ? "yes" : "no");
|
||||||
|
for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
|
||||||
|
data = stasis_message_data(cts->consumer->messages_rxed[i]);
|
||||||
|
ast_test_status_update(test, "Message type received: %s %s\n",
|
||||||
|
stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
|
||||||
|
data && data->description ? data->description : "no data");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
|
||||||
|
const char *data)
|
||||||
|
{
|
||||||
|
struct stasis_message *msg;
|
||||||
|
struct stasis_subscription_change *test_data =
|
||||||
|
ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
|
||||||
|
|
||||||
|
if (!test_data) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
|
||||||
|
|
||||||
|
msg = stasis_message_create(msg_type, test_data);
|
||||||
|
ao2_ref(test_data, -1);
|
||||||
|
if (!msg) {
|
||||||
|
ast_test_status_update(test, "Unable to create %s message\n",
|
||||||
|
stasis_message_type_name(msg_type));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
stasis_publish(cts->topic, msg);
|
||||||
|
ao2_ref(msg, -1);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
AST_TEST_DEFINE(type_filters)
|
||||||
|
{
|
||||||
|
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
|
||||||
|
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
|
||||||
|
int ix = 0;
|
||||||
|
|
||||||
|
switch (cmd) {
|
||||||
|
case TEST_INIT:
|
||||||
|
info->name = __func__;
|
||||||
|
info->category = test_category "filtering/";
|
||||||
|
info->summary = "Test message filtering by type";
|
||||||
|
info->description = "Test message filtering by type";
|
||||||
|
return AST_TEST_NOT_RUN;
|
||||||
|
case TEST_EXECUTE:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
types = create_message_types(test);
|
||||||
|
ast_test_validate(test, NULL != types);
|
||||||
|
|
||||||
|
cts = create_cts(test);
|
||||||
|
ast_test_validate(test, NULL != cts);
|
||||||
|
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
|
||||||
|
|
||||||
|
/* We should get these */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
|
||||||
|
/* ... but not this one */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
|
||||||
|
|
||||||
|
/* Wait for change(subscribe) and "Pass" messages */
|
||||||
|
consumer_wait_for(cts->consumer, 3);
|
||||||
|
|
||||||
|
/* Remove type 1 */
|
||||||
|
ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
|
||||||
|
|
||||||
|
/* We should now NOT get this one */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
|
||||||
|
/* We should get this one (again) */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
|
||||||
|
/* We still should NOT get this one */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
|
||||||
|
|
||||||
|
/* We should now have a second type2 */
|
||||||
|
consumer_wait_for(cts->consumer, 4);
|
||||||
|
|
||||||
|
stasis_unsubscribe(cts->sub);
|
||||||
|
cts->sub = NULL;
|
||||||
|
consumer_wait_for_completion(cts->consumer);
|
||||||
|
|
||||||
|
dump_consumer(test, cts);
|
||||||
|
|
||||||
|
ast_test_validate(test, 1 == cts->consumer->complete);
|
||||||
|
ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
|
||||||
|
|
||||||
|
return AST_TEST_PASS;
|
||||||
|
}
|
||||||
|
|
||||||
|
AST_TEST_DEFINE(formatter_filters)
|
||||||
|
{
|
||||||
|
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
|
||||||
|
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
|
||||||
|
int ix = 0;
|
||||||
|
|
||||||
|
switch (cmd) {
|
||||||
|
case TEST_INIT:
|
||||||
|
info->name = __func__;
|
||||||
|
info->category = test_category "filtering/";
|
||||||
|
info->summary = "Test message filtering by formatter";
|
||||||
|
info->description = "Test message filtering by formatter";
|
||||||
|
return AST_TEST_NOT_RUN;
|
||||||
|
case TEST_EXECUTE:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
types = create_message_types(test);
|
||||||
|
ast_test_validate(test, NULL != types);
|
||||||
|
|
||||||
|
cts = create_cts(test);
|
||||||
|
ast_test_validate(test, NULL != cts);
|
||||||
|
|
||||||
|
stasis_subscription_accept_formatters(cts->sub,
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
|
||||||
|
|
||||||
|
/* We should get these */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
|
||||||
|
|
||||||
|
/* ... but not these */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
|
||||||
|
|
||||||
|
/* Wait for change(subscribe) and the "Pass" messages */
|
||||||
|
consumer_wait_for(cts->consumer, 4);
|
||||||
|
|
||||||
|
/* Change the subscription to accept only event formatters */
|
||||||
|
stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
|
||||||
|
|
||||||
|
/* We should NOT get these now */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
|
||||||
|
/* ... but we should still get this one */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
|
||||||
|
/* ... and this one should be new */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
|
||||||
|
|
||||||
|
/* We should now have a second amievent */
|
||||||
|
consumer_wait_for(cts->consumer, 6);
|
||||||
|
|
||||||
|
stasis_unsubscribe(cts->sub);
|
||||||
|
cts->sub = NULL;
|
||||||
|
consumer_wait_for_completion(cts->consumer);
|
||||||
|
|
||||||
|
dump_consumer(test, cts);
|
||||||
|
|
||||||
|
ast_test_validate(test, 1 == cts->consumer->complete);
|
||||||
|
ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
|
||||||
|
|
||||||
|
return AST_TEST_PASS;
|
||||||
|
}
|
||||||
|
|
||||||
|
AST_TEST_DEFINE(combo_filters)
|
||||||
|
{
|
||||||
|
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
|
||||||
|
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
|
||||||
|
int ix = 0;
|
||||||
|
|
||||||
|
switch (cmd) {
|
||||||
|
case TEST_INIT:
|
||||||
|
info->name = __func__;
|
||||||
|
info->category = test_category "filtering/";
|
||||||
|
info->summary = "Test message filtering by type and formatter";
|
||||||
|
info->description = "Test message filtering by type and formatter";
|
||||||
|
return AST_TEST_NOT_RUN;
|
||||||
|
case TEST_EXECUTE:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
types = create_message_types(test);
|
||||||
|
ast_test_validate(test, NULL != types);
|
||||||
|
|
||||||
|
cts = create_cts(test);
|
||||||
|
ast_test_validate(test, NULL != cts);
|
||||||
|
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
|
||||||
|
ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
|
||||||
|
stasis_subscription_accept_formatters(cts->sub,
|
||||||
|
STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
|
||||||
|
|
||||||
|
/* We should get these */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
|
||||||
|
|
||||||
|
/* ... but not these */
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
|
||||||
|
ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
|
||||||
|
|
||||||
|
/* Wait for change(subscribe) and the "Pass" messages */
|
||||||
|
consumer_wait_for(cts->consumer, 6);
|
||||||
|
|
||||||
|
stasis_unsubscribe(cts->sub);
|
||||||
|
cts->sub = NULL;
|
||||||
|
consumer_wait_for_completion(cts->consumer);
|
||||||
|
|
||||||
|
dump_consumer(test, cts);
|
||||||
|
|
||||||
|
ast_test_validate(test, 1 == cts->consumer->complete);
|
||||||
|
ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
|
||||||
|
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
|
||||||
|
|
||||||
|
return AST_TEST_PASS;
|
||||||
|
}
|
||||||
|
|
||||||
static int unload_module(void)
|
static int unload_module(void)
|
||||||
{
|
{
|
||||||
AST_TEST_UNREGISTER(message_type);
|
AST_TEST_UNREGISTER(message_type);
|
||||||
@@ -2070,6 +2459,9 @@ static int unload_module(void)
|
|||||||
AST_TEST_UNREGISTER(to_ami);
|
AST_TEST_UNREGISTER(to_ami);
|
||||||
AST_TEST_UNREGISTER(dtor_order);
|
AST_TEST_UNREGISTER(dtor_order);
|
||||||
AST_TEST_UNREGISTER(caching_dtor_order);
|
AST_TEST_UNREGISTER(caching_dtor_order);
|
||||||
|
AST_TEST_UNREGISTER(type_filters);
|
||||||
|
AST_TEST_UNREGISTER(formatter_filters);
|
||||||
|
AST_TEST_UNREGISTER(combo_filters);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2099,6 +2491,9 @@ static int load_module(void)
|
|||||||
AST_TEST_REGISTER(to_ami);
|
AST_TEST_REGISTER(to_ami);
|
||||||
AST_TEST_REGISTER(dtor_order);
|
AST_TEST_REGISTER(dtor_order);
|
||||||
AST_TEST_REGISTER(caching_dtor_order);
|
AST_TEST_REGISTER(caching_dtor_order);
|
||||||
|
AST_TEST_REGISTER(type_filters);
|
||||||
|
AST_TEST_REGISTER(formatter_filters);
|
||||||
|
AST_TEST_REGISTER(combo_filters);
|
||||||
return AST_MODULE_LOAD_SUCCESS;
|
return AST_MODULE_LOAD_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user