--- sio.c 2002/11/14 09:22:42 1.11
+++ sio.c 2002/11/14 15:56:10 1.12
@@ -34,7 +34,6 @@
#include "al.h"
#include "sio.h"
-#include "sio_module.h"
#include "list.h"
/****************************************************************************/
@@ -52,49 +51,68 @@
#define SIO_RC(rv) (rv)
#endif /* WITH_EX */
-
+/*
+ * node representing either input or output
+ * direction of a processing node
+ */
struct sio_halfduplex_st;
typedef struct sio_halfduplex_st sio_halfduplex_t;
struct sio_halfduplex_st {
- NODE(sio_halfduplex_t) hd;
- sio_stage_t *stage;
- sio_halfduplex_t *cross;
- const char *tag;
- sio_rc_t rc_with_data, rc_no_data;
- al_t *al;
- sio_rc_t (*func)(sio_t *, al_t *, void *);
+ NODE(sio_halfduplex_t) hd; /* link to reader/writer chain */
+ sio_stage_t *stage; /* back to its stage structure */
+ sio_halfduplex_t *cross; /* reader <-> writer sibling */
+ const char *tag; /* debugging help */
+ sio_rc_t rc_with_data; /* default rc to avoid */
+ sio_rc_t rc_no_data; /* decision in strategy() */
+ al_t *al; /* reader/writer assembly line */
+ sio_rc_t (*func)(sio_t *, al_t *, void *); /* input() or output() */
+};
+
+/*
+ * processing node
+ */
+struct sio_stage_st {
+ sio_halfduplex_t reader; /* reader node, linked into sio_t */
+ sio_halfduplex_t writer; /* writer node, linked into sio_t */
+ void *userdata; /* module private per-instance data */
+ sio_module_t *module; /* link to module methods */
+ sio_mode_t rw; /* state of attachment */
};
+/*
+ * represent the whole full-duplex pipe
+ */
struct sio_st {
struct {
- LIST(sio_halfduplex_t) hd;
- al_t *al;
+ LIST(sio_halfduplex_t) hd; /* link reader halfduplex nodes */
+ al_t *al; /* the reader assembly line */
} readers;
struct {
- LIST(sio_halfduplex_t) hd;
- al_t *al;
+ LIST(sio_halfduplex_t) hd; /* link writer halfduplex nodes */
+ al_t *al; /* the writer assembly line */
} writers;
- sio_labelnum_t label_data;
- sio_labelnum_t label_error;
+ sio_labelnum_t label_data; /* unique al_label object */
+ sio_labelnum_t label_error; /* to tag data and signals */
sio_labelnum_t label_eof;
- int eof_flag;
+ int eof_flag; /* accumulating flags */
int error_flag;
- char *dst;
+ char *dst; /* write pointer for sio_read */
};
+
+/*
+ * AL tags data with unique pointers. Each (!) sio structure has
+ * unique (!) labels to tag data, error information and eof
+ * information on the global assembly lines
+ */
#define SIO_LABEL_DATA(sio) ((al_label_t)&(sio)->label_data)
#define SIO_LABEL_ERROR(sio) ((al_label_t)&(sio)->label_error)
#define SIO_LABEL_EOF(sio) ((al_label_t)&(sio)->label_eof)
-struct sio_stage_st {
- sio_halfduplex_t reader;
- sio_halfduplex_t writer;
- void *userdata;
- sio_module_t *module;
- sio_mode_t rw;
-};
-
/****************************************************************************/
+/*
+ * schedule stages on chain of halfduplex nodes
+ */
static
sio_rc_t sio_strategy(sio_t *sio, sio_halfduplex_t *chain)
{
@@ -114,7 +132,7 @@
* if we drop off the chain, simply result SIO_OK
*
*/
- rc = SIO_UPSTREAM;
+ rc = SIO_SCHED_UP;
h = chain;
while (h != NULL) {
rc = h->func(sio, h->al, h->stage->userdata);
@@ -127,13 +145,13 @@
rc = h->rc_no_data;
}
- if (rc == SIO_UPSTREAM)
+ if (rc == SIO_SCHED_UP)
h = NEXT(h,hd);
- else if (rc == SIO_DOWNSTREAM)
+ else if (rc == SIO_SCHED_DOWN)
h = PREV(h,hd);
- else if (rc == SIO_XSTREAM)
+ else if (rc == SIO_SCHED_CROSS)
h = h->cross;
- else if (rc == SIO_LOOP)
+ else if (rc == SIO_SCHED_LOOP)
h = h;
else
break;
@@ -147,6 +165,10 @@
/**************************************************************************/
+/*
+ * allocate and intialize sio_t data structure
+ *
+ */
sio_rc_t sio_create(sio_t **siop)
{
sio_t *sio;
@@ -162,6 +184,10 @@
LISTINIT(&sio->readers,hd);
LISTINIT(&sio->writers,hd);
+ /*
+ * we only need unique pointers for the labels, but
+ * we point the pointers also to symbolic constants
+ */
sio->label_data = SIO_LN_DATA;
sio->label_error = SIO_LN_ERROR;
sio->label_eof = SIO_LN_EOF;
@@ -171,17 +197,31 @@
return SIO_OK;
}
+/*
+ * destroy sio_t data structure.
+ *
+ * no deinitialization is done.
+ */
sio_rc_t sio_destroy(sio_t *sio)
{
/* argument sanity check(s) */
if (sio == NULL)
return SIO_RC(SIO_ERR_ARG);
+ /* see wether all stages are detached */
+ if (!ISEMPTY(&sio->readers,hd) ||
+ !ISEMPTY(&sio->writers,hd))
+ return SIO_RC(SIO_ERR_ARG);
+
free(sio);
return SIO_OK;
}
+/*
+ * create pair of halfduplex nodes that use methods
+ * from module siom.
+ */
sio_rc_t sio_create_stage(sio_t *sio, sio_module_t *siom, sio_stage_t **siosp)
{
sio_rc_t rc;
@@ -214,10 +254,10 @@
sios->writer.tag = "writer";
/* default rules */
- sios->reader.rc_with_data = SIO_DOWNSTREAM;
- sios->reader.rc_no_data = SIO_UPSTREAM;
- sios->writer.rc_with_data = SIO_UPSTREAM;
- sios->writer.rc_no_data = SIO_DOWNSTREAM;
+ sios->reader.rc_with_data = SIO_SCHED_DOWN;
+ sios->reader.rc_no_data = SIO_SCHED_UP;
+ sios->writer.rc_with_data = SIO_SCHED_UP;
+ sios->writer.rc_no_data = SIO_SCHED_DOWN;
rc = sios->module->init(sio, &sios->userdata);
if (rc != SIO_OK) {
@@ -230,6 +270,9 @@
return SIO_RC(rc);
}
+/*
+ * pass parameters to the configure method of a stage
+ */
sio_rc_t sio_configure_stage(sio_t *sio, sio_stage_t *sios,
void *obj, void *value)
{
@@ -244,6 +287,9 @@
return SIO_RC(rc);
}
+/*
+ *
+ */
sio_rc_t sio_destroy_stage(sio_t *sio, sio_stage_t *sios)
{
sio_rc_t rc;
@@ -252,12 +298,27 @@
if (sio == NULL || sios == NULL)
return SIO_RC(SIO_ERR_ARG);
+ /* more sanity checking */
+ if (sios->rw != SIO_MODE_INVALID)
+ return SIO_RC(SIO_ERR_ARG);
+
rc = sios->module->cleanup(sio, sios->userdata);
free(sios);
return SIO_OK;
}
+/*
+ * allocate global assembly lines
+ *
+ * this is called before a module gets attached
+ *
+ * the first module attached as a reader allocates
+ * the read assembly line
+ *
+ * the first module attached as a writer allocates
+ * the write assembly line
+ */
static
sio_rc_t sio_create_al(sio_t *sio, sio_mode_t rw)
{
@@ -286,6 +347,18 @@
return SIO_OK;
}
+/*
+ * free global assembly lines
+ *
+ * this is called after a module has been detached
+ *
+ * if the detached module was a reader and there are no more
+ * readers then drop read assembly line
+ *
+ * if the detached module was a writer and there are no more
+ * writers then drop write assembly line
+ *
+ */
static
sio_rc_t sio_destroy_al(sio_t *sio, sio_mode_t rw)
{
@@ -305,6 +378,19 @@
return SIO_OK;
}
+/*
+ * attach a stage to the read and/or write side of the pipe
+ *
+ * prepare assembly lines
+ *
+ * stages that are reader and writer get a pointer to the
+ * sibling side so that the scheduler can cross between
+ * reading and writing
+ *
+ * when a stage is attached to either side its openr and
+ * openw methods are called respectively
+ *
+ */
sio_rc_t sio_attach(sio_t *sio, sio_stage_t *sios, sio_mode_t rw)
{
sio_rc_t rc;
@@ -365,6 +451,15 @@
return SIO_OK;
}
+/*
+ * detach a stage to the read and/or write side of the pipe
+ *
+ * when a stage is detached from either side its closer and
+ * closew methods are called respectively
+ *
+ * drop assembly lines when possible
+ *
+ */
sio_rc_t sio_detach(sio_t *sio, sio_stage_t *sios)
{
sio_rc_t rc;
@@ -411,6 +506,16 @@
return SIO_RC(rc);
}
+/*
+ * retrieve data from the input side
+ *
+ * if there is no data in the reader assembly line
+ * then schedule the input side of the pipe once
+ *
+ * retrieve data from the reader assembly line up to
+ * the specified byte limit
+ *
+ */
sio_rc_t sio_input(sio_t *sio, al_t *al, size_t limit)
{
sio_rc_t rc;
@@ -447,6 +552,14 @@
return SIO_OK;
}
+/*
+ * pass data to the output side
+ *
+ * append data to the writer assembly line
+ *
+ * schedule the output side of the pipe
+ *
+ */
sio_rc_t sio_output(sio_t *sio, al_t *al)
{
sio_rc_t rc;
@@ -474,6 +587,16 @@
return SIO_RC(rc);
}
+/*
+ * schedule the output side of the pipe and
+ * signal to flush data buffers
+ *
+ * current the signalling is done by sending
+ * an EOF data chunk. Convention for all
+ * buffering modules is to flush data buffers
+ * on label boundaries.
+ *
+ */
sio_rc_t sio_push(sio_t *sio)
{
sio_rc_t rc;
@@ -500,6 +623,14 @@
return SIO_RC(rc);
}
+/*
+ * callback used by sio_read to scan through
+ * reader assembly line
+ *
+ * data chunks are copied to the destination buffer
+ * error chunks set the error flag
+ * eof chunks set the eof flag
+ */
static al_rc_t sio_readchunk(al_chunk_t *alc, void *u)
{
sio_t *sio = (sio_t *)u;
@@ -518,6 +649,12 @@
return AL_OK;
}
+/*
+ * retrieve data from the pipe into a buffer much like read()
+ *
+ * handles error and eof signals
+ *
+ */
sio_rc_t sio_read(sio_t *sio, char *dst, size_t n, size_t *actualp)
{
al_rc_t arc;
@@ -543,6 +680,9 @@
return SIO_RC(rc);
}
+/*
+ * send data to the pipe from a buffer much like write()
+ */
sio_rc_t sio_write(sio_t *sio, char *src, size_t n, size_t *actualp)
{
al_rc_t arc;
@@ -567,7 +707,14 @@
return SIO_RC(rc);
}
-int sio_flag(sio_t *sio, sio_flag_t fl)
+/*
+ * query a SIO flag
+ *
+ * currently this is SIO_FLAG_ERROR and SIO_FLAG_EOF
+ * which are set by sio_read()
+ *
+ */
+sio_rc_t sio_flag(sio_t *sio, sio_flag_t fl)
{
int rc;
@@ -583,10 +730,13 @@
break;
}
- return rc;
+ return rc ? SIO_TRUE : SIO_FALSE;
}
-int sio_clearflag(sio_t *sio, sio_flag_t fl)
+/*
+ * query and clear a SIO flag
+ */
+sio_rc_t sio_clearflag(sio_t *sio, sio_flag_t fl)
{
int rc;
@@ -604,7 +754,7 @@
break;
}
- return rc;
+ return rc ? SIO_TRUE : SIO_FALSE;
}
const char *sio_error(sio_rc_t rc)
@@ -612,22 +762,30 @@
const char *mess;
switch (rc) {
- case SIO_OK: mess = "Everything Ok"; break;
- case SIO_ERR_ARG: mess = "Invalid Argument"; break;
- case SIO_ERR_MEM: mess = "Not Enough Memory"; break;
- case SIO_ERR_EOF: mess = "End Of Data"; break;
- case SIO_ERR_SYS: mess = "Operating System Error"; break;
- case SIO_ERR_INT: mess = "Internal Error"; break;
- case SIO_UPSTREAM: mess = "Invoke Upstream Stage"; break;
- case SIO_DOWNSTREAM: mess = "Invoke Downstream Stage"; break;
- case SIO_XSTREAM: mess = "Invoke Crossstream Stage"; break;
- case SIO_LOOP: mess = "Loop through current Stage"; break;
- default: mess = "Invalid Result Code"; break;
+ case SIO_OK: mess = "Everything Ok"; break;
+ case SIO_ERR_ARG: mess = "Invalid Argument"; break;
+ case SIO_ERR_MEM: mess = "Not Enough Memory"; break;
+ case SIO_ERR_EOF: mess = "End Of Data"; break;
+ case SIO_ERR_SYS: mess = "Operating System Error"; break;
+ case SIO_ERR_INT: mess = "Internal Error"; break;
+ case SIO_SCHED_UP: mess = "Invoke Upstream Stage"; break;
+ case SIO_SCHED_DOWN: mess = "Invoke Downstream Stage"; break;
+ case SIO_SCHED_CROSS: mess = "Invoke Crossstream Stage"; break;
+ case SIO_SCHED_LOOP: mess = "Loop through current Stage"; break;
+ default: mess = "Invalid Result Code"; break;
}
return mess;
}
+/*
+ * stages need to share the labels that distinguish between
+ * data and signals on the pipe
+ *
+ * this function returns OSSP al labels for SIO specific
+ * label numbers defined in sio.h
+ *
+ */
sio_rc_t sio_label(sio_t *sio, sio_labelnum_t ln, al_label_t *p)
{
void *label;
|