diff options
Diffstat (limited to 'libpipe')
-rw-r--r-- | libpipe/pipe.c | 89 | ||||
-rw-r--r-- | libpipe/pipe.h | 9 |
2 files changed, 84 insertions, 14 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c index 914816bc..85aac0e6 100644 --- a/libpipe/pipe.c +++ b/libpipe/pipe.c @@ -64,6 +64,7 @@ pipe_create (struct pipe_class *class, struct pipe **pipe) condition_init (&new->pending_read_selects); condition_init (&new->pending_writes); condition_init (&new->pending_write_selects); + new->pending_selects = NULL; mutex_init (&new->lock); pq_create (&new->queue); @@ -83,6 +84,63 @@ pipe_free (struct pipe *pipe) free (pipe); } +static void +pipe_add_select_cond (struct pipe *pipe, struct pipe_select_cond *cond) +{ + struct pipe_select_cond *first, *last; + + first = pipe->pending_selects; + + if (first == NULL) + { + cond->next = cond; + cond->prev = cond; + pipe->pending_selects = cond; + return; + } + + last = first->prev; + cond->next = first; + cond->prev = last; + first->prev = cond; + last->next = cond; +} + +static void +pipe_remove_select_cond (struct pipe *pipe, struct pipe_select_cond *cond) +{ + cond->prev->next = cond->next; + cond->next->prev = cond->prev; + + if (pipe->pending_selects == cond) + { + if (cond->next == cond) + pipe->pending_selects = NULL; + else + pipe->pending_selects = cond->next; + } +} + +static void +pipe_select_cond_broadcast (struct pipe *pipe) +{ + struct pipe_select_cond *cond, *last; + + cond = pipe->pending_selects; + + if (cond == NULL) + return; + + last = cond->prev; + + do + { + condition_broadcast (&cond->cond); + cond = cond->next; + } + while (cond != last); +} + /* Take any actions necessary when PIPE acquires its first writer. */ void _pipe_first_writer (struct pipe *pipe) { @@ -113,6 +171,7 @@ void _pipe_no_readers (struct pipe *pipe) { condition_broadcast (&pipe->pending_writes); condition_broadcast (&pipe->pending_write_selects); + pipe_select_cond_broadcast (pipe); } } mutex_unlock (&pipe->lock); @@ -135,6 +194,7 @@ void _pipe_no_writers (struct pipe *pipe) { condition_broadcast (&pipe->pending_reads); condition_broadcast (&pipe->pending_read_selects); + pipe_select_cond_broadcast (pipe); } } mutex_unlock (&pipe->lock); @@ -171,22 +231,22 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe, /* ugh */ { int rpipe_blocked, wpipe_blocked; - struct condition pending_read_write_select; + struct pipe_select_cond pending_select; size_t wlimit = wpipe->write_limit; struct mutex *lock = (wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock); - condition_init (&pending_read_write_select); - condition_implies (&rpipe->pending_read_selects, - &pending_read_write_select); - condition_implies (&wpipe->pending_write_selects, - &pending_read_write_select); + condition_init (&pending_select.cond); mutex_lock (lock); - if (rpipe != wpipe) + if (rpipe == wpipe) + pipe_add_select_cond (rpipe, &pending_select); + else { mutex_lock (&rpipe->lock); mutex_lock (&wpipe->lock); + pipe_add_select_cond (rpipe, &pending_select); + pipe_add_select_cond (wpipe, &pending_select); } rpipe_blocked = @@ -200,7 +260,7 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe, mutex_unlock (&rpipe->lock); mutex_unlock (&wpipe->lock); } - if (hurd_condition_wait (&pending_read_write_select, lock)) + if (hurd_condition_wait (&pending_select.cond, lock)) err = EINTR; if (rpipe != wpipe) { @@ -223,17 +283,16 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe, *select_type &= ~SELECT_WRITE; } - if (rpipe != wpipe) + if (rpipe == wpipe) + pipe_remove_select_cond (rpipe, &pending_select); + else { + pipe_remove_select_cond (rpipe, &pending_select); + pipe_remove_select_cond (wpipe, &pending_select); mutex_unlock (&rpipe->lock); mutex_unlock (&wpipe->lock); } mutex_unlock (lock); - - condition_unimplies (&rpipe->pending_read_selects, - &pending_read_write_select); - condition_unimplies (&wpipe->pending_write_selects, - &pending_read_write_select); } return err; @@ -306,6 +365,7 @@ pipe_send (struct pipe *pipe, int noblock, void *source, if (pipe_is_readable (pipe, 0)) { condition_broadcast (&pipe->pending_read_selects); + pipe_select_cond_broadcast (pipe); /* We leave PIPE locked here, assuming the caller will soon unlock it and allow others access. */ } @@ -407,6 +467,7 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source, if (pipe_readable (pipe, 1) < pipe->write_limit) { condition_broadcast (&pipe->pending_write_selects); + pipe_select_cond_broadcast (pipe); /* We leave PIPE locked here, assuming the caller will soon unlock it and allow others access. */ } diff --git a/libpipe/pipe.h b/libpipe/pipe.h index 96432990..a3590fc4 100644 --- a/libpipe/pipe.h +++ b/libpipe/pipe.h @@ -62,6 +62,13 @@ extern struct pipe_class *stream_pipe_class; extern struct pipe_class *dgram_pipe_class; extern struct pipe_class *seqpack_pipe_class; +struct pipe_select_cond +{ + struct pipe_select_cond *next; + struct pipe_select_cond *prev; + struct condition cond; +}; + /* A unidirectional data pipe; it transfers data from READER to WRITER. */ struct pipe { @@ -88,6 +95,8 @@ struct pipe struct condition pending_writes; struct condition pending_write_selects; + struct pipe_select_cond *pending_selects; + /* The maximum number of characters that this pipe will hold without further writes blocking. */ size_t write_limit; |