summaryrefslogtreecommitdiff
path: root/libpager
diff options
context:
space:
mode:
Diffstat (limited to 'libpager')
-rw-r--r--libpager/demuxer.c119
-rw-r--r--libpager/pager.h28
-rw-r--r--libpager/queue.h8
3 files changed, 137 insertions, 18 deletions
diff --git a/libpager/demuxer.c b/libpager/demuxer.c
index 4dd3cd87..59dd1c59 100644
--- a/libpager/demuxer.c
+++ b/libpager/demuxer.c
@@ -60,7 +60,7 @@ request_inp (const struct request *r)
/* A worker. */
struct worker
{
- struct requests *requests; /* our pagers request queue */
+ struct pager_requests *requests; /* our pagers request queue */
struct queue queue; /* other workers may delegate requests to us */
unsigned long tag; /* tag of the object we are working on */
};
@@ -68,12 +68,18 @@ struct worker
/* This is the queue for incoming requests. A single thread receives
messages from the port set, looks the service routine up, and
enqueues the request here. */
-struct requests
+struct pager_requests
{
struct port_bucket *bucket;
- struct queue queue;
+ /* Normally, both queues are the same. However, when the workers are
+ inhibited, a new queue_in is created, but queue_out is left as the
+ old value, so the workers drain queue_out but do not receive new
+ requests. */
+ struct queue *queue_in; /* the queue to add to */
+ struct queue *queue_out; /* the queue to take from */
int asleep;
pthread_cond_t wakeup;
+ pthread_cond_t inhibit_wakeup;
pthread_mutex_t lock;
struct worker workers[WORKER_COUNT];
};
@@ -81,7 +87,7 @@ struct requests
/* Demultiplex a single message directed at a pager port; INP is the
message received; fill OUTP with the reply. */
static int
-pager_demuxer (struct requests *requests,
+pager_demuxer (struct pager_requests *requests,
mach_msg_header_t *inp,
mach_msg_header_t *outp)
{
@@ -108,10 +114,10 @@ pager_demuxer (struct requests *requests,
pthread_mutex_lock (&requests->lock);
- queue_enqueue (&requests->queue, &r->item);
+ queue_enqueue (requests->queue_in, &r->item);
- /* Awake worker. */
- if (requests->asleep > 0)
+ /* Awake worker, but only if not inhibited. */
+ if (requests->asleep > 0 && requests->queue_in == requests->queue_out)
pthread_cond_signal (&requests->wakeup);
pthread_mutex_unlock (&requests->lock);
@@ -160,7 +166,7 @@ static void *
worker_func (void *arg)
{
struct worker *self = (struct worker *) arg;
- struct requests *requests = self->requests;
+ struct pager_requests *requests = self->requests;
struct request *r = NULL;
mig_reply_header_t reply_msg;
@@ -186,9 +192,11 @@ worker_func (void *arg)
get_request_locked:
/* ... get a request from the global queue instead. */
- while ((r = queue_dequeue (&requests->queue)) == NULL)
+ while ((r = queue_dequeue (requests->queue_out)) == NULL)
{
requests->asleep += 1;
+ if (requests->asleep == WORKER_COUNT)
+ pthread_cond_broadcast (&requests->inhibit_wakeup);
pthread_cond_wait (&requests->wakeup, &requests->lock);
requests->asleep -= 1;
}
@@ -281,7 +289,7 @@ worker_func (void *arg)
static void *
service_paging_requests (void *arg)
{
- struct requests *requests = arg;
+ struct pager_requests *requests = arg;
int demuxer (mach_msg_header_t *inp,
mach_msg_header_t *outp)
@@ -298,27 +306,44 @@ service_paging_requests (void *arg)
/* Start the worker threads libpager uses to service requests. */
error_t
-pager_start_workers (struct port_bucket *pager_bucket)
+pager_start_workers (struct port_bucket *pager_bucket,
+ struct pager_requests **out_requests)
{
error_t err;
int i;
pthread_t t;
- struct requests *requests;
+ struct pager_requests *requests;
+
+ assert (out_requests != NULL);
requests = malloc (sizeof *requests);
if (requests == NULL)
- return ENOMEM;
+ {
+ err = ENOMEM;
+ goto done;
+ }
requests->bucket = pager_bucket;
requests->asleep = 0;
- queue_init (&requests->queue);
+
+ requests->queue_in = malloc (sizeof *requests->queue_in);
+ if (requests->queue_in == NULL)
+ {
+ err = ENOMEM;
+ goto done;
+ }
+ queue_init (requests->queue_in);
+ /* Until the workers are inhibited, both queues are the same. */
+ requests->queue_out = requests->queue_in;
+
pthread_cond_init (&requests->wakeup, NULL);
+ pthread_cond_init (&requests->inhibit_wakeup, NULL);
pthread_mutex_init (&requests->lock, NULL);
/* Make a thread to service paging requests. */
err = pthread_create (&t, NULL, service_paging_requests, requests);
if (err)
- return err;
+ goto done;
pthread_detach (t);
for (i = 0; i < WORKER_COUNT; i++)
@@ -329,9 +354,71 @@ pager_start_workers (struct port_bucket *pager_bucket)
err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]);
if (err)
- return err;
+ goto done;
pthread_detach (t);
}
+done:
+ if (err)
+ *out_requests = NULL;
+ else
+ *out_requests = requests;
+
return err;
}
+
+error_t
+pager_inhibit_workers (struct pager_requests *requests)
+{
+ error_t err = 0;
+
+ pthread_mutex_lock (&requests->lock);
+
+ /* Check the workers are not already inhibited. */
+ assert (requests->queue_out == requests->queue_in);
+
+ /* Any new paging requests will go into a new queue. */
+ struct queue *new_queue = malloc (sizeof *new_queue);
+ if (new_queue == NULL)
+ {
+ err = ENOMEM;
+ goto done_locked;
+ }
+ queue_init (new_queue);
+ requests->queue_in = new_queue;
+
+ /* Wait until all the workers are asleep and the queue has been
+ drained. All individual worker queues must have been drained, as
+ they are populated while the relevant worker is still running, and
+ it will always drain its personal queue before sleeping.
+ Check that the queue is empty, since it's possible that a request
+ came in, was queued and a worker was signalled but the lock was
+ acquired here before the worker woke up. */
+ while (requests->asleep < WORKER_COUNT || !queue_empty(requests->queue_out))
+ pthread_cond_wait (&requests->inhibit_wakeup, &requests->lock);
+
+done_locked:
+ pthread_mutex_unlock (&requests->lock);
+ return err;
+}
+
+void
+pager_resume_workers (struct pager_requests *requests)
+{
+ pthread_mutex_lock (&requests->lock);
+
+ /* Check the workers are inhibited. */
+ assert (requests->queue_out != requests->queue_in);
+ assert (requests->asleep == WORKER_COUNT);
+ assert (queue_empty(requests->queue_out));
+
+ /* The queue has been drained and will no longer be used. */
+ free (requests->queue_out);
+ requests->queue_out = requests->queue_in;
+
+ /* We need to wake up all workers, as there could be multiple requests
+ in the new queue. */
+ pthread_cond_broadcast (&requests->wakeup);
+
+ pthread_mutex_unlock (&requests->lock);
+}
diff --git a/libpager/pager.h b/libpager/pager.h
index fe342388..df4db686 100644
--- a/libpager/pager.h
+++ b/libpager/pager.h
@@ -25,8 +25,32 @@
scope. */
struct user_pager_info;
-/* Start the worker threads libpager uses to service requests. */
-error_t pager_start_workers (struct port_bucket *pager_bucket);
+struct pager_requests;
+
+/* Start the worker threads libpager uses to service requests. If no
+ error is returned, *requests will be a valid pointer, else it will be
+ set to NULL. */
+error_t
+pager_start_workers (struct port_bucket *pager_bucket,
+ struct pager_requests **requests);
+
+/* Inhibit the worker threads libpager uses to service requests,
+ blocking until all requests sent before this function is called have
+ finished.
+ Note that RPCs will not be inhibited, so new requests will
+ queue up, but will not be handled until the workers are resumed. If
+ RPCs should be inhibited as well, call ports_inhibit_bucket_rpcs with
+ the bucket used to create the workers before calling this. However,
+ inhibiting RPCs and not calling this is generally insufficient, as
+ libports is unaware of our internal worker pool, and will return once
+ all the RPCs have been queued, before they have been handled by a
+ worker thread. */
+error_t
+pager_inhibit_workers (struct pager_requests *requests);
+
+/* Resume the worker threads libpager uses to service requests. */
+void
+pager_resume_workers (struct pager_requests *requests);
/* Create a new pager. The pager will have a port created for it
(using libports, in BUCKET) and will be immediately ready
diff --git a/libpager/queue.h b/libpager/queue.h
index d3cf738e..abcd3b98 100644
--- a/libpager/queue.h
+++ b/libpager/queue.h
@@ -19,6 +19,8 @@
You should have received a copy of the GNU General Public License
along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */
+#include <stdbool.h>
+
/* A FIFO queue with constant-time enqueue and dequeue operations. */
struct item {
struct item *next;
@@ -59,3 +61,9 @@ queue_dequeue (struct queue *q)
r->next = NULL;
return r;
}
+
+static inline bool
+queue_empty (struct queue *q)
+{
+ return q->head == NULL;
+}