Files
asterisk/tests/test_stasis.c
David M. Lee b23e8e1950 stasis: Fixed message ordering issues when forwarding
This patch fixes an issue of message ordering that occurs when
multiple topics are forwarded to an aggregator topic (such as
ast_channel_topic_all()).

It is (very reasonably) expected that the rules governing message
dispatch order still apply, so long as the messages start from the
same thread, and are received by the same subscription. Because the
existing code had an additional layer of dispatching via the Stasis
thread pool for forwards, those promises couldn't be kept.

Forwarding subscriptions no longer have their own mailbox, and now
dispatch directly from the forwarding topic's stasis_publish()
call. This means that the topic's lock is held for the duration of not
only a message's dispatch, but the dispatch of all the forwards. This
shouldn't be a problem right now, but if an aggregator topic had many
subscribers, it could become a problem. But I figure we can write more
clever code when the time comes, if necessary.

Review: https://reviewboard.asterisk.org/r/2419/


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-04-01 13:37:51 +00:00

938 lines
31 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*!
* \file \brief Test Stasis message bus.
*
* \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
*
* \ingroup tests
*/
/*** MODULEINFO
<depend>TEST_FRAMEWORK</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/test.h"
static const char *test_category = "/stasis/core/";
AST_TEST_DEFINE(message_type)
{
RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test basic message_type functions";
info->description = "Test basic message_type functions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, NULL == stasis_message_type_create(NULL));
uut = stasis_message_type_create("SomeMessage");
ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(message)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
char *expected = "SomeData";
struct timeval expected_timestamp;
struct timeval time_diff;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test basic message functions";
info->description = "Test basic message functions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
type = stasis_message_type_create("SomeMessage");
ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
ast_test_validate(test, NULL == stasis_message_create(type, NULL));
data = ao2_alloc(strlen(expected) + 1, NULL);
strcpy(data, expected);
expected_timestamp = ast_tvnow();
uut = stasis_message_create(type, data);
ast_test_validate(test, NULL != uut);
ast_test_validate(test, type == stasis_message_type(uut));
ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
/* 10ms is certainly long enough for the two calls to complete */
ast_test_validate(test, time_diff.tv_sec == 0);
ast_test_validate(test, time_diff.tv_usec < 10000);
ao2_ref(uut, -1);
uut = NULL;
ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
return AST_TEST_PASS;
}
struct consumer {
ast_mutex_t lock;
ast_cond_t out;
struct stasis_message **messages_rxed;
size_t messages_rxed_len;
int ignore_subscriptions;
int complete;
};
static void consumer_dtor(void *obj) {
struct consumer *consumer = obj;
ast_mutex_destroy(&consumer->lock);
ast_cond_destroy(&consumer->out);
while (consumer->messages_rxed_len > 0) {
ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
}
ast_free(consumer->messages_rxed);
consumer->messages_rxed = NULL;
}
static struct consumer *consumer_create(int ignore_subscriptions) {
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
if (!consumer) {
return NULL;
}
consumer->ignore_subscriptions = ignore_subscriptions;
consumer->messages_rxed = ast_malloc(0);
if (!consumer->messages_rxed) {
return NULL;
}
ast_mutex_init(&consumer->lock);
ast_cond_init(&consumer->out, NULL);
ao2_ref(consumer, +1);
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
SCOPED_MUTEX(lock, &consumer->lock);
if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
++consumer->messages_rxed_len;
consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
ast_assert(consumer->messages_rxed != NULL);
consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
ao2_ref(message, +1);
}
if (stasis_subscription_final_message(sub, message)) {
consumer->complete = 1;
consumer_needs_cleanup = consumer;
}
ast_cond_signal(&consumer->out);
}
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 30,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_MUTEX(lock, &consumer->lock);
while (consumer->messages_rxed_len < expected_len) {
int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any othet types of errors */
}
return consumer->messages_rxed_len;
}
static int consumer_wait_for_completion(struct consumer *consumer)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 30,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_MUTEX(lock, &consumer->lock);
while (!consumer->complete) {
int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any othet types of errors */
}
return consumer->complete;
}
static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
struct timeval diff = {
.tv_sec = 0,
.tv_usec = 100000 /* wait for 100ms */
};
struct timeval end_tv = ast_tvadd(start, diff);
struct timespec end = {
.tv_sec = end_tv.tv_sec,
.tv_nsec = end_tv.tv_usec * 1000
};
SCOPED_MUTEX(lock, &consumer->lock);
while (consumer->messages_rxed_len == expected_len) {
int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any othet types of errors */
}
return consumer->messages_rxed_len;
}
AST_TEST_DEFINE(subscription_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
int complete;
struct stasis_subscription_change *change;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test subscribe/unsubscribe messages";
info->description = "Test subscribe/unsubscribe messages";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(0);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
uut = stasis_unsubscribe(uut);
complete = consumer_wait_for_completion(consumer);
ast_test_validate(test, 1 == complete);
ast_test_validate(test, 2 == consumer->messages_rxed_len);
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
change = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
change = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(publish)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
const char *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test simple subscriptions";
info->description = "Test simple subscriptions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message_type = stasis_message_type_create("TestMessage");
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, test_data == actual);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test simple subscriptions";
info->description = "Test simple subscriptions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
uut = stasis_unsubscribe(uut);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message_type = stasis_message_type_create("TestMessage");
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_should_stay(consumer, 0);
ast_test_validate(test, 0 == actual_len);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(forward)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test sending events to a parent topic";
info->description = "Test sending events to a parent topic.\n"
"This test creates three topics (one parent, two children)\n"
"and publishes a message to one child, and verifies it's\n"
"only seen by that child and the parent";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
parent_topic = stasis_topic_create("ParentTestTopic");
ast_test_validate(test, NULL != parent_topic);
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
forward_sub = stasis_forward_all(topic, parent_topic);
ast_test_validate(test, NULL != forward_sub);
parent_consumer = consumer_create(1);
ast_test_validate(test, NULL != parent_consumer);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
ast_test_validate(test, NULL != parent_sub);
ao2_ref(parent_consumer, +1);
sub = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message_type = stasis_message_type_create("TestMessage");
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(parent_consumer, 1);
ast_test_validate(test, 1 == actual_len);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(interleaving)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test sending interleaved events to a parent topic";
info->description = "Test sending events to a parent topic.\n"
"This test creates three topics (one parent, two children)\n"
"and publishes messages alternately between the children.\n"
"It verifies that the messages are received in the expected\n"
"order.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
test_message_type = stasis_message_type_create("test");
ast_test_validate(test, NULL != test_message_type);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message3);
parent_topic = stasis_topic_create("ParentTestTopic");
ast_test_validate(test, NULL != parent_topic);
topic1 = stasis_topic_create("Topic1");
ast_test_validate(test, NULL != topic1);
topic2 = stasis_topic_create("Topic2");
ast_test_validate(test, NULL != topic2);
forward_sub1 = stasis_forward_all(topic1, parent_topic);
ast_test_validate(test, NULL != forward_sub1);
forward_sub2 = stasis_forward_all(topic2, parent_topic);
ast_test_validate(test, NULL != forward_sub2);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
stasis_publish(topic1, test_message1);
stasis_publish(topic2, test_message2);
stasis_publish(topic1, test_message3);
actual_len = consumer_wait_for(consumer, 3);
ast_test_validate(test, 3 == actual_len);
ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
return AST_TEST_PASS;
}
struct cache_test_data {
char *id;
char *value;
};
static void cache_test_data_dtor(void *obj)
{
struct cache_test_data *data = obj;
ast_free(data->id);
ast_free(data->value);
}
static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
{
RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
if (data == NULL) {
return NULL;
}
ast_assert(name != NULL);
ast_assert(value != NULL);
data->id = ast_strdup(name);
data->value = ast_strdup(value);
if (!data->id || !data->value) {
return NULL;
}
return stasis_message_create(type, data);
}
static const char *cache_test_data_id(struct stasis_message *message) {
struct cache_test_data *cachable = stasis_message_data(message);
if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
return NULL;
}
return cachable->id;
}
AST_TEST_DEFINE(cache_passthrough)
{
RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
struct stasis_message_type *actual_type;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test passing messages through cache topic unscathed.";
info->description = "Test passing messages through cache topic unscathed.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
non_cache_type = stasis_message_type_create("NonCacheable");
ast_test_validate(test, NULL != non_cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_message = cache_test_message_create(non_cache_type, "1", "1");
ast_test_validate(test, NULL != test_message);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual_type = stasis_message_type(consumer->messages_rxed[0]);
ast_test_validate(test, non_cache_type == actual_type);
ast_test_validate(test, test_message == consumer->messages_rxed[0]);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(cache)
{
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
int actual_len;
struct stasis_cache_update *actual_update;
struct ao2_container *cache_dump;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test passing messages through cache topic unscathed.";
info->description = "Test passing messages through cache topic unscathed.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
cache_type = stasis_message_type_create("Cacheable");
ast_test_validate(test, NULL != cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_message1_1 = cache_test_message_create(cache_type, "1", "1");
ast_test_validate(test, NULL != test_message1_1);
test_message2_1 = cache_test_message_create(cache_type, "2", "1");
ast_test_validate(test, NULL != test_message2_1);
/* Post a couple of snapshots */
stasis_publish(topic, test_message1_1);
stasis_publish(topic, test_message2_1);
actual_len = consumer_wait_for(consumer, 2);
ast_test_validate(test, 2 == actual_len);
/* Dump the cache to ensure that it has the correct number of items in it */
cache_dump = stasis_cache_dump(caching_topic, NULL);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
ao2_ref(cache_dump, -1);
cache_dump = NULL;
/* Check for new snapshot messages */
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
actual_update = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1"));
/* stasis_cache_get returned a ref, so unref test_message1_1 */
ao2_ref(test_message1_1, -1);
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
actual_update = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2"));
/* stasis_cache_get returned a ref, so unref test_message2_1 */
ao2_ref(test_message2_1, -1);
/* Update snapshot 2 */
test_message2_2 = cache_test_message_create(cache_type, "2", "2");
ast_test_validate(test, NULL != test_message2_2);
stasis_publish(topic, test_message2_2);
actual_len = consumer_wait_for(consumer, 3);
ast_test_validate(test, 3 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[2]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2"));
/* stasis_cache_get returned a ref, so unref test_message2_2 */
ao2_ref(test_message2_2, -1);
/* Dump the cache to ensure that it has the correct number of items in it */
cache_dump = stasis_cache_dump(caching_topic, NULL);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
ao2_ref(cache_dump, -1);
cache_dump = NULL;
/* Clear snapshot 1 */
test_message1_clear = stasis_cache_clear_create(cache_type, "1");
ast_test_validate(test, NULL != test_message1_clear);
stasis_publish(topic, test_message1_clear);
actual_len = consumer_wait_for(consumer, 4);
ast_test_validate(test, 4 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[3]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
ast_test_validate(test, NULL == actual_update->new_snapshot);
ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
/* Dump the cache to ensure that it has the correct number of items in it */
cache_dump = stasis_cache_dump(caching_topic, NULL);
ast_test_validate(test, 1 == ao2_container_count(cache_dump));
ao2_ref(cache_dump, -1);
cache_dump = NULL;
/* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change_type());
ast_test_validate(test, 0 == ao2_container_count(cache_dump));
ao2_ref(cache_dump, -1);
cache_dump = NULL;
return AST_TEST_PASS;
}
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
int ret;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary =
"Multiple routes to the same message_type should fail";
info->description =
"Multiple routes to the same message_type should fail";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
test_message_type = stasis_message_type_create("TestMessage");
ast_test_validate(test, NULL != test_message_type);
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer2);
ast_test_validate(test, 0 != ret);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
int actual_len, ret;
struct stasis_message *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test simple message routing";
info->description = "Test simple message routing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
consumer3 = consumer_create(1);
ast_test_validate(test, NULL != consumer3);
test_message_type1 = stasis_message_type_create("TestMessage1");
ast_test_validate(test, NULL != test_message_type1);
test_message_type2 = stasis_message_type_create("TestMessage2");
ast_test_validate(test, NULL != test_message_type2);
test_message_type3 = stasis_message_type_create("TestMessage3");
ast_test_validate(test, NULL != test_message_type3);
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type1, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer1, +1);
ret = stasis_message_router_add(
uut, test_message_type2, consumer_exec, consumer2);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer2, +1);
ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer3, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type1, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type2, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type3, test_data);
ast_test_validate(test, NULL != test_message3);
stasis_publish(topic, test_message1);
stasis_publish(topic, test_message2);
stasis_publish(topic, test_message3);
actual_len = consumer_wait_for(consumer1, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer2, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer3, 1);
ast_test_validate(test, 1 == actual_len);
actual = consumer1->messages_rxed[0];
ast_test_validate(test, test_message1 == actual);
actual = consumer2->messages_rxed[0];
ast_test_validate(test, test_message2 == actual);
actual = consumer3->messages_rxed[0];
ast_test_validate(test, test_message3 == actual);
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup(consumer1);
ao2_cleanup(consumer2);
return AST_TEST_PASS;
}
static int unload_module(void)
{
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
AST_TEST_UNREGISTER(publish);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_passthrough);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
AST_TEST_UNREGISTER(interleaving);
return 0;
}
static int load_module(void)
{
AST_TEST_REGISTER(message_type);
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
AST_TEST_REGISTER(publish);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_passthrough);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
AST_TEST_REGISTER(interleaving);
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
.load = load_module,
.unload = unload_module
);