*** /dev/null Sat Nov 23 01:42:00 2024
--- - Sat Nov 23 01:42:45 2024
***************
*** 0 ****
--- 1,787 ----
+ /*
+ ** GNU Pth - The GNU Portable Threads
+ ** Copyright (c) 1999-2000 Ralf S. Engelschall <rse@engelschall.com>
+ **
+ ** This file is part of GNU Pth, a non-preemptive thread scheduling
+ ** library which can be found at http://www.gnu.org/software/pth/.
+ **
+ ** This library is free software; you can redistribute it and/or
+ ** modify it under the terms of the GNU Lesser General Public
+ ** License as published by the Free Software Foundation; either
+ ** version 2.1 of the License, or (at your option) any later version.
+ **
+ ** This library is distributed in the hope that it will be useful,
+ ** but WITHOUT ANY WARRANTY; without even the implied warranty of
+ ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ ** Lesser General Public License for more details.
+ **
+ ** You should have received a copy of the GNU Lesser General Public
+ ** License along with this library; if not, write to the Free Software
+ ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ** USA, or contact Ralf S. Engelschall <rse@engelschall.com>.
+ **
+ ** pth_sched.c: Pth thread scheduler, the real heart of Pth
+ */
+ /* ``Recursive, adj.;
+ see Recursive.''
+ -- Unknown */
+ #include "pth_p.h"
+
+ intern pth_t pth_main; /* the main thread */
+ intern pth_t pth_sched; /* the permanent scheduler thread */
+ intern pth_t pth_current; /* the currently running thread */
+ intern pth_pqueue_t pth_NQ; /* queue of new threads */
+ intern pth_pqueue_t pth_RQ; /* queue of threads ready to run */
+ intern pth_pqueue_t pth_WQ; /* queue of threads waiting for an event */
+ intern pth_pqueue_t pth_SQ; /* queue of suspended threads */
+ intern pth_pqueue_t pth_DQ; /* queue of terminated threads */
+ intern float pth_loadval; /* average scheduler load value */
+
+ static int pth_sigpipe[2]; /* internal signal occurrence pipe */
+ static sigset_t pth_sigpending; /* mask of pending signals */
+ static sigset_t pth_sigblock; /* mask of signals we block in scheduler */
+ static sigset_t pth_sigcatch; /* mask of signals we have to catch */
+ static sigset_t pth_sigraised; /* mask of raised signals */
+
+ static pth_time_t pth_loadticknext;
+ static pth_time_t pth_loadtickgap = PTH_TIME(1,0);
+
+ /* initialize the scheduler ingredients */
+ intern void pth_scheduler_init(void)
+ {
+ /* create the internal signal pipe */
+ if (pipe(pth_sigpipe) == -1) {
+ fprintf(stderr, "**Pth** INIT: Cannot create internal pipe: %s\n",
+ strerror(errno));
+ abort();
+ }
+ pth_fdmode(pth_sigpipe[0], PTH_FDMODE_NONBLOCK);
+ pth_fdmode(pth_sigpipe[1], PTH_FDMODE_NONBLOCK);
+
+ /* initialize the essential threads */
+ pth_sched = NULL;
+ pth_current = NULL;
+
+ /* initalize the thread queues */
+ pth_pqueue_init(&pth_NQ);
+ pth_pqueue_init(&pth_RQ);
+ pth_pqueue_init(&pth_WQ);
+ pth_pqueue_init(&pth_SQ);
+ pth_pqueue_init(&pth_DQ);
+
+ /* initialize load support */
+ pth_loadval = 1.0;
+ pth_time_set(&pth_loadticknext, PTH_TIME_NOW);
+ return;
+ }
+
+ /* drop all threads (except for the currently active one) */
+ intern void pth_scheduler_drop(void)
+ {
+ pth_t t;
+
+ /* clear the new queue */
+ while ((t = pth_pqueue_delmax(&pth_NQ)) != NULL);
+ pth_tcb_free(t);
+ pth_pqueue_init(&pth_NQ);
+
+ /* clear the ready queue */
+ while ((t = pth_pqueue_delmax(&pth_RQ)) != NULL);
+ pth_tcb_free(t);
+ pth_pqueue_init(&pth_RQ);
+
+ /* clear the waiting queue */
+ while ((t = pth_pqueue_delmax(&pth_WQ)) != NULL);
+ pth_tcb_free(t);
+ pth_pqueue_init(&pth_WQ);
+
+ /* clear the suspend queue */
+ while ((t = pth_pqueue_delmax(&pth_SQ)) != NULL);
+ pth_tcb_free(t);
+ pth_pqueue_init(&pth_SQ);
+
+ /* clear the dead queue */
+ while ((t = pth_pqueue_delmax(&pth_DQ)) != NULL);
+ pth_tcb_free(t);
+ pth_pqueue_init(&pth_DQ);
+ return;
+ }
+
+ /* kill the scheduler ingredients */
+ intern void pth_scheduler_kill(void)
+ {
+ /* drop all threads */
+ pth_scheduler_drop();
+
+ /* remove the internal signal pipe */
+ close(pth_sigpipe[0]);
+ close(pth_sigpipe[1]);
+ return;
+ }
+
+ /*
+ * Update the average scheduler load.
+ *
+ * This is called on every context switch, but we have to adjust the
+ * average load value every second, only. When we're called more than
+ * once per second we handle this by just calculating anything once
+ * and then do NOPs until the next ticks is over. When the scheduler
+ * waited for more than once second (or a thread CPU burst lasted for
+ * more than once second) we simulate the missing calculations. That's
+ * no problem because we can assume that the number of ready threads
+ * then wasn't changed dramatically (or more context switched would have
+ * been occurred and we would have been given more chances to operate).
+ * The actual average load is calculated through an exponential average
+ * formula.
+ */
+ #define pth_scheduler_load(now) \
+ if (pth_time_cmp((now), &pth_loadticknext) >= 0) { \
+ pth_time_t ttmp; \
+ int numready; \
+ numready = pth_pqueue_elements(&pth_RQ); \
+ pth_time_set(&ttmp, (now)); \
+ do { \
+ pth_loadval = (numready*0.25) + (pth_loadval*0.75); \
+ pth_time_sub(&ttmp, &pth_loadtickgap); \
+ } while (pth_time_cmp(&ttmp, &pth_loadticknext) >= 0); \
+ pth_time_set(&pth_loadticknext, (now)); \
+ pth_time_add(&pth_loadticknext, &pth_loadtickgap); \
+ }
+
+ /* the heart of this library: the thread scheduler */
+ intern void *pth_scheduler(void *dummy)
+ {
+ sigset_t sigs;
+ pth_time_t running;
+ pth_time_t snapshot;
+ struct sigaction sa;
+ sigset_t ss;
+ int sig;
+ pth_t t;
+
+ /*
+ * bootstrapping
+ */
+ pth_debug1("pth_scheduler: bootstrapping");
+
+ /* mark this thread as the special scheduler thread */
+ pth_sched->state = PTH_STATE_SCHEDULER;
+
+ /* block all signals in the scheduler thread */
+ sigfillset(&sigs);
+ pth_sc(sigprocmask)(SIG_SETMASK, &sigs, NULL);
+
+ /* initialize the snapshot time for bootstrapping the loop */
+ pth_time_set(&snapshot, PTH_TIME_NOW);
+
+ /*
+ * endless scheduler loop
+ */
+ for (;;) {
+ /*
+ * Move threads from new queue to ready queue and give
+ * them maximum priority so they start immediately
+ */
+ while ((t = pth_pqueue_tail(&pth_NQ)) != NULL) {
+ pth_pqueue_delete(&pth_NQ, t);
+ t->state = PTH_STATE_READY;
+ pth_pqueue_insert(&pth_RQ, pth_pqueue_favorite_prio(&pth_RQ), t);
+ pth_debug2("pth_scheduler: new thread \"%s\" moved to top of ready queue", t->name);
+ }
+
+ /*
+ * Update average scheduler load
+ */
+ pth_scheduler_load(&snapshot);
+
+ /*
+ * Find next thread in ready queue
+ */
+ pth_current = pth_pqueue_delmax(&pth_RQ);
+ if (pth_current == NULL) {
+ fprintf(stderr, "**Pth** SCHEDULER INTERNAL ERROR: "
+ "no more thread(s) available to schedule!?!?\n");
+ abort();
+ }
+ pth_debug4("pth_scheduler: thread \"%s\" selected (prio=%d, qprio=%d)",
+ pth_current->name, pth_current->prio, pth_current->q_prio);
+
+ /*
+ * Raise additionally thread-specific signals
+ * (they are delivered when we switch the context)
+ *
+ * Situation is ('#' = signal pending):
+ * process pending (pth_sigpending): ----####
+ * thread pending (pth_current->sigpending): --##--##
+ * Result has to be:
+ * process new pending: --######
+ */
+ if (pth_current->sigpendcnt > 0) {
+ sigpending(&pth_sigpending);
+ for (sig = 1; sig < PTH_NSIG; sig++)
+ if (sigismember(&pth_current->sigpending, sig))
+ if (!sigismember(&pth_sigpending, sig))
+ kill(getpid(), sig);
+ }
+
+ /*
+ * Set running start time for new thread
+ * and perform a context switch to it
+ */
+ pth_debug3("pth_scheduler: switching to thread 0x%lx (\"%s\")",
+ (unsigned long)pth_current, pth_current->name);
+
+ /* update thread times */
+ pth_time_set(&pth_current->lastran, PTH_TIME_NOW);
+
+ /* update scheduler times */
+ pth_time_set(&running, &pth_current->lastran);
+ pth_time_sub(&running, &snapshot);
+ pth_time_add(&pth_sched->running, &running);
+
+ /* ** ENTERING THREAD ** - by switching the machine context */
+ pth_mctx_switch(&pth_sched->mctx, &pth_current->mctx);
+
+ /* update scheduler times */
+ pth_time_set(&snapshot, PTH_TIME_NOW);
+ pth_debug3("pth_scheduler: cameback from thread 0x%lx (\"%s\")",
+ (unsigned long)pth_current, pth_current->name);
+
+ /*
+ * Calculate and update the time the previous thread was running
+ */
+ pth_time_set(&running, &snapshot);
+ pth_time_sub(&running, &pth_current->lastran);
+ pth_time_add(&pth_current->running, &running);
+ pth_debug3("pth_scheduler: thread \"%s\" ran %.6f",
+ pth_current->name, pth_time_t2d(&running));
+
+ /*
+ * Remove still pending thread-specific signals
+ * (they are re-delivered next time)
+ *
+ * Situation is ('#' = signal pending):
+ * thread old pending (pth_current->sigpending): --##--##
+ * process old pending (pth_sigpending): ----####
+ * process still pending (sigstillpending): ---#-#-#
+ * Result has to be:
+ * process new pending: -----#-#
+ * thread new pending (pth_current->sigpending): ---#---#
+ */
+ if (pth_current->sigpendcnt > 0) {
+ sigset_t sigstillpending;
+ sigpending(&sigstillpending);
+ for (sig = 1; sig < PTH_NSIG; sig++) {
+ if (sigismember(&pth_current->sigpending, sig)) {
+ if (!sigismember(&sigstillpending, sig)) {
+ /* thread (and perhaps also process) signal delivered */
+ sigdelset(&pth_current->sigpending, sig);
+ pth_current->sigpendcnt--;
+ }
+ else if (!sigismember(&pth_sigpending, sig)) {
+ /* thread signal not delivered */
+ pth_util_sigdelete(sig);
+ }
+ }
+ }
+ }
+
+ /*
+ * Check for stack overflow
+ */
+ if (pth_current->stackguard != NULL) {
+ if (*pth_current->stackguard != 0xDEAD) {
+ pth_debug3("pth_scheduler: stack overflow detected for thread 0x%lx (\"%s\")",
+ (unsigned long)pth_current, pth_current->name);
+ /*
+ * if the application doesn't catch SIGSEGVs, we terminate
+ * manually with a SIGSEGV now, but output a reasonable message.
+ */
+ if (sigaction(SIGSEGV, NULL, &sa) == 0) {
+ if (sa.sa_handler == SIG_DFL) {
+ fprintf(stderr, "**Pth** STACK OVERFLOW: thread pid_t=0x%lx, name=\"%s\"\n",
+ (unsigned long)pth_current, pth_current->name);
+ kill(getpid(), SIGSEGV);
+ sigfillset(&ss);
+ sigdelset(&ss, SIGSEGV);
+ sigsuspend(&ss);
+ abort();
+ }
+ }
+ /*
+ * else we terminate the thread only and send us a SIGSEGV
+ * which allows the application to handle the situation...
+ */
+ pth_current->join_arg = (void *)0xDEAD;
+ pth_current->state = PTH_STATE_DEAD;
+ kill(getpid(), SIGSEGV);
+ }
+ }
+
+ /*
+ * When previous thread is now marked as dead, kick it out
+ */
+ if (pth_current->state == PTH_STATE_DEAD) {
+ pth_debug2("pth_scheduler: marking thread \"%s\" as dead", pth_current->name);
+ if (!pth_current->joinable)
+ pth_tcb_free(pth_current);
+ else
+ pth_pqueue_insert(&pth_DQ, PTH_PRIO_STD, pth_current);
+ pth_current = NULL;
+ }
+
+ /*
+ * When thread wants to wait for an event
+ * move it to waiting queue now
+ */
+ if (pth_current != NULL && pth_current->state == PTH_STATE_WAITING) {
+ pth_debug2("pth_scheduler: moving thread \"%s\" to waiting queue",
+ pth_current->name);
+ pth_pqueue_insert(&pth_WQ, pth_current->prio, pth_current);
+ pth_current = NULL;
+ }
+
+ /*
+ * migrate old treads in ready queue into higher
+ * priorities to avoid starvation and insert last running
+ * thread back into this queue, too.
+ */
+ pth_pqueue_increase(&pth_RQ);
+ if (pth_current != NULL)
+ pth_pqueue_insert(&pth_RQ, pth_current->prio, pth_current);
+
+ /*
+ * Manage the events in the waiting queue, i.e. decide whether their
+ * events occurred and move them to the ready queue. But wait only if
+ * we have already no new or ready threads.
+ */
+ if ( pth_pqueue_elements(&pth_RQ) == 0
+ && pth_pqueue_elements(&pth_NQ) == 0)
+ pth_sched_eventmanager(&snapshot, FALSE /* wait */);
+ else
+ pth_sched_eventmanager(&snapshot, TRUE /* poll */);
+ }
+
+ /* NOTREACHED */
+ return NULL;
+ }
+
+ /*
+ * Look whether some events already occurred and move
+ * corresponding threads from waiting queue back to ready queue.
+ */
+ intern void pth_sched_eventmanager(pth_time_t *now, int dopoll)
+ {
+ pth_t nexttimer_thread;
+ pth_event_t nexttimer_ev;
+ pth_time_t nexttimer_value;
+ pth_event_t evh;
+ pth_event_t ev;
+ pth_t t;
+ pth_t tlast;
+ int this_occurred;
+ int any_occurred;
+ fd_set rfds;
+ fd_set wfds;
+ fd_set efds;
+ struct timeval delay;
+ struct timeval *pdelay;
+ sigset_t oss;
+ struct sigaction sa;
+ struct sigaction osa[1+PTH_NSIG];
+ char minibuf[128];
+ int loop_repeat;
+ int fdmax;
+ int rc;
+ int sig;
+ int n;
+
+ pth_debug2("pth_sched_eventmanager: enter in %s mode",
+ dopoll ? "polling" : "waiting");
+
+ /* entry point for internal looping in event handling */
+ loop_entry:
+ loop_repeat = FALSE;
+
+ /* initialize fd sets */
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&efds);
+ fdmax = -1;
+
+ /* initialize signal status */
+ sigpending(&pth_sigpending);
+ sigfillset(&pth_sigblock);
+ sigemptyset(&pth_sigcatch);
+ sigemptyset(&pth_sigraised);
+
+ /* initialize next timer */
+ pth_time_set(&nexttimer_value, PTH_TIME_ZERO);
+ nexttimer_thread = NULL;
+ nexttimer_ev = NULL;
+
+ /* for all threads in the waiting queue... */
+ any_occurred = FALSE;
+ for (t = pth_pqueue_head(&pth_WQ); t != NULL;
+ t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) {
+
+ /* determine signals we block */
+ for (sig = 1; sig < PTH_NSIG; sig++)
+ if (!sigismember(&(t->mctx.sigs), sig))
+ sigdelset(&pth_sigblock, sig);
+
+ /* cancellation support */
+ if (t->cancelreq == TRUE)
+ any_occurred = TRUE;
+
+ /* ... and all their events... */
+ if (t->events == NULL)
+ continue;
+ /* ...check whether events occurred */
+ ev = evh = t->events;
+ do {
+ if (!ev->ev_occurred) {
+ this_occurred = FALSE;
+
+ /* Filedescriptor I/O */
+ if (ev->ev_type == PTH_EVENT_FD) {
+ /* filedescriptors are checked later all at once.
+ Here we only assemble them in the fd sets */
+ if (ev->ev_goal & PTH_UNTIL_FD_READABLE)
+ FD_SET(ev->ev_args.FD.fd, &rfds);
+ if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE)
+ FD_SET(ev->ev_args.FD.fd, &wfds);
+ if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION)
+ FD_SET(ev->ev_args.FD.fd, &efds);
+ if (fdmax < ev->ev_args.FD.fd)
+ fdmax = ev->ev_args.FD.fd;
+ }
+ /* Filedescriptor Set Select I/O */
+ else if (ev->ev_type == PTH_EVENT_SELECT) {
+ /* filedescriptors are checked later all at once.
+ Here we only merge the fd sets. */
+ pth_util_fds_merge(ev->ev_args.SELECT.nfd,
+ ev->ev_args.SELECT.rfds, &rfds,
+ ev->ev_args.SELECT.wfds, &wfds,
+ ev->ev_args.SELECT.efds, &efds);
+ if (fdmax < ev->ev_args.SELECT.nfd-1)
+ fdmax = ev->ev_args.SELECT.nfd-1;
+ }
+ /* Signal Set */
+ else if (ev->ev_type == PTH_EVENT_SIGS) {
+ for (sig = 1; sig < PTH_NSIG; sig++) {
+ if (sigismember(ev->ev_args.SIGS.sigs, sig)) {
+ /* thread signal handling */
+ if (sigismember(&t->sigpending, sig)) {
+ *(ev->ev_args.SIGS.sig) = sig;
+ sigdelset(&t->sigpending, sig);
+ t->sigpendcnt--;
+ this_occurred = TRUE;
+ }
+ /* process signal handling */
+ if (sigismember(&pth_sigpending, sig)) {
+ if (ev->ev_args.SIGS.sig != NULL)
+ *(ev->ev_args.SIGS.sig) = sig;
+ pth_util_sigdelete(sig);
+ sigdelset(&pth_sigpending, sig);
+ this_occurred = TRUE;
+ }
+ else {
+ sigdelset(&pth_sigblock, sig);
+ sigaddset(&pth_sigcatch, sig);
+ }
+ }
+ }
+ }
+ /* Timer */
+ else if (ev->ev_type == PTH_EVENT_TIME) {
+ if (pth_time_cmp(&(ev->ev_args.TIME.tv), now) < 0)
+ this_occurred = TRUE;
+ else {
+ /* remember the timer which will be elapsed next */
+ if ((nexttimer_thread == NULL && nexttimer_ev == NULL) ||
+ pth_time_cmp(&(ev->ev_args.TIME.tv), &nexttimer_value) < 0) {
+ nexttimer_thread = t;
+ nexttimer_ev = ev;
+ pth_time_set(&nexttimer_value, &(ev->ev_args.TIME.tv));
+ }
+ }
+ }
+ /* Message Port Arrivals */
+ else if (ev->ev_type == PTH_EVENT_MSG) {
+ if (!pth_ring_empty(&(ev->ev_args.MSG.mp->mp_queue)))
+ this_occurred = TRUE;
+ }
+ /* Mutex Release */
+ else if (ev->ev_type == PTH_EVENT_MUTEX) {
+ if (!(ev->ev_args.MUTEX.mutex->mx_state & PTH_MUTEX_LOCKED))
+ this_occurred = TRUE;
+ }
+ /* Condition Variable Signal */
+ else if (ev->ev_type == PTH_EVENT_COND) {
+ if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) {
+ if (ev->ev_args.COND.cond->cn_state & PTH_COND_BROADCAST)
+ this_occurred = TRUE;
+ else {
+ if (!(ev->ev_args.COND.cond->cn_state & PTH_COND_HANDLED)) {
+ ev->ev_args.COND.cond->cn_state |= PTH_COND_HANDLED;
+ this_occurred = TRUE;
+ }
+ }
+ }
+ }
+ /* Thread Termination */
+ else if (ev->ev_type == PTH_EVENT_TID) {
+ if ( ( ev->ev_args.TID.tid == NULL
+ && pth_pqueue_elements(&pth_DQ) > 0)
+ || ( ev->ev_args.TID.tid != NULL
+ && ev->ev_args.TID.tid->state == ev->ev_goal))
+ this_occurred = TRUE;
+ }
+ /* Custom Event Function */
+ else if (ev->ev_type == PTH_EVENT_FUNC) {
+ if (ev->ev_args.FUNC.func(ev->ev_args.FUNC.arg))
+ this_occurred = TRUE;
+ else {
+ pth_time_t tv;
+ pth_time_set(&tv, now);
+ pth_time_add(&tv, &(ev->ev_args.FUNC.tv));
+ if ((nexttimer_thread == NULL && nexttimer_ev == NULL) ||
+ pth_time_cmp(&tv, &nexttimer_value) < 0) {
+ nexttimer_thread = t;
+ nexttimer_ev = ev;
+ pth_time_set(&nexttimer_value, &tv);
+ }
+ }
+ }
+
+ /* tag event if it has occurred */
+ if (this_occurred) {
+ pth_debug2("pth_sched_eventmanager: [non-I/O] event occurred for thread \"%s\"", t->name);
+ ev->ev_occurred = TRUE;
+ any_occurred = TRUE;
+ }
+ }
+ } while ((ev = ev->ev_next) != evh);
+ }
+ if (any_occurred)
+ dopoll = TRUE;
+
+ /* now decide how to poll for fd I/O and timers */
+ if (dopoll) {
+ /* do a polling with immediate timeout,
+ i.e. check the fd sets only without blocking */
+ pth_time_set(&delay, PTH_TIME_ZERO);
+ pdelay = &delay;
+ }
+ else if (nexttimer_ev != NULL) {
+ /* do a polling with a timeout set to the next timer,
+ i.e. wait for the fd sets or the next timer */
+ pth_time_set(&delay, &nexttimer_value);
+ pth_time_sub(&delay, now);
+ pdelay = &delay;
+ }
+ else {
+ /* do a polling without a timeout,
+ i.e. wait for the fd sets only with blocking */
+ pdelay = NULL;
+ }
+
+ /* clear pipe and let select() wait for the read-part of the pipe */
+ while (pth_sc(read)(pth_sigpipe[0], minibuf, sizeof(minibuf)) > 0) ;
+ FD_SET(pth_sigpipe[0], &rfds);
+ if (fdmax < pth_sigpipe[0])
+ fdmax = pth_sigpipe[0];
+
+ /* replace signal actions for signals we've to catch for events */
+ for (sig = 1; sig < PTH_NSIG; sig++) {
+ if (sigismember(&pth_sigcatch, sig)) {
+ sa.sa_handler = pth_sched_eventmanager_sighandler;
+ sigfillset(&sa.sa_mask);
+ sa.sa_flags = 0;
+ sigaction(sig, &sa, &osa[sig]);
+ }
+ }
+
+ /* allow some signals to be delivered: Either to our
+ catching handler or directly to the configured
+ handler for signals not catched by events */
+ pth_sc(sigprocmask)(SIG_SETMASK, &pth_sigblock, &oss);
+
+ /* now do the polling for filedescriptor I/O and timers
+ WHEN THE SCHEDULER SLEEPS AT ALL, THEN HERE!! */
+ rc = -1;
+ if (!(dopoll && fdmax == -1))
+ while ((rc = pth_sc(select)(fdmax+1, &rfds, &wfds, &efds, pdelay)) < 0
+ && errno == EINTR) ;
+
+ /* restore signal mask and actions and handle signals */
+ pth_sc(sigprocmask)(SIG_SETMASK, &oss, NULL);
+ for (sig = 1; sig < PTH_NSIG; sig++)
+ if (sigismember(&pth_sigcatch, sig))
+ sigaction(sig, &osa[sig], NULL);
+
+ /* if the timer elapsed, handle it */
+ if (!dopoll && rc == 0 && nexttimer_ev != NULL) {
+ if (nexttimer_ev->ev_type == PTH_EVENT_FUNC) {
+ /* it was an implicit timer event for a function event,
+ so repeat the event handling for rechecking the function */
+ loop_repeat = TRUE;
+ }
+ else {
+ /* it was an explicit timer event, standing for its own */
+ pth_debug2("pth_sched_eventmanager: [timeout] event occurred for thread \"%s\"",
+ nexttimer_thread->name);
+ nexttimer_ev->ev_occurred = TRUE;
+ }
+ }
+
+ /* if the internal signal pipe was used, adjust the select() results */
+ if (!dopoll && rc > 0 && FD_ISSET(pth_sigpipe[0], &rfds)) {
+ FD_CLR(pth_sigpipe[0], &rfds);
+ rc--;
+ }
+
+ /* if an error occurred, avoid confusion in the cleanup loop */
+ if (rc <= 0) {
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&efds);
+ }
+
+ /* now comes the final cleanup loop where we've to
+ do two jobs: first we've to the late handling of the fd I/O events and
+ additionally if a thread has one occurred event, we move it from the
+ waiting queue to the ready queue */
+
+ /* for all threads in the waiting queue... */
+ t = pth_pqueue_head(&pth_WQ);
+ while (t != NULL) {
+
+ /* do the late handling of the fd I/O and signal
+ events in the waiting event ring */
+ any_occurred = FALSE;
+ if (t->events != NULL) {
+ ev = evh = t->events;
+ do {
+ /*
+ * Late handling for still not occured events
+ */
+ if (!ev->ev_occurred) {
+ /* Filedescriptor I/O */
+ if (ev->ev_type == PTH_EVENT_FD) {
+ if ( ( ev->ev_goal & PTH_UNTIL_FD_READABLE
+ && FD_ISSET(ev->ev_args.FD.fd, &rfds))
+ || ( ev->ev_goal & PTH_UNTIL_FD_WRITEABLE
+ && FD_ISSET(ev->ev_args.FD.fd, &wfds))
+ || ( ev->ev_goal & PTH_UNTIL_FD_EXCEPTION
+ && FD_ISSET(ev->ev_args.FD.fd, &efds)) ) {
+ pth_debug2("pth_sched_eventmanager: "
+ "[I/O] event occurred for thread \"%s\"", t->name);
+ ev->ev_occurred = TRUE;
+ }
+ }
+ /* Filedescriptor Set I/O */
+ else if (ev->ev_type == PTH_EVENT_SELECT) {
+ if (pth_util_fds_test(ev->ev_args.SELECT.nfd,
+ ev->ev_args.SELECT.rfds, &rfds,
+ ev->ev_args.SELECT.wfds, &wfds,
+ ev->ev_args.SELECT.efds, &efds)) {
+ n = pth_util_fds_select(ev->ev_args.SELECT.nfd,
+ ev->ev_args.SELECT.rfds, &rfds,
+ ev->ev_args.SELECT.wfds, &wfds,
+ ev->ev_args.SELECT.efds, &efds);
+ if (ev->ev_args.SELECT.n != NULL)
+ *(ev->ev_args.SELECT.n) = n;
+ ev->ev_occurred = TRUE;
+ pth_debug2("pth_sched_eventmanager: "
+ "[I/O] event occurred for thread \"%s\"", t->name);
+ }
+ }
+ /* Signal Set */
+ else if (ev->ev_type == PTH_EVENT_SIGS) {
+ for (sig = 1; sig < PTH_NSIG; sig++) {
+ if (sigismember(ev->ev_args.SIGS.sigs, sig)) {
+ if (sigismember(&pth_sigraised, sig)) {
+ if (ev->ev_args.SIGS.sig != NULL)
+ *(ev->ev_args.SIGS.sig) = sig;
+ pth_debug2("pth_sched_eventmanager: "
+ "[signal] event occurred for thread \"%s\"", t->name);
+ sigdelset(&pth_sigraised, sig);
+ ev->ev_occurred = TRUE;
+ }
+ }
+ }
+ }
+ }
+ /*
+ * post-processing for already occured events
+ */
+ else {
+ /* Condition Variable Signal */
+ if (ev->ev_type == PTH_EVENT_COND) {
+ /* clean signal */
+ if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) {
+ ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_SIGNALED);
+ ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_BROADCAST);
+ ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_HANDLED);
+ }
+ }
+ }
+
+ /* local to global mapping */
+ if (ev->ev_occurred)
+ any_occurred = TRUE;
+ } while ((ev = ev->ev_next) != evh);
+ }
+
+ /* cancellation support */
+ if (t->cancelreq == TRUE) {
+ pth_debug2("pth_sched_eventmanager: cancellation request pending for thread \"%s\"", t->name);
+ any_occurred = TRUE;
+ }
+
+ /* walk to next thread in waiting queue */
+ tlast = t;
+ t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT);
+
+ /*
+ * move last thread to ready queue if any events occurred for it.
+ * we insert it with a slightly increased queue priority to it a
+ * better chance to immediately get scheduled, else the last running
+ * thread might immediately get again the CPU which is usually not
+ * what we want, because we oven use pth_yield() calls to give others
+ * a chance.
+ */
+ if (any_occurred) {
+ pth_pqueue_delete(&pth_WQ, tlast);
+ tlast->state = PTH_STATE_READY;
+ pth_pqueue_insert(&pth_RQ, tlast->prio+1, tlast);
+ pth_debug2("pth_sched_eventmanager: thread \"%s\" moved from waiting "
+ "to ready queue", tlast->name);
+ }
+ }
+
+ /* perhaps we have to internally loop... */
+ if (loop_repeat) {
+ pth_time_set(now, PTH_TIME_NOW);
+ goto loop_entry;
+ }
+
+ pth_debug1("pth_sched_eventmanager: leaving");
+ return;
+ }
+
+ intern void pth_sched_eventmanager_sighandler(int sig)
+ {
+ char c;
+
+ /* remember raised signal */
+ sigaddset(&pth_sigraised, sig);
+
+ /* write signal to signal pipe in order to awake the select() */
+ c = (int)sig;
+ pth_sc(write)(pth_sigpipe[1], &c, sizeof(char));
+ return;
+ }
+
|