diff options
Diffstat (limited to 'debian/patches/libpager-fixthreads.patch')
-rw-r--r-- | debian/patches/libpager-fixthreads.patch | 1141 |
1 files changed, 1141 insertions, 0 deletions
diff --git a/debian/patches/libpager-fixthreads.patch b/debian/patches/libpager-fixthreads.patch new file mode 100644 index 00000000..18ca3d2d --- /dev/null +++ b/debian/patches/libpager-fixthreads.patch @@ -0,0 +1,1141 @@ +From 76bdbc27516e57f2071218361d9ae4940471a27f Mon Sep 17 00:00:00 2001 +From: Justus Winter <4winter@informatik.uni-hamburg.de> +Date: Thu, 17 Apr 2014 15:44:12 +0200 +Subject: [PATCH hurd] libpager: use a fixed number of threads + +Previously, libpager used an unbounded number of threads to receive +messages from the pager bucket. It used sequence barriers to execute +the requests to order requests to each object. + +The sequence barriers are implemented in seqnos.c. A server function +uses _pager_wait_for_seqno to wait for its sequence number and +_pager_release_seqno to release it, or it uses _pager_update_seqno to +do both operations in one step. + +These sequence barriers divide each server function in three parts: A, +B, and C. A_i happens "before" the sequence barrier i, B_i happens +"in order", C_i happens "after" the sequence barrier. This partial +order < has the following properties: + +* This order is *per object*. Requests to different objects are not + ordered. + +* A_i < B_i, B_i < C_i (due to the structure of the code) + +* B_i < B_{i+1} (this is due to the sequence barriers) + +* Note that only the B parts are ordered by the sequence numbers, we + are free to execute C_i and C_{i+1} in any possible order. The same + argument applies to the A parts. + +The sequence barriers are implemented using a very simple ticket +algorithm. Every request, even the invalid ones, is processed by a +thread, and waits until the ticket count reaches its seqno, does some +work in-order, then increments the ticket and awakes all threads that +have piled up up to this moment. All of them except one will then +discover that it's not their turn yet and go to sleep again. + +Creating one thread per request has proven to be problematic as +memory_object requests often arrive in large batches. + +This patch does two things: + +* Use a single thread to receive messages from the port bucket. All + incoming request are put into a queue. + +* Use a fixed-number of threads (though even one is actually enough) + to execute the the server functions. If multiple threads are used, + a work-delegation mechanism ensures that the per object order < is + preserved. + +For reference, I used the following command to create workloads that +highlight the problem this patch is addressing: + +% settrans t .../ext2fs --sync=30 /dev/sd2s1 +... +% /usr/bin/time zsh -c 'for ((i=0; i < 3500; i++)); do + dd if=/dev/zero of=t/src/$i bs=4k count=290 2>/dev/null + echo -n . + if ((i % 100 == 0)) ; then echo -n $i; fi +done' + +* libpager/queue.h: New file. +* libpager/demuxer.c: Manage a queue of requests received from the +port bucket. +(pager_demuxer): Just decode the server function and enqueue the +request. +(worker_func): New function that consumes and executes the requests +from the queue. +(service_paging_requests): New function. +(pager_start_workers): Likewise. +* libpager/data-request.c: Remove the seqno barriers. +* libpager/data-return.c: Likewise. +* libpager/data-unlock.c: Likewise. +* libpager/chg-compl.c: Likewise. +* libpager/lock-completed.c: Likewise. +* libpager/no-senders.c: Likewise. +* libpager/notify-stubs.c: Likewise. +* libpager/object-init.c: Likewise. +* libpager/object-terminate.c: Likewise. +* libpager/seqnos.c: Remove file. +* libpager/stubs.c: Likewise. +* libpager/pager.h (pager_demuxer): Drop declaration. +(pager_start_workers): New declaration. +* libpager/priv.h: Remove the _pager_seqno declarations. +* libpager/Makefile (SRCS): Drop seqnos.c. +* console/pager.c (user_pager_init): Call pager_start_workers. +* libdiskfs/disk-pager.c: Likewise. +* storeio/pager.c: Likewise. +* ext2fs/pager.c (service_paging_requests): Remove function. +(create_disk_pager): Start separate file pager using +`pager_start_workers'. +* fatfs/pager.c (service_paging_requests): Remove function. +(create_fat_pager): Start separate file pager using +`pager_start_workers'. +--- + console/pager.c | 29 +---- + ext2fs/pager.c | 28 +---- + fatfs/pager.c | 28 +---- + libdiskfs/disk-pager.c | 28 +---- + libpager/Makefile | 2 +- + libpager/chg-compl.c | 4 +- + libpager/data-request.c | 3 - + libpager/data-return.c | 4 - + libpager/data-unlock.c | 5 - + libpager/demuxer.c | 295 ++++++++++++++++++++++++++++++++++++++++++-- + libpager/lock-completed.c | 2 - + libpager/no-senders.c | 1 - + libpager/notify-stubs.c | 10 -- + libpager/object-init.c | 2 - + libpager/object-terminate.c | 4 +- + libpager/pager.h | 7 +- + libpager/priv.h | 4 - + libpager/queue.h | 61 +++++++++ + libpager/seqnos.c | 79 ------------ + libpager/stubs.c | 9 -- + storeio/pager.c | 26 +--- + 21 files changed, 369 insertions(+), 262 deletions(-) + create mode 100644 libpager/queue.h + delete mode 100644 libpager/seqnos.c + +diff --git a/console/pager.c b/console/pager.c +index 87c36f0..3568211 100644 +--- a/console/pager.c ++++ b/console/pager.c +@@ -119,22 +119,6 @@ void + pager_dropweak (struct user_pager_info *upi) + { + } +- +- +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); +- return NULL; +-} +- + + /* Initialize the pager for the display component. */ + void +@@ -148,15 +132,10 @@ user_pager_init (void) + if (! pager_bucket) + error (5, errno, "Cannot create pager bucket"); + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); +- if (!err) +- pthread_detach (thread); +- else +- { +- errno = err; +- perror ("pthread_create"); +- } ++ /* Start libpagers worker threads. */ ++ err = pager_start_workers (pager_bucket); ++ if (err) ++ error (5, err, "Cannot start pager worker threads"); + } + + +diff --git a/ext2fs/pager.c b/ext2fs/pager.c +index 39cf1c7..298dae7 100644 +--- a/ext2fs/pager.c ++++ b/ext2fs/pager.c +@@ -1192,21 +1192,6 @@ disk_cache_block_is_ref (block_t block) + return ref; + } + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000, +- 0, +- NULL); +- /* Not reached. */ +- return NULL; +-} +- + /* Create the disk pager, and the file pager. */ + void + create_disk_pager (void) +@@ -1231,17 +1216,10 @@ create_disk_pager (void) + /* The file pager. */ + file_pager_bucket = ports_create_bucket (); + +-#define STACK_SIZE (64 * 1024) +- pthread_attr_init (&attr); +- pthread_attr_setstacksize (&attr, STACK_SIZE); +-#undef STACK_SIZE +- +- /* Make a thread to service file paging requests. */ +- err = pthread_create (&thread, &attr, +- service_paging_requests, file_pager_bucket); ++ /* Start libpagers worker threads. */ ++ err = pager_start_workers (file_pager_bucket); + if (err) +- error (2, err, "pthread_create"); +- pthread_detach (thread); ++ ext2_panic ("can't create libpager worker threads: %s", strerror (err)); + } + + /* Call this to create a FILE_DATA pager and return a send right. +diff --git a/fatfs/pager.c b/fatfs/pager.c +index f855ecf..0c59084 100644 +--- a/fatfs/pager.c ++++ b/fatfs/pager.c +@@ -756,21 +756,6 @@ pager_dropweak (struct user_pager_info *p __attribute__ ((unused))) + { + } + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000, +- 0, +- NULL); +- /* Not reached. */ +- return NULL; +-} +- + /* Create the disk pager. */ + void + create_fat_pager (void) +@@ -790,17 +775,10 @@ create_fat_pager (void) + /* The file pager. */ + file_pager_bucket = ports_create_bucket (); + +-#define STACK_SIZE (64 * 1024) +- pthread_attr_init (&attr); +- pthread_attr_setstacksize (&attr, STACK_SIZE); +-#undef STACK_SIZE +- +- /* Make a thread to service file paging requests. */ +- err = pthread_create (&thread, &attr, +- service_paging_requests, file_pager_bucket); ++ /* Start libpagers worker threads. */ ++ err = pager_start_workers (file_pager_bucket); + if (err) +- error (2, err, "pthread_create"); +- pthread_detach (thread); ++ error (2, err, "can't create libpager worker threads"); + } + + /* Call this to create a FILE_DATA pager and return a send right. +diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c +index 9a0d9d8..4083ef2 100644 +--- a/libdiskfs/disk-pager.c ++++ b/libdiskfs/disk-pager.c +@@ -33,39 +33,19 @@ static struct hurd_signal_preemptor preemptor = + handler: (sighandler_t) &fault_handler, + }; + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); +- return NULL; +-} +- + void + diskfs_start_disk_pager (struct user_pager_info *upi, + struct port_bucket *pager_bucket, + int may_cache, int notify_on_evict, + size_t size, void **image) + { +- pthread_t thread; + error_t err; + mach_port_t disk_pager_port; + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); +- if (!err) +- pthread_detach (thread); +- else +- { +- errno = err; +- perror ("pthread_create"); +- } ++ /* Start libpagers worker threads. */ ++ err = pager_start_workers (pager_bucket); ++ if (err) ++ error (2, err, "creating pager worker threads failed"); + + /* Create the pager. */ + diskfs_disk_pager = pager_create (upi, pager_bucket, +diff --git a/libpager/Makefile b/libpager/Makefile +index b622295..a15a899 100644 +--- a/libpager/Makefile ++++ b/libpager/Makefile +@@ -22,7 +22,7 @@ SRCS = data-request.c data-return.c data-unlock.c pager-port.c \ + inhibit-term.c lock-completed.c lock-object.c mark-error.c \ + no-senders.c object-init.c object-terminate.c pagemap.c \ + pager-create.c pager-flush.c pager-shutdown.c pager-sync.c \ +- stubs.c seqnos.c demuxer.c chg-compl.c pager-attr.c clean.c \ ++ stubs.c demuxer.c chg-compl.c pager-attr.c clean.c \ + dropweak.c notify-stubs.c get-upi.c pager-memcpy.c pager-return.c \ + offer-page.c + installhdrs = pager.h +diff --git a/libpager/chg-compl.c b/libpager/chg-compl.c +index d77c46c..89ccfc8 100644 +--- a/libpager/chg-compl.c ++++ b/libpager/chg-compl.c +@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, + } + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seq); + + for (ar = p->attribute_requests; ar; ar = ar->next) + if (ar->may_cache == maycache && ar->copy_strategy == strat) +@@ -46,8 +45,7 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, + pthread_cond_broadcast (&p->wakeup); + break; + } +- +- _pager_release_seqno (p, seq); ++ + pthread_mutex_unlock (&p->interlock); + return 0; + } +diff --git a/libpager/data-request.c b/libpager/data-request.c +index 82ce904..18f3de6 100644 +--- a/libpager/data-request.c ++++ b/libpager/data-request.c +@@ -41,7 +41,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -105,7 +104,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + if (!doread) +@@ -139,7 +137,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + allow_release_out: + _pager_allow_termination (p); + release_out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + return 0; + } +diff --git a/libpager/data-return.c b/libpager/data-return.c +index ee6c6e8..f16f323 100644 +--- a/libpager/data-return.c ++++ b/libpager/data-return.c +@@ -52,7 +52,6 @@ _pager_do_write_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -101,7 +100,6 @@ _pager_do_write_request (struct pager *p, + notified[i] = (p->notify_on_evict + && ! (pm_entries[i] & PM_PAGEINWAIT)); + +- _pager_release_seqno (p, seqno); + goto notify; + } + else { +@@ -158,7 +156,6 @@ _pager_do_write_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + /* This is inefficient; we should send all the pages to the device at once +@@ -251,7 +248,6 @@ _pager_do_write_request (struct pager *p, + return 0; + + release_out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + return 0; + } +diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c +index 599237c..8c7c776 100644 +--- a/libpager/data-unlock.c ++++ b/libpager/data-unlock.c +@@ -35,11 +35,6 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, + || p->port.class != _pager_class) + return EOPNOTSUPP; + +- pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- + if (p->pager_state != NORMAL) + { + printf ("pager in wrong state for unlock\n"); +diff --git a/libpager/demuxer.c b/libpager/demuxer.c +index b4d4054..ce71717 100644 +--- a/libpager/demuxer.c ++++ b/libpager/demuxer.c +@@ -1,5 +1,5 @@ + /* Demuxer for pager library +- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation ++ Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as +@@ -15,26 +15,299 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include <error.h> ++#include <mach/mig_errors.h> ++#include <pthread.h> ++#include <string.h> ++ + #include "priv.h" + #include "memory_object_S.h" + #include "notify_S.h" ++#include "queue.h" ++ ++/* ++ Worker pool for the server functions. ++ ++ A single thread receives messages from the port bucket and puts them ++ into a queue. A fixed number of consumers actually execute the ++ server functions and send the reply. ++ ++ The requests to an object O have to be processed in the order they ++ were received. To this end, each worker has a local queue and a ++ tag. If a thread processes a request to O, it sets its tag to a ++ unique identifier representing O. If another thread now dequeues a ++ second request to O, it enqueues it to the first workers queue. ++ ++ At least one worker thread is necessary. ++*/ ++#define WORKER_COUNT 1 ++ ++/* An request contains the message received from the port set. */ ++struct request ++{ ++ struct item item; ++ mig_routine_t routine; ++ mach_msg_header_t *inp; ++ mach_msg_header_t *outp; ++}; ++ ++/* A worker. */ ++struct worker ++{ ++ struct requests *requests; /* our pagers request queue */ ++ struct queue queue; /* other workers may delegate requests to us */ ++ unsigned long tag; /* tag of the object we are working on */ ++}; ++ ++/* This is the queue for incoming requests. A single thread receives ++ messages from the port set, looks the service routine up, and ++ enqueues the request here. */ ++struct requests ++{ ++ struct port_bucket *bucket; ++ struct queue queue; ++ int asleep; ++ pthread_cond_t wakeup; ++ pthread_mutex_t lock; ++ struct worker workers[WORKER_COUNT]; ++}; + + /* Demultiplex a single message directed at a pager port; INP is the + message received; fill OUTP with the reply. */ +-int +-pager_demuxer (mach_msg_header_t *inp, ++static int ++pager_demuxer (struct requests *requests, ++ mach_msg_header_t *inp, + mach_msg_header_t *outp) + { ++ error_t err = MIG_NO_REPLY; ++ ++#ifdef MACH_RCV_LARGE ++ const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */ ++#else ++ const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */ ++#endif ++ + mig_routine_t routine; +- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) || +- (routine = _pager_seqnos_notify_server_routine (inp))) ++ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) || ++ (routine = _pager_seqnos_notify_server_routine (inp)))) ++ return FALSE; ++ ++#define MASK (8u - 1u) ++ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK; ++#undef MASK ++ ++ struct request *r = malloc (sizeof *r + padded_size + max_size); ++ if (r == NULL) + { +- (*routine) (inp, outp); +- return TRUE; ++ err = ENOMEM; ++ goto out; ++ } ++ ++ r->routine = routine; ++ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r); ++ memcpy (r->inp, inp, inp->msgh_size); ++ ++ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size); ++ memcpy (r->outp, outp, sizeof *outp); ++ ++ pthread_mutex_lock (&requests->lock); ++ ++ queue_enqueue (&requests->queue, &r->item); ++ ++ /* Awake worker. */ ++ if (requests->asleep > 0) ++ pthread_cond_signal (&requests->wakeup); ++ ++ pthread_mutex_unlock (&requests->lock); ++ ++ /* A worker thread will reply. */ ++ err = MIG_NO_REPLY; ++ ++ out: ++ ((mig_reply_header_t *) outp)->RetCode = err; ++ return TRUE; ++} ++ ++/* Consumes requests from the queue. */ ++static void * ++worker_func (void *arg) ++{ ++ struct worker *self = (struct worker *) arg; ++ struct requests *requests = self->requests; ++ struct request *r = NULL; ++ ++ while (1) ++ { ++ int i; ++ mach_msg_return_t mr; ++ ++ /* Free previous message. */ ++ free (r); ++ ++ pthread_mutex_lock (&requests->lock); ++ ++ /* First, look in our queue for more requests to the object we ++ have been working on lately. Some other thread might have ++ delegated them to us. */ ++ r = queue_dequeue (&self->queue); ++ if (r != NULL) ++ goto got_one; ++ ++ /* Nope. Clear our tag and... */ ++ self->tag = 0; ++ ++ get_request_locked: ++ /* ... get a request from the global queue instead. */ ++ while ((r = queue_dequeue (&requests->queue)) == NULL) ++ { ++ requests->asleep += 1; ++ pthread_cond_wait (&requests->wakeup, &requests->lock); ++ requests->asleep -= 1; ++ } ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ if (requests->workers[i].tag ++ == (unsigned long) r->inp->msgh_local_port) ++ { ++ /* Some other thread is working on that object. Delegate ++ the request to that worker. */ ++ queue_enqueue (&requests->workers[i].queue, &r->item); ++ goto get_request_locked; ++ } ++ ++ /* Claim responsibility for this object by setting our tag. */ ++ self->tag = (unsigned long) r->inp->msgh_local_port; ++ ++ got_one: ++ pthread_mutex_unlock (&requests->lock); ++ ++ /* Call the server routine. */ ++ (*r->routine) (r->inp, r->outp); ++ ++ /* What follows is basically the second part of ++ mach_msg_server_timeout. */ ++ mig_reply_header_t *request = (mig_reply_header_t *) r->inp; ++ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp; ++ ++ switch (reply->RetCode) ++ { ++ case KERN_SUCCESS: ++ /* Hunky dory. */ ++ break; ++ ++ case MIG_NO_REPLY: ++ /* The server function wanted no reply sent. ++ Loop for another request. */ ++ continue; ++ ++ default: ++ /* Some error; destroy the request message to release any ++ port rights or VM it holds. Don't destroy the reply port ++ right, so we can send an error message. */ ++ request->Head.msgh_remote_port = MACH_PORT_NULL; ++ mach_msg_destroy (&request->Head); ++ break; ++ } ++ ++ if (reply->Head.msgh_remote_port == MACH_PORT_NULL) ++ { ++ /* No reply port, so destroy the reply. */ ++ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) ++ mach_msg_destroy (&reply->Head); ++ continue; ++ } ++ ++ /* Send the reply. */ ++ mr = mach_msg (&reply->Head, ++ MACH_SEND_MSG, ++ reply->Head.msgh_size, ++ 0, ++ MACH_PORT_NULL, ++ 0, ++ MACH_PORT_NULL); ++ ++ switch (mr) ++ { ++ case MACH_SEND_INVALID_DEST: ++ /* The reply can't be delivered, so destroy it. This error ++ indicates only that the requester went away, so we ++ continue and get the next request. */ ++ mach_msg_destroy (&reply->Head); ++ break; ++ ++ default: ++ /* Some other form of lossage; there is not much we can ++ do here. */ ++ error (0, mr, "mach_msg"); ++ } ++ } ++ ++ /* Not reached. */ ++ return NULL; ++} ++ ++/* A top-level function for the paging thread that just services paging ++ requests. */ ++static void * ++service_paging_requests (void *arg) ++{ ++ struct requests *requests = arg; ++ ++ int demuxer (mach_msg_header_t *inp, ++ mach_msg_header_t *outp) ++ { ++ return pager_demuxer (requests, inp, outp); ++ } ++ ++ ports_manage_port_operations_one_thread (requests->bucket, ++ demuxer, ++ 0); ++ /* Not reached. */ ++ return NULL; ++} ++ ++/* Start the worker threads libpager uses to service requests. */ ++error_t ++pager_start_workers (struct port_bucket *pager_bucket) ++{ ++ error_t err; ++ int i; ++ pthread_t t; ++ pthread_attr_t attr; ++ struct requests *requests; ++ ++ requests = malloc (sizeof *requests); ++ if (requests == NULL) ++ return ENOMEM; ++ ++ requests->bucket = pager_bucket; ++ requests->asleep = 0; ++ queue_init (&requests->queue); ++ pthread_cond_init (&requests->wakeup, NULL); ++ pthread_mutex_init (&requests->lock, NULL); ++ ++ pthread_attr_init (&attr); ++#define STACK_SIZE (64 * 1024) ++ pthread_attr_setstacksize (&attr, STACK_SIZE); ++#undef STACK_SIZE ++ ++ /* Make a thread to service paging requests. */ ++ err = pthread_create (&t, &attr, service_paging_requests, requests); ++ if (err) ++ return err; ++ pthread_detach (t); ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ { ++ requests->workers[i].requests = requests; ++ requests->workers[i].tag = 0; ++ queue_init (&requests->workers[i].queue); ++ ++ err = pthread_create (&t, &attr, &worker_func, &requests->workers[i]); ++ if (err) ++ return err; ++ pthread_detach (t); + } + +- /* Synchronize our bookkeeping of the port's seqno with the one +- consumed by this bogus message. */ +- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno); +- return FALSE; ++ return err; + } +diff --git a/libpager/lock-completed.c b/libpager/lock-completed.c +index a3f3f16..30b1dd3 100644 +--- a/libpager/lock-completed.c ++++ b/libpager/lock-completed.c +@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + if (control != p->memobjcntl) + { +@@ -59,7 +58,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, + } + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return err; +diff --git a/libpager/no-senders.c b/libpager/no-senders.c +index c21dfc2..d0bbe27 100644 +--- a/libpager/no-senders.c ++++ b/libpager/no-senders.c +@@ -29,7 +29,6 @@ _pager_do_seqnos_mach_notify_no_senders (struct port_info *pi, + pi->class != _pager_class) + return EOPNOTSUPP; + +- _pager_update_seqno_p ((struct pager *) pi, seqno); + ports_no_senders (pi, mscount); + + return 0; +diff --git a/libpager/notify-stubs.c b/libpager/notify-stubs.c +index ba13882..a826420 100644 +--- a/libpager/notify-stubs.c ++++ b/libpager/notify-stubs.c +@@ -28,8 +28,6 @@ _pager_do_seqnos_mach_notify_port_deleted (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -39,8 +37,6 @@ _pager_do_seqnos_mach_notify_msg_accepted (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -50,8 +46,6 @@ _pager_do_seqnos_mach_notify_port_destroyed (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -59,8 +53,6 @@ error_t + _pager_do_seqnos_mach_notify_send_once (struct port_info *pi, + mach_port_seqno_t seqno) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -70,7 +62,5 @@ _pager_do_seqnos_mach_notify_dead_name (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } +diff --git a/libpager/object-init.c b/libpager/object-init.c +index 6683e24..eb62c44 100644 +--- a/libpager/object-init.c ++++ b/libpager/object-init.c +@@ -33,7 +33,6 @@ _pager_seqnos_memory_object_init (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + if (pagesize != __vm_page_size) + { +@@ -69,7 +68,6 @@ _pager_seqnos_memory_object_init (struct pager *p, + p->pager_state = NORMAL; + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return 0; +diff --git a/libpager/object-terminate.c b/libpager/object-terminate.c +index 332bcab..e8c6f38 100644 +--- a/libpager/object-terminate.c ++++ b/libpager/object-terminate.c +@@ -32,8 +32,7 @@ _pager_seqnos_memory_object_terminate (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- ++ + if (control != p->memobjcntl) + { + printf ("incg terminate: wrong control port\n"); +@@ -75,7 +74,6 @@ _pager_seqnos_memory_object_terminate (struct pager *p, + #endif + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return 0; +diff --git a/libpager/pager.h b/libpager/pager.h +index d0572af..29ec833 100644 +--- a/libpager/pager.h ++++ b/libpager/pager.h +@@ -25,11 +25,8 @@ + scope. */ + struct user_pager_info; + +-/* This de-muxer function is for use within libports_demuxer. */ +-/* INP is a message we've received; OUTP will be filled in with +- a reply message. */ +-int pager_demuxer (mach_msg_header_t *inp, +- mach_msg_header_t *outp); ++/* Start the worker threads libpager uses to service requests. */ ++error_t pager_start_workers (struct port_bucket *pager_bucket); + + /* Create a new pager. The pager will have a port created for it + (using libports, in BUCKET) and will be immediately ready +diff --git a/libpager/priv.h b/libpager/priv.h +index 1f8405a..4576e12 100644 +--- a/libpager/priv.h ++++ b/libpager/priv.h +@@ -134,10 +134,6 @@ extern int _pager_page_errors[]; + struct port_class *_pager_class; + + +-void _pager_wait_for_seqno (struct pager *, mach_port_seqno_t); +-void _pager_release_seqno (struct pager *, mach_port_seqno_t); +-void _pager_update_seqno (mach_port_t, mach_port_seqno_t); +-void _pager_update_seqno_p (struct pager *, mach_port_seqno_t); + void _pager_block_termination (struct pager *); + void _pager_allow_termination (struct pager *); + error_t _pager_pagemap_resize (struct pager *, vm_address_t); +diff --git a/libpager/queue.h b/libpager/queue.h +new file mode 100644 +index 0000000..d3cf738 +--- /dev/null ++++ b/libpager/queue.h +@@ -0,0 +1,61 @@ ++/* A FIFO queue with constant-time enqueue and dequeue operations. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++/* A FIFO queue with constant-time enqueue and dequeue operations. */ ++struct item { ++ struct item *next; ++}; ++ ++struct queue { ++ struct item *head; ++ struct item **tail; ++}; ++ ++static inline void ++queue_init (struct queue *q) ++{ ++ q->head = NULL; ++ q->tail = &q->head; ++} ++ ++static inline void ++queue_enqueue (struct queue *q, struct item *r) ++{ ++ *q->tail = r; ++ q->tail = &r->next; ++ r->next = NULL; ++} ++ ++static inline void * ++queue_dequeue (struct queue *q) ++{ ++ struct item *r = q->head; ++ if (r == NULL) ++ return NULL; ++ ++ /* Pop the first item off. */ ++ if ((q->head = q->head->next) == NULL) ++ /* The queue is empty, fix tail pointer. */ ++ q->tail = &q->head; ++ ++ r->next = NULL; ++ return r; ++} +diff --git a/libpager/seqnos.c b/libpager/seqnos.c +deleted file mode 100644 +index cab2f33..0000000 +--- a/libpager/seqnos.c ++++ /dev/null +@@ -1,79 +0,0 @@ +-/* Sequence number synchronization routines for pager library +- Copyright (C) 1994, 2011 Free Software Foundation +- +- This program is free software; you can redistribute it and/or +- modify it under the terms of the GNU General Public License as +- published by the Free Software Foundation; either version 2, or (at +- your option) any later version. +- +- This program is distributed in the hope that it will be useful, but +- WITHOUT ANY WARRANTY; without even the implied warranty of +- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +- General Public License for more details. +- +- You should have received a copy of the GNU General Public License +- along with this program; if not, write to the Free Software +- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +- +-#include "priv.h" +-#include <assert.h> +- +-/* The message with seqno SEQNO has just been dequeued for pager P; +- wait until all preceding messages have had a chance and then +- return. */ +-void +-_pager_wait_for_seqno (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- while (seqno != p->seqno + 1) +- { +- p->waitingforseqno = 1; +- pthread_cond_wait (&p->wakeup, &p->interlock); +- } +-} +- +- +-/* Allow the next message for pager P (potentially blocked in +- _pager_wait_for_seqno) to be handled. */ +-void +-_pager_release_seqno (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- assert (seqno == p->seqno + 1); +- p->seqno = seqno; +- if (p->waitingforseqno) +- { +- p->waitingforseqno = 0; +- pthread_cond_broadcast (&p->wakeup); +- } +-} +- +- +-/* Just update the seqno. */ +-void +-_pager_update_seqno (mach_port_t object, +- mach_port_seqno_t seqno) +-{ +- struct pager *p; +- +- p = ports_lookup_port (0, object, _pager_class); +- _pager_update_seqno_p (p, seqno); +- if (p) +- ports_port_deref (p); +-} +- +- +-/* Just update the seqno, pointer version. */ +-void +-_pager_update_seqno_p (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- if (p +- && p->port.class == _pager_class) +- { +- pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- } +-} +diff --git a/libpager/stubs.c b/libpager/stubs.c +index 411f483..c7f1a5a 100644 +--- a/libpager/stubs.c ++++ b/libpager/stubs.c +@@ -29,9 +29,6 @@ _pager_seqnos_memory_object_copy (struct pager *p, + mach_port_t new) + { + printf ("m_o_copy called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } + +@@ -44,9 +41,6 @@ _pager_seqnos_memory_object_data_write (struct pager *p, + vm_size_t data_cnt) + { + printf ("m_o_data_write called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } + +@@ -60,8 +54,5 @@ _pager_seqnos_memory_object_supply_completed (struct pager *p, + vm_offset_t err_off) + { + printf ("m_o_supply_completed called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } +diff --git a/storeio/pager.c b/storeio/pager.c +index 7d78711..c260d73 100644 +--- a/storeio/pager.c ++++ b/storeio/pager.c +@@ -24,6 +24,7 @@ + #include <strings.h> + #include <unistd.h> + #include <errno.h> ++#include <error.h> + #include <sys/mman.h> + #include <stdio.h> + +@@ -142,21 +143,6 @@ pager_clear_user_data (struct user_pager_info *upi) + + static struct port_bucket *pager_port_bucket = 0; + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- (void) arg; +- +- for (;;) +- ports_manage_port_operations_multithread (pager_port_bucket, +- pager_demuxer, +- 1000 * 30, 1000 * 60 * 5, 0); +- +- return NULL; +-} +- + /* Initialize paging for this device. */ + static void + init_dev_paging () +@@ -173,14 +159,12 @@ init_dev_paging () + + pager_port_bucket = ports_create_bucket (); + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, NULL); +- if (!err) +- pthread_detach (thread); +- else ++ /* Start libpagers worker threads. */ ++ err = pager_start_workers (pager_port_bucket); ++ if (err) + { + errno = err; +- perror ("pthread_create"); ++ error (0, err, "pager_start_workers"); + } + } + pthread_mutex_unlock (&pager_global_lock); +-- +2.1.1 + |