Significantly improve scheduler performance under high load.

This patch changes the scheduler to use a max-heap to store pending scheduler
entries instead of a fully sorted doubly linked list.  When the number of
entries in the scheduler gets large, this will perform much better.  For much
more detailed information on this change, see the review request.

Review: http://reviewboard.digium.com/r/160/


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@176639 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Russell Bryant
2009-02-17 21:04:08 +00:00
parent 56b9180bd7
commit 044cf691fe
2 changed files with 200 additions and 80 deletions

View File

@@ -45,15 +45,17 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/linkedlists.h" #include "asterisk/linkedlists.h"
#include "asterisk/dlinkedlists.h" #include "asterisk/dlinkedlists.h"
#include "asterisk/hashtab.h" #include "asterisk/hashtab.h"
#include "asterisk/heap.h"
struct sched { struct sched {
AST_DLLIST_ENTRY(sched) list; AST_LIST_ENTRY(sched) list;
int id; /*!< ID number of event */ int id; /*!< ID number of event */
struct timeval when; /*!< Absolute time event should take place */ struct timeval when; /*!< Absolute time event should take place */
int resched; /*!< When to reschedule */ int resched; /*!< When to reschedule */
int variable; /*!< Use return value from callback to reschedule */ int variable; /*!< Use return value from callback to reschedule */
const void *data; /*!< Data */ const void *data; /*!< Data */
ast_sched_cb callback; /*!< Callback */ ast_sched_cb callback; /*!< Callback */
ssize_t __heap_index;
}; };
struct sched_context { struct sched_context {
@@ -61,8 +63,8 @@ struct sched_context {
unsigned int eventcnt; /*!< Number of events processed */ unsigned int eventcnt; /*!< Number of events processed */
unsigned int schedcnt; /*!< Number of outstanding schedule events */ unsigned int schedcnt; /*!< Number of outstanding schedule events */
unsigned int highwater; /*!< highest count so far */ unsigned int highwater; /*!< highest count so far */
AST_DLLIST_HEAD_NOLOCK(, sched) schedq; /*!< Schedule entry and main queue */
struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */ struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */
struct ast_heap *sched_heap;
#ifdef SCHED_MAX_CACHE #ifdef SCHED_MAX_CACHE
AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */ AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
@@ -229,6 +231,11 @@ static unsigned int sched_hash(const void *obj)
return h; return h;
} }
static int sched_time_cmp(void *a, void *b)
{
return ast_tvcmp(((struct sched *) a)->when, ((struct sched *) b)->when);
}
struct sched_context *sched_context_create(void) struct sched_context *sched_context_create(void)
{ {
struct sched_context *tmp; struct sched_context *tmp;
@@ -241,6 +248,12 @@ struct sched_context *sched_context_create(void)
tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1); tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
offsetof(struct sched, __heap_index)))) {
sched_context_destroy(tmp);
return NULL;
}
return tmp; return tmp;
} }
@@ -256,9 +269,13 @@ void sched_context_destroy(struct sched_context *con)
ast_free(s); ast_free(s);
#endif #endif
/* And the queue */ if (con->sched_heap) {
while ((s = AST_DLLIST_REMOVE_HEAD(&con->schedq, list))) while ((s = ast_heap_pop(con->sched_heap))) {
ast_free(s); ast_free(s);
}
ast_heap_destroy(con->sched_heap);
con->sched_heap = NULL;
}
ast_hashtab_destroy(con->schedq_ht, NULL); ast_hashtab_destroy(con->schedq_ht, NULL);
con->schedq_ht = NULL; con->schedq_ht = NULL;
@@ -310,16 +327,18 @@ static void sched_release(struct sched_context *con, struct sched *tmp)
int ast_sched_wait(struct sched_context *con) int ast_sched_wait(struct sched_context *con)
{ {
int ms; int ms;
struct sched *s;
DEBUG(ast_debug(1, "ast_sched_wait()\n")); DEBUG(ast_debug(1, "ast_sched_wait()\n"));
ast_mutex_lock(&con->lock); ast_mutex_lock(&con->lock);
if (AST_DLLIST_EMPTY(&con->schedq)) { if ((s = ast_heap_peek(con->sched_heap, 1))) {
ms = -1; ms = ast_tvdiff_ms(s->when, ast_tvnow());
} else { if (ms < 0) {
ms = ast_tvdiff_ms(AST_DLLIST_FIRST(&con->schedq)->when, ast_tvnow());
if (ms < 0)
ms = 0; ms = 0;
}
} else {
ms = -1;
} }
ast_mutex_unlock(&con->lock); ast_mutex_unlock(&con->lock);
@@ -334,53 +353,17 @@ int ast_sched_wait(struct sched_context *con)
*/ */
static void schedule(struct sched_context *con, struct sched *s) static void schedule(struct sched_context *con, struct sched *s)
{ {
struct sched *cur = NULL; ast_heap_push(con->sched_heap, s);
int ret;
int df = 0;
int de = 0;
struct sched *first = AST_DLLIST_FIRST(&con->schedq);
struct sched *last = AST_DLLIST_LAST(&con->schedq);
if (first) if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
df = ast_tvdiff_us(s->when, first->when); ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
if (last)
de = ast_tvdiff_us(s->when, last->when);
if (df < 0)
df = -df;
if (de < 0)
de = -de;
if (df < de) {
AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
if (ast_tvcmp(s->when, cur->when) == -1) {
AST_DLLIST_INSERT_BEFORE(&con->schedq, cur, s, list);
break;
}
}
if (!cur) {
AST_DLLIST_INSERT_TAIL(&con->schedq, s, list);
}
} else {
AST_DLLIST_TRAVERSE_BACKWARDS(&con->schedq, cur, list) {
if (ast_tvcmp(s->when, cur->when) == 1) {
AST_DLLIST_INSERT_AFTER(&con->schedq, cur, s, list);
break;
}
}
if (!cur) {
AST_DLLIST_INSERT_HEAD(&con->schedq, s, list);
}
} }
ret = ast_hashtab_insert_safe(con->schedq_ht, s);
if (!ret)
ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n",s->id);
con->schedcnt++; con->schedcnt++;
if (con->schedcnt > con->highwater) if (con->schedcnt > con->highwater) {
con->highwater = con->schedcnt; con->highwater = con->schedcnt;
}
} }
/*! \brief /*! \brief
@@ -480,31 +463,25 @@ int ast_sched_del(struct sched_context *con, int id)
int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function) int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
#endif #endif
{ {
struct sched *s, tmp; struct sched *s, tmp = {
.id = id,
};
DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id)); DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
ast_mutex_lock(&con->lock); ast_mutex_lock(&con->lock);
/* OK, this is the heart of the sched performance upgrade.
If we have 4700 peers, we can have 4700+ entries in the
schedq list. searching this would take time. So, I add a
hashtab to the context to keep track of each entry, by id.
I also leave the linked list alone, almost, -- I implement
a doubly-linked list instead, because it would do little good
to look up the id in a hashtab, and then have to run thru
a couple thousand entries to remove it from the schedq list! */
tmp.id = id;
s = ast_hashtab_lookup(con->schedq_ht, &tmp); s = ast_hashtab_lookup(con->schedq_ht, &tmp);
if (s) { if (s) {
struct sched *x = AST_DLLIST_REMOVE(&con->schedq, s, list); if (!ast_heap_remove(con->sched_heap, s)) {
ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
}
if (!x) if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
ast_log(LOG_WARNING,"sched entry %d not in the schedq list?\n", s->id);
if (!ast_hashtab_remove_this_object(con->schedq_ht, s))
ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id); ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
}
con->schedcnt--; con->schedcnt--;
sched_release(con, s); sched_release(con, s);
} }
@@ -530,14 +507,18 @@ int _ast_sched_del(struct sched_context *con, int id, const char *file, int line
void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames) void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
{ {
int i; int i, x;
struct sched *cur; struct sched *cur;
int countlist[cbnames->numassocs + 1]; int countlist[cbnames->numassocs + 1];
size_t heap_size;
ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt); ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
ast_mutex_lock(&con->lock); ast_mutex_lock(&con->lock);
AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
heap_size = ast_heap_size(con->sched_heap);
for (x = 1; x <= heap_size; x++) {
cur = ast_heap_peek(con->sched_heap, x);
/* match the callback to the cblist */ /* match the callback to the cblist */
for (i = 0; i < cbnames->numassocs; i++) { for (i = 0; i < cbnames->numassocs; i++) {
if (cur->callback == cbnames->cblist[i]) { if (cur->callback == cbnames->cblist[i]) {
@@ -550,6 +531,7 @@ void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct as
countlist[cbnames->numassocs]++; countlist[cbnames->numassocs]++;
} }
} }
ast_mutex_unlock(&con->lock); ast_mutex_unlock(&con->lock);
for (i = 0; i < cbnames->numassocs; i++) { for (i = 0; i < cbnames->numassocs; i++) {
@@ -564,6 +546,8 @@ void ast_sched_dump(struct sched_context *con)
{ {
struct sched *q; struct sched *q;
struct timeval when = ast_tvnow(); struct timeval when = ast_tvnow();
int x;
size_t heap_size;
#ifdef SCHED_MAX_CACHE #ifdef SCHED_MAX_CACHE
ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater); ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
#else #else
@@ -574,9 +558,11 @@ void ast_sched_dump(struct sched_context *con)
ast_debug(1, "|ID Callback Data Time (sec:ms) |\n"); ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n"); ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
ast_mutex_lock(&con->lock); ast_mutex_lock(&con->lock);
AST_DLLIST_TRAVERSE(&con->schedq, q, list) { heap_size = ast_heap_size(con->sched_heap);
struct timeval delta = ast_tvsub(q->when, when); for (x = 1; x <= heap_size; x++) {
struct timeval delta;
q = ast_heap_peek(con->sched_heap, x);
delta = ast_tvsub(q->when, when);
ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
q->id, q->id,
q->callback, q->callback,
@@ -602,19 +588,22 @@ int ast_sched_runq(struct sched_context *con)
ast_mutex_lock(&con->lock); ast_mutex_lock(&con->lock);
for (numevents = 0; !AST_DLLIST_EMPTY(&con->schedq); numevents++) { for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
/* schedule all events which are going to expire within 1ms. /* schedule all events which are going to expire within 1ms.
* We only care about millisecond accuracy anyway, so this will * We only care about millisecond accuracy anyway, so this will
* help us get more than one event at one time if they are very * help us get more than one event at one time if they are very
* close together. * close together.
*/ */
when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000)); when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
if (ast_tvcmp(AST_DLLIST_FIRST(&con->schedq)->when, when) != -1) if (ast_tvcmp(current->when, when) != -1) {
break; break;
}
current = AST_DLLIST_REMOVE_HEAD(&con->schedq, list); current = ast_heap_pop(con->sched_heap);
if (!ast_hashtab_remove_this_object(con->schedq_ht, current))
if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id); ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
}
con->schedcnt--; con->schedcnt--;
@@ -638,11 +627,12 @@ int ast_sched_runq(struct sched_context *con)
*/ */
if (sched_settime(&current->when, current->variable? res : current->resched)) { if (sched_settime(&current->when, current->variable? res : current->resched)) {
sched_release(con, current); sched_release(con, current);
} else } else {
schedule(con, current); schedule(con, current);
}
} else { } else {
/* No longer needed, so release it */ /* No longer needed, so release it */
sched_release(con, current); sched_release(con, current);
} }
} }

130
tests/test_sched.c Normal file
View File

@@ -0,0 +1,130 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2009, Digium, Inc.
*
* Russell Bryant <russell@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief ast_sched performance test module
*
* \author Russell Bryant <russell@digium.com>
*/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/module.h"
#include "asterisk/cli.h"
#include "asterisk/utils.h"
#include "asterisk/sched.h"
static int sched_cb(const void *data)
{
return 0;
}
static char *handle_cli_sched_test(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct sched_context *con;
struct timeval start;
unsigned int num, i;
int *sched_ids = NULL;
switch (cmd) {
case CLI_INIT:
e->command = "sched test";
e->usage = ""
"Usage: sched test <num>\n"
"";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != e->args + 1) {
return CLI_SHOWUSAGE;
}
if (sscanf(a->argv[e->args], "%u", &num) != 1) {
return CLI_SHOWUSAGE;
}
if (!(con = sched_context_create())) {
ast_cli(a->fd, "Test failed - could not create scheduler context\n");
return CLI_FAILURE;
}
if (!(sched_ids = ast_malloc(sizeof(*sched_ids) * num))) {
ast_cli(a->fd, "Test failed - memory allocation failure\n");
goto return_cleanup;
}
ast_cli(a->fd, "Testing ast_sched_add() performance - timing how long it takes "
"to add %u entries at random time intervals from 0 to 60 seconds\n", num);
start = ast_tvnow();
for (i = 0; i < num; i++) {
int when = abs(ast_random()) % 60000;
if ((sched_ids[i] = ast_sched_add(con, when, sched_cb, NULL)) == -1) {
ast_cli(a->fd, "Test failed - sched_add returned -1\n");
goto return_cleanup;
}
}
ast_cli(a->fd, "Test complete - %ld us\n", ast_tvdiff_us(ast_tvnow(), start));
ast_cli(a->fd, "Testing ast_sched_del() performance - timing how long it takes "
"to delete %u entries with random time intervals from 0 to 60 seconds\n", num);
start = ast_tvnow();
for (i = 0; i < num; i++) {
if (ast_sched_del(con, sched_ids[i]) == -1) {
ast_cli(a->fd, "Test failed - sched_del returned -1\n");
goto return_cleanup;
}
}
ast_cli(a->fd, "Test complete - %ld us\n", ast_tvdiff_us(ast_tvnow(), start));
return_cleanup:
sched_context_destroy(con);
if (sched_ids) {
ast_free(sched_ids);
}
return CLI_SUCCESS;
}
static struct ast_cli_entry cli_sched[] = {
AST_CLI_DEFINE(handle_cli_sched_test, "Test ast_sched add/del performance"),
};
static int unload_module(void)
{
ast_cli_unregister_multiple(cli_sched, ARRAY_LEN(cli_sched));
return 0;
}
static int load_module(void)
{
ast_cli_register_multiple(cli_sched, ARRAY_LEN(cli_sched));
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "ast_sched performance test module");