diff options
Diffstat (limited to 'libpipe/pipe.c')
-rw-r--r-- | libpipe/pipe.c | 190 |
1 files changed, 168 insertions, 22 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c index 17f9cda7..358a27ee 100644 --- a/libpipe/pipe.c +++ b/libpipe/pipe.c @@ -24,6 +24,8 @@ #include <mach/time_value.h> #include <mach/mach_host.h> +#include <hurd/hurd_types.h> + #include "pipe.h" static inline void @@ -31,9 +33,14 @@ timestamp (time_value_t *stamp) { host_get_time (mach_host_self (), stamp); } + +/* Hold this lock before attempting to lock multiple pipes. */ +struct mutex pipe_multiple_lock = MUTEX_INITIALIZER; /* ---------------------------------------------------------------- */ +#define pipe_is_connless(p) ((p)->class->flags & PIPE_CLASS_CONNECTIONLESS) + /* Creates a new pipe of class CLASS and returns it in RESULT. */ error_t pipe_create (struct pipe_class *class, struct pipe **pipe) @@ -47,16 +54,23 @@ pipe_create (struct pipe_class *class, struct pipe **pipe) new->writers = 0; new->flags = 0; new->class = class; + new->write_limit = 16*1024; + new->write_atomic = 16*1024; bzero (&new->read_time, sizeof (new->read_time)); bzero (&new->write_time, sizeof (new->write_time)); condition_init (&new->pending_reads); - condition_init (&new->pending_selects); + condition_init (&new->pending_read_selects); + condition_init (&new->pending_writes); + condition_init (&new->pending_write_selects); mutex_init (&new->lock); pq_create (&new->queue); + if (! pipe_is_connless (new)) + new->flags |= PIPE_BROKEN; + *pipe = new; return 0; } @@ -69,21 +83,17 @@ pipe_free (struct pipe *pipe) free (pipe); } -/* Wake up all threads waiting on PIPE, which should be locked. */ -inline void -pipe_kick (struct pipe *pipe) +/* Take any actions necessary when PIPE acquires its first writer. */ +void _pipe_first_writer (struct pipe *pipe) { - /* Now wake them all up for the bad news... */ - condition_broadcast (&pipe->pending_reads); - mutex_unlock (&pipe->lock); - condition_broadcast (&pipe->pending_selects); - mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */ + if (pipe->readers > 0) + pipe->flags &= ~PIPE_BROKEN; } -/* Take any actions necessary when PIPE acquires its first writer. */ -void _pipe_first_writer (struct pipe *pipe) +/* Take any actions necessary when PIPE acquires its first reader. */ +void _pipe_first_reader (struct pipe *pipe) { - if (! (pipe->class->flags & PIPE_CLASS_CONNECTIONLESS)) + if (pipe->writers > 0) pipe->flags &= ~PIPE_BROKEN; } @@ -94,7 +104,16 @@ void _pipe_no_readers (struct pipe *pipe) if (pipe->writers == 0) pipe_free (pipe); else - mutex_unlock (&pipe->lock); + { + if (! pipe_is_connless (pipe)) + { + pipe->flags |= PIPE_BROKEN; + if (pipe->readers) + /* Wake up writers for the bad news... */ + condition_broadcast (&pipe->pending_writes); + } + mutex_unlock (&pipe->lock); + } } /* Take any actions necessary when PIPE's last writer has gone away. PIPE @@ -105,23 +124,124 @@ void _pipe_no_writers (struct pipe *pipe) pipe_free (pipe); else { - if (! (pipe->class->flags & PIPE_CLASS_CONNECTIONLESS)) + if (! pipe_is_connless (pipe)) { pipe->flags |= PIPE_BROKEN; if (pipe->readers) - /* Wake up readers who might want to know about our new state. */ - pipe_kick (pipe); + /* Wake up readers for the bad news... */ + { + condition_broadcast (&pipe->pending_reads); + condition_broadcast (&pipe->pending_read_selects); + } } mutex_unlock (&pipe->lock); } } +/* Return when either RPIPE is available for reading (if SELECT_READ is set + in *SELECT_TYPE), or WPIPE is available for writing (if select_write is + set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both) + is now available. DATA_ONLY should be true if only data packets should be + waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling + this function (unlike most pipe functions). */ +error_t +pipe_pair_select_read_write (struct pipe *rpipe, struct pipe *wpipe, + int *select_type, int data_only) +{ + error_t err = 0; + + *select_type &= SELECT_READ | SELECT_WRITE; + + if (*select_type == SELECT_READ) + { + mutex_lock (&rpipe->lock); + err = pipe_select_read (rpipe, data_only); + mutex_unlock (&rpipe->lock); + } + else if (*select_type == SELECT_WRITE) + { + mutex_lock (&wpipe->lock); + err = pipe_select_write (wpipe); + mutex_unlock (&wpipe->lock); + } + else + /* ugh */ + { + int rpipe_blocked, wpipe_blocked; + struct condition pending_read_write_select; + size_t wlimit = wpipe->write_limit; + struct mutex *lock = + (wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock); + + condition_init (&pending_read_write_select); + condition_implies (&rpipe->pending_read_selects, + &pending_read_write_select); + condition_implies (&wpipe->pending_write_selects, + &pending_read_write_select); + + mutex_lock (lock); + if (rpipe != wpipe) + { + mutex_lock (&rpipe->lock); + mutex_lock (&wpipe->lock); + } + + rpipe_blocked = + ! ((rpipe->flags & PIPE_BROKEN) || pipe_is_readable (rpipe, data_only)); + wpipe_blocked = + ! ((wpipe->flags & PIPE_BROKEN) || pipe_readable (wpipe, 1) < wlimit); + while (!err && rpipe_blocked && wpipe_blocked) + { + if (rpipe != wpipe) + { + mutex_unlock (&rpipe->lock); + mutex_unlock (&wpipe->lock); + } + if (hurd_condition_wait (&pending_read_write_select, lock)) + err = EINTR; + if (rpipe != wpipe) + { + mutex_lock (&rpipe->lock); + mutex_lock (&wpipe->lock); + } + rpipe_blocked = + ! ((rpipe->flags & PIPE_BROKEN) + || pipe_is_readable (rpipe, data_only)); + wpipe_blocked = + ! ((wpipe->flags & PIPE_BROKEN) + || pipe_readable (wpipe, 1) < wlimit); + } + + if (!err) + { + if (rpipe_blocked) + *select_type &= ~SELECT_READ; + if (wpipe_blocked) + *select_type &= ~SELECT_WRITE; + } + + if (rpipe != wpipe) + { + mutex_unlock (&rpipe->lock); + mutex_unlock (&wpipe->lock); + } + mutex_unlock (lock); + + condition_unimplies (&rpipe->pending_read_selects, + &pending_read_write_select); + condition_unimplies (&wpipe->pending_write_selects, + &pending_read_write_select); + } + + return err; +} + /* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and returns the amount written in AMOUNT. If present, the information in CONTROL & PORTS is written in a preceding control packet. If an error is returned, nothing is done. */ error_t -pipe_send (struct pipe *pipe, void *source, +pipe_send (struct pipe *pipe, int noblock, void *source, char *data, size_t data_len, char *control, size_t control_len, mach_port_t *ports, size_t num_ports, @@ -129,8 +249,19 @@ pipe_send (struct pipe *pipe, void *source, { error_t err = 0; - if (pipe->flags & PIPE_BROKEN) - return EPIPE; + err = pipe_wait_writable (pipe, noblock); + if (err) + return err; + + if (noblock) + { + size_t left = pipe->write_limit - pipe_readable (pipe, 1); + if (left > data_len) + if (data_len <= pipe->write_atomic) + return EWOULDBLOCK; + else + data_len = left; + } if (control_len > 0 || num_ports > 0) /* Write a control packet. */ @@ -169,7 +300,7 @@ pipe_send (struct pipe *pipe, void *source, /* Only wakeup selects if there's still data available. */ if (pipe_is_readable (pipe, 0)) { - condition_broadcast (&pipe->pending_selects); + condition_broadcast (&pipe->pending_read_selects); /* We leave PIPE locked here, assuming the caller will soon unlock it and allow others access. */ } @@ -207,7 +338,7 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source, /* True if the user isn't asking for any `control' data. */ int data_only = (control == NULL && ports == NULL); - err = pipe_wait (pipe, noblock, data_only); + err = pipe_wait_readable (pipe, noblock, data_only); if (err) return err; @@ -256,7 +387,22 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source, *data_len = 0; if (!err && packet) - timestamp (&pipe->read_time); + { + timestamp (&pipe->read_time); + + /* And wakeup anyone that might be interested in it. */ + condition_broadcast (&pipe->pending_writes); + mutex_unlock (&pipe->lock); + + mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */ + /* Only wakeup selects if there's still writing space available. */ + if (pipe_readable (pipe, 1) < pipe->write_limit) + { + condition_broadcast (&pipe->pending_write_selects); + /* We leave PIPE locked here, assuming the caller will soon unlock + it and allow others access. */ + } + } return err; } |