OSSP CVS Repository

ossp - ossp-pkg/pth/test_mp.c
Not logged in
[Honeypot]  [Browse]  [Directory]  [Home]  [Login
[Reports]  [Search]  [Ticket]  [Timeline
  [Raw

ossp-pkg/pth/test_mp.c
/*
**  GNU Pth - The GNU Portable Threads
**  Copyright (c) 1999-2007 Ralf S. Engelschall <rse@engelschall.com>
**
**  This file is part of GNU Pth, a non-preemptive thread scheduling
**  library which can be found at http://www.gnu.org/software/pth/.
**
**  This library is free software; you can redistribute it and/or
**  modify it under the terms of the GNU Lesser General Public
**  License as published by the Free Software Foundation; either
**  version 2.1 of the License, or (at your option) any later version.
**
**  This library is distributed in the hope that it will be useful,
**  but WITHOUT ANY WARRANTY; without even the implied warranty of
**  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
**  Lesser General Public License for more details.
**
**  You should have received a copy of the GNU Lesser General Public
**  License along with this library; if not, write to the Free Software
**  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
**  USA, or contact Ralf S. Engelschall <rse@engelschall.com>.
**
**  test_mp.c: Pth test program (message port handling)
*/
                             /* ``Failure is not an option.
                                It comes bundled with software.'' */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <unistd.h>

#include "pth.h"

#include "test_common.h"

/* our simple query structure */
struct query {
    pth_message_t head; /* the standard message header */
    char *string;       /* the query ingredients */
};

/* our worker thread which translates the string to upper case */
typedef struct {
    pth_msgport_t mp;
    pth_event_t ev;
} worker_cleanup_t;
static void worker_cleanup(void *arg)
{
    worker_cleanup_t *wc = (worker_cleanup_t *)arg;
    pth_event_free(wc->ev, PTH_FREE_THIS);
    pth_msgport_destroy(wc->mp);
    return;
}
static void *worker(void *_dummy)
{
    worker_cleanup_t wc;
    pth_msgport_t mp;
    pth_event_t ev;
    struct query *q;
    int i;

    fprintf(stderr, "worker: start\n");
    wc.mp = mp = pth_msgport_create("worker");
    wc.ev = ev = pth_event(PTH_EVENT_MSG, mp);
    pth_cleanup_push(worker_cleanup, &wc);
    for (;;) {
         if ((i = pth_wait(ev)) != 1)
             continue;
         while ((q = (struct query *)pth_msgport_get(mp)) != NULL) {
             fprintf(stderr, "worker: recv query <%s>\n", q->string);
             for (i = 0; q->string[i] != NUL; i++)
                 q->string[i] = toupper(q->string[i]);
             fprintf(stderr, "worker: send reply <%s>\n", q->string);
             pth_msgport_reply((pth_message_t *)q);
         }
    }
    return NULL;
}

/* a useless ticker thread */
static void *ticker(void *_arg)
{
    time_t now;
    fprintf(stderr, "ticker: start\n");
    for (;;) {
        pth_sleep(5);
        now = time(NULL);
        fprintf(stderr, "ticker was woken up on %s", ctime(&now));
    }
    /* NOTREACHED */
    return NULL;
}

#define MAXLINELEN 1024

int main(int argc, char *argv[])
{
    char caLine[MAXLINELEN];
    pth_event_t ev = NULL;
    pth_event_t evt = NULL;
    pth_t t_worker = NULL;
    pth_t t_ticker = NULL;
    pth_attr_t t_attr;
    pth_msgport_t mp = NULL;
    pth_msgport_t mp_worker = NULL;
    struct query *q = NULL;
    int n;

    if (!pth_init()) {
        perror("pth_init");
        exit(1);
    }

    fprintf(stderr, "This is TEST_MP, a Pth test using message ports.\n");
    fprintf(stderr, "\n");
    fprintf(stderr, "Lines on stdin are send to a worker thread via message\n");
    fprintf(stderr, "ports, translated to upper case by the worker thread and\n");
    fprintf(stderr, "send back to the main thread via message ports.\n");
    fprintf(stderr, "Additionally a useless ticker thread awakens every 5s.\n");
    fprintf(stderr, "Enter \"quit\" on stdin for stopping this test.\n");
    fprintf(stderr, "\n");

    t_attr = pth_attr_new();
    pth_attr_set(t_attr, PTH_ATTR_NAME, "worker");
    pth_attr_set(t_attr, PTH_ATTR_JOINABLE, TRUE);
    pth_attr_set(t_attr, PTH_ATTR_STACK_SIZE, 16*1024);
    t_worker = pth_spawn(t_attr, worker, NULL);
    pth_attr_set(t_attr, PTH_ATTR_NAME, "ticker");
    t_ticker = pth_spawn(t_attr, ticker, NULL);
    pth_attr_destroy(t_attr);
    pth_yield(NULL);

    mp_worker = pth_msgport_find("worker");
    mp = pth_msgport_create("main");
    q = (struct query *)malloc(sizeof(struct query));
    ev = pth_event(PTH_EVENT_MSG, mp);

    evt = NULL;
    for (;;) {
        if (evt == NULL)
            evt = pth_event(PTH_EVENT_TIME, pth_timeout(20,0));
        else
            evt = pth_event(PTH_EVENT_TIME|PTH_MODE_REUSE, evt, pth_timeout(20,0));
        n = pth_readline_ev(STDIN_FILENO, caLine, MAXLINELEN, evt);
        if (n == -1 && pth_event_status(evt) == PTH_STATUS_OCCURRED) {
            fprintf(stderr, "main: Hey, what are you waiting for? Type in something!\n");
            continue;
        }
        if (n < 0) {
            fprintf(stderr, "main: I/O read error on stdin\n");
            break;
        }
        if (n == 0) {
            fprintf(stderr, "main: EOF on stdin\n");
            break;
        }
        caLine[n-1] = NUL;
        if (strcmp(caLine, "quit") == 0) {
            fprintf(stderr, "main: quit\n");
            break;
        }
        fprintf(stderr, "main: out --> <%s>\n", caLine);
        q->string = caLine;
        q->head.m_replyport = mp;
        pth_msgport_put(mp_worker, (pth_message_t *)q);
        pth_wait(ev);
        q = (struct query *)pth_msgport_get(mp);
        fprintf(stderr, "main: in <-- <%s>\n", q->string);
    }

    free(q);
    pth_event_free(ev, PTH_FREE_THIS);
    pth_event_free(evt, PTH_FREE_THIS);
    pth_msgport_destroy(mp);
    pth_cancel(t_worker);
    pth_join(t_worker, NULL);
    pth_cancel(t_ticker);
    pth_join(t_ticker, NULL);
    pth_kill();
    return 0;
}


CVSTrac 2.0.1