ossp-pkg/pth/pth_sync.c
/*
** GNU Pth - The GNU Portable Threads
** Copyright (c) 1999-2007 Ralf S. Engelschall <rse@engelschall.com>
**
** This file is part of GNU Pth, a non-preemptive thread scheduling
** library which can be found at http://www.gnu.org/software/pth/.
**
** This library is free software; you can redistribute it and/or
** modify it under the terms of the GNU Lesser General Public
** License as published by the Free Software Foundation; either
** version 2.1 of the License, or (at your option) any later version.
**
** This library is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
** Lesser General Public License for more details.
**
** You should have received a copy of the GNU Lesser General Public
** License along with this library; if not, write to the Free Software
** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
** USA, or contact Ralf S. Engelschall <rse@engelschall.com>.
**
** pth_sync.c: Pth synchronization facilities
*/
/* ``It is hard to fly with
the eagles when you work
with the turkeys.''
-- Unknown */
#include "pth_p.h"
/*
** Mutual Exclusion Locks
*/
int pth_mutex_init(pth_mutex_t *mutex)
{
if (mutex == NULL)
return pth_error(FALSE, EINVAL);
mutex->mx_state = PTH_MUTEX_INITIALIZED;
mutex->mx_owner = NULL;
mutex->mx_count = 0;
return TRUE;
}
int pth_mutex_acquire(pth_mutex_t *mutex, int tryonly, pth_event_t ev_extra)
{
static pth_key_t ev_key = PTH_KEY_INIT;
pth_event_t ev;
pth_debug2("pth_mutex_acquire: called from thread \"%s\"", pth_current->name);
/* consistency checks */
if (mutex == NULL)
return pth_error(FALSE, EINVAL);
if (!(mutex->mx_state & PTH_MUTEX_INITIALIZED))
return pth_error(FALSE, EDEADLK);
/* still not locked, so simply acquire mutex? */
if (!(mutex->mx_state & PTH_MUTEX_LOCKED)) {
mutex->mx_state |= PTH_MUTEX_LOCKED;
mutex->mx_owner = pth_current;
mutex->mx_count = 1;
pth_ring_append(&(pth_current->mutexring), &(mutex->mx_node));
pth_debug1("pth_mutex_acquire: immediately locking mutex");
return TRUE;
}
/* already locked by caller? */
if (mutex->mx_count >= 1 && mutex->mx_owner == pth_current) {
/* recursive lock */
mutex->mx_count++;
pth_debug1("pth_mutex_acquire: recursive locking");
return TRUE;
}
/* should we just tryonly? */
if (tryonly)
return pth_error(FALSE, EBUSY);
/* else wait for mutex to become unlocked.. */
pth_debug1("pth_mutex_acquire: wait until mutex is unlocked");
for (;;) {
ev = pth_event(PTH_EVENT_MUTEX|PTH_MODE_STATIC, &ev_key, mutex);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
pth_wait(ev);
if (ev_extra != NULL) {
pth_event_isolate(ev);
if (pth_event_status(ev) == PTH_STATUS_PENDING)
return pth_error(FALSE, EINTR);
}
if (!(mutex->mx_state & PTH_MUTEX_LOCKED))
break;
}
/* now it's again unlocked, so acquire mutex */
pth_debug1("pth_mutex_acquire: locking mutex");
mutex->mx_state |= PTH_MUTEX_LOCKED;
mutex->mx_owner = pth_current;
mutex->mx_count = 1;
pth_ring_append(&(pth_current->mutexring), &(mutex->mx_node));
return TRUE;
}
int pth_mutex_release(pth_mutex_t *mutex)
{
/* consistency checks */
if (mutex == NULL)
return pth_error(FALSE, EINVAL);
if (!(mutex->mx_state & PTH_MUTEX_INITIALIZED))
return pth_error(FALSE, EDEADLK);
if (!(mutex->mx_state & PTH_MUTEX_LOCKED))
return pth_error(FALSE, EDEADLK);
if (mutex->mx_owner != pth_current)
return pth_error(FALSE, EACCES);
/* decrement recursion counter and release mutex */
mutex->mx_count--;
if (mutex->mx_count <= 0) {
mutex->mx_state &= ~(PTH_MUTEX_LOCKED);
mutex->mx_owner = NULL;
mutex->mx_count = 0;
pth_ring_delete(&(pth_current->mutexring), &(mutex->mx_node));
}
return TRUE;
}
intern void pth_mutex_releaseall(pth_t thread)
{
pth_ringnode_t *rn, *rnf;
if (thread == NULL)
return;
/* iterate over all mutexes of thread */
rn = rnf = pth_ring_first(&(thread->mutexring));
while (rn != NULL) {
pth_mutex_release((pth_mutex_t *)rn);
rn = pth_ring_next(&(thread->mutexring), rn);
if (rn == rnf)
break;
}
return;
}
/*
** Read-Write Locks
*/
int pth_rwlock_init(pth_rwlock_t *rwlock)
{
if (rwlock == NULL)
return pth_error(FALSE, EINVAL);
rwlock->rw_state = PTH_RWLOCK_INITIALIZED;
rwlock->rw_readers = 0;
pth_mutex_init(&(rwlock->rw_mutex_rd));
pth_mutex_init(&(rwlock->rw_mutex_rw));
return TRUE;
}
int pth_rwlock_acquire(pth_rwlock_t *rwlock, int op, int tryonly, pth_event_t ev_extra)
{
/* consistency checks */
if (rwlock == NULL)
return pth_error(FALSE, EINVAL);
if (!(rwlock->rw_state & PTH_RWLOCK_INITIALIZED))
return pth_error(FALSE, EDEADLK);
/* acquire lock */
if (op == PTH_RWLOCK_RW) {
/* read-write lock is simple */
if (!pth_mutex_acquire(&(rwlock->rw_mutex_rw), tryonly, ev_extra))
return FALSE;
rwlock->rw_mode = PTH_RWLOCK_RW;
}
else {
/* read-only lock is more complicated to get right */
if (!pth_mutex_acquire(&(rwlock->rw_mutex_rd), tryonly, ev_extra))
return FALSE;
rwlock->rw_readers++;
if (rwlock->rw_readers == 1) {
if (!pth_mutex_acquire(&(rwlock->rw_mutex_rw), tryonly, ev_extra)) {
rwlock->rw_readers--;
pth_shield { pth_mutex_release(&(rwlock->rw_mutex_rd)); }
return FALSE;
}
}
rwlock->rw_mode = PTH_RWLOCK_RD;
pth_mutex_release(&(rwlock->rw_mutex_rd));
}
return TRUE;
}
int pth_rwlock_release(pth_rwlock_t *rwlock)
{
/* consistency checks */
if (rwlock == NULL)
return pth_error(FALSE, EINVAL);
if (!(rwlock->rw_state & PTH_RWLOCK_INITIALIZED))
return pth_error(FALSE, EDEADLK);
/* release lock */
if (rwlock->rw_mode == PTH_RWLOCK_RW) {
/* read-write unlock is simple */
if (!pth_mutex_release(&(rwlock->rw_mutex_rw)))
return FALSE;
}
else {
/* read-only unlock is more complicated to get right */
if (!pth_mutex_acquire(&(rwlock->rw_mutex_rd), FALSE, NULL))
return FALSE;
rwlock->rw_readers--;
if (rwlock->rw_readers == 0) {
if (!pth_mutex_release(&(rwlock->rw_mutex_rw))) {
rwlock->rw_readers++;
pth_shield { pth_mutex_release(&(rwlock->rw_mutex_rd)); }
return FALSE;
}
}
rwlock->rw_mode = PTH_RWLOCK_RD;
pth_mutex_release(&(rwlock->rw_mutex_rd));
}
return TRUE;
}
/*
** Condition Variables
*/
int pth_cond_init(pth_cond_t *cond)
{
if (cond == NULL)
return pth_error(FALSE, EINVAL);
cond->cn_state = PTH_COND_INITIALIZED;
cond->cn_waiters = 0;
return TRUE;
}
static void pth_cond_cleanup_handler(void *_cleanvec)
{
pth_mutex_t *mutex = (pth_mutex_t *)(((void **)_cleanvec)[0]);
pth_cond_t *cond = (pth_cond_t *)(((void **)_cleanvec)[1]);
/* re-acquire mutex when pth_cond_await() is cancelled
in order to restore the condition variable semantics */
pth_mutex_acquire(mutex, FALSE, NULL);
/* fix number of waiters */
cond->cn_waiters--;
return;
}
int pth_cond_await(pth_cond_t *cond, pth_mutex_t *mutex, pth_event_t ev_extra)
{
static pth_key_t ev_key = PTH_KEY_INIT;
void *cleanvec[2];
pth_event_t ev;
/* consistency checks */
if (cond == NULL || mutex == NULL)
return pth_error(FALSE, EINVAL);
if (!(cond->cn_state & PTH_COND_INITIALIZED))
return pth_error(FALSE, EDEADLK);
/* check whether we can do a short-circuit wait */
if ( (cond->cn_state & PTH_COND_SIGNALED)
&& !(cond->cn_state & PTH_COND_BROADCAST)) {
cond->cn_state &= ~(PTH_COND_SIGNALED);
cond->cn_state &= ~(PTH_COND_BROADCAST);
cond->cn_state &= ~(PTH_COND_HANDLED);
return TRUE;
}
/* add us to the number of waiters */
cond->cn_waiters++;
/* release mutex (caller had to acquire it first) */
pth_mutex_release(mutex);
/* wait until the condition is signaled */
ev = pth_event(PTH_EVENT_COND|PTH_MODE_STATIC, &ev_key, cond);
if (ev_extra != NULL)
pth_event_concat(ev, ev_extra, NULL);
cleanvec[0] = mutex;
cleanvec[1] = cond;
pth_cleanup_push(pth_cond_cleanup_handler, cleanvec);
pth_wait(ev);
pth_cleanup_pop(FALSE);
if (ev_extra != NULL)
pth_event_isolate(ev);
/* reacquire mutex */
pth_mutex_acquire(mutex, FALSE, NULL);
/* remove us from the number of waiters */
cond->cn_waiters--;
/* release mutex (caller had to acquire it first) */
return TRUE;
}
int pth_cond_notify(pth_cond_t *cond, int broadcast)
{
/* consistency checks */
if (cond == NULL)
return pth_error(FALSE, EINVAL);
if (!(cond->cn_state & PTH_COND_INITIALIZED))
return pth_error(FALSE, EDEADLK);
/* do something only if there is at least one waiters (POSIX semantics) */
if (cond->cn_waiters > 0) {
/* signal the condition */
cond->cn_state |= PTH_COND_SIGNALED;
if (broadcast)
cond->cn_state |= PTH_COND_BROADCAST;
else
cond->cn_state &= ~(PTH_COND_BROADCAST);
cond->cn_state &= ~(PTH_COND_HANDLED);
/* and give other threads a chance to awake */
pth_yield(NULL);
}
/* return to caller */
return TRUE;
}
/*
** Barriers
*/
int pth_barrier_init(pth_barrier_t *barrier, int threshold)
{
if (barrier == NULL || threshold <= 0)
return pth_error(FALSE, EINVAL);
if (!pth_mutex_init(&(barrier->br_mutex)))
return FALSE;
if (!pth_cond_init(&(barrier->br_cond)))
return FALSE;
barrier->br_state = PTH_BARRIER_INITIALIZED;
barrier->br_threshold = threshold;
barrier->br_count = threshold;
barrier->br_cycle = FALSE;
return TRUE;
}
int pth_barrier_reach(pth_barrier_t *barrier)
{
int cancel, cycle;
int rv;
if (barrier == NULL)
return pth_error(FALSE, EINVAL);
if (!(barrier->br_state & PTH_BARRIER_INITIALIZED))
return pth_error(FALSE, EINVAL);
if (!pth_mutex_acquire(&(barrier->br_mutex), FALSE, NULL))
return FALSE;
cycle = barrier->br_cycle;
if (--(barrier->br_count) == 0) {
/* last thread reached the barrier */
barrier->br_cycle = !(barrier->br_cycle);
barrier->br_count = barrier->br_threshold;
if ((rv = pth_cond_notify(&(barrier->br_cond), TRUE)))
rv = PTH_BARRIER_TAILLIGHT;
}
else {
/* wait until remaining threads have reached the barrier, too */
pth_cancel_state(PTH_CANCEL_DISABLE, &cancel);
if (barrier->br_threshold == barrier->br_count)
rv = PTH_BARRIER_HEADLIGHT;
else
rv = TRUE;
while (cycle == barrier->br_cycle) {
if (!(rv = pth_cond_await(&(barrier->br_cond), &(barrier->br_mutex), NULL)))
break;
}
pth_cancel_state(cancel, NULL);
}
pth_mutex_release(&(barrier->br_mutex));
return rv;
}