--- sio_buffer.c 2002/10/23 17:05:10 1.1
+++ sio_buffer.c 2002/11/05 15:52:21 1.2
@@ -11,6 +11,7 @@
al_tx_t *outputtx;
size_t inputsize;
al_t *input, *output;
+ al_label_t data_label;
} private_t;
/*
@@ -21,16 +22,18 @@
static
sio_rc_t buffer_init(sio_t *sio, void **u)
{
- private_t *mydata;
+ private_t *my;
- mydata = (private_t *)malloc(sizeof(private_t));
- if (mydata == NULL)
+ my = (private_t *)malloc(sizeof(private_t));
+ if (my == NULL)
return SIO_ERR_MEM;
- mydata->inputsize = 0;
- mydata->outputsize = 0;
+ my->inputsize = 0;
+ my->outputsize = 0;
- *u = mydata;
+ sio_label(sio, SIO_LN_DATA, &my->data_label);
+
+ *u = my;
return SIO_OK;
}
@@ -43,13 +46,13 @@
static
sio_rc_t buffer_configure(sio_t *sio, void *u, void *obj, void *val)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
const char *name = (const char *)obj;
if (!strcmp(name, "inputsize")) {
- mydata->inputsize = *(size_t *)val;
+ my->inputsize = *(size_t *)val;
} else if (!strcmp(name, "outputsize")) {
- mydata->outputsize = *(size_t *)val;
+ my->outputsize = *(size_t *)val;
} else {
return SIO_ERR_ARG;
}
@@ -63,9 +66,9 @@
static
sio_rc_t buffer_cleanup(sio_t *sio, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
- free(mydata);
+ free(my);
return SIO_OK;
}
@@ -73,10 +76,10 @@
static
sio_rc_t buffer_openr(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
al_rc_t arc;
- arc = al_create(&mydata->input);
+ arc = al_create(&my->input);
if (arc != AL_OK)
return SIO_ERR_INT;
@@ -86,10 +89,10 @@
static
sio_rc_t buffer_closer(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
- al_destroy(mydata->input);
- mydata->input = NULL;
+ al_destroy(my->input);
+ my->input = NULL;
return SIO_OK;
}
@@ -97,16 +100,16 @@
static
sio_rc_t buffer_openw(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
al_rc_t arc;
- arc = al_txalloc(al, &mydata->outputtx);
+ arc = al_txalloc(al, &my->outputtx);
if (arc != AL_OK)
return SIO_ERR_INT;
- arc = al_create(&mydata->output);
+ arc = al_create(&my->output);
if (arc != AL_OK) {
- al_txfree(al, mydata->outputtx);
+ al_txfree(al, my->outputtx);
return SIO_ERR_INT;
}
@@ -116,18 +119,29 @@
static
sio_rc_t buffer_closew(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
- al_destroy(mydata->output);
- mydata->output = NULL;
- al_txfree(al, mydata->outputtx);
- mydata->outputtx = NULL;
+ al_destroy(my->output);
+ my->output = NULL;
+ al_txfree(al, my->outputtx);
+ my->outputtx = NULL;
return SIO_OK;
}
+/*
+ * buffer logic
+ *
+ * gather data from producer
+ * if buffer size reached or label changes then push data to consumer
+ * (read -> downstream, write -> upstream)
+ *
+ * buffer size depends on label type
+ *
+ */
static
-sio_rc_t buffer_inout(sio_t *sio, al_t *al, al_t *buf, size_t size,
+sio_rc_t buffer_inout(sio_t *sio, al_t *al, private_t *my,
+ al_t *buf, size_t size,
sio_rc_t up, sio_rc_t down)
{
size_t avail, data, needed;
@@ -148,7 +162,13 @@
al_flatten(buf, 0, avail, AL_FORWARD_SPAN, label, NULL, &data);
- needed = size;
+ /* non-data is not buffered */
+ if (label == my->data_label)
+ needed = size;
+ else
+ needed = 1;
+
+ /* flush if there is extra data (that must have a different label) */
if (data < avail)
needed = 1;
@@ -165,18 +185,20 @@
static
sio_rc_t buffer_input(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
- return buffer_inout(sio, al, mydata->input, mydata->inputsize,
+ return buffer_inout(sio, al, my,
+ my->input, my->inputsize,
SIO_DOWNSTREAM, SIO_UPSTREAM);
}
static
sio_rc_t buffer_output(sio_t *sio, al_t *al, void *u)
{
- private_t *mydata = (private_t *)u;
+ private_t *my = (private_t *)u;
- return buffer_inout(sio, al, mydata->output, mydata->outputsize,
+ return buffer_inout(sio, al, my,
+ my->output, my->outputsize,
SIO_UPSTREAM, SIO_DOWNSTREAM);
}
|