/* ** OSSP sio - Stream I/O ** Copyright (c) 2002-2005 Cable & Wireless ** Copyright (c) 2002-2005 The OSSP Project ** Copyright (c) 2002-2005 Ralf S. Engelschall ** ** This file is part of OSSP sio, a layered stream I/O library ** which can be found at http://www.ossp.org/pkg/lib/sio/. ** ** Permission to use, copy, modify, and distribute this software for ** any purpose with or without fee is hereby granted, provided that ** the above copyright notice and this permission notice appear in all ** copies. ** ** THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED ** WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF ** MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. ** IN NO EVENT SHALL THE AUTHORS AND COPYRIGHT HOLDERS AND THEIR ** CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, ** SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT ** LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF ** USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ** ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, ** OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT ** OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF ** SUCH DAMAGE. ** ** sio.c: stream I/O library implementation */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "al.h" #include "sio.h" #include "list.h" /****************************************************************************/ /* unique library identifier */ const char sio_id[] = "OSSP sio"; /* support for OSSP ex based exception throwing */ #ifdef WITH_EX #define __EX_NS_UCCXX__ #include "ex.h" #define SIO_RC(rv) \ ( (rv) != SIO_OK && (ex_catching && !ex_shielding) \ ? (ex_throw(sio_id, NULL, (rv)), (rv)) : (rv) ) #else #define SIO_RC(rv) (rv) #endif /* WITH_EX */ /* * node representing either input or output * direction of a processing node */ struct sio_halfduplex_st; typedef struct sio_halfduplex_st sio_halfduplex_t; struct sio_halfduplex_st { NODE(sio_halfduplex_t) hd; /* link to reader/writer chain */ sio_stage_t *stage; /* back to its stage structure */ sio_halfduplex_t *cross; /* reader <-> writer sibling */ const char *tag; /* debugging help */ sio_rc_t rc_with_data; /* default rc to avoid */ sio_rc_t rc_no_data; /* decision in strategy() */ al_t *al; /* reader/writer assembly line */ sio_rc_t (*func)(sio_t *, al_t *, void *, sio_rc_t); /* input() or output() */ }; /* * processing node */ struct sio_stage_st { sio_halfduplex_t reader; /* reader node, linked into sio_t */ sio_halfduplex_t writer; /* writer node, linked into sio_t */ void *userdata; /* module private per-instance data */ sio_module_t *module; /* link to module methods */ sio_mode_t rw; /* state of attachment */ }; /* * represent the whole full-duplex pipe */ struct sio_st { struct { LIST(sio_halfduplex_t) hd; /* link reader halfduplex nodes */ al_t *al; /* the reader assembly line */ } readers; struct { LIST(sio_halfduplex_t) hd; /* link writer halfduplex nodes */ al_t *al; /* the writer assembly line */ } writers; sio_labelnum_t label_data; /* unique al_label object */ sio_labelnum_t label_error; /* to tag data and signals */ sio_labelnum_t label_eof; int eof_flag; /* accumulating flags */ int error_flag; }; /* * AL tags data with unique pointers. Each (!) sio structure has * unique (!) labels to tag data, error information and eof * information on the global assembly lines */ #define SIO_LABEL_DATA(sio) ((al_label_t)&(sio)->label_data) #define SIO_LABEL_ERROR(sio) ((al_label_t)&(sio)->label_error) #define SIO_LABEL_EOF(sio) ((al_label_t)&(sio)->label_eof) /****************************************************************************/ /* * schedule stages on chain of halfduplex nodes */ static sio_rc_t sio_strategy(sio_t *sio, sio_halfduplex_t *chain) { sio_rc_t rc; sio_halfduplex_t *h; /* * call stage and direct data upstream/downstream * according to response code * * if stage directs SIO_OK, chose default direction * depending on data in assembly line * * if we the stage does not return a direction, * simply end the code * * if we drop off the chain, simply result SIO_OK * */ rc = SIO_SCHED_UP; h = chain; while (h != NULL) { rc = h->func(sio, h->al, h->stage->userdata, rc); /* chose default direction */ if (rc == SIO_OK) { if (al_bytes(h->al) > 0) rc = h->rc_with_data; else rc = h->rc_no_data; } if (rc == SIO_SCHED_UP) h = NEXT(h,hd); else if (rc == SIO_SCHED_DOWN) h = PREV(h,hd); else if (rc == SIO_SCHED_CROSS) h = h->cross; else if (rc == SIO_SCHED_LOOP) h = h; else break; } if (h == NULL) rc = SIO_OK; return rc; } /**************************************************************************/ /* * allocate and intialize sio_t data structure * */ sio_rc_t sio_create(sio_t **siop) { sio_t *sio; /* argument sanity check(s) */ if (siop == NULL) return SIO_RC(SIO_ERR_ARG); sio = (sio_t *)malloc(sizeof(sio_t)); if (sio == NULL) return SIO_RC(SIO_ERR_MEM); LISTINIT(&sio->readers,hd); LISTINIT(&sio->writers,hd); /* * we only need unique pointers for the labels, but * we point the pointers also to symbolic constants */ sio->label_data = SIO_LN_DATA; sio->label_error = SIO_LN_ERROR; sio->label_eof = SIO_LN_EOF; sio->eof_flag = 0; sio->error_flag = 0; *siop = sio; return SIO_OK; } /* * destroy sio_t data structure. * * no deinitialization is done. */ sio_rc_t sio_destroy(sio_t *sio) { /* argument sanity check(s) */ if (sio == NULL) return SIO_RC(SIO_ERR_ARG); /* see wether all stages are detached */ if (!ISEMPTY(&sio->readers,hd) || !ISEMPTY(&sio->writers,hd)) return SIO_RC(SIO_ERR_ARG); free(sio); return SIO_OK; } /* * create pair of halfduplex nodes that use methods * from module siom. */ sio_rc_t sio_create_stage(sio_t *sio, sio_module_t *siom, sio_stage_t **siosp) { sio_rc_t rc; sio_stage_t *sios; /* argument sanity check(s) */ if (sio == NULL || siom == NULL || siosp == NULL) return SIO_RC(SIO_ERR_ARG); sios = (sio_stage_t *)malloc(sizeof(sio_stage_t)); if (sios == NULL) return SIO_RC(SIO_ERR_MEM); NODEINIT(&sios->reader,hd); NODEINIT(&sios->writer,hd); sios->module = siom; sios->userdata = NULL; sios->rw = SIO_MODE_INVALID; sios->reader.func = sios->module->input; sios->reader.stage = sios; sios->writer.func = sios->module->output; sios->writer.stage = sios; sios->reader.cross = NULL; sios->writer.cross = NULL; sios->reader.tag = "reader"; sios->writer.tag = "writer"; /* default rules */ sios->reader.rc_with_data = SIO_SCHED_DOWN; sios->reader.rc_no_data = SIO_SCHED_UP; sios->writer.rc_with_data = SIO_SCHED_UP; sios->writer.rc_no_data = SIO_SCHED_DOWN; rc = sios->module->init(sio, &sios->userdata); if (rc != SIO_OK) { free(sios); return SIO_RC(rc); } *siosp = sios; return SIO_RC(rc); } /* * pass parameters to the configure method of a stage */ sio_rc_t sio_configure_stage(sio_t *sio, sio_stage_t *sios, void *obj, void *value) { sio_rc_t rc; /* argument sanity check(s) */ if (sio == NULL || sios == NULL) return SIO_RC(SIO_ERR_ARG); rc = sios->module->configure(sio, sios->userdata, obj, value); return SIO_RC(rc); } /* * */ sio_rc_t sio_destroy_stage(sio_t *sio, sio_stage_t *sios) { sio_rc_t rc; /* argument sanity check(s) */ if (sio == NULL || sios == NULL) return SIO_RC(SIO_ERR_ARG); /* more sanity checking */ if (sios->rw != SIO_MODE_INVALID) return SIO_RC(SIO_ERR_ARG); rc = sios->module->cleanup(sio, sios->userdata); free(sios); return SIO_OK; } /* * allocate global assembly lines * * this is called before a module gets attached * * the first module attached as a reader allocates * the read assembly line * * the first module attached as a writer allocates * the write assembly line */ static sio_rc_t sio_create_al(sio_t *sio, sio_mode_t rw) { al_rc_t arc; int freereader = 0; if (rw == SIO_MODE_READ || rw == SIO_MODE_READWRITE) { if (ISEMPTY(&sio->readers,hd)) { arc = al_create(&sio->readers.al); if (arc != AL_OK) return SIO_ERR_INT; freereader = 1; } } if (rw == SIO_MODE_WRITE || rw == SIO_MODE_READWRITE) { if (ISEMPTY(&sio->writers,hd)) { arc = al_create(&sio->writers.al); if (arc != AL_OK) { if (freereader) al_destroy(sio->readers.al); return SIO_ERR_INT; } } } return SIO_OK; } /* * free global assembly lines * * this is called after a module has been detached * * if the detached module was a reader and there are no more * readers then drop read assembly line * * if the detached module was a writer and there are no more * writers then drop write assembly line * */ static sio_rc_t sio_destroy_al(sio_t *sio, sio_mode_t rw) { if (rw == SIO_MODE_READ || rw == SIO_MODE_READWRITE) { if (ISEMPTY(&sio->readers,hd)) { al_destroy(sio->readers.al); sio->readers.al = NULL; } } if (rw == SIO_MODE_WRITE || rw == SIO_MODE_READWRITE) { if (ISEMPTY(&sio->writers,hd)) { al_destroy(sio->writers.al); sio->writers.al = NULL; } } return SIO_OK; } /* * attach a stage to the read and/or write side of the pipe * * the stage is attached to the head of the pipe, you * have to create your pipes "backwards". * * prepare assembly lines * * stages that are reader and writer get a pointer to the * sibling side so that the scheduler can cross between * reading and writing * * when a stage is attached to either side its openr and * openw methods are called respectively * */ sio_rc_t sio_attach(sio_t *sio, sio_stage_t *sios, sio_mode_t rw) { sio_rc_t rc; int freereader = 0; /* argument sanity check(s) */ if (sio == NULL || sios == NULL) return SIO_RC(SIO_ERR_ARG); switch (rw) { case SIO_MODE_READ: case SIO_MODE_WRITE: case SIO_MODE_READWRITE: break; default: return SIO_RC(SIO_ERR_ARG); } /* is module already attached ? */ if (sios->rw != SIO_MODE_INVALID) return SIO_RC(SIO_ERR_ARG); /* create assembly lines (if aready existing) */ rc = sio_create_al(sio, rw); if (rc != SIO_OK) return SIO_RC(rc); if (rw == SIO_MODE_READ || rw == SIO_MODE_READWRITE) { rc = sios->module->openr(sio, sio->readers.al, sios->userdata); if (rc != SIO_OK) { sio_destroy_al(sio, rw); return SIO_RC(rc); } ADDHEAD(&sio->readers,hd,&sios->reader); freereader = 1; } if (rw == SIO_MODE_WRITE || rw == SIO_MODE_READWRITE) { rc = sios->module->openw(sio, sio->writers.al, sios->userdata); if (rc != SIO_OK) { if (freereader) { REMOVE(&sio->readers,hd,&sios->reader); sios->module->closer(sio, sio->readers.al, sios->userdata); } sio_destroy_al(sio, rw); return SIO_RC(rc); } ADDHEAD(&sio->writers,hd,&sios->writer); } if (rw == SIO_MODE_READWRITE) { sios->reader.cross = &sios->writer; sios->writer.cross = &sios->reader; } sios->reader.al = sio->readers.al; sios->writer.al = sio->writers.al; sios->rw = rw; return SIO_OK; } /* * detach a stage to the read and/or write side of the pipe * * when a stage is detached from either side its closer and * closew methods are called respectively * * drop assembly lines when possible * */ sio_rc_t sio_detach(sio_t *sio, sio_stage_t *sios) { sio_rc_t rc, rc2; /* argument sanity check(s) */ if (sio == NULL || sios == NULL) return SIO_RC(SIO_ERR_ARG); switch (sios->rw) { case SIO_MODE_READ: case SIO_MODE_WRITE: case SIO_MODE_READWRITE: break; default: return SIO_RC(SIO_ERR_ARG); } rc = SIO_OK; rc2 = SIO_OK; if (sios->module->shutdown != NULL && sios->module->shutdown(sio, sios->userdata) == SIO_OK) { if (sios->rw == SIO_MODE_WRITE || sios->rw == SIO_MODE_READWRITE) { rc = sio_strategy(sio, HEAD(&sio->writers,hd)); if (rc != SIO_OK) return SIO_RC(rc); } if (sios->rw == SIO_MODE_READ || sios->rw == SIO_MODE_READWRITE) { sio_strategy(sio, HEAD(&sio->readers,hd)); if (rc != SIO_OK) return SIO_RC(rc); } } if (sios->rw == SIO_MODE_WRITE || sios->rw == SIO_MODE_READWRITE) { REMOVE(&sio->writers,hd,&sios->writer); rc = sios->module->closew(sio, sio->writers.al, sios->userdata); } if (sios->rw == SIO_MODE_READ || sios->rw == SIO_MODE_READWRITE) { REMOVE(&sio->readers,hd,&sios->reader); rc2 = sios->module->closer(sio, sio->readers.al, sios->userdata); } /* XXX double error handling ? */ if (rc == SIO_OK) rc = rc2; if (sios->rw == SIO_MODE_READWRITE) { sios->reader.cross = NULL; sios->writer.cross = NULL; } sios->writer.al = NULL; sios->reader.al = NULL; sio_destroy_al(sio, sios->rw); sios->rw = SIO_MODE_INVALID; return SIO_RC(rc); } /* * retrieve data from the input side * * if there is no data in the reader assembly line * then schedule the input side of the pipe once * if this still doesn't retrieve data then raise * a SIO_ERR_EOF error. * * retrieve data from the reader assembly line up to * the specified byte limit for the first span of * the specified label or any data if label == NULL * */ sio_rc_t sio_input(sio_t *sio, al_t *al, size_t limit, al_label_t label) { sio_rc_t rc; sio_halfduplex_t *h; al_t *src; size_t n; size_t datastart, datasize; /* argument sanity check(s) */ if (sio == NULL || al == NULL) return SIO_RC(SIO_ERR_ARG); h = HEAD(&sio->readers,hd); if (h == NULL) return SIO_RC(SIO_ERR_ARG); src = h->al; n = al_bytes(src); if (n == 0) { rc = sio_strategy(sio, h); if (rc != SIO_OK) return SIO_RC(rc); n = al_bytes(src); if (n == 0) return SIO_RC(SIO_ERR_EOF); } /* * clamp to requested size */ if (n > limit) n = limit; while (n > 0) { if (label == NULL) { datastart = 0; datasize = n; } else if (al_spanlabel(src, 0, n, label, &datastart, &datasize) != AL_OK) break; /* * clamp to requested size */ if (datasize > n) datasize = n; /* XXX - error handling ? */ (void) al_splice(src, datastart, datasize, NULL, al); n -= datasize; } return SIO_OK; } /* * pass data to the output side * * append data to the writer assembly line * * schedule the output side of the pipe * */ sio_rc_t sio_output(sio_t *sio, al_t *al) { sio_rc_t rc; al_rc_t arc; sio_halfduplex_t *h; al_t *dst; size_t n; /* argument sanity check(s) */ if (sio == NULL || al == NULL) return SIO_RC(SIO_ERR_ARG); h = HEAD(&sio->writers,hd); if (h == NULL) return SIO_RC(SIO_ERR_ARG); dst = h->al; n = al_bytes(dst); arc = al_splice(dst, n, 0, al, NULL); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); rc = sio_strategy(sio, h); return SIO_RC(rc); } /* * schedule the output side of the pipe and * signal to flush data buffers * * current the signalling is done by sending * an EOF data chunk. Convention for all * buffering modules is to flush data buffers * on label boundaries. * */ sio_rc_t sio_push(sio_t *sio) { sio_rc_t rc; al_rc_t arc; sio_halfduplex_t *h; al_t *dst; char eof = '\0'; /* argument sanity check(s) */ if (sio == NULL) return SIO_RC(SIO_ERR_ARG); h = HEAD(&sio->writers,hd); if (h == NULL) return SIO_RC(SIO_ERR_ARG); dst = h->al; arc = al_append_bytes(dst, &eof, 1, SIO_LABEL_EOF(sio)); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); rc = sio_strategy(sio, h); return SIO_RC(rc); } /* * retrieve data from the pipe into a buffer much like read() * * handles error and eof signals * */ sio_rc_t sio_read(sio_t *sio, char *dst, size_t n, size_t *actualp) { al_rc_t arc; sio_rc_t rc; al_t *al; *actualp = 0; if (n == 0) return SIO_OK; arc = al_create(&al); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); rc = sio_input(sio, al, n, SIO_LABEL_DATA(sio)); if (rc == SIO_OK) { if (al_bytes(al) > 0) al_flatten(al, 0, n, AL_FORWARD, NULL, dst, actualp); else { rc = sio_input(sio, al, n, SIO_LABEL_ERROR(sio)); if (rc == SIO_OK) { if (al_bytes(al) > 0) sio->error_flag = 1; else { rc = sio_input(sio, al, n, SIO_LABEL_EOF(sio)); if (rc == SIO_OK) { if (al_bytes(al) > 0) sio->eof_flag = 1; } } } } } arc = al_destroy(al); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); return SIO_RC(rc); } /* * send data to the pipe from a buffer much like write() */ sio_rc_t sio_write(sio_t *sio, char *src, size_t n, size_t *actualp) { al_rc_t arc; sio_rc_t rc; al_t *al; if (n == 0) return SIO_OK; arc = al_create(&al); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); arc = al_append_bytes(al, src, n, SIO_LABEL_DATA(sio)); if (arc != AL_OK) rc = SIO_ERR_INT; else rc = sio_output(sio, al); *actualp = n - al_bytes(al); arc = al_destroy(al); if (arc != AL_OK) return SIO_RC(SIO_ERR_INT); return SIO_RC(rc); } /* * query a SIO flag * * currently this is SIO_FLAG_ERROR and SIO_FLAG_EOF * which are set by sio_read() * */ sio_rc_t sio_flag(sio_t *sio, sio_flag_t fl) { int rc; switch (fl) { case SIO_FLAG_ERROR: rc = sio->error_flag; break; case SIO_FLAG_EOF: rc = sio->eof_flag; break; default: rc = 0; break; } return rc ? SIO_TRUE : SIO_FALSE; } /* * query and clear a SIO flag */ sio_rc_t sio_clearflag(sio_t *sio, sio_flag_t fl) { int rc; switch (fl) { case SIO_FLAG_ERROR: rc = sio->error_flag; sio->error_flag = 0; break; case SIO_FLAG_EOF: rc = sio->eof_flag; sio->eof_flag = 0; break; default: rc = 0; break; } return rc ? SIO_TRUE : SIO_FALSE; } const char *sio_error(sio_rc_t rc) { const char *mess; switch (rc) { case SIO_OK: mess = "Everything Ok"; break; case SIO_ERR_ARG: mess = "Invalid Argument"; break; case SIO_ERR_MEM: mess = "Not Enough Memory"; break; case SIO_ERR_EOF: mess = "End Of Data"; break; case SIO_ERR_SYS: mess = "Operating System Error"; break; case SIO_ERR_INT: mess = "Internal Error"; break; case SIO_SCHED_UP: mess = "Invoke Upstream Stage"; break; case SIO_SCHED_DOWN: mess = "Invoke Downstream Stage"; break; case SIO_SCHED_CROSS: mess = "Invoke Crossstream Stage"; break; case SIO_SCHED_LOOP: mess = "Loop through current Stage"; break; default: mess = "Invalid Result Code"; break; } return mess; } /* * stages need to share the labels that distinguish between * data and signals on the pipe * * this function returns OSSP al labels for SIO specific * label numbers defined in sio.h * */ sio_rc_t sio_label(sio_t *sio, sio_labelnum_t ln, al_label_t *p) { void *label; switch (ln) { case SIO_LN_DATA: label = SIO_LABEL_DATA(sio); break; case SIO_LN_ERROR: label = SIO_LABEL_ERROR(sio); break; case SIO_LN_EOF: label = SIO_LABEL_EOF(sio); break; default: return SIO_ERR_ARG; } *p = label; return SIO_OK; }