Index: ossp-pkg/pth/ChangeLog RCS File: /v/ossp/cvs/ossp-pkg/pth/ChangeLog,v rcsdiff -q -kk '-r1.535' '-r1.536' -u '/v/ossp/cvs/ossp-pkg/pth/ChangeLog,v' 2>/dev/null --- ChangeLog 2001/01/01 12:51:08 1.535 +++ ChangeLog 2001/02/25 17:08:07 1.536 @@ -19,7 +19,11 @@ | ||__ _| __|_(_) |_|_____________________________________________________________ - Changes between 1.4a3 and 1.4a4 (29-Jul-2000 to xx-Oct-2000) + Changes between 1.4a3 and 1.4a4 (29-Jul-2000 to 26-Feb-2001) + + *) Fixed an even-manager bug which causes a thread that calls + pth_nap() to never woke up if the only elapsed event was a timer. + [Archie Cobbs ] *) Added `#define _BITS_SIGTHREAD_H' to pthread.h to avoid inclusion of bits/sigthread.h (from signal.h) on Linux running glibc6 2.2. Index: ossp-pkg/pth/pth_sched.c RCS File: /v/ossp/cvs/ossp-pkg/pth/pth_sched.c,v co -q -kk -p'1.79' '/v/ossp/cvs/ossp-pkg/pth/pth_sched.c,v' | diff -u /dev/null - -L'ossp-pkg/pth/pth_sched.c' 2>/dev/null --- ossp-pkg/pth/pth_sched.c +++ - 2024-05-17 09:02:17.848402197 +0200 @@ -0,0 +1,787 @@ +/* +** GNU Pth - The GNU Portable Threads +** Copyright (c) 1999-2000 Ralf S. Engelschall +** +** This file is part of GNU Pth, a non-preemptive thread scheduling +** library which can be found at http://www.gnu.org/software/pth/. +** +** This library is free software; you can redistribute it and/or +** modify it under the terms of the GNU Lesser General Public +** License as published by the Free Software Foundation; either +** version 2.1 of the License, or (at your option) any later version. +** +** This library is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +** Lesser General Public License for more details. +** +** You should have received a copy of the GNU Lesser General Public +** License along with this library; if not, write to the Free Software +** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +** USA, or contact Ralf S. Engelschall . +** +** pth_sched.c: Pth thread scheduler, the real heart of Pth +*/ + /* ``Recursive, adj.; + see Recursive.'' + -- Unknown */ +#include "pth_p.h" + +intern pth_t pth_main; /* the main thread */ +intern pth_t pth_sched; /* the permanent scheduler thread */ +intern pth_t pth_current; /* the currently running thread */ +intern pth_pqueue_t pth_NQ; /* queue of new threads */ +intern pth_pqueue_t pth_RQ; /* queue of threads ready to run */ +intern pth_pqueue_t pth_WQ; /* queue of threads waiting for an event */ +intern pth_pqueue_t pth_SQ; /* queue of suspended threads */ +intern pth_pqueue_t pth_DQ; /* queue of terminated threads */ +intern float pth_loadval; /* average scheduler load value */ + +static int pth_sigpipe[2]; /* internal signal occurrence pipe */ +static sigset_t pth_sigpending; /* mask of pending signals */ +static sigset_t pth_sigblock; /* mask of signals we block in scheduler */ +static sigset_t pth_sigcatch; /* mask of signals we have to catch */ +static sigset_t pth_sigraised; /* mask of raised signals */ + +static pth_time_t pth_loadticknext; +static pth_time_t pth_loadtickgap = PTH_TIME(1,0); + +/* initialize the scheduler ingredients */ +intern void pth_scheduler_init(void) +{ + /* create the internal signal pipe */ + if (pipe(pth_sigpipe) == -1) { + fprintf(stderr, "**Pth** INIT: Cannot create internal pipe: %s\n", + strerror(errno)); + abort(); + } + pth_fdmode(pth_sigpipe[0], PTH_FDMODE_NONBLOCK); + pth_fdmode(pth_sigpipe[1], PTH_FDMODE_NONBLOCK); + + /* initialize the essential threads */ + pth_sched = NULL; + pth_current = NULL; + + /* initalize the thread queues */ + pth_pqueue_init(&pth_NQ); + pth_pqueue_init(&pth_RQ); + pth_pqueue_init(&pth_WQ); + pth_pqueue_init(&pth_SQ); + pth_pqueue_init(&pth_DQ); + + /* initialize load support */ + pth_loadval = 1.0; + pth_time_set(&pth_loadticknext, PTH_TIME_NOW); + return; +} + +/* drop all threads (except for the currently active one) */ +intern void pth_scheduler_drop(void) +{ + pth_t t; + + /* clear the new queue */ + while ((t = pth_pqueue_delmax(&pth_NQ)) != NULL); + pth_tcb_free(t); + pth_pqueue_init(&pth_NQ); + + /* clear the ready queue */ + while ((t = pth_pqueue_delmax(&pth_RQ)) != NULL); + pth_tcb_free(t); + pth_pqueue_init(&pth_RQ); + + /* clear the waiting queue */ + while ((t = pth_pqueue_delmax(&pth_WQ)) != NULL); + pth_tcb_free(t); + pth_pqueue_init(&pth_WQ); + + /* clear the suspend queue */ + while ((t = pth_pqueue_delmax(&pth_SQ)) != NULL); + pth_tcb_free(t); + pth_pqueue_init(&pth_SQ); + + /* clear the dead queue */ + while ((t = pth_pqueue_delmax(&pth_DQ)) != NULL); + pth_tcb_free(t); + pth_pqueue_init(&pth_DQ); + return; +} + +/* kill the scheduler ingredients */ +intern void pth_scheduler_kill(void) +{ + /* drop all threads */ + pth_scheduler_drop(); + + /* remove the internal signal pipe */ + close(pth_sigpipe[0]); + close(pth_sigpipe[1]); + return; +} + +/* + * Update the average scheduler load. + * + * This is called on every context switch, but we have to adjust the + * average load value every second, only. When we're called more than + * once per second we handle this by just calculating anything once + * and then do NOPs until the next ticks is over. When the scheduler + * waited for more than once second (or a thread CPU burst lasted for + * more than once second) we simulate the missing calculations. That's + * no problem because we can assume that the number of ready threads + * then wasn't changed dramatically (or more context switched would have + * been occurred and we would have been given more chances to operate). + * The actual average load is calculated through an exponential average + * formula. + */ +#define pth_scheduler_load(now) \ + if (pth_time_cmp((now), &pth_loadticknext) >= 0) { \ + pth_time_t ttmp; \ + int numready; \ + numready = pth_pqueue_elements(&pth_RQ); \ + pth_time_set(&ttmp, (now)); \ + do { \ + pth_loadval = (numready*0.25) + (pth_loadval*0.75); \ + pth_time_sub(&ttmp, &pth_loadtickgap); \ + } while (pth_time_cmp(&ttmp, &pth_loadticknext) >= 0); \ + pth_time_set(&pth_loadticknext, (now)); \ + pth_time_add(&pth_loadticknext, &pth_loadtickgap); \ + } + +/* the heart of this library: the thread scheduler */ +intern void *pth_scheduler(void *dummy) +{ + sigset_t sigs; + pth_time_t running; + pth_time_t snapshot; + struct sigaction sa; + sigset_t ss; + int sig; + pth_t t; + + /* + * bootstrapping + */ + pth_debug1("pth_scheduler: bootstrapping"); + + /* mark this thread as the special scheduler thread */ + pth_sched->state = PTH_STATE_SCHEDULER; + + /* block all signals in the scheduler thread */ + sigfillset(&sigs); + pth_sc(sigprocmask)(SIG_SETMASK, &sigs, NULL); + + /* initialize the snapshot time for bootstrapping the loop */ + pth_time_set(&snapshot, PTH_TIME_NOW); + + /* + * endless scheduler loop + */ + for (;;) { + /* + * Move threads from new queue to ready queue and give + * them maximum priority so they start immediately + */ + while ((t = pth_pqueue_tail(&pth_NQ)) != NULL) { + pth_pqueue_delete(&pth_NQ, t); + t->state = PTH_STATE_READY; + pth_pqueue_insert(&pth_RQ, pth_pqueue_favorite_prio(&pth_RQ), t); + pth_debug2("pth_scheduler: new thread \"%s\" moved to top of ready queue", t->name); + } + + /* + * Update average scheduler load + */ + pth_scheduler_load(&snapshot); + + /* + * Find next thread in ready queue + */ + pth_current = pth_pqueue_delmax(&pth_RQ); + if (pth_current == NULL) { + fprintf(stderr, "**Pth** SCHEDULER INTERNAL ERROR: " + "no more thread(s) available to schedule!?!?\n"); + abort(); + } + pth_debug4("pth_scheduler: thread \"%s\" selected (prio=%d, qprio=%d)", + pth_current->name, pth_current->prio, pth_current->q_prio); + + /* + * Raise additionally thread-specific signals + * (they are delivered when we switch the context) + * + * Situation is ('#' = signal pending): + * process pending (pth_sigpending): ----#### + * thread pending (pth_current->sigpending): --##--## + * Result has to be: + * process new pending: --###### + */ + if (pth_current->sigpendcnt > 0) { + sigpending(&pth_sigpending); + for (sig = 1; sig < PTH_NSIG; sig++) + if (sigismember(&pth_current->sigpending, sig)) + if (!sigismember(&pth_sigpending, sig)) + kill(getpid(), sig); + } + + /* + * Set running start time for new thread + * and perform a context switch to it + */ + pth_debug3("pth_scheduler: switching to thread 0x%lx (\"%s\")", + (unsigned long)pth_current, pth_current->name); + + /* update thread times */ + pth_time_set(&pth_current->lastran, PTH_TIME_NOW); + + /* update scheduler times */ + pth_time_set(&running, &pth_current->lastran); + pth_time_sub(&running, &snapshot); + pth_time_add(&pth_sched->running, &running); + + /* ** ENTERING THREAD ** - by switching the machine context */ + pth_mctx_switch(&pth_sched->mctx, &pth_current->mctx); + + /* update scheduler times */ + pth_time_set(&snapshot, PTH_TIME_NOW); + pth_debug3("pth_scheduler: cameback from thread 0x%lx (\"%s\")", + (unsigned long)pth_current, pth_current->name); + + /* + * Calculate and update the time the previous thread was running + */ + pth_time_set(&running, &snapshot); + pth_time_sub(&running, &pth_current->lastran); + pth_time_add(&pth_current->running, &running); + pth_debug3("pth_scheduler: thread \"%s\" ran %.6f", + pth_current->name, pth_time_t2d(&running)); + + /* + * Remove still pending thread-specific signals + * (they are re-delivered next time) + * + * Situation is ('#' = signal pending): + * thread old pending (pth_current->sigpending): --##--## + * process old pending (pth_sigpending): ----#### + * process still pending (sigstillpending): ---#-#-# + * Result has to be: + * process new pending: -----#-# + * thread new pending (pth_current->sigpending): ---#---# + */ + if (pth_current->sigpendcnt > 0) { + sigset_t sigstillpending; + sigpending(&sigstillpending); + for (sig = 1; sig < PTH_NSIG; sig++) { + if (sigismember(&pth_current->sigpending, sig)) { + if (!sigismember(&sigstillpending, sig)) { + /* thread (and perhaps also process) signal delivered */ + sigdelset(&pth_current->sigpending, sig); + pth_current->sigpendcnt--; + } + else if (!sigismember(&pth_sigpending, sig)) { + /* thread signal not delivered */ + pth_util_sigdelete(sig); + } + } + } + } + + /* + * Check for stack overflow + */ + if (pth_current->stackguard != NULL) { + if (*pth_current->stackguard != 0xDEAD) { + pth_debug3("pth_scheduler: stack overflow detected for thread 0x%lx (\"%s\")", + (unsigned long)pth_current, pth_current->name); + /* + * if the application doesn't catch SIGSEGVs, we terminate + * manually with a SIGSEGV now, but output a reasonable message. + */ + if (sigaction(SIGSEGV, NULL, &sa) == 0) { + if (sa.sa_handler == SIG_DFL) { + fprintf(stderr, "**Pth** STACK OVERFLOW: thread pid_t=0x%lx, name=\"%s\"\n", + (unsigned long)pth_current, pth_current->name); + kill(getpid(), SIGSEGV); + sigfillset(&ss); + sigdelset(&ss, SIGSEGV); + sigsuspend(&ss); + abort(); + } + } + /* + * else we terminate the thread only and send us a SIGSEGV + * which allows the application to handle the situation... + */ + pth_current->join_arg = (void *)0xDEAD; + pth_current->state = PTH_STATE_DEAD; + kill(getpid(), SIGSEGV); + } + } + + /* + * When previous thread is now marked as dead, kick it out + */ + if (pth_current->state == PTH_STATE_DEAD) { + pth_debug2("pth_scheduler: marking thread \"%s\" as dead", pth_current->name); + if (!pth_current->joinable) + pth_tcb_free(pth_current); + else + pth_pqueue_insert(&pth_DQ, PTH_PRIO_STD, pth_current); + pth_current = NULL; + } + + /* + * When thread wants to wait for an event + * move it to waiting queue now + */ + if (pth_current != NULL && pth_current->state == PTH_STATE_WAITING) { + pth_debug2("pth_scheduler: moving thread \"%s\" to waiting queue", + pth_current->name); + pth_pqueue_insert(&pth_WQ, pth_current->prio, pth_current); + pth_current = NULL; + } + + /* + * migrate old treads in ready queue into higher + * priorities to avoid starvation and insert last running + * thread back into this queue, too. + */ + pth_pqueue_increase(&pth_RQ); + if (pth_current != NULL) + pth_pqueue_insert(&pth_RQ, pth_current->prio, pth_current); + + /* + * Manage the events in the waiting queue, i.e. decide whether their + * events occurred and move them to the ready queue. But wait only if + * we have already no new or ready threads. + */ + if ( pth_pqueue_elements(&pth_RQ) == 0 + && pth_pqueue_elements(&pth_NQ) == 0) + pth_sched_eventmanager(&snapshot, FALSE /* wait */); + else + pth_sched_eventmanager(&snapshot, TRUE /* poll */); + } + + /* NOTREACHED */ + return NULL; +} + +/* + * Look whether some events already occurred and move + * corresponding threads from waiting queue back to ready queue. + */ +intern void pth_sched_eventmanager(pth_time_t *now, int dopoll) +{ + pth_t nexttimer_thread; + pth_event_t nexttimer_ev; + pth_time_t nexttimer_value; + pth_event_t evh; + pth_event_t ev; + pth_t t; + pth_t tlast; + int this_occurred; + int any_occurred; + fd_set rfds; + fd_set wfds; + fd_set efds; + struct timeval delay; + struct timeval *pdelay; + sigset_t oss; + struct sigaction sa; + struct sigaction osa[1+PTH_NSIG]; + char minibuf[128]; + int loop_repeat; + int fdmax; + int rc; + int sig; + int n; + + pth_debug2("pth_sched_eventmanager: enter in %s mode", + dopoll ? "polling" : "waiting"); + + /* entry point for internal looping in event handling */ + loop_entry: + loop_repeat = FALSE; + + /* initialize fd sets */ + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + fdmax = -1; + + /* initialize signal status */ + sigpending(&pth_sigpending); + sigfillset(&pth_sigblock); + sigemptyset(&pth_sigcatch); + sigemptyset(&pth_sigraised); + + /* initialize next timer */ + pth_time_set(&nexttimer_value, PTH_TIME_ZERO); + nexttimer_thread = NULL; + nexttimer_ev = NULL; + + /* for all threads in the waiting queue... */ + any_occurred = FALSE; + for (t = pth_pqueue_head(&pth_WQ); t != NULL; + t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) { + + /* determine signals we block */ + for (sig = 1; sig < PTH_NSIG; sig++) + if (!sigismember(&(t->mctx.sigs), sig)) + sigdelset(&pth_sigblock, sig); + + /* cancellation support */ + if (t->cancelreq == TRUE) + any_occurred = TRUE; + + /* ... and all their events... */ + if (t->events == NULL) + continue; + /* ...check whether events occurred */ + ev = evh = t->events; + do { + if (!ev->ev_occurred) { + this_occurred = FALSE; + + /* Filedescriptor I/O */ + if (ev->ev_type == PTH_EVENT_FD) { + /* filedescriptors are checked later all at once. + Here we only assemble them in the fd sets */ + if (ev->ev_goal & PTH_UNTIL_FD_READABLE) + FD_SET(ev->ev_args.FD.fd, &rfds); + if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE) + FD_SET(ev->ev_args.FD.fd, &wfds); + if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION) + FD_SET(ev->ev_args.FD.fd, &efds); + if (fdmax < ev->ev_args.FD.fd) + fdmax = ev->ev_args.FD.fd; + } + /* Filedescriptor Set Select I/O */ + else if (ev->ev_type == PTH_EVENT_SELECT) { + /* filedescriptors are checked later all at once. + Here we only merge the fd sets. */ + pth_util_fds_merge(ev->ev_args.SELECT.nfd, + ev->ev_args.SELECT.rfds, &rfds, + ev->ev_args.SELECT.wfds, &wfds, + ev->ev_args.SELECT.efds, &efds); + if (fdmax < ev->ev_args.SELECT.nfd-1) + fdmax = ev->ev_args.SELECT.nfd-1; + } + /* Signal Set */ + else if (ev->ev_type == PTH_EVENT_SIGS) { + for (sig = 1; sig < PTH_NSIG; sig++) { + if (sigismember(ev->ev_args.SIGS.sigs, sig)) { + /* thread signal handling */ + if (sigismember(&t->sigpending, sig)) { + *(ev->ev_args.SIGS.sig) = sig; + sigdelset(&t->sigpending, sig); + t->sigpendcnt--; + this_occurred = TRUE; + } + /* process signal handling */ + if (sigismember(&pth_sigpending, sig)) { + if (ev->ev_args.SIGS.sig != NULL) + *(ev->ev_args.SIGS.sig) = sig; + pth_util_sigdelete(sig); + sigdelset(&pth_sigpending, sig); + this_occurred = TRUE; + } + else { + sigdelset(&pth_sigblock, sig); + sigaddset(&pth_sigcatch, sig); + } + } + } + } + /* Timer */ + else if (ev->ev_type == PTH_EVENT_TIME) { + if (pth_time_cmp(&(ev->ev_args.TIME.tv), now) < 0) + this_occurred = TRUE; + else { + /* remember the timer which will be elapsed next */ + if ((nexttimer_thread == NULL && nexttimer_ev == NULL) || + pth_time_cmp(&(ev->ev_args.TIME.tv), &nexttimer_value) < 0) { + nexttimer_thread = t; + nexttimer_ev = ev; + pth_time_set(&nexttimer_value, &(ev->ev_args.TIME.tv)); + } + } + } + /* Message Port Arrivals */ + else if (ev->ev_type == PTH_EVENT_MSG) { + if (pth_ring_elements(&(ev->ev_args.MSG.mp->mp_queue)) > 0) + this_occurred = TRUE; + } + /* Mutex Release */ + else if (ev->ev_type == PTH_EVENT_MUTEX) { + if (!(ev->ev_args.MUTEX.mutex->mx_state & PTH_MUTEX_LOCKED)) + this_occurred = TRUE; + } + /* Condition Variable Signal */ + else if (ev->ev_type == PTH_EVENT_COND) { + if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { + if (ev->ev_args.COND.cond->cn_state & PTH_COND_BROADCAST) + this_occurred = TRUE; + else { + if (!(ev->ev_args.COND.cond->cn_state & PTH_COND_HANDLED)) { + ev->ev_args.COND.cond->cn_state |= PTH_COND_HANDLED; + this_occurred = TRUE; + } + } + } + } + /* Thread Termination */ + else if (ev->ev_type == PTH_EVENT_TID) { + if ( ( ev->ev_args.TID.tid == NULL + && pth_pqueue_elements(&pth_DQ) > 0) + || ( ev->ev_args.TID.tid != NULL + && ev->ev_args.TID.tid->state == ev->ev_goal)) + this_occurred = TRUE; + } + /* Custom Event Function */ + else if (ev->ev_type == PTH_EVENT_FUNC) { + if (ev->ev_args.FUNC.func(ev->ev_args.FUNC.arg)) + this_occurred = TRUE; + else { + pth_time_t tv; + pth_time_set(&tv, now); + pth_time_add(&tv, &(ev->ev_args.FUNC.tv)); + if ((nexttimer_thread == NULL && nexttimer_ev == NULL) || + pth_time_cmp(&tv, &nexttimer_value) < 0) { + nexttimer_thread = t; + nexttimer_ev = ev; + pth_time_set(&nexttimer_value, &tv); + } + } + } + + /* tag event if it has occurred */ + if (this_occurred) { + pth_debug2("pth_sched_eventmanager: [non-I/O] event occurred for thread \"%s\"", t->name); + ev->ev_occurred = TRUE; + any_occurred = TRUE; + } + } + } while ((ev = ev->ev_next) != evh); + } + if (any_occurred) + dopoll = TRUE; + + /* now decide how to poll for fd I/O and timers */ + if (dopoll) { + /* do a polling with immediate timeout, + i.e. check the fd sets only without blocking */ + pth_time_set(&delay, PTH_TIME_ZERO); + pdelay = &delay; + } + else if (nexttimer_ev != NULL) { + /* do a polling with a timeout set to the next timer, + i.e. wait for the fd sets or the next timer */ + pth_time_set(&delay, &nexttimer_value); + pth_time_sub(&delay, now); + pdelay = &delay; + } + else { + /* do a polling without a timeout, + i.e. wait for the fd sets only with blocking */ + pdelay = NULL; + } + + /* clear pipe and let select() wait for the read-part of the pipe */ + while (pth_sc(read)(pth_sigpipe[0], minibuf, sizeof(minibuf)) > 0) ; + FD_SET(pth_sigpipe[0], &rfds); + if (fdmax < pth_sigpipe[0]) + fdmax = pth_sigpipe[0]; + + /* replace signal actions for signals we've to catch for events */ + for (sig = 1; sig < PTH_NSIG; sig++) { + if (sigismember(&pth_sigcatch, sig)) { + sa.sa_handler = pth_sched_eventmanager_sighandler; + sigfillset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(sig, &sa, &osa[sig]); + } + } + + /* allow some signals to be delivered: Either to our + catching handler or directly to the configured + handler for signals not catched by events */ + pth_sc(sigprocmask)(SIG_SETMASK, &pth_sigblock, &oss); + + /* now do the polling for filedescriptor I/O and timers + WHEN THE SCHEDULER SLEEPS AT ALL, THEN HERE!! */ + rc = -1; + if (!(dopoll && fdmax == -1)) + while ((rc = pth_sc(select)(fdmax+1, &rfds, &wfds, &efds, pdelay)) < 0 + && errno == EINTR) ; + + /* restore signal mask and actions and handle signals */ + pth_sc(sigprocmask)(SIG_SETMASK, &oss, NULL); + for (sig = 1; sig < PTH_NSIG; sig++) + if (sigismember(&pth_sigcatch, sig)) + sigaction(sig, &osa[sig], NULL); + + /* if the timer elapsed, handle it */ + if (!dopoll && rc == 0 && nexttimer_ev != NULL) { + if (nexttimer_ev->ev_type == PTH_EVENT_FUNC) { + /* it was an implicit timer event for a function event, + so repeat the event handling for rechecking the function */ + loop_repeat = TRUE; + } + else { + /* it was an explicit timer event, standing for its own */ + pth_debug2("pth_sched_eventmanager: [timeout] event occurred for thread \"%s\"", + nexttimer_thread->name); + nexttimer_ev->ev_occurred = TRUE; + } + } + + /* if the internal signal pipe was used, adjust the select() results */ + if (!dopoll && rc > 0 && FD_ISSET(pth_sigpipe[0], &rfds)) { + FD_CLR(pth_sigpipe[0], &rfds); + rc--; + } + + /* if an error occurred, avoid confusion in the cleanup loop */ + if (rc <= 0) { + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + } + + /* now comes the final cleanup loop where we've to + do two jobs: first we've to the late handling of the fd I/O events and + additionally if a thread has one occurred event, we move it from the + waiting queue to the ready queue */ + + /* for all threads in the waiting queue... */ + t = pth_pqueue_head(&pth_WQ); + while (t != NULL) { + + /* do the late handling of the fd I/O and signal + events in the waiting event ring */ + any_occurred = FALSE; + if (t->events != NULL) { + ev = evh = t->events; + do { + /* + * Late handling for still not occured events + */ + if (!ev->ev_occurred) { + /* Filedescriptor I/O */ + if (ev->ev_type == PTH_EVENT_FD) { + if ( ( ev->ev_goal & PTH_UNTIL_FD_READABLE + && FD_ISSET(ev->ev_args.FD.fd, &rfds)) + || ( ev->ev_goal & PTH_UNTIL_FD_WRITEABLE + && FD_ISSET(ev->ev_args.FD.fd, &wfds)) + || ( ev->ev_goal & PTH_UNTIL_FD_EXCEPTION + && FD_ISSET(ev->ev_args.FD.fd, &efds)) ) { + pth_debug2("pth_sched_eventmanager: " + "[I/O] event occurred for thread \"%s\"", t->name); + ev->ev_occurred = TRUE; + } + } + /* Filedescriptor Set I/O */ + else if (ev->ev_type == PTH_EVENT_SELECT) { + if (pth_util_fds_test(ev->ev_args.SELECT.nfd, + ev->ev_args.SELECT.rfds, &rfds, + ev->ev_args.SELECT.wfds, &wfds, + ev->ev_args.SELECT.efds, &efds)) { + n = pth_util_fds_select(ev->ev_args.SELECT.nfd, + ev->ev_args.SELECT.rfds, &rfds, + ev->ev_args.SELECT.wfds, &wfds, + ev->ev_args.SELECT.efds, &efds); + if (ev->ev_args.SELECT.n != NULL) + *(ev->ev_args.SELECT.n) = n; + ev->ev_occurred = TRUE; + pth_debug2("pth_sched_eventmanager: " + "[I/O] event occurred for thread \"%s\"", t->name); + } + } + /* Signal Set */ + else if (ev->ev_type == PTH_EVENT_SIGS) { + for (sig = 1; sig < PTH_NSIG; sig++) { + if (sigismember(ev->ev_args.SIGS.sigs, sig)) { + if (sigismember(&pth_sigraised, sig)) { + if (ev->ev_args.SIGS.sig != NULL) + *(ev->ev_args.SIGS.sig) = sig; + pth_debug2("pth_sched_eventmanager: " + "[signal] event occurred for thread \"%s\"", t->name); + sigdelset(&pth_sigraised, sig); + ev->ev_occurred = TRUE; + } + } + } + } + } + /* + * post-processing for already occured events + */ + else { + /* Condition Variable Signal */ + if (ev->ev_type == PTH_EVENT_COND) { + /* clean signal */ + if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { + ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_SIGNALED); + ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_BROADCAST); + ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_HANDLED); + } + } + } + + /* local to global mapping */ + if (ev->ev_occurred) + any_occurred = TRUE; + } while ((ev = ev->ev_next) != evh); + } + + /* cancellation support */ + if (t->cancelreq == TRUE) { + pth_debug2("pth_sched_eventmanager: cancellation request pending for thread \"%s\"", t->name); + any_occurred = TRUE; + } + + /* walk to next thread in waiting queue */ + tlast = t; + t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT); + + /* + * move last thread to ready queue if any events occurred for it. + * we insert it with a slightly increased queue priority to it a + * better chance to immediately get scheduled, else the last running + * thread might immediately get again the CPU which is usually not + * what we want, because we oven use pth_yield() calls to give others + * a chance. + */ + if (any_occurred) { + pth_pqueue_delete(&pth_WQ, tlast); + tlast->state = PTH_STATE_READY; + pth_pqueue_insert(&pth_RQ, tlast->prio+1, tlast); + pth_debug2("pth_sched_eventmanager: thread \"%s\" moved from waiting " + "to ready queue", tlast->name); + } + } + + /* perhaps we have to internally loop... */ + if (loop_repeat) { + pth_time_set(now, PTH_TIME_NOW); + goto loop_entry; + } + + pth_debug1("pth_sched_eventmanager: leaving"); + return; +} + +intern void pth_sched_eventmanager_sighandler(int sig) +{ + char c; + + /* remember raised signal */ + sigaddset(&pth_sigraised, sig); + + /* write signal to signal pipe in order to awake the select() */ + c = (int)sig; + pth_sc(write)(pth_sigpipe[1], &c, sizeof(char)); + return; +} +