summaryrefslogtreecommitdiff
path: root/libpipe
diff options
context:
space:
mode:
Diffstat (limited to 'libpipe')
-rw-r--r--libpipe/pipe.c89
-rw-r--r--libpipe/pipe.h9
2 files changed, 84 insertions, 14 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
index 914816bc..85aac0e6 100644
--- a/libpipe/pipe.c
+++ b/libpipe/pipe.c
@@ -64,6 +64,7 @@ pipe_create (struct pipe_class *class, struct pipe **pipe)
condition_init (&new->pending_read_selects);
condition_init (&new->pending_writes);
condition_init (&new->pending_write_selects);
+ new->pending_selects = NULL;
mutex_init (&new->lock);
pq_create (&new->queue);
@@ -83,6 +84,63 @@ pipe_free (struct pipe *pipe)
free (pipe);
}
+static void
+pipe_add_select_cond (struct pipe *pipe, struct pipe_select_cond *cond)
+{
+ struct pipe_select_cond *first, *last;
+
+ first = pipe->pending_selects;
+
+ if (first == NULL)
+ {
+ cond->next = cond;
+ cond->prev = cond;
+ pipe->pending_selects = cond;
+ return;
+ }
+
+ last = first->prev;
+ cond->next = first;
+ cond->prev = last;
+ first->prev = cond;
+ last->next = cond;
+}
+
+static void
+pipe_remove_select_cond (struct pipe *pipe, struct pipe_select_cond *cond)
+{
+ cond->prev->next = cond->next;
+ cond->next->prev = cond->prev;
+
+ if (pipe->pending_selects == cond)
+ {
+ if (cond->next == cond)
+ pipe->pending_selects = NULL;
+ else
+ pipe->pending_selects = cond->next;
+ }
+}
+
+static void
+pipe_select_cond_broadcast (struct pipe *pipe)
+{
+ struct pipe_select_cond *cond, *last;
+
+ cond = pipe->pending_selects;
+
+ if (cond == NULL)
+ return;
+
+ last = cond->prev;
+
+ do
+ {
+ condition_broadcast (&cond->cond);
+ cond = cond->next;
+ }
+ while (cond != last);
+}
+
/* Take any actions necessary when PIPE acquires its first writer. */
void _pipe_first_writer (struct pipe *pipe)
{
@@ -113,6 +171,7 @@ void _pipe_no_readers (struct pipe *pipe)
{
condition_broadcast (&pipe->pending_writes);
condition_broadcast (&pipe->pending_write_selects);
+ pipe_select_cond_broadcast (pipe);
}
}
mutex_unlock (&pipe->lock);
@@ -135,6 +194,7 @@ void _pipe_no_writers (struct pipe *pipe)
{
condition_broadcast (&pipe->pending_reads);
condition_broadcast (&pipe->pending_read_selects);
+ pipe_select_cond_broadcast (pipe);
}
}
mutex_unlock (&pipe->lock);
@@ -171,22 +231,22 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
/* ugh */
{
int rpipe_blocked, wpipe_blocked;
- struct condition pending_read_write_select;
+ struct pipe_select_cond pending_select;
size_t wlimit = wpipe->write_limit;
struct mutex *lock =
(wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock);
- condition_init (&pending_read_write_select);
- condition_implies (&rpipe->pending_read_selects,
- &pending_read_write_select);
- condition_implies (&wpipe->pending_write_selects,
- &pending_read_write_select);
+ condition_init (&pending_select.cond);
mutex_lock (lock);
- if (rpipe != wpipe)
+ if (rpipe == wpipe)
+ pipe_add_select_cond (rpipe, &pending_select);
+ else
{
mutex_lock (&rpipe->lock);
mutex_lock (&wpipe->lock);
+ pipe_add_select_cond (rpipe, &pending_select);
+ pipe_add_select_cond (wpipe, &pending_select);
}
rpipe_blocked =
@@ -200,7 +260,7 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
mutex_unlock (&rpipe->lock);
mutex_unlock (&wpipe->lock);
}
- if (hurd_condition_wait (&pending_read_write_select, lock))
+ if (hurd_condition_wait (&pending_select.cond, lock))
err = EINTR;
if (rpipe != wpipe)
{
@@ -223,17 +283,16 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
*select_type &= ~SELECT_WRITE;
}
- if (rpipe != wpipe)
+ if (rpipe == wpipe)
+ pipe_remove_select_cond (rpipe, &pending_select);
+ else
{
+ pipe_remove_select_cond (rpipe, &pending_select);
+ pipe_remove_select_cond (wpipe, &pending_select);
mutex_unlock (&rpipe->lock);
mutex_unlock (&wpipe->lock);
}
mutex_unlock (lock);
-
- condition_unimplies (&rpipe->pending_read_selects,
- &pending_read_write_select);
- condition_unimplies (&wpipe->pending_write_selects,
- &pending_read_write_select);
}
return err;
@@ -306,6 +365,7 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
if (pipe_is_readable (pipe, 0))
{
condition_broadcast (&pipe->pending_read_selects);
+ pipe_select_cond_broadcast (pipe);
/* We leave PIPE locked here, assuming the caller will soon unlock
it and allow others access. */
}
@@ -407,6 +467,7 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source,
if (pipe_readable (pipe, 1) < pipe->write_limit)
{
condition_broadcast (&pipe->pending_write_selects);
+ pipe_select_cond_broadcast (pipe);
/* We leave PIPE locked here, assuming the caller will soon unlock
it and allow others access. */
}
diff --git a/libpipe/pipe.h b/libpipe/pipe.h
index 96432990..a3590fc4 100644
--- a/libpipe/pipe.h
+++ b/libpipe/pipe.h
@@ -62,6 +62,13 @@ extern struct pipe_class *stream_pipe_class;
extern struct pipe_class *dgram_pipe_class;
extern struct pipe_class *seqpack_pipe_class;
+struct pipe_select_cond
+{
+ struct pipe_select_cond *next;
+ struct pipe_select_cond *prev;
+ struct condition cond;
+};
+
/* A unidirectional data pipe; it transfers data from READER to WRITER. */
struct pipe
{
@@ -88,6 +95,8 @@ struct pipe
struct condition pending_writes;
struct condition pending_write_selects;
+ struct pipe_select_cond *pending_selects;
+
/* The maximum number of characters that this pipe will hold without
further writes blocking. */
size_t write_limit;