summaryrefslogtreecommitdiff
path: root/debian/patches/libpager-fixthreads.patch
diff options
context:
space:
mode:
Diffstat (limited to 'debian/patches/libpager-fixthreads.patch')
-rw-r--r--debian/patches/libpager-fixthreads.patch1141
1 files changed, 1141 insertions, 0 deletions
diff --git a/debian/patches/libpager-fixthreads.patch b/debian/patches/libpager-fixthreads.patch
new file mode 100644
index 00000000..18ca3d2d
--- /dev/null
+++ b/debian/patches/libpager-fixthreads.patch
@@ -0,0 +1,1141 @@
+From 76bdbc27516e57f2071218361d9ae4940471a27f Mon Sep 17 00:00:00 2001
+From: Justus Winter <4winter@informatik.uni-hamburg.de>
+Date: Thu, 17 Apr 2014 15:44:12 +0200
+Subject: [PATCH hurd] libpager: use a fixed number of threads
+
+Previously, libpager used an unbounded number of threads to receive
+messages from the pager bucket. It used sequence barriers to execute
+the requests to order requests to each object.
+
+The sequence barriers are implemented in seqnos.c. A server function
+uses _pager_wait_for_seqno to wait for its sequence number and
+_pager_release_seqno to release it, or it uses _pager_update_seqno to
+do both operations in one step.
+
+These sequence barriers divide each server function in three parts: A,
+B, and C. A_i happens "before" the sequence barrier i, B_i happens
+"in order", C_i happens "after" the sequence barrier. This partial
+order < has the following properties:
+
+* This order is *per object*. Requests to different objects are not
+ ordered.
+
+* A_i < B_i, B_i < C_i (due to the structure of the code)
+
+* B_i < B_{i+1} (this is due to the sequence barriers)
+
+* Note that only the B parts are ordered by the sequence numbers, we
+ are free to execute C_i and C_{i+1} in any possible order. The same
+ argument applies to the A parts.
+
+The sequence barriers are implemented using a very simple ticket
+algorithm. Every request, even the invalid ones, is processed by a
+thread, and waits until the ticket count reaches its seqno, does some
+work in-order, then increments the ticket and awakes all threads that
+have piled up up to this moment. All of them except one will then
+discover that it's not their turn yet and go to sleep again.
+
+Creating one thread per request has proven to be problematic as
+memory_object requests often arrive in large batches.
+
+This patch does two things:
+
+* Use a single thread to receive messages from the port bucket. All
+ incoming request are put into a queue.
+
+* Use a fixed-number of threads (though even one is actually enough)
+ to execute the the server functions. If multiple threads are used,
+ a work-delegation mechanism ensures that the per object order < is
+ preserved.
+
+For reference, I used the following command to create workloads that
+highlight the problem this patch is addressing:
+
+% settrans t .../ext2fs --sync=30 /dev/sd2s1
+...
+% /usr/bin/time zsh -c 'for ((i=0; i < 3500; i++)); do
+ dd if=/dev/zero of=t/src/$i bs=4k count=290 2>/dev/null
+ echo -n .
+ if ((i % 100 == 0)) ; then echo -n $i; fi
+done'
+
+* libpager/queue.h: New file.
+* libpager/demuxer.c: Manage a queue of requests received from the
+port bucket.
+(pager_demuxer): Just decode the server function and enqueue the
+request.
+(worker_func): New function that consumes and executes the requests
+from the queue.
+(service_paging_requests): New function.
+(pager_start_workers): Likewise.
+* libpager/data-request.c: Remove the seqno barriers.
+* libpager/data-return.c: Likewise.
+* libpager/data-unlock.c: Likewise.
+* libpager/chg-compl.c: Likewise.
+* libpager/lock-completed.c: Likewise.
+* libpager/no-senders.c: Likewise.
+* libpager/notify-stubs.c: Likewise.
+* libpager/object-init.c: Likewise.
+* libpager/object-terminate.c: Likewise.
+* libpager/seqnos.c: Remove file.
+* libpager/stubs.c: Likewise.
+* libpager/pager.h (pager_demuxer): Drop declaration.
+(pager_start_workers): New declaration.
+* libpager/priv.h: Remove the _pager_seqno declarations.
+* libpager/Makefile (SRCS): Drop seqnos.c.
+* console/pager.c (user_pager_init): Call pager_start_workers.
+* libdiskfs/disk-pager.c: Likewise.
+* storeio/pager.c: Likewise.
+* ext2fs/pager.c (service_paging_requests): Remove function.
+(create_disk_pager): Start separate file pager using
+`pager_start_workers'.
+* fatfs/pager.c (service_paging_requests): Remove function.
+(create_fat_pager): Start separate file pager using
+`pager_start_workers'.
+---
+ console/pager.c | 29 +----
+ ext2fs/pager.c | 28 +----
+ fatfs/pager.c | 28 +----
+ libdiskfs/disk-pager.c | 28 +----
+ libpager/Makefile | 2 +-
+ libpager/chg-compl.c | 4 +-
+ libpager/data-request.c | 3 -
+ libpager/data-return.c | 4 -
+ libpager/data-unlock.c | 5 -
+ libpager/demuxer.c | 295 ++++++++++++++++++++++++++++++++++++++++++--
+ libpager/lock-completed.c | 2 -
+ libpager/no-senders.c | 1 -
+ libpager/notify-stubs.c | 10 --
+ libpager/object-init.c | 2 -
+ libpager/object-terminate.c | 4 +-
+ libpager/pager.h | 7 +-
+ libpager/priv.h | 4 -
+ libpager/queue.h | 61 +++++++++
+ libpager/seqnos.c | 79 ------------
+ libpager/stubs.c | 9 --
+ storeio/pager.c | 26 +---
+ 21 files changed, 369 insertions(+), 262 deletions(-)
+ create mode 100644 libpager/queue.h
+ delete mode 100644 libpager/seqnos.c
+
+diff --git a/console/pager.c b/console/pager.c
+index 87c36f0..3568211 100644
+--- a/console/pager.c
++++ b/console/pager.c
+@@ -119,22 +119,6 @@ void
+ pager_dropweak (struct user_pager_info *upi)
+ {
+ }
+-
+-
+-/* A top-level function for the paging thread that just services paging
+- requests. */
+-static void *
+-service_paging_requests (void *arg)
+-{
+- struct port_bucket *pager_bucket = arg;
+- for (;;)
+- ports_manage_port_operations_multithread (pager_bucket,
+- pager_demuxer,
+- 1000 * 60 * 2,
+- 1000 * 60 * 10, 0);
+- return NULL;
+-}
+-
+
+ /* Initialize the pager for the display component. */
+ void
+@@ -148,15 +132,10 @@ user_pager_init (void)
+ if (! pager_bucket)
+ error (5, errno, "Cannot create pager bucket");
+
+- /* Make a thread to service paging requests. */
+- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket);
+- if (!err)
+- pthread_detach (thread);
+- else
+- {
+- errno = err;
+- perror ("pthread_create");
+- }
++ /* Start libpagers worker threads. */
++ err = pager_start_workers (pager_bucket);
++ if (err)
++ error (5, err, "Cannot start pager worker threads");
+ }
+
+
+diff --git a/ext2fs/pager.c b/ext2fs/pager.c
+index 39cf1c7..298dae7 100644
+--- a/ext2fs/pager.c
++++ b/ext2fs/pager.c
+@@ -1192,21 +1192,6 @@ disk_cache_block_is_ref (block_t block)
+ return ref;
+ }
+
+-/* A top-level function for the paging thread that just services paging
+- requests. */
+-static void *
+-service_paging_requests (void *arg)
+-{
+- struct port_bucket *pager_bucket = arg;
+- ports_manage_port_operations_multithread (pager_bucket,
+- pager_demuxer,
+- 1000,
+- 0,
+- NULL);
+- /* Not reached. */
+- return NULL;
+-}
+-
+ /* Create the disk pager, and the file pager. */
+ void
+ create_disk_pager (void)
+@@ -1231,17 +1216,10 @@ create_disk_pager (void)
+ /* The file pager. */
+ file_pager_bucket = ports_create_bucket ();
+
+-#define STACK_SIZE (64 * 1024)
+- pthread_attr_init (&attr);
+- pthread_attr_setstacksize (&attr, STACK_SIZE);
+-#undef STACK_SIZE
+-
+- /* Make a thread to service file paging requests. */
+- err = pthread_create (&thread, &attr,
+- service_paging_requests, file_pager_bucket);
++ /* Start libpagers worker threads. */
++ err = pager_start_workers (file_pager_bucket);
+ if (err)
+- error (2, err, "pthread_create");
+- pthread_detach (thread);
++ ext2_panic ("can't create libpager worker threads: %s", strerror (err));
+ }
+
+ /* Call this to create a FILE_DATA pager and return a send right.
+diff --git a/fatfs/pager.c b/fatfs/pager.c
+index f855ecf..0c59084 100644
+--- a/fatfs/pager.c
++++ b/fatfs/pager.c
+@@ -756,21 +756,6 @@ pager_dropweak (struct user_pager_info *p __attribute__ ((unused)))
+ {
+ }
+
+-/* A top-level function for the paging thread that just services paging
+- requests. */
+-static void *
+-service_paging_requests (void *arg)
+-{
+- struct port_bucket *pager_bucket = arg;
+- ports_manage_port_operations_multithread (pager_bucket,
+- pager_demuxer,
+- 1000,
+- 0,
+- NULL);
+- /* Not reached. */
+- return NULL;
+-}
+-
+ /* Create the disk pager. */
+ void
+ create_fat_pager (void)
+@@ -790,17 +775,10 @@ create_fat_pager (void)
+ /* The file pager. */
+ file_pager_bucket = ports_create_bucket ();
+
+-#define STACK_SIZE (64 * 1024)
+- pthread_attr_init (&attr);
+- pthread_attr_setstacksize (&attr, STACK_SIZE);
+-#undef STACK_SIZE
+-
+- /* Make a thread to service file paging requests. */
+- err = pthread_create (&thread, &attr,
+- service_paging_requests, file_pager_bucket);
++ /* Start libpagers worker threads. */
++ err = pager_start_workers (file_pager_bucket);
+ if (err)
+- error (2, err, "pthread_create");
+- pthread_detach (thread);
++ error (2, err, "can't create libpager worker threads");
+ }
+
+ /* Call this to create a FILE_DATA pager and return a send right.
+diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c
+index 9a0d9d8..4083ef2 100644
+--- a/libdiskfs/disk-pager.c
++++ b/libdiskfs/disk-pager.c
+@@ -33,39 +33,19 @@ static struct hurd_signal_preemptor preemptor =
+ handler: (sighandler_t) &fault_handler,
+ };
+
+-/* A top-level function for the paging thread that just services paging
+- requests. */
+-static void *
+-service_paging_requests (void *arg)
+-{
+- struct port_bucket *pager_bucket = arg;
+- for (;;)
+- ports_manage_port_operations_multithread (pager_bucket,
+- pager_demuxer,
+- 1000 * 60 * 2,
+- 1000 * 60 * 10, 0);
+- return NULL;
+-}
+-
+ void
+ diskfs_start_disk_pager (struct user_pager_info *upi,
+ struct port_bucket *pager_bucket,
+ int may_cache, int notify_on_evict,
+ size_t size, void **image)
+ {
+- pthread_t thread;
+ error_t err;
+ mach_port_t disk_pager_port;
+
+- /* Make a thread to service paging requests. */
+- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket);
+- if (!err)
+- pthread_detach (thread);
+- else
+- {
+- errno = err;
+- perror ("pthread_create");
+- }
++ /* Start libpagers worker threads. */
++ err = pager_start_workers (pager_bucket);
++ if (err)
++ error (2, err, "creating pager worker threads failed");
+
+ /* Create the pager. */
+ diskfs_disk_pager = pager_create (upi, pager_bucket,
+diff --git a/libpager/Makefile b/libpager/Makefile
+index b622295..a15a899 100644
+--- a/libpager/Makefile
++++ b/libpager/Makefile
+@@ -22,7 +22,7 @@ SRCS = data-request.c data-return.c data-unlock.c pager-port.c \
+ inhibit-term.c lock-completed.c lock-object.c mark-error.c \
+ no-senders.c object-init.c object-terminate.c pagemap.c \
+ pager-create.c pager-flush.c pager-shutdown.c pager-sync.c \
+- stubs.c seqnos.c demuxer.c chg-compl.c pager-attr.c clean.c \
++ stubs.c demuxer.c chg-compl.c pager-attr.c clean.c \
+ dropweak.c notify-stubs.c get-upi.c pager-memcpy.c pager-return.c \
+ offer-page.c
+ installhdrs = pager.h
+diff --git a/libpager/chg-compl.c b/libpager/chg-compl.c
+index d77c46c..89ccfc8 100644
+--- a/libpager/chg-compl.c
++++ b/libpager/chg-compl.c
+@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_change_completed (struct pager *p,
+ }
+
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seq);
+
+ for (ar = p->attribute_requests; ar; ar = ar->next)
+ if (ar->may_cache == maycache && ar->copy_strategy == strat)
+@@ -46,8 +45,7 @@ _pager_seqnos_memory_object_change_completed (struct pager *p,
+ pthread_cond_broadcast (&p->wakeup);
+ break;
+ }
+-
+- _pager_release_seqno (p, seq);
++
+ pthread_mutex_unlock (&p->interlock);
+ return 0;
+ }
+diff --git a/libpager/data-request.c b/libpager/data-request.c
+index 82ce904..18f3de6 100644
+--- a/libpager/data-request.c
++++ b/libpager/data-request.c
+@@ -41,7 +41,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p,
+
+ /* Acquire the right to meddle with the pagemap */
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+
+ /* sanity checks -- we don't do multi-page requests yet. */
+ if (control != p->memobjcntl)
+@@ -105,7 +104,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p,
+ }
+
+ /* Let someone else in. */
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+
+ if (!doread)
+@@ -139,7 +137,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p,
+ allow_release_out:
+ _pager_allow_termination (p);
+ release_out:
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+ return 0;
+ }
+diff --git a/libpager/data-return.c b/libpager/data-return.c
+index ee6c6e8..f16f323 100644
+--- a/libpager/data-return.c
++++ b/libpager/data-return.c
+@@ -52,7 +52,6 @@ _pager_do_write_request (struct pager *p,
+
+ /* Acquire the right to meddle with the pagemap */
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+
+ /* sanity checks -- we don't do multi-page requests yet. */
+ if (control != p->memobjcntl)
+@@ -101,7 +100,6 @@ _pager_do_write_request (struct pager *p,
+ notified[i] = (p->notify_on_evict
+ && ! (pm_entries[i] & PM_PAGEINWAIT));
+
+- _pager_release_seqno (p, seqno);
+ goto notify;
+ }
+ else {
+@@ -158,7 +156,6 @@ _pager_do_write_request (struct pager *p,
+ }
+
+ /* Let someone else in. */
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+
+ /* This is inefficient; we should send all the pages to the device at once
+@@ -251,7 +248,6 @@ _pager_do_write_request (struct pager *p,
+ return 0;
+
+ release_out:
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+ return 0;
+ }
+diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c
+index 599237c..8c7c776 100644
+--- a/libpager/data-unlock.c
++++ b/libpager/data-unlock.c
+@@ -35,11 +35,6 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p,
+ || p->port.class != _pager_class)
+ return EOPNOTSUPP;
+
+- pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+- _pager_release_seqno (p, seqno);
+- pthread_mutex_unlock (&p->interlock);
+-
+ if (p->pager_state != NORMAL)
+ {
+ printf ("pager in wrong state for unlock\n");
+diff --git a/libpager/demuxer.c b/libpager/demuxer.c
+index b4d4054..ce71717 100644
+--- a/libpager/demuxer.c
++++ b/libpager/demuxer.c
+@@ -1,5 +1,5 @@
+ /* Demuxer for pager library
+- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation
++ Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+@@ -15,26 +15,299 @@
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
++#include <error.h>
++#include <mach/mig_errors.h>
++#include <pthread.h>
++#include <string.h>
++
+ #include "priv.h"
+ #include "memory_object_S.h"
+ #include "notify_S.h"
++#include "queue.h"
++
++/*
++ Worker pool for the server functions.
++
++ A single thread receives messages from the port bucket and puts them
++ into a queue. A fixed number of consumers actually execute the
++ server functions and send the reply.
++
++ The requests to an object O have to be processed in the order they
++ were received. To this end, each worker has a local queue and a
++ tag. If a thread processes a request to O, it sets its tag to a
++ unique identifier representing O. If another thread now dequeues a
++ second request to O, it enqueues it to the first workers queue.
++
++ At least one worker thread is necessary.
++*/
++#define WORKER_COUNT 1
++
++/* An request contains the message received from the port set. */
++struct request
++{
++ struct item item;
++ mig_routine_t routine;
++ mach_msg_header_t *inp;
++ mach_msg_header_t *outp;
++};
++
++/* A worker. */
++struct worker
++{
++ struct requests *requests; /* our pagers request queue */
++ struct queue queue; /* other workers may delegate requests to us */
++ unsigned long tag; /* tag of the object we are working on */
++};
++
++/* This is the queue for incoming requests. A single thread receives
++ messages from the port set, looks the service routine up, and
++ enqueues the request here. */
++struct requests
++{
++ struct port_bucket *bucket;
++ struct queue queue;
++ int asleep;
++ pthread_cond_t wakeup;
++ pthread_mutex_t lock;
++ struct worker workers[WORKER_COUNT];
++};
+
+ /* Demultiplex a single message directed at a pager port; INP is the
+ message received; fill OUTP with the reply. */
+-int
+-pager_demuxer (mach_msg_header_t *inp,
++static int
++pager_demuxer (struct requests *requests,
++ mach_msg_header_t *inp,
+ mach_msg_header_t *outp)
+ {
++ error_t err = MIG_NO_REPLY;
++
++#ifdef MACH_RCV_LARGE
++ const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */
++#else
++ const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */
++#endif
++
+ mig_routine_t routine;
+- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) ||
+- (routine = _pager_seqnos_notify_server_routine (inp)))
++ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) ||
++ (routine = _pager_seqnos_notify_server_routine (inp))))
++ return FALSE;
++
++#define MASK (8u - 1u)
++ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK;
++#undef MASK
++
++ struct request *r = malloc (sizeof *r + padded_size + max_size);
++ if (r == NULL)
+ {
+- (*routine) (inp, outp);
+- return TRUE;
++ err = ENOMEM;
++ goto out;
++ }
++
++ r->routine = routine;
++ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r);
++ memcpy (r->inp, inp, inp->msgh_size);
++
++ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size);
++ memcpy (r->outp, outp, sizeof *outp);
++
++ pthread_mutex_lock (&requests->lock);
++
++ queue_enqueue (&requests->queue, &r->item);
++
++ /* Awake worker. */
++ if (requests->asleep > 0)
++ pthread_cond_signal (&requests->wakeup);
++
++ pthread_mutex_unlock (&requests->lock);
++
++ /* A worker thread will reply. */
++ err = MIG_NO_REPLY;
++
++ out:
++ ((mig_reply_header_t *) outp)->RetCode = err;
++ return TRUE;
++}
++
++/* Consumes requests from the queue. */
++static void *
++worker_func (void *arg)
++{
++ struct worker *self = (struct worker *) arg;
++ struct requests *requests = self->requests;
++ struct request *r = NULL;
++
++ while (1)
++ {
++ int i;
++ mach_msg_return_t mr;
++
++ /* Free previous message. */
++ free (r);
++
++ pthread_mutex_lock (&requests->lock);
++
++ /* First, look in our queue for more requests to the object we
++ have been working on lately. Some other thread might have
++ delegated them to us. */
++ r = queue_dequeue (&self->queue);
++ if (r != NULL)
++ goto got_one;
++
++ /* Nope. Clear our tag and... */
++ self->tag = 0;
++
++ get_request_locked:
++ /* ... get a request from the global queue instead. */
++ while ((r = queue_dequeue (&requests->queue)) == NULL)
++ {
++ requests->asleep += 1;
++ pthread_cond_wait (&requests->wakeup, &requests->lock);
++ requests->asleep -= 1;
++ }
++
++ for (i = 0; i < WORKER_COUNT; i++)
++ if (requests->workers[i].tag
++ == (unsigned long) r->inp->msgh_local_port)
++ {
++ /* Some other thread is working on that object. Delegate
++ the request to that worker. */
++ queue_enqueue (&requests->workers[i].queue, &r->item);
++ goto get_request_locked;
++ }
++
++ /* Claim responsibility for this object by setting our tag. */
++ self->tag = (unsigned long) r->inp->msgh_local_port;
++
++ got_one:
++ pthread_mutex_unlock (&requests->lock);
++
++ /* Call the server routine. */
++ (*r->routine) (r->inp, r->outp);
++
++ /* What follows is basically the second part of
++ mach_msg_server_timeout. */
++ mig_reply_header_t *request = (mig_reply_header_t *) r->inp;
++ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp;
++
++ switch (reply->RetCode)
++ {
++ case KERN_SUCCESS:
++ /* Hunky dory. */
++ break;
++
++ case MIG_NO_REPLY:
++ /* The server function wanted no reply sent.
++ Loop for another request. */
++ continue;
++
++ default:
++ /* Some error; destroy the request message to release any
++ port rights or VM it holds. Don't destroy the reply port
++ right, so we can send an error message. */
++ request->Head.msgh_remote_port = MACH_PORT_NULL;
++ mach_msg_destroy (&request->Head);
++ break;
++ }
++
++ if (reply->Head.msgh_remote_port == MACH_PORT_NULL)
++ {
++ /* No reply port, so destroy the reply. */
++ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)
++ mach_msg_destroy (&reply->Head);
++ continue;
++ }
++
++ /* Send the reply. */
++ mr = mach_msg (&reply->Head,
++ MACH_SEND_MSG,
++ reply->Head.msgh_size,
++ 0,
++ MACH_PORT_NULL,
++ 0,
++ MACH_PORT_NULL);
++
++ switch (mr)
++ {
++ case MACH_SEND_INVALID_DEST:
++ /* The reply can't be delivered, so destroy it. This error
++ indicates only that the requester went away, so we
++ continue and get the next request. */
++ mach_msg_destroy (&reply->Head);
++ break;
++
++ default:
++ /* Some other form of lossage; there is not much we can
++ do here. */
++ error (0, mr, "mach_msg");
++ }
++ }
++
++ /* Not reached. */
++ return NULL;
++}
++
++/* A top-level function for the paging thread that just services paging
++ requests. */
++static void *
++service_paging_requests (void *arg)
++{
++ struct requests *requests = arg;
++
++ int demuxer (mach_msg_header_t *inp,
++ mach_msg_header_t *outp)
++ {
++ return pager_demuxer (requests, inp, outp);
++ }
++
++ ports_manage_port_operations_one_thread (requests->bucket,
++ demuxer,
++ 0);
++ /* Not reached. */
++ return NULL;
++}
++
++/* Start the worker threads libpager uses to service requests. */
++error_t
++pager_start_workers (struct port_bucket *pager_bucket)
++{
++ error_t err;
++ int i;
++ pthread_t t;
++ pthread_attr_t attr;
++ struct requests *requests;
++
++ requests = malloc (sizeof *requests);
++ if (requests == NULL)
++ return ENOMEM;
++
++ requests->bucket = pager_bucket;
++ requests->asleep = 0;
++ queue_init (&requests->queue);
++ pthread_cond_init (&requests->wakeup, NULL);
++ pthread_mutex_init (&requests->lock, NULL);
++
++ pthread_attr_init (&attr);
++#define STACK_SIZE (64 * 1024)
++ pthread_attr_setstacksize (&attr, STACK_SIZE);
++#undef STACK_SIZE
++
++ /* Make a thread to service paging requests. */
++ err = pthread_create (&t, &attr, service_paging_requests, requests);
++ if (err)
++ return err;
++ pthread_detach (t);
++
++ for (i = 0; i < WORKER_COUNT; i++)
++ {
++ requests->workers[i].requests = requests;
++ requests->workers[i].tag = 0;
++ queue_init (&requests->workers[i].queue);
++
++ err = pthread_create (&t, &attr, &worker_func, &requests->workers[i]);
++ if (err)
++ return err;
++ pthread_detach (t);
+ }
+
+- /* Synchronize our bookkeeping of the port's seqno with the one
+- consumed by this bogus message. */
+- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno);
+- return FALSE;
++ return err;
+ }
+diff --git a/libpager/lock-completed.c b/libpager/lock-completed.c
+index a3f3f16..30b1dd3 100644
+--- a/libpager/lock-completed.c
++++ b/libpager/lock-completed.c
+@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p,
+ return EOPNOTSUPP;
+
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+
+ if (control != p->memobjcntl)
+ {
+@@ -59,7 +58,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p,
+ }
+
+ out:
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+
+ return err;
+diff --git a/libpager/no-senders.c b/libpager/no-senders.c
+index c21dfc2..d0bbe27 100644
+--- a/libpager/no-senders.c
++++ b/libpager/no-senders.c
+@@ -29,7 +29,6 @@ _pager_do_seqnos_mach_notify_no_senders (struct port_info *pi,
+ pi->class != _pager_class)
+ return EOPNOTSUPP;
+
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+ ports_no_senders (pi, mscount);
+
+ return 0;
+diff --git a/libpager/notify-stubs.c b/libpager/notify-stubs.c
+index ba13882..a826420 100644
+--- a/libpager/notify-stubs.c
++++ b/libpager/notify-stubs.c
+@@ -28,8 +28,6 @@ _pager_do_seqnos_mach_notify_port_deleted (struct port_info *pi,
+ mach_port_t name
+ __attribute__ ((unused)))
+ {
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+-
+ return 0;
+ }
+
+@@ -39,8 +37,6 @@ _pager_do_seqnos_mach_notify_msg_accepted (struct port_info *pi,
+ mach_port_t name
+ __attribute__ ((unused)))
+ {
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+-
+ return 0;
+ }
+
+@@ -50,8 +46,6 @@ _pager_do_seqnos_mach_notify_port_destroyed (struct port_info *pi,
+ mach_port_t name
+ __attribute__ ((unused)))
+ {
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+-
+ return 0;
+ }
+
+@@ -59,8 +53,6 @@ error_t
+ _pager_do_seqnos_mach_notify_send_once (struct port_info *pi,
+ mach_port_seqno_t seqno)
+ {
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+-
+ return 0;
+ }
+
+@@ -70,7 +62,5 @@ _pager_do_seqnos_mach_notify_dead_name (struct port_info *pi,
+ mach_port_t name
+ __attribute__ ((unused)))
+ {
+- _pager_update_seqno_p ((struct pager *) pi, seqno);
+-
+ return 0;
+ }
+diff --git a/libpager/object-init.c b/libpager/object-init.c
+index 6683e24..eb62c44 100644
+--- a/libpager/object-init.c
++++ b/libpager/object-init.c
+@@ -33,7 +33,6 @@ _pager_seqnos_memory_object_init (struct pager *p,
+ return EOPNOTSUPP;
+
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+
+ if (pagesize != __vm_page_size)
+ {
+@@ -69,7 +68,6 @@ _pager_seqnos_memory_object_init (struct pager *p,
+ p->pager_state = NORMAL;
+
+ out:
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+
+ return 0;
+diff --git a/libpager/object-terminate.c b/libpager/object-terminate.c
+index 332bcab..e8c6f38 100644
+--- a/libpager/object-terminate.c
++++ b/libpager/object-terminate.c
+@@ -32,8 +32,7 @@ _pager_seqnos_memory_object_terminate (struct pager *p,
+ return EOPNOTSUPP;
+
+ pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+-
++
+ if (control != p->memobjcntl)
+ {
+ printf ("incg terminate: wrong control port\n");
+@@ -75,7 +74,6 @@ _pager_seqnos_memory_object_terminate (struct pager *p,
+ #endif
+
+ out:
+- _pager_release_seqno (p, seqno);
+ pthread_mutex_unlock (&p->interlock);
+
+ return 0;
+diff --git a/libpager/pager.h b/libpager/pager.h
+index d0572af..29ec833 100644
+--- a/libpager/pager.h
++++ b/libpager/pager.h
+@@ -25,11 +25,8 @@
+ scope. */
+ struct user_pager_info;
+
+-/* This de-muxer function is for use within libports_demuxer. */
+-/* INP is a message we've received; OUTP will be filled in with
+- a reply message. */
+-int pager_demuxer (mach_msg_header_t *inp,
+- mach_msg_header_t *outp);
++/* Start the worker threads libpager uses to service requests. */
++error_t pager_start_workers (struct port_bucket *pager_bucket);
+
+ /* Create a new pager. The pager will have a port created for it
+ (using libports, in BUCKET) and will be immediately ready
+diff --git a/libpager/priv.h b/libpager/priv.h
+index 1f8405a..4576e12 100644
+--- a/libpager/priv.h
++++ b/libpager/priv.h
+@@ -134,10 +134,6 @@ extern int _pager_page_errors[];
+ struct port_class *_pager_class;
+
+
+-void _pager_wait_for_seqno (struct pager *, mach_port_seqno_t);
+-void _pager_release_seqno (struct pager *, mach_port_seqno_t);
+-void _pager_update_seqno (mach_port_t, mach_port_seqno_t);
+-void _pager_update_seqno_p (struct pager *, mach_port_seqno_t);
+ void _pager_block_termination (struct pager *);
+ void _pager_allow_termination (struct pager *);
+ error_t _pager_pagemap_resize (struct pager *, vm_address_t);
+diff --git a/libpager/queue.h b/libpager/queue.h
+new file mode 100644
+index 0000000..d3cf738
+--- /dev/null
++++ b/libpager/queue.h
+@@ -0,0 +1,61 @@
++/* A FIFO queue with constant-time enqueue and dequeue operations.
++
++ Copyright (C) 2014 Free Software Foundation, Inc.
++
++ Written by Justus Winter <4winter@informatik.uni-hamburg.de>
++
++ This file is part of the GNU Hurd.
++
++ The GNU Hurd is free software; you can redistribute it and/or
++ modify it under the terms of the GNU General Public License as
++ published by the Free Software Foundation; either version 2, or (at
++ your option) any later version.
++
++ The GNU Hurd is distributed in the hope that it will be useful, but
++ WITHOUT ANY WARRANTY; without even the implied warranty of
++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
++ General Public License for more details.
++
++ You should have received a copy of the GNU General Public License
++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */
++
++/* A FIFO queue with constant-time enqueue and dequeue operations. */
++struct item {
++ struct item *next;
++};
++
++struct queue {
++ struct item *head;
++ struct item **tail;
++};
++
++static inline void
++queue_init (struct queue *q)
++{
++ q->head = NULL;
++ q->tail = &q->head;
++}
++
++static inline void
++queue_enqueue (struct queue *q, struct item *r)
++{
++ *q->tail = r;
++ q->tail = &r->next;
++ r->next = NULL;
++}
++
++static inline void *
++queue_dequeue (struct queue *q)
++{
++ struct item *r = q->head;
++ if (r == NULL)
++ return NULL;
++
++ /* Pop the first item off. */
++ if ((q->head = q->head->next) == NULL)
++ /* The queue is empty, fix tail pointer. */
++ q->tail = &q->head;
++
++ r->next = NULL;
++ return r;
++}
+diff --git a/libpager/seqnos.c b/libpager/seqnos.c
+deleted file mode 100644
+index cab2f33..0000000
+--- a/libpager/seqnos.c
++++ /dev/null
+@@ -1,79 +0,0 @@
+-/* Sequence number synchronization routines for pager library
+- Copyright (C) 1994, 2011 Free Software Foundation
+-
+- This program is free software; you can redistribute it and/or
+- modify it under the terms of the GNU General Public License as
+- published by the Free Software Foundation; either version 2, or (at
+- your option) any later version.
+-
+- This program is distributed in the hope that it will be useful, but
+- WITHOUT ANY WARRANTY; without even the implied warranty of
+- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+- General Public License for more details.
+-
+- You should have received a copy of the GNU General Public License
+- along with this program; if not, write to the Free Software
+- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+-
+-#include "priv.h"
+-#include <assert.h>
+-
+-/* The message with seqno SEQNO has just been dequeued for pager P;
+- wait until all preceding messages have had a chance and then
+- return. */
+-void
+-_pager_wait_for_seqno (struct pager *p,
+- mach_port_seqno_t seqno)
+-{
+- while (seqno != p->seqno + 1)
+- {
+- p->waitingforseqno = 1;
+- pthread_cond_wait (&p->wakeup, &p->interlock);
+- }
+-}
+-
+-
+-/* Allow the next message for pager P (potentially blocked in
+- _pager_wait_for_seqno) to be handled. */
+-void
+-_pager_release_seqno (struct pager *p,
+- mach_port_seqno_t seqno)
+-{
+- assert (seqno == p->seqno + 1);
+- p->seqno = seqno;
+- if (p->waitingforseqno)
+- {
+- p->waitingforseqno = 0;
+- pthread_cond_broadcast (&p->wakeup);
+- }
+-}
+-
+-
+-/* Just update the seqno. */
+-void
+-_pager_update_seqno (mach_port_t object,
+- mach_port_seqno_t seqno)
+-{
+- struct pager *p;
+-
+- p = ports_lookup_port (0, object, _pager_class);
+- _pager_update_seqno_p (p, seqno);
+- if (p)
+- ports_port_deref (p);
+-}
+-
+-
+-/* Just update the seqno, pointer version. */
+-void
+-_pager_update_seqno_p (struct pager *p,
+- mach_port_seqno_t seqno)
+-{
+- if (p
+- && p->port.class == _pager_class)
+- {
+- pthread_mutex_lock (&p->interlock);
+- _pager_wait_for_seqno (p, seqno);
+- _pager_release_seqno (p, seqno);
+- pthread_mutex_unlock (&p->interlock);
+- }
+-}
+diff --git a/libpager/stubs.c b/libpager/stubs.c
+index 411f483..c7f1a5a 100644
+--- a/libpager/stubs.c
++++ b/libpager/stubs.c
+@@ -29,9 +29,6 @@ _pager_seqnos_memory_object_copy (struct pager *p,
+ mach_port_t new)
+ {
+ printf ("m_o_copy called\n");
+-
+- _pager_update_seqno_p (p, seq);
+-
+ return EOPNOTSUPP;
+ }
+
+@@ -44,9 +41,6 @@ _pager_seqnos_memory_object_data_write (struct pager *p,
+ vm_size_t data_cnt)
+ {
+ printf ("m_o_data_write called\n");
+-
+- _pager_update_seqno_p (p, seq);
+-
+ return EOPNOTSUPP;
+ }
+
+@@ -60,8 +54,5 @@ _pager_seqnos_memory_object_supply_completed (struct pager *p,
+ vm_offset_t err_off)
+ {
+ printf ("m_o_supply_completed called\n");
+-
+- _pager_update_seqno_p (p, seq);
+-
+ return EOPNOTSUPP;
+ }
+diff --git a/storeio/pager.c b/storeio/pager.c
+index 7d78711..c260d73 100644
+--- a/storeio/pager.c
++++ b/storeio/pager.c
+@@ -24,6 +24,7 @@
+ #include <strings.h>
+ #include <unistd.h>
+ #include <errno.h>
++#include <error.h>
+ #include <sys/mman.h>
+ #include <stdio.h>
+
+@@ -142,21 +143,6 @@ pager_clear_user_data (struct user_pager_info *upi)
+
+ static struct port_bucket *pager_port_bucket = 0;
+
+-/* A top-level function for the paging thread that just services paging
+- requests. */
+-static void *
+-service_paging_requests (void *arg)
+-{
+- (void) arg;
+-
+- for (;;)
+- ports_manage_port_operations_multithread (pager_port_bucket,
+- pager_demuxer,
+- 1000 * 30, 1000 * 60 * 5, 0);
+-
+- return NULL;
+-}
+-
+ /* Initialize paging for this device. */
+ static void
+ init_dev_paging ()
+@@ -173,14 +159,12 @@ init_dev_paging ()
+
+ pager_port_bucket = ports_create_bucket ();
+
+- /* Make a thread to service paging requests. */
+- err = pthread_create (&thread, NULL, service_paging_requests, NULL);
+- if (!err)
+- pthread_detach (thread);
+- else
++ /* Start libpagers worker threads. */
++ err = pager_start_workers (pager_port_bucket);
++ if (err)
+ {
+ errno = err;
+- perror ("pthread_create");
++ error (0, err, "pager_start_workers");
+ }
+ }
+ pthread_mutex_unlock (&pager_global_lock);
+--
+2.1.1
+