mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-04 20:04:50 +00:00
stasis: Add methods to allow for synchronous publishing to subscriber
This patch adds an API call to Stasis that allows a publisher to publish a stasis message that will not return until a specific subscriber handles the message. Since a subscriber can have their own forwarding topic which orders messages from many topics, this allows a publisher who knows of that subscriber to synchronize to that subscriber regardless of the forwarding relationships between topics. This is of particular use for dialplan applications that need to synchronize on a particular subscriber's handling of a message. (issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405313 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
|
||||
ast_cond_signal(&consumer->out);
|
||||
}
|
||||
|
||||
static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
|
||||
{
|
||||
struct consumer *consumer = data;
|
||||
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
|
||||
SCOPED_MUTEX(lock, &consumer->lock);
|
||||
|
||||
if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
|
||||
|
||||
++consumer->messages_rxed_len;
|
||||
consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
|
||||
ast_assert(consumer->messages_rxed != NULL);
|
||||
consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
|
||||
ao2_ref(message, +1);
|
||||
}
|
||||
|
||||
if (stasis_subscription_final_message(sub, message)) {
|
||||
consumer->complete = 1;
|
||||
consumer_needs_cleanup = consumer;
|
||||
}
|
||||
}
|
||||
|
||||
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
|
||||
{
|
||||
struct timeval start = ast_tvnow();
|
||||
@@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish)
|
||||
case TEST_INIT:
|
||||
info->name = __func__;
|
||||
info->category = test_category;
|
||||
info->summary = "Test simple subscriptions";
|
||||
info->description = "Test simple subscriptions";
|
||||
info->summary = "Test publishing";
|
||||
info->description = "Test publishing";
|
||||
return AST_TEST_NOT_RUN;
|
||||
case TEST_EXECUTE:
|
||||
break;
|
||||
@@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish)
|
||||
return AST_TEST_PASS;
|
||||
}
|
||||
|
||||
AST_TEST_DEFINE(publish_sync)
|
||||
{
|
||||
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
|
||||
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
|
||||
int actual_len;
|
||||
const char *actual;
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
info->name = __func__;
|
||||
info->category = test_category;
|
||||
info->summary = "Test synchronous publishing";
|
||||
info->description = "Test synchronous publishing";
|
||||
return AST_TEST_NOT_RUN;
|
||||
case TEST_EXECUTE:
|
||||
break;
|
||||
}
|
||||
|
||||
topic = stasis_topic_create("TestTopic");
|
||||
ast_test_validate(test, NULL != topic);
|
||||
|
||||
consumer = consumer_create(1);
|
||||
ast_test_validate(test, NULL != consumer);
|
||||
|
||||
uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
|
||||
ast_test_validate(test, NULL != uut);
|
||||
ao2_ref(consumer, +1);
|
||||
|
||||
test_data = ao2_alloc(1, NULL);
|
||||
ast_test_validate(test, NULL != test_data);
|
||||
test_message_type = stasis_message_type_create("TestMessage", NULL);
|
||||
test_message = stasis_message_create(test_message_type, test_data);
|
||||
|
||||
stasis_publish_sync(uut, test_message);
|
||||
|
||||
actual_len = consumer->messages_rxed_len;
|
||||
ast_test_validate(test, 1 == actual_len);
|
||||
actual = stasis_message_data(consumer->messages_rxed[0]);
|
||||
ast_test_validate(test, test_data == actual);
|
||||
|
||||
return AST_TEST_PASS;
|
||||
}
|
||||
|
||||
AST_TEST_DEFINE(unsubscribe_stops_messages)
|
||||
{
|
||||
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
|
||||
@@ -1324,6 +1392,7 @@ static int unload_module(void)
|
||||
AST_TEST_UNREGISTER(message);
|
||||
AST_TEST_UNREGISTER(subscription_messages);
|
||||
AST_TEST_UNREGISTER(publish);
|
||||
AST_TEST_UNREGISTER(publish_sync);
|
||||
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
|
||||
AST_TEST_UNREGISTER(forward);
|
||||
AST_TEST_UNREGISTER(cache_filter);
|
||||
@@ -1347,6 +1416,7 @@ static int load_module(void)
|
||||
AST_TEST_REGISTER(message);
|
||||
AST_TEST_REGISTER(subscription_messages);
|
||||
AST_TEST_REGISTER(publish);
|
||||
AST_TEST_REGISTER(publish_sync);
|
||||
AST_TEST_REGISTER(unsubscribe_stops_messages);
|
||||
AST_TEST_REGISTER(forward);
|
||||
AST_TEST_REGISTER(cache_filter);
|
||||
|
Reference in New Issue
Block a user