mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-05-30 02:20:11 +00:00
1492 lines
45 KiB
C
1492 lines
45 KiB
C
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* ***** BEGIN LICENSE BLOCK *****
|
|
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
|
|
*
|
|
* The contents of this file are subject to the Mozilla Public License Version
|
|
* 1.1 (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
* http://www.mozilla.org/MPL/
|
|
*
|
|
* Software distributed under the License is distributed on an "AS IS" basis,
|
|
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
|
* for the specific language governing rights and limitations under the
|
|
* License.
|
|
*
|
|
* The Original Code is the Netscape Portable Runtime (NSPR).
|
|
*
|
|
* The Initial Developer of the Original Code is
|
|
* Netscape Communications Corporation.
|
|
* Portions created by the Initial Developer are Copyright (C) 1998-2000
|
|
* the Initial Developer. All Rights Reserved.
|
|
*
|
|
* Contributor(s):
|
|
*
|
|
* Alternatively, the contents of this file may be used under the terms of
|
|
* either the GNU General Public License Version 2 or later (the "GPL"), or
|
|
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
|
|
* in which case the provisions of the GPL or the LGPL are applicable instead
|
|
* of those above. If you wish to allow use of your version of this file only
|
|
* under the terms of either the GPL or the LGPL, and not to allow others to
|
|
* use your version of this file under the terms of the MPL, indicate your
|
|
* decision by deleting the provisions above and replace them with the notice
|
|
* and other provisions required by the GPL or the LGPL. If you do not delete
|
|
* the provisions above, a recipient may use your version of this file under
|
|
* the terms of any one of the MPL, the GPL or the LGPL.
|
|
*
|
|
* ***** END LICENSE BLOCK ***** */
|
|
|
|
#include "primpl.h"
|
|
#include "pprmwait.h"
|
|
|
|
#define _MW_REHASH_MAX 11
|
|
|
|
static PRLock *mw_lock = NULL;
|
|
static _PRGlobalState *mw_state = NULL;
|
|
|
|
static PRIntervalTime max_polling_interval;
|
|
|
|
#ifdef WINNT
|
|
|
|
typedef struct TimerEvent {
|
|
PRIntervalTime absolute;
|
|
void (*func)(void *);
|
|
void *arg;
|
|
LONG ref_count;
|
|
PRCList links;
|
|
} TimerEvent;
|
|
|
|
#define TIMER_EVENT_PTR(_qp) \
|
|
((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
|
|
|
|
struct {
|
|
PRLock *ml;
|
|
PRCondVar *new_timer;
|
|
PRCondVar *cancel_timer;
|
|
PRThread *manager_thread;
|
|
PRCList timer_queue;
|
|
} tm_vars;
|
|
|
|
static PRStatus TimerInit(void);
|
|
static void TimerManager(void *arg);
|
|
static TimerEvent *CreateTimer(PRIntervalTime timeout,
|
|
void (*func)(void *), void *arg);
|
|
static PRBool CancelTimer(TimerEvent *timer);
|
|
|
|
static void TimerManager(void *arg)
|
|
{
|
|
PRIntervalTime now;
|
|
PRIntervalTime timeout;
|
|
PRCList *head;
|
|
TimerEvent *timer;
|
|
|
|
PR_Lock(tm_vars.ml);
|
|
while (1)
|
|
{
|
|
if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
|
|
{
|
|
PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
|
|
}
|
|
else
|
|
{
|
|
now = PR_IntervalNow();
|
|
head = PR_LIST_HEAD(&tm_vars.timer_queue);
|
|
timer = TIMER_EVENT_PTR(head);
|
|
if ((PRInt32) (now - timer->absolute) >= 0)
|
|
{
|
|
PR_REMOVE_LINK(head);
|
|
/*
|
|
* make its prev and next point to itself so that
|
|
* it's obvious that it's not on the timer_queue.
|
|
*/
|
|
PR_INIT_CLIST(head);
|
|
PR_ASSERT(2 == timer->ref_count);
|
|
PR_Unlock(tm_vars.ml);
|
|
timer->func(timer->arg);
|
|
PR_Lock(tm_vars.ml);
|
|
timer->ref_count -= 1;
|
|
if (0 == timer->ref_count)
|
|
{
|
|
PR_NotifyAllCondVar(tm_vars.cancel_timer);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
timeout = (PRIntervalTime)(timer->absolute - now);
|
|
PR_WaitCondVar(tm_vars.new_timer, timeout);
|
|
}
|
|
}
|
|
}
|
|
PR_Unlock(tm_vars.ml);
|
|
}
|
|
|
|
static TimerEvent *CreateTimer(
|
|
PRIntervalTime timeout,
|
|
void (*func)(void *),
|
|
void *arg)
|
|
{
|
|
TimerEvent *timer;
|
|
PRCList *links, *tail;
|
|
TimerEvent *elem;
|
|
|
|
timer = PR_NEW(TimerEvent);
|
|
if (NULL == timer)
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
return timer;
|
|
}
|
|
timer->absolute = PR_IntervalNow() + timeout;
|
|
timer->func = func;
|
|
timer->arg = arg;
|
|
timer->ref_count = 2;
|
|
PR_Lock(tm_vars.ml);
|
|
tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
|
|
while (links->prev != tail)
|
|
{
|
|
elem = TIMER_EVENT_PTR(links);
|
|
if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
|
|
{
|
|
break;
|
|
}
|
|
links = links->prev;
|
|
}
|
|
PR_INSERT_AFTER(&timer->links, links);
|
|
PR_NotifyCondVar(tm_vars.new_timer);
|
|
PR_Unlock(tm_vars.ml);
|
|
return timer;
|
|
}
|
|
|
|
static PRBool CancelTimer(TimerEvent *timer)
|
|
{
|
|
PRBool canceled = PR_FALSE;
|
|
|
|
PR_Lock(tm_vars.ml);
|
|
timer->ref_count -= 1;
|
|
if (timer->links.prev == &timer->links)
|
|
{
|
|
while (timer->ref_count == 1)
|
|
{
|
|
PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
PR_REMOVE_LINK(&timer->links);
|
|
canceled = PR_TRUE;
|
|
}
|
|
PR_Unlock(tm_vars.ml);
|
|
PR_DELETE(timer);
|
|
return canceled;
|
|
}
|
|
|
|
static PRStatus TimerInit(void)
|
|
{
|
|
tm_vars.ml = PR_NewLock();
|
|
if (NULL == tm_vars.ml)
|
|
{
|
|
goto failed;
|
|
}
|
|
tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
|
|
if (NULL == tm_vars.new_timer)
|
|
{
|
|
goto failed;
|
|
}
|
|
tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
|
|
if (NULL == tm_vars.cancel_timer)
|
|
{
|
|
goto failed;
|
|
}
|
|
PR_INIT_CLIST(&tm_vars.timer_queue);
|
|
tm_vars.manager_thread = PR_CreateThread(
|
|
PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
|
|
PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
|
|
if (NULL == tm_vars.manager_thread)
|
|
{
|
|
goto failed;
|
|
}
|
|
return PR_SUCCESS;
|
|
|
|
failed:
|
|
if (NULL != tm_vars.cancel_timer)
|
|
{
|
|
PR_DestroyCondVar(tm_vars.cancel_timer);
|
|
}
|
|
if (NULL != tm_vars.new_timer)
|
|
{
|
|
PR_DestroyCondVar(tm_vars.new_timer);
|
|
}
|
|
if (NULL != tm_vars.ml)
|
|
{
|
|
PR_DestroyLock(tm_vars.ml);
|
|
}
|
|
return PR_FAILURE;
|
|
}
|
|
|
|
#endif /* WINNT */
|
|
|
|
/******************************************************************/
|
|
/******************************************************************/
|
|
/************************ The private portion *********************/
|
|
/******************************************************************/
|
|
/******************************************************************/
|
|
void _PR_InitMW(void)
|
|
{
|
|
#ifdef WINNT
|
|
/*
|
|
* We use NT 4's InterlockedCompareExchange() to operate
|
|
* on PRMWStatus variables.
|
|
*/
|
|
//PR_ASSERT(sizeof(PVOID) == sizeof(PRMWStatus));
|
|
TimerInit();
|
|
#endif
|
|
mw_lock = PR_NewLock();
|
|
PR_ASSERT(NULL != mw_lock);
|
|
mw_state = PR_NEWZAP(_PRGlobalState);
|
|
PR_ASSERT(NULL != mw_state);
|
|
PR_INIT_CLIST(&mw_state->group_list);
|
|
max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
|
|
} /* _PR_InitMW */
|
|
|
|
void _PR_CleanupMW(void)
|
|
{
|
|
PR_DestroyLock(mw_lock);
|
|
mw_lock = NULL;
|
|
if (mw_state->group) {
|
|
PR_DestroyWaitGroup(mw_state->group);
|
|
/* mw_state->group is set to NULL as a side effect. */
|
|
}
|
|
PR_DELETE(mw_state);
|
|
} /* _PR_CleanupMW */
|
|
|
|
static PRWaitGroup *MW_Init2(void)
|
|
{
|
|
PRWaitGroup *group = mw_state->group; /* it's the null group */
|
|
if (NULL == group) /* there is this special case */
|
|
{
|
|
group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
|
|
if (NULL == group) goto failed_alloc;
|
|
PR_Lock(mw_lock);
|
|
if (NULL == mw_state->group)
|
|
{
|
|
mw_state->group = group;
|
|
group = NULL;
|
|
}
|
|
PR_Unlock(mw_lock);
|
|
if (group != NULL) (void)PR_DestroyWaitGroup(group);
|
|
group = mw_state->group; /* somebody beat us to it */
|
|
}
|
|
failed_alloc:
|
|
return group; /* whatever */
|
|
} /* MW_Init2 */
|
|
|
|
static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
|
|
{
|
|
/*
|
|
** The entries are put in the table using the fd (PRFileDesc*) of
|
|
** the receive descriptor as the key. This allows us to locate
|
|
** the appropriate entry aqain when the poll operation finishes.
|
|
**
|
|
** The pointer to the file descriptor object is first divided by
|
|
** the natural alignment of a pointer in the belief that object
|
|
** will have at least that many zeros in the low order bits.
|
|
** This may not be a good assuption.
|
|
**
|
|
** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
|
|
** that we declare defeat and force the table to be reconstructed.
|
|
** Since some fds might be added more than once, won't that cause
|
|
** collisions even in an empty table?
|
|
*/
|
|
PRIntn rehash = _MW_REHASH_MAX;
|
|
PRRecvWait **waiter;
|
|
PRUintn hidx = _MW_HASH(desc->fd, hash->length);
|
|
PRUintn hoffset = 0;
|
|
|
|
while (rehash-- > 0)
|
|
{
|
|
waiter = &hash->recv_wait;
|
|
if (NULL == waiter[hidx])
|
|
{
|
|
waiter[hidx] = desc;
|
|
hash->count += 1;
|
|
#if 0
|
|
printf("Adding 0x%x->0x%x ", desc, desc->fd);
|
|
printf(
|
|
"table[%u:%u:*%u]: 0x%x->0x%x\n",
|
|
hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
|
|
#endif
|
|
return _prmw_success;
|
|
}
|
|
if (desc == waiter[hidx])
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
|
|
return _prmw_error;
|
|
}
|
|
#if 0
|
|
printf("Failing 0x%x->0x%x ", desc, desc->fd);
|
|
printf(
|
|
"table[*%u:%u:%u]: 0x%x->0x%x\n",
|
|
hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
|
|
#endif
|
|
if (0 == hoffset)
|
|
{
|
|
hoffset = _MW_HASH2(desc->fd, hash->length);
|
|
PR_ASSERT(0 != hoffset);
|
|
}
|
|
hidx = (hidx + hoffset) % (hash->length);
|
|
}
|
|
return _prmw_rehash;
|
|
} /* MW_AddHashInternal */
|
|
|
|
static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
|
|
{
|
|
PRRecvWait **desc;
|
|
PRUint32 pidx, length;
|
|
_PRWaiterHash *newHash, *oldHash = group->waiter;
|
|
PRBool retry;
|
|
_PR_HashStory hrv;
|
|
|
|
static const PRInt32 prime_number[] = {
|
|
_PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
|
|
2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
|
|
PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
|
|
|
|
/* look up the next size we'd like to use for the hash table */
|
|
for (pidx = 0; pidx < primes; ++pidx)
|
|
{
|
|
if (prime_number[pidx] == oldHash->length)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
/* table size must be one of the prime numbers */
|
|
PR_ASSERT(pidx < primes);
|
|
|
|
/* if pidx == primes - 1, we can't expand the table any more */
|
|
while (pidx < primes - 1)
|
|
{
|
|
/* next size */
|
|
++pidx;
|
|
length = prime_number[pidx];
|
|
|
|
/* allocate the new hash table and fill it in with the old */
|
|
newHash = (_PRWaiterHash*)PR_CALLOC(
|
|
sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
|
|
if (NULL == newHash)
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
return _prmw_error;
|
|
}
|
|
|
|
newHash->length = length;
|
|
retry = PR_FALSE;
|
|
for (desc = &oldHash->recv_wait;
|
|
newHash->count < oldHash->count; ++desc)
|
|
{
|
|
PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
|
|
if (NULL != *desc)
|
|
{
|
|
hrv = MW_AddHashInternal(*desc, newHash);
|
|
PR_ASSERT(_prmw_error != hrv);
|
|
if (_prmw_success != hrv)
|
|
{
|
|
PR_DELETE(newHash);
|
|
retry = PR_TRUE;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (retry) continue;
|
|
|
|
PR_DELETE(group->waiter);
|
|
group->waiter = newHash;
|
|
group->p_timestamp += 1;
|
|
return _prmw_success;
|
|
}
|
|
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
return _prmw_error; /* we're hosed */
|
|
} /* MW_ExpandHashInternal */
|
|
|
|
#ifndef WINNT
|
|
static void _MW_DoneInternal(
|
|
PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
|
|
{
|
|
/*
|
|
** Add this receive wait object to the list of finished I/O
|
|
** operations for this particular group. If there are other
|
|
** threads waiting on the group, notify one. If not, arrange
|
|
** for this thread to return.
|
|
*/
|
|
|
|
#if 0
|
|
printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
|
|
#endif
|
|
(*waiter)->outcome = outcome;
|
|
PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
|
|
PR_NotifyCondVar(group->io_complete);
|
|
PR_ASSERT(0 != group->waiter->count);
|
|
group->waiter->count -= 1;
|
|
*waiter = NULL;
|
|
} /* _MW_DoneInternal */
|
|
#endif /* WINNT */
|
|
|
|
static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
|
|
{
|
|
/*
|
|
** Find the receive wait object corresponding to the file descriptor.
|
|
** Only search the wait group specified.
|
|
*/
|
|
PRRecvWait **desc;
|
|
PRIntn rehash = _MW_REHASH_MAX;
|
|
_PRWaiterHash *hash = group->waiter;
|
|
PRUintn hidx = _MW_HASH(fd, hash->length);
|
|
PRUintn hoffset = 0;
|
|
|
|
while (rehash-- > 0)
|
|
{
|
|
desc = (&hash->recv_wait) + hidx;
|
|
if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
|
|
if (0 == hoffset)
|
|
{
|
|
hoffset = _MW_HASH2(fd, hash->length);
|
|
PR_ASSERT(0 != hoffset);
|
|
}
|
|
hidx = (hidx + hoffset) % (hash->length);
|
|
}
|
|
return NULL;
|
|
} /* _MW_LookupInternal */
|
|
|
|
#ifndef WINNT
|
|
static PRStatus _MW_PollInternal(PRWaitGroup *group)
|
|
{
|
|
PRRecvWait **waiter;
|
|
PRStatus rv = PR_FAILURE;
|
|
PRInt32 count, count_ready;
|
|
PRIntervalTime polling_interval;
|
|
|
|
group->poller = PR_GetCurrentThread();
|
|
|
|
while (PR_TRUE)
|
|
{
|
|
PRIntervalTime now, since_last_poll;
|
|
PRPollDesc *poll_list;
|
|
|
|
while (0 == group->waiter->count)
|
|
{
|
|
PRStatus st;
|
|
st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
goto aborted;
|
|
}
|
|
if (_MW_ABORTED(st)) goto aborted;
|
|
}
|
|
|
|
/*
|
|
** There's something to do. See if our existing polling list
|
|
** is large enough for what we have to do?
|
|
*/
|
|
|
|
while (group->polling_count < group->waiter->count)
|
|
{
|
|
PRUint32 old_count = group->waiter->count;
|
|
PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
|
|
PRSize new_size = sizeof(PRPollDesc) * new_count;
|
|
PRPollDesc *old_polling_list = group->polling_list;
|
|
|
|
PR_Unlock(group->ml);
|
|
poll_list = (PRPollDesc*)PR_CALLOC(new_size);
|
|
if (NULL == poll_list)
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
PR_Lock(group->ml);
|
|
goto failed_alloc;
|
|
}
|
|
if (NULL != old_polling_list)
|
|
PR_DELETE(old_polling_list);
|
|
PR_Lock(group->ml);
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
goto aborted;
|
|
}
|
|
group->polling_list = poll_list;
|
|
group->polling_count = new_count;
|
|
}
|
|
|
|
now = PR_IntervalNow();
|
|
polling_interval = max_polling_interval;
|
|
since_last_poll = now - group->last_poll;
|
|
|
|
waiter = &group->waiter->recv_wait;
|
|
poll_list = group->polling_list;
|
|
for (count = 0; count < group->waiter->count; ++waiter)
|
|
{
|
|
PR_ASSERT(waiter < &group->waiter->recv_wait
|
|
+ group->waiter->length);
|
|
if (NULL != *waiter) /* a live one! */
|
|
{
|
|
if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
|
|
&& (since_last_poll >= (*waiter)->timeout))
|
|
_MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
|
|
else
|
|
{
|
|
if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
|
|
{
|
|
(*waiter)->timeout -= since_last_poll;
|
|
if ((*waiter)->timeout < polling_interval)
|
|
polling_interval = (*waiter)->timeout;
|
|
}
|
|
PR_ASSERT(poll_list < group->polling_list
|
|
+ group->polling_count);
|
|
poll_list->fd = (*waiter)->fd;
|
|
poll_list->in_flags = PR_POLL_READ;
|
|
poll_list->out_flags = 0;
|
|
#if 0
|
|
printf(
|
|
"Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
|
|
poll_list, count, poll_list->fd, (*waiter)->timeout);
|
|
#endif
|
|
poll_list += 1;
|
|
count += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
PR_ASSERT(count == group->waiter->count);
|
|
|
|
/*
|
|
** If there are no more threads waiting for completion,
|
|
** we need to return.
|
|
*/
|
|
if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
&& (1 == group->waiting_threads)) break;
|
|
|
|
if (0 == count) continue; /* wait for new business */
|
|
|
|
group->last_poll = now;
|
|
|
|
PR_Unlock(group->ml);
|
|
|
|
count_ready = PR_Poll(group->polling_list, count, polling_interval);
|
|
|
|
PR_Lock(group->ml);
|
|
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
goto aborted;
|
|
}
|
|
if (-1 == count_ready)
|
|
{
|
|
goto failed_poll; /* that's a shame */
|
|
}
|
|
else if (0 < count_ready)
|
|
{
|
|
for (poll_list = group->polling_list; count > 0;
|
|
poll_list++, count--)
|
|
{
|
|
PR_ASSERT(
|
|
poll_list < group->polling_list + group->polling_count);
|
|
if (poll_list->out_flags != 0)
|
|
{
|
|
waiter = _MW_LookupInternal(group, poll_list->fd);
|
|
/*
|
|
** If 'waiter' is NULL, that means the wait receive
|
|
** descriptor has been canceled.
|
|
*/
|
|
if (NULL != waiter)
|
|
_MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
|
|
}
|
|
}
|
|
}
|
|
/*
|
|
** If there are no more threads waiting for completion,
|
|
** we need to return.
|
|
** This thread was "borrowed" to do the polling, but it really
|
|
** belongs to the client.
|
|
*/
|
|
if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
&& (1 == group->waiting_threads)) break;
|
|
}
|
|
|
|
rv = PR_SUCCESS;
|
|
|
|
aborted:
|
|
failed_poll:
|
|
failed_alloc:
|
|
group->poller = NULL; /* we were that, not we ain't */
|
|
if ((_prmw_running == group->state) && (group->waiting_threads > 1))
|
|
{
|
|
/* Wake up one thread to become the new poller. */
|
|
PR_NotifyCondVar(group->io_complete);
|
|
}
|
|
return rv; /* we return with the lock held */
|
|
} /* _MW_PollInternal */
|
|
#endif /* !WINNT */
|
|
|
|
static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
|
|
{
|
|
PRMWGroupState rv = group->state;
|
|
/*
|
|
** Looking at the group's fields is safe because
|
|
** once the group's state is no longer running, it
|
|
** cannot revert and there is a safe check on entry
|
|
** to make sure no more threads are made to wait.
|
|
*/
|
|
if ((_prmw_stopping == rv)
|
|
&& (0 == group->waiting_threads))
|
|
{
|
|
rv = group->state = _prmw_stopped;
|
|
PR_NotifyCondVar(group->mw_manage);
|
|
}
|
|
return rv;
|
|
} /* MW_TestForShutdownInternal */
|
|
|
|
#ifndef WINNT
|
|
static void _MW_InitialRecv(PRCList *io_ready)
|
|
{
|
|
PRRecvWait *desc = (PRRecvWait*)io_ready;
|
|
if ((NULL == desc->buffer.start)
|
|
|| (0 == desc->buffer.length))
|
|
desc->bytesRecv = 0;
|
|
else
|
|
{
|
|
desc->bytesRecv = (desc->fd->methods->recv)(
|
|
desc->fd, desc->buffer.start,
|
|
desc->buffer.length, 0, desc->timeout);
|
|
if (desc->bytesRecv < 0) /* SetError should already be there */
|
|
desc->outcome = PR_MW_FAILURE;
|
|
}
|
|
} /* _MW_InitialRecv */
|
|
#endif
|
|
|
|
#ifdef WINNT
|
|
static void NT_TimeProc(void *arg)
|
|
{
|
|
_MDOverlapped *overlapped = (_MDOverlapped *)arg;
|
|
PRRecvWait *desc = overlapped->data.mw.desc;
|
|
PRFileDesc *bottom;
|
|
|
|
if (InterlockedCompareExchange((PVOID *)&desc->outcome,
|
|
(PVOID)PR_MW_TIMEOUT, (PVOID)PR_MW_PENDING) != (PVOID)PR_MW_PENDING)
|
|
{
|
|
/* This wait recv descriptor has already completed. */
|
|
return;
|
|
}
|
|
|
|
/* close the osfd to abort the outstanding async io request */
|
|
/* $$$$
|
|
** Little late to be checking if NSPR's on the bottom of stack,
|
|
** but if we don't check, we can't assert that the private data
|
|
** is what we think it is.
|
|
** $$$$
|
|
*/
|
|
bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
|
|
PR_ASSERT(NULL != bottom);
|
|
if (NULL != bottom) /* now what!?!?! */
|
|
{
|
|
bottom->secret->state = _PR_FILEDESC_CLOSED;
|
|
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
|
|
{
|
|
fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
|
|
PR_ASSERT(!"What shall I do?");
|
|
}
|
|
}
|
|
return;
|
|
} /* NT_TimeProc */
|
|
|
|
static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
|
|
{
|
|
PRRecvWait **waiter;
|
|
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
waiter = _MW_LookupInternal(group, fd);
|
|
if (NULL != waiter)
|
|
{
|
|
group->waiter->count -= 1;
|
|
*waiter = NULL;
|
|
}
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
|
|
}
|
|
|
|
PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
|
|
{
|
|
PRRecvWait **waiter;
|
|
|
|
waiter = _MW_LookupInternal(group, fd);
|
|
if (NULL != waiter)
|
|
{
|
|
group->waiter->count -= 1;
|
|
*waiter = NULL;
|
|
}
|
|
return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
|
|
}
|
|
#endif /* WINNT */
|
|
|
|
/******************************************************************/
|
|
/******************************************************************/
|
|
/********************** The public API portion ********************/
|
|
/******************************************************************/
|
|
/******************************************************************/
|
|
PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
|
|
PRWaitGroup *group, PRRecvWait *desc)
|
|
{
|
|
_PR_HashStory hrv;
|
|
PRStatus rv = PR_FAILURE;
|
|
#ifdef WINNT
|
|
_MDOverlapped *overlapped;
|
|
HANDLE hFile;
|
|
BOOL bResult;
|
|
DWORD dwError;
|
|
PRFileDesc *bottom;
|
|
#endif
|
|
|
|
if (!_pr_initialized) _PR_ImplicitInitialization();
|
|
if ((NULL == group) && (NULL == (group = MW_Init2())))
|
|
{
|
|
return rv;
|
|
}
|
|
|
|
PR_ASSERT(NULL != desc->fd);
|
|
|
|
desc->outcome = PR_MW_PENDING; /* nice, well known value */
|
|
desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
|
|
|
|
PR_Lock(group->ml);
|
|
|
|
if (_prmw_running != group->state)
|
|
{
|
|
/* Not allowed to add after cancelling the group */
|
|
desc->outcome = PR_MW_INTERRUPT;
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
PR_Unlock(group->ml);
|
|
return rv;
|
|
}
|
|
|
|
#ifdef WINNT
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
#endif
|
|
|
|
/*
|
|
** If the waiter count is zero at this point, there's no telling
|
|
** how long we've been idle. Therefore, initialize the beginning
|
|
** of the timing interval. As long as the list doesn't go empty,
|
|
** it will maintain itself.
|
|
*/
|
|
if (0 == group->waiter->count)
|
|
group->last_poll = PR_IntervalNow();
|
|
|
|
do
|
|
{
|
|
hrv = MW_AddHashInternal(desc, group->waiter);
|
|
if (_prmw_rehash != hrv) break;
|
|
hrv = MW_ExpandHashInternal(group); /* gruesome */
|
|
if (_prmw_success != hrv) break;
|
|
} while (PR_TRUE);
|
|
|
|
#ifdef WINNT
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
#endif
|
|
|
|
PR_NotifyCondVar(group->new_business); /* tell the world */
|
|
rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
|
|
PR_Unlock(group->ml);
|
|
|
|
#ifdef WINNT
|
|
overlapped = PR_NEWZAP(_MDOverlapped);
|
|
if (NULL == overlapped)
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
NT_HashRemove(group, desc->fd);
|
|
return rv;
|
|
}
|
|
overlapped->ioModel = _MD_MultiWaitIO;
|
|
overlapped->data.mw.desc = desc;
|
|
overlapped->data.mw.group = group;
|
|
if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
|
|
{
|
|
overlapped->data.mw.timer = CreateTimer(
|
|
desc->timeout,
|
|
NT_TimeProc,
|
|
overlapped);
|
|
if (0 == overlapped->data.mw.timer)
|
|
{
|
|
NT_HashRemove(group, desc->fd);
|
|
PR_DELETE(overlapped);
|
|
/*
|
|
* XXX It appears that a maximum of 16 timer events can
|
|
* be outstanding. GetLastError() returns 0 when I try it.
|
|
*/
|
|
PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
|
|
return PR_FAILURE;
|
|
}
|
|
}
|
|
|
|
/* Reach to the bottom layer to get the OS fd */
|
|
bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
|
|
PR_ASSERT(NULL != bottom);
|
|
if (NULL == bottom)
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
return PR_FAILURE;
|
|
}
|
|
hFile = (HANDLE)bottom->secret->md.osfd;
|
|
if (!bottom->secret->md.io_model_committed)
|
|
{
|
|
PRInt32 st;
|
|
st = _md_Associate(hFile);
|
|
PR_ASSERT(0 != st);
|
|
bottom->secret->md.io_model_committed = PR_TRUE;
|
|
}
|
|
bResult = ReadFile(hFile,
|
|
desc->buffer.start,
|
|
(DWORD)desc->buffer.length,
|
|
NULL,
|
|
&overlapped->overlapped);
|
|
if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
|
|
{
|
|
if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
|
|
{
|
|
if (InterlockedCompareExchange((PVOID *)&desc->outcome,
|
|
(PVOID)PR_MW_FAILURE, (PVOID)PR_MW_PENDING)
|
|
== (PVOID)PR_MW_PENDING)
|
|
{
|
|
CancelTimer(overlapped->data.mw.timer);
|
|
}
|
|
NT_HashRemove(group, desc->fd);
|
|
PR_DELETE(overlapped);
|
|
}
|
|
_PR_MD_MAP_READ_ERROR(dwError);
|
|
rv = PR_FAILURE;
|
|
}
|
|
#endif
|
|
|
|
return rv;
|
|
} /* PR_AddWaitFileDesc */
|
|
|
|
PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
|
|
{
|
|
PRCList *io_ready = NULL;
|
|
#ifdef WINNT
|
|
PRThread *me = _PR_MD_CURRENT_THREAD();
|
|
_MDOverlapped *overlapped;
|
|
#endif
|
|
|
|
if (!_pr_initialized) _PR_ImplicitInitialization();
|
|
if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
|
|
|
|
PR_Lock(group->ml);
|
|
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
goto invalid_state;
|
|
}
|
|
|
|
group->waiting_threads += 1; /* the polling thread is counted */
|
|
|
|
#ifdef WINNT
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
while (PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
{
|
|
_PR_THREAD_LOCK(me);
|
|
me->state = _PR_IO_WAIT;
|
|
PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
|
|
if (!_PR_IS_NATIVE_THREAD(me))
|
|
{
|
|
_PR_SLEEPQ_LOCK(me->cpu);
|
|
_PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
|
|
_PR_SLEEPQ_UNLOCK(me->cpu);
|
|
}
|
|
_PR_THREAD_UNLOCK(me);
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
PR_Unlock(group->ml);
|
|
_PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
|
|
me->state = _PR_RUNNING;
|
|
PR_Lock(group->ml);
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
if (_PR_PENDING_INTERRUPT(me)) {
|
|
PR_REMOVE_LINK(&me->waitQLinks);
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
me->flags &= ~_PR_INTERRUPT;
|
|
me->io_suspended = PR_FALSE;
|
|
PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
|
|
goto aborted;
|
|
}
|
|
}
|
|
io_ready = PR_LIST_HEAD(&group->io_ready);
|
|
PR_ASSERT(io_ready != NULL);
|
|
PR_REMOVE_LINK(io_ready);
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
overlapped = (_MDOverlapped *)
|
|
((char *)io_ready - offsetof(_MDOverlapped, data));
|
|
io_ready = &overlapped->data.mw.desc->internal;
|
|
#else
|
|
do
|
|
{
|
|
/*
|
|
** If the I/O ready list isn't empty, have this thread
|
|
** return with the first receive wait object that's available.
|
|
*/
|
|
if (PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
{
|
|
/*
|
|
** Is there a polling thread yet? If not, grab this thread
|
|
** and use it.
|
|
*/
|
|
if (NULL == group->poller)
|
|
{
|
|
/*
|
|
** This thread will stay do polling until it becomes the only one
|
|
** left to service a completion. Then it will return and there will
|
|
** be none left to actually poll or to run completions.
|
|
**
|
|
** The polling function should only return w/ failure or
|
|
** with some I/O ready.
|
|
*/
|
|
if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
** There are four reasons a thread can be awakened from
|
|
** a wait on the io_complete condition variable.
|
|
** 1. Some I/O has completed, i.e., the io_ready list
|
|
** is nonempty.
|
|
** 2. The wait group is canceled.
|
|
** 3. The thread is interrupted.
|
|
** 4. The current polling thread has to leave and needs
|
|
** a replacement.
|
|
** The logic to find a new polling thread is made more
|
|
** complicated by all the other possible events.
|
|
** I tried my best to write the logic clearly, but
|
|
** it is still full of if's with continue and goto.
|
|
*/
|
|
PRStatus st;
|
|
do
|
|
{
|
|
st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
goto aborted;
|
|
}
|
|
if (_MW_ABORTED(st) || (NULL == group->poller)) break;
|
|
} while (PR_CLIST_IS_EMPTY(&group->io_ready));
|
|
|
|
/*
|
|
** The thread is interrupted and has to leave. It might
|
|
** have also been awakened to process ready i/o or be the
|
|
** new poller. To be safe, if either condition is true,
|
|
** we awaken another thread to take its place.
|
|
*/
|
|
if (_MW_ABORTED(st))
|
|
{
|
|
if ((NULL == group->poller
|
|
|| !PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
&& group->waiting_threads > 1)
|
|
PR_NotifyCondVar(group->io_complete);
|
|
goto aborted;
|
|
}
|
|
|
|
/*
|
|
** A new poller is needed, but can I be the new poller?
|
|
** If there is no i/o ready, sure. But if there is any
|
|
** i/o ready, it has a higher priority. I want to
|
|
** process the ready i/o first and wake up another
|
|
** thread to be the new poller.
|
|
*/
|
|
if (NULL == group->poller)
|
|
{
|
|
if (PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
continue;
|
|
if (group->waiting_threads > 1)
|
|
PR_NotifyCondVar(group->io_complete);
|
|
}
|
|
}
|
|
PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
|
|
}
|
|
io_ready = PR_LIST_HEAD(&group->io_ready);
|
|
PR_NotifyCondVar(group->io_taken);
|
|
PR_ASSERT(io_ready != NULL);
|
|
PR_REMOVE_LINK(io_ready);
|
|
} while (NULL == io_ready);
|
|
|
|
failed_poll:
|
|
|
|
#endif
|
|
|
|
aborted:
|
|
|
|
group->waiting_threads -= 1;
|
|
invalid_state:
|
|
(void)MW_TestForShutdownInternal(group);
|
|
PR_Unlock(group->ml);
|
|
|
|
failed_init:
|
|
if (NULL != io_ready)
|
|
{
|
|
/* If the operation failed, record the reason why */
|
|
switch (((PRRecvWait*)io_ready)->outcome)
|
|
{
|
|
case PR_MW_PENDING:
|
|
PR_ASSERT(0);
|
|
break;
|
|
case PR_MW_SUCCESS:
|
|
#ifndef WINNT
|
|
_MW_InitialRecv(io_ready);
|
|
#endif
|
|
break;
|
|
#ifdef WINNT
|
|
case PR_MW_FAILURE:
|
|
_PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
|
|
break;
|
|
#endif
|
|
case PR_MW_TIMEOUT:
|
|
PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
|
|
break;
|
|
case PR_MW_INTERRUPT:
|
|
PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
|
|
break;
|
|
default: break;
|
|
}
|
|
#ifdef WINNT
|
|
if (NULL != overlapped->data.mw.timer)
|
|
{
|
|
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
|
|
!= overlapped->data.mw.desc->timeout);
|
|
CancelTimer(overlapped->data.mw.timer);
|
|
}
|
|
else
|
|
{
|
|
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
|
|
== overlapped->data.mw.desc->timeout);
|
|
}
|
|
PR_DELETE(overlapped);
|
|
#endif
|
|
}
|
|
return (PRRecvWait*)io_ready;
|
|
} /* PR_WaitRecvReady */
|
|
|
|
PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
|
|
{
|
|
#if !defined(WINNT)
|
|
PRRecvWait **recv_wait;
|
|
#endif
|
|
PRStatus rv = PR_SUCCESS;
|
|
if (NULL == group) group = mw_state->group;
|
|
PR_ASSERT(NULL != group);
|
|
if (NULL == group)
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
return PR_FAILURE;
|
|
}
|
|
|
|
PR_Lock(group->ml);
|
|
|
|
if (_prmw_running != group->state)
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
rv = PR_FAILURE;
|
|
goto unlock;
|
|
}
|
|
|
|
#ifdef WINNT
|
|
if (InterlockedCompareExchange((PVOID *)&desc->outcome,
|
|
(PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING)
|
|
{
|
|
PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
|
|
PR_ASSERT(NULL != bottom);
|
|
if (NULL == bottom)
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
goto unlock;
|
|
}
|
|
bottom->secret->state = _PR_FILEDESC_CLOSED;
|
|
#if 0
|
|
fprintf(stderr, "cancel wait recv: closing socket\n");
|
|
#endif
|
|
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
|
|
{
|
|
fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
|
|
exit(1);
|
|
}
|
|
}
|
|
#else
|
|
if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
|
|
{
|
|
/* it was in the wait table */
|
|
_MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
|
|
goto unlock;
|
|
}
|
|
if (!PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
{
|
|
/* is it already complete? */
|
|
PRCList *head = PR_LIST_HEAD(&group->io_ready);
|
|
do
|
|
{
|
|
PRRecvWait *done = (PRRecvWait*)head;
|
|
if (done == desc) goto unlock;
|
|
head = PR_NEXT_LINK(head);
|
|
} while (head != &group->io_ready);
|
|
}
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
rv = PR_FAILURE;
|
|
|
|
#endif
|
|
unlock:
|
|
PR_Unlock(group->ml);
|
|
return rv;
|
|
} /* PR_CancelWaitFileDesc */
|
|
|
|
PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
|
|
{
|
|
PRRecvWait **desc;
|
|
PRRecvWait *recv_wait = NULL;
|
|
#ifdef WINNT
|
|
_MDOverlapped *overlapped;
|
|
PRRecvWait **end;
|
|
PRThread *me = _PR_MD_CURRENT_THREAD();
|
|
#endif
|
|
|
|
if (NULL == group) group = mw_state->group;
|
|
PR_ASSERT(NULL != group);
|
|
if (NULL == group)
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
return NULL;
|
|
}
|
|
|
|
PR_Lock(group->ml);
|
|
if (_prmw_stopped != group->state)
|
|
{
|
|
if (_prmw_running == group->state)
|
|
group->state = _prmw_stopping; /* so nothing new comes in */
|
|
if (0 == group->waiting_threads) /* is there anybody else? */
|
|
group->state = _prmw_stopped; /* we can stop right now */
|
|
else
|
|
{
|
|
PR_NotifyAllCondVar(group->new_business);
|
|
PR_NotifyAllCondVar(group->io_complete);
|
|
}
|
|
while (_prmw_stopped != group->state)
|
|
(void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
|
|
}
|
|
|
|
#ifdef WINNT
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
#endif
|
|
/* make all the existing descriptors look done/interrupted */
|
|
#ifdef WINNT
|
|
end = &group->waiter->recv_wait + group->waiter->length;
|
|
for (desc = &group->waiter->recv_wait; desc < end; ++desc)
|
|
{
|
|
if (NULL != *desc)
|
|
{
|
|
if (InterlockedCompareExchange((PVOID *)&(*desc)->outcome,
|
|
(PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING)
|
|
== (PVOID)PR_MW_PENDING)
|
|
{
|
|
PRFileDesc *bottom = PR_GetIdentitiesLayer(
|
|
(*desc)->fd, PR_NSPR_IO_LAYER);
|
|
PR_ASSERT(NULL != bottom);
|
|
if (NULL == bottom)
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
goto invalid_arg;
|
|
}
|
|
bottom->secret->state = _PR_FILEDESC_CLOSED;
|
|
#if 0
|
|
fprintf(stderr, "cancel wait group: closing socket\n");
|
|
#endif
|
|
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
|
|
{
|
|
fprintf(stderr, "closesocket failed: %d\n",
|
|
WSAGetLastError());
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
while (group->waiter->count > 0)
|
|
{
|
|
_PR_THREAD_LOCK(me);
|
|
me->state = _PR_IO_WAIT;
|
|
PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
|
|
if (!_PR_IS_NATIVE_THREAD(me))
|
|
{
|
|
_PR_SLEEPQ_LOCK(me->cpu);
|
|
_PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
|
|
_PR_SLEEPQ_UNLOCK(me->cpu);
|
|
}
|
|
_PR_THREAD_UNLOCK(me);
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
PR_Unlock(group->ml);
|
|
_PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
|
|
me->state = _PR_RUNNING;
|
|
PR_Lock(group->ml);
|
|
_PR_MD_LOCK(&group->mdlock);
|
|
}
|
|
#else
|
|
for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
|
|
{
|
|
PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
|
|
if (NULL != *desc)
|
|
_MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
|
|
}
|
|
#endif
|
|
|
|
/* take first element of finished list and return it or NULL */
|
|
if (PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
|
|
else
|
|
{
|
|
PRCList *head = PR_LIST_HEAD(&group->io_ready);
|
|
PR_REMOVE_AND_INIT_LINK(head);
|
|
#ifdef WINNT
|
|
overlapped = (_MDOverlapped *)
|
|
((char *)head - offsetof(_MDOverlapped, data));
|
|
head = &overlapped->data.mw.desc->internal;
|
|
if (NULL != overlapped->data.mw.timer)
|
|
{
|
|
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
|
|
!= overlapped->data.mw.desc->timeout);
|
|
CancelTimer(overlapped->data.mw.timer);
|
|
}
|
|
else
|
|
{
|
|
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
|
|
== overlapped->data.mw.desc->timeout);
|
|
}
|
|
PR_DELETE(overlapped);
|
|
#endif
|
|
recv_wait = (PRRecvWait*)head;
|
|
}
|
|
#ifdef WINNT
|
|
invalid_arg:
|
|
_PR_MD_UNLOCK(&group->mdlock);
|
|
#endif
|
|
PR_Unlock(group->ml);
|
|
|
|
return recv_wait;
|
|
} /* PR_CancelWaitGroup */
|
|
|
|
PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
|
|
{
|
|
#ifdef XP_MAC
|
|
#pragma unused (size)
|
|
#endif
|
|
PRWaitGroup *wg;
|
|
|
|
if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
goto failed;
|
|
}
|
|
/* the wait group itself */
|
|
wg->ml = PR_NewLock();
|
|
if (NULL == wg->ml) goto failed_lock;
|
|
wg->io_taken = PR_NewCondVar(wg->ml);
|
|
if (NULL == wg->io_taken) goto failed_cvar0;
|
|
wg->io_complete = PR_NewCondVar(wg->ml);
|
|
if (NULL == wg->io_complete) goto failed_cvar1;
|
|
wg->new_business = PR_NewCondVar(wg->ml);
|
|
if (NULL == wg->new_business) goto failed_cvar2;
|
|
wg->mw_manage = PR_NewCondVar(wg->ml);
|
|
if (NULL == wg->mw_manage) goto failed_cvar3;
|
|
|
|
PR_INIT_CLIST(&wg->group_link);
|
|
PR_INIT_CLIST(&wg->io_ready);
|
|
|
|
/* the waiters sequence */
|
|
wg->waiter = (_PRWaiterHash*)PR_CALLOC(
|
|
sizeof(_PRWaiterHash) +
|
|
(_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
|
|
if (NULL == wg->waiter)
|
|
{
|
|
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
goto failed_waiter;
|
|
}
|
|
wg->waiter->count = 0;
|
|
wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
|
|
|
|
#ifdef WINNT
|
|
_PR_MD_NEW_LOCK(&wg->mdlock);
|
|
PR_INIT_CLIST(&wg->wait_list);
|
|
#endif /* WINNT */
|
|
|
|
PR_Lock(mw_lock);
|
|
PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
|
|
PR_Unlock(mw_lock);
|
|
return wg;
|
|
|
|
failed_waiter:
|
|
PR_DestroyCondVar(wg->mw_manage);
|
|
failed_cvar3:
|
|
PR_DestroyCondVar(wg->new_business);
|
|
failed_cvar2:
|
|
PR_DestroyCondVar(wg->io_complete);
|
|
failed_cvar1:
|
|
PR_DestroyCondVar(wg->io_taken);
|
|
failed_cvar0:
|
|
PR_DestroyLock(wg->ml);
|
|
failed_lock:
|
|
PR_DELETE(wg);
|
|
wg = NULL;
|
|
|
|
failed:
|
|
return wg;
|
|
} /* MW_CreateWaitGroup */
|
|
|
|
PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
|
|
{
|
|
PRStatus rv = PR_SUCCESS;
|
|
if (NULL == group) group = mw_state->group;
|
|
PR_ASSERT(NULL != group);
|
|
if (NULL != group)
|
|
{
|
|
PR_Lock(group->ml);
|
|
if ((group->waiting_threads == 0)
|
|
&& (group->waiter->count == 0)
|
|
&& PR_CLIST_IS_EMPTY(&group->io_ready))
|
|
{
|
|
group->state = _prmw_stopped;
|
|
}
|
|
else
|
|
{
|
|
PR_SetError(PR_INVALID_STATE_ERROR, 0);
|
|
rv = PR_FAILURE;
|
|
}
|
|
PR_Unlock(group->ml);
|
|
if (PR_FAILURE == rv) return rv;
|
|
|
|
PR_Lock(mw_lock);
|
|
PR_REMOVE_LINK(&group->group_link);
|
|
PR_Unlock(mw_lock);
|
|
|
|
#ifdef WINNT
|
|
/*
|
|
* XXX make sure wait_list is empty and waiter is empty.
|
|
* These must be checked while holding mdlock.
|
|
*/
|
|
_PR_MD_FREE_LOCK(&group->mdlock);
|
|
#endif
|
|
|
|
PR_DELETE(group->waiter);
|
|
PR_DELETE(group->polling_list);
|
|
PR_DestroyCondVar(group->mw_manage);
|
|
PR_DestroyCondVar(group->new_business);
|
|
PR_DestroyCondVar(group->io_complete);
|
|
PR_DestroyCondVar(group->io_taken);
|
|
PR_DestroyLock(group->ml);
|
|
if (group == mw_state->group) mw_state->group = NULL;
|
|
PR_DELETE(group);
|
|
}
|
|
else
|
|
{
|
|
/* The default wait group is not created yet. */
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
rv = PR_FAILURE;
|
|
}
|
|
return rv;
|
|
} /* PR_DestroyWaitGroup */
|
|
|
|
/**********************************************************************
|
|
***********************************************************************
|
|
******************** Wait group enumerations **************************
|
|
***********************************************************************
|
|
**********************************************************************/
|
|
|
|
PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
|
|
{
|
|
PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
|
|
if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
|
|
else
|
|
{
|
|
enumerator->group = group;
|
|
enumerator->seal = _PR_ENUM_SEALED;
|
|
}
|
|
return enumerator;
|
|
} /* PR_CreateMWaitEnumerator */
|
|
|
|
PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
|
|
{
|
|
PR_ASSERT(NULL != enumerator);
|
|
PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
|
|
if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
|
|
{
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
return PR_FAILURE;
|
|
}
|
|
enumerator->seal = _PR_ENUM_UNSEALED;
|
|
PR_Free(enumerator);
|
|
return PR_SUCCESS;
|
|
} /* PR_DestroyMWaitEnumerator */
|
|
|
|
PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
|
|
PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
|
|
{
|
|
PRRecvWait *result = NULL;
|
|
|
|
/* entry point sanity checking */
|
|
PR_ASSERT(NULL != enumerator);
|
|
PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
|
|
if ((NULL == enumerator)
|
|
|| (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
|
|
|
|
/* beginning of enumeration */
|
|
if (NULL == previous)
|
|
{
|
|
if (NULL == enumerator->group)
|
|
{
|
|
enumerator->group = mw_state->group;
|
|
if (NULL == enumerator->group)
|
|
{
|
|
PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
|
|
return NULL;
|
|
}
|
|
}
|
|
enumerator->waiter = &enumerator->group->waiter->recv_wait;
|
|
enumerator->p_timestamp = enumerator->group->p_timestamp;
|
|
enumerator->thread = PR_GetCurrentThread();
|
|
enumerator->index = 0;
|
|
}
|
|
/* continuing an enumeration */
|
|
else
|
|
{
|
|
PRThread *me = PR_GetCurrentThread();
|
|
PR_ASSERT(me == enumerator->thread);
|
|
if (me != enumerator->thread) goto bad_argument;
|
|
|
|
/* need to restart the enumeration */
|
|
if (enumerator->p_timestamp != enumerator->group->p_timestamp)
|
|
return PR_EnumerateWaitGroup(enumerator, NULL);
|
|
}
|
|
|
|
/* actually progress the enumeration */
|
|
#if defined(WINNT)
|
|
_PR_MD_LOCK(&enumerator->group->mdlock);
|
|
#else
|
|
PR_Lock(enumerator->group->ml);
|
|
#endif
|
|
while (enumerator->index++ < enumerator->group->waiter->length)
|
|
{
|
|
if (NULL != (result = *(enumerator->waiter)++)) break;
|
|
}
|
|
#if defined(WINNT)
|
|
_PR_MD_UNLOCK(&enumerator->group->mdlock);
|
|
#else
|
|
PR_Unlock(enumerator->group->ml);
|
|
#endif
|
|
|
|
return result; /* what we live for */
|
|
|
|
bad_argument:
|
|
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
|
|
return NULL; /* probably ambiguous */
|
|
} /* PR_EnumerateWaitGroup */
|
|
|
|
/* prmwait.c */
|