summaryrefslogtreecommitdiff
path: root/libpipe/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'libpipe/pipe.c')
-rw-r--r--libpipe/pipe.c190
1 files changed, 168 insertions, 22 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
index 17f9cda7..358a27ee 100644
--- a/libpipe/pipe.c
+++ b/libpipe/pipe.c
@@ -24,6 +24,8 @@
#include <mach/time_value.h>
#include <mach/mach_host.h>
+#include <hurd/hurd_types.h>
+
#include "pipe.h"
static inline void
@@ -31,9 +33,14 @@ timestamp (time_value_t *stamp)
{
host_get_time (mach_host_self (), stamp);
}
+
+/* Hold this lock before attempting to lock multiple pipes. */
+struct mutex pipe_multiple_lock = MUTEX_INITIALIZER;
/* ---------------------------------------------------------------- */
+#define pipe_is_connless(p) ((p)->class->flags & PIPE_CLASS_CONNECTIONLESS)
+
/* Creates a new pipe of class CLASS and returns it in RESULT. */
error_t
pipe_create (struct pipe_class *class, struct pipe **pipe)
@@ -47,16 +54,23 @@ pipe_create (struct pipe_class *class, struct pipe **pipe)
new->writers = 0;
new->flags = 0;
new->class = class;
+ new->write_limit = 16*1024;
+ new->write_atomic = 16*1024;
bzero (&new->read_time, sizeof (new->read_time));
bzero (&new->write_time, sizeof (new->write_time));
condition_init (&new->pending_reads);
- condition_init (&new->pending_selects);
+ condition_init (&new->pending_read_selects);
+ condition_init (&new->pending_writes);
+ condition_init (&new->pending_write_selects);
mutex_init (&new->lock);
pq_create (&new->queue);
+ if (! pipe_is_connless (new))
+ new->flags |= PIPE_BROKEN;
+
*pipe = new;
return 0;
}
@@ -69,21 +83,17 @@ pipe_free (struct pipe *pipe)
free (pipe);
}
-/* Wake up all threads waiting on PIPE, which should be locked. */
-inline void
-pipe_kick (struct pipe *pipe)
+/* Take any actions necessary when PIPE acquires its first writer. */
+void _pipe_first_writer (struct pipe *pipe)
{
- /* Now wake them all up for the bad news... */
- condition_broadcast (&pipe->pending_reads);
- mutex_unlock (&pipe->lock);
- condition_broadcast (&pipe->pending_selects);
- mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */
+ if (pipe->readers > 0)
+ pipe->flags &= ~PIPE_BROKEN;
}
-/* Take any actions necessary when PIPE acquires its first writer. */
-void _pipe_first_writer (struct pipe *pipe)
+/* Take any actions necessary when PIPE acquires its first reader. */
+void _pipe_first_reader (struct pipe *pipe)
{
- if (! (pipe->class->flags & PIPE_CLASS_CONNECTIONLESS))
+ if (pipe->writers > 0)
pipe->flags &= ~PIPE_BROKEN;
}
@@ -94,7 +104,16 @@ void _pipe_no_readers (struct pipe *pipe)
if (pipe->writers == 0)
pipe_free (pipe);
else
- mutex_unlock (&pipe->lock);
+ {
+ if (! pipe_is_connless (pipe))
+ {
+ pipe->flags |= PIPE_BROKEN;
+ if (pipe->readers)
+ /* Wake up writers for the bad news... */
+ condition_broadcast (&pipe->pending_writes);
+ }
+ mutex_unlock (&pipe->lock);
+ }
}
/* Take any actions necessary when PIPE's last writer has gone away. PIPE
@@ -105,23 +124,124 @@ void _pipe_no_writers (struct pipe *pipe)
pipe_free (pipe);
else
{
- if (! (pipe->class->flags & PIPE_CLASS_CONNECTIONLESS))
+ if (! pipe_is_connless (pipe))
{
pipe->flags |= PIPE_BROKEN;
if (pipe->readers)
- /* Wake up readers who might want to know about our new state. */
- pipe_kick (pipe);
+ /* Wake up readers for the bad news... */
+ {
+ condition_broadcast (&pipe->pending_reads);
+ condition_broadcast (&pipe->pending_read_selects);
+ }
}
mutex_unlock (&pipe->lock);
}
}
+/* Return when either RPIPE is available for reading (if SELECT_READ is set
+ in *SELECT_TYPE), or WPIPE is available for writing (if select_write is
+ set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both)
+ is now available. DATA_ONLY should be true if only data packets should be
+ waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling
+ this function (unlike most pipe functions). */
+error_t
+pipe_pair_select_read_write (struct pipe *rpipe, struct pipe *wpipe,
+ int *select_type, int data_only)
+{
+ error_t err = 0;
+
+ *select_type &= SELECT_READ | SELECT_WRITE;
+
+ if (*select_type == SELECT_READ)
+ {
+ mutex_lock (&rpipe->lock);
+ err = pipe_select_read (rpipe, data_only);
+ mutex_unlock (&rpipe->lock);
+ }
+ else if (*select_type == SELECT_WRITE)
+ {
+ mutex_lock (&wpipe->lock);
+ err = pipe_select_write (wpipe);
+ mutex_unlock (&wpipe->lock);
+ }
+ else
+ /* ugh */
+ {
+ int rpipe_blocked, wpipe_blocked;
+ struct condition pending_read_write_select;
+ size_t wlimit = wpipe->write_limit;
+ struct mutex *lock =
+ (wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock);
+
+ condition_init (&pending_read_write_select);
+ condition_implies (&rpipe->pending_read_selects,
+ &pending_read_write_select);
+ condition_implies (&wpipe->pending_write_selects,
+ &pending_read_write_select);
+
+ mutex_lock (lock);
+ if (rpipe != wpipe)
+ {
+ mutex_lock (&rpipe->lock);
+ mutex_lock (&wpipe->lock);
+ }
+
+ rpipe_blocked =
+ ! ((rpipe->flags & PIPE_BROKEN) || pipe_is_readable (rpipe, data_only));
+ wpipe_blocked =
+ ! ((wpipe->flags & PIPE_BROKEN) || pipe_readable (wpipe, 1) < wlimit);
+ while (!err && rpipe_blocked && wpipe_blocked)
+ {
+ if (rpipe != wpipe)
+ {
+ mutex_unlock (&rpipe->lock);
+ mutex_unlock (&wpipe->lock);
+ }
+ if (hurd_condition_wait (&pending_read_write_select, lock))
+ err = EINTR;
+ if (rpipe != wpipe)
+ {
+ mutex_lock (&rpipe->lock);
+ mutex_lock (&wpipe->lock);
+ }
+ rpipe_blocked =
+ ! ((rpipe->flags & PIPE_BROKEN)
+ || pipe_is_readable (rpipe, data_only));
+ wpipe_blocked =
+ ! ((wpipe->flags & PIPE_BROKEN)
+ || pipe_readable (wpipe, 1) < wlimit);
+ }
+
+ if (!err)
+ {
+ if (rpipe_blocked)
+ *select_type &= ~SELECT_READ;
+ if (wpipe_blocked)
+ *select_type &= ~SELECT_WRITE;
+ }
+
+ if (rpipe != wpipe)
+ {
+ mutex_unlock (&rpipe->lock);
+ mutex_unlock (&wpipe->lock);
+ }
+ mutex_unlock (lock);
+
+ condition_unimplies (&rpipe->pending_read_selects,
+ &pending_read_write_select);
+ condition_unimplies (&wpipe->pending_write_selects,
+ &pending_read_write_select);
+ }
+
+ return err;
+}
+
/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and
returns the amount written in AMOUNT. If present, the information in
CONTROL & PORTS is written in a preceding control packet. If an error is
returned, nothing is done. */
error_t
-pipe_send (struct pipe *pipe, void *source,
+pipe_send (struct pipe *pipe, int noblock, void *source,
char *data, size_t data_len,
char *control, size_t control_len,
mach_port_t *ports, size_t num_ports,
@@ -129,8 +249,19 @@ pipe_send (struct pipe *pipe, void *source,
{
error_t err = 0;
- if (pipe->flags & PIPE_BROKEN)
- return EPIPE;
+ err = pipe_wait_writable (pipe, noblock);
+ if (err)
+ return err;
+
+ if (noblock)
+ {
+ size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+ if (left > data_len)
+ if (data_len <= pipe->write_atomic)
+ return EWOULDBLOCK;
+ else
+ data_len = left;
+ }
if (control_len > 0 || num_ports > 0)
/* Write a control packet. */
@@ -169,7 +300,7 @@ pipe_send (struct pipe *pipe, void *source,
/* Only wakeup selects if there's still data available. */
if (pipe_is_readable (pipe, 0))
{
- condition_broadcast (&pipe->pending_selects);
+ condition_broadcast (&pipe->pending_read_selects);
/* We leave PIPE locked here, assuming the caller will soon unlock
it and allow others access. */
}
@@ -207,7 +338,7 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source,
/* True if the user isn't asking for any `control' data. */
int data_only = (control == NULL && ports == NULL);
- err = pipe_wait (pipe, noblock, data_only);
+ err = pipe_wait_readable (pipe, noblock, data_only);
if (err)
return err;
@@ -256,7 +387,22 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source,
*data_len = 0;
if (!err && packet)
- timestamp (&pipe->read_time);
+ {
+ timestamp (&pipe->read_time);
+
+ /* And wakeup anyone that might be interested in it. */
+ condition_broadcast (&pipe->pending_writes);
+ mutex_unlock (&pipe->lock);
+
+ mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */
+ /* Only wakeup selects if there's still writing space available. */
+ if (pipe_readable (pipe, 1) < pipe->write_limit)
+ {
+ condition_broadcast (&pipe->pending_write_selects);
+ /* We leave PIPE locked here, assuming the caller will soon unlock
+ it and allow others access. */
+ }
+ }
return err;
}