diff options
-rw-r--r-- | debian/patches/ext2fs-cache-superblock.patch | 73 | ||||
-rw-r--r-- | debian/patches/ext2fs-fix-inum-type.patch | 27 | ||||
-rw-r--r-- | debian/patches/ext2fs-skip-unallocated-blocks.patch | 17 | ||||
-rw-r--r-- | debian/patches/libpager-threading-rework.patch | 1983 | ||||
-rw-r--r-- | debian/patches/libpager-threadpool.patch | 1274 |
5 files changed, 2083 insertions, 1291 deletions
diff --git a/debian/patches/ext2fs-cache-superblock.patch b/debian/patches/ext2fs-cache-superblock.patch new file mode 100644 index 00000000..27bf494e --- /dev/null +++ b/debian/patches/ext2fs-cache-superblock.patch @@ -0,0 +1,73 @@ +commit a3ccff628fcd9d9866bac415758be8d55d6cf13f +Author: Justus Winter <4winter@informatik.uni-hamburg.de> +Date: Thu Apr 24 17:44:14 2014 +0200 + + ext2fs: cache the superblock + + Previously, the superblock was mmaped and a pointer stored in sblock + by map_hypermetadata. This memory is backed by our disk pager. + + This is rather unfortunate, as this means that whenever we read a + value from that location, we might generate a request our disk pager. + This amplifies the so-called thread-storm problem. + + Rather than relying on a mmaped region of memory, just use the data + loaded by get_hypermetadata. + + * ext2fs/hyper.c (get_hypermetadata): Do not free sblock. + (mapped_sblock): New variable. + (map_hypermetadata): Map the superblock to mapped_sblock instead. + (diskfs_set_hypermetadata): Copy superblock into mapped_superblock. + +diff --git a/ext2fs/hyper.c b/ext2fs/hyper.c +index 5bcc2ab..5f288bf 100644 +--- a/ext2fs/hyper.c ++++ b/ext2fs/hyper.c +@@ -61,7 +61,9 @@ get_hypermetadata (void) + error_t err; + size_t read = 0; + +- assert (! sblock); ++ if (sblock != NULL) ++ munmap (sblock, SBLOCK_SIZE); ++ + err = store_read (store, SBLOCK_OFFS >> store->log2_block_size, + SBLOCK_SIZE, (void **)&sblock, &read); + if (err || read != SBLOCK_SIZE) +@@ -161,19 +163,19 @@ get_hypermetadata (void) + zeroblock = (vm_address_t) mmap (0, block_size, PROT_READ, MAP_ANON, 0, 0); + assert (zeroblock != (vm_address_t) MAP_FAILED); + } +- +- munmap (sblock, SBLOCK_SIZE); +- sblock = NULL; + } + ++static struct ext2_super_block *mapped_sblock; ++ + void + map_hypermetadata (void) + { +- sblock = (struct ext2_super_block *) boffs_ptr (SBLOCK_OFFS); ++ mapped_sblock = (struct ext2_super_block *) boffs_ptr (SBLOCK_OFFS); + + /* Cache a convenient pointer to the block group descriptors for allocation. + These are stored in the filesystem blocks following the superblock. */ +- group_desc_image = (struct ext2_group_desc *) bptr (bptr_block (sblock) + 1); ++ group_desc_image = ++ (struct ext2_group_desc *) bptr (bptr_block (mapped_sblock) + 1); + } + + error_t +@@ -196,8 +198,9 @@ diskfs_set_hypermetadata (int wait, int clean) + if (sblock_dirty) + { + sblock_dirty = 0; +- disk_cache_block_ref_ptr (sblock); +- record_global_poke (sblock); ++ memcpy (mapped_sblock, sblock, SBLOCK_SIZE); ++ disk_cache_block_ref_ptr (mapped_sblock); ++ record_global_poke (mapped_sblock); + } + + sync_global (wait); diff --git a/debian/patches/ext2fs-fix-inum-type.patch b/debian/patches/ext2fs-fix-inum-type.patch new file mode 100644 index 00000000..76d5854c --- /dev/null +++ b/debian/patches/ext2fs-fix-inum-type.patch @@ -0,0 +1,27 @@ +commit 2e154e16471763936be42874f464e41cd56a2f0c +Author: Justus Winter <4winter@informatik.uni-hamburg.de> +Date: Thu Apr 24 17:32:04 2014 +0200 + + ext2fs: fix type of inum + + Previously, inum was of type int, whereas dino_ref expects ino_t. On + Hurd/x86 the former is 32 bit wide, the latter 64. If dino_ref is + inlined, this does not seem to pose a problem, but if ext2fs is + compiled with -O0, this most likely results in an invalid memory access. + + * ext2fs/ialloc.c (ext2_alloc_inode): Use type ino_t for inum. + +diff --git a/ext2fs/ialloc.c b/ext2fs/ialloc.c +index 2d8e51e..52212d5 100644 +--- a/ext2fs/ialloc.c ++++ b/ext2fs/ialloc.c +@@ -115,7 +115,8 @@ ino_t + ext2_alloc_inode (ino_t dir_inum, mode_t mode) + { + char *bh = NULL; +- int i, j, inum, avefreei; ++ int i, j, avefreei; ++ ino_t inum; + struct ext2_group_desc *gdp; + struct ext2_group_desc *tmp; + diff --git a/debian/patches/ext2fs-skip-unallocated-blocks.patch b/debian/patches/ext2fs-skip-unallocated-blocks.patch deleted file mode 100644 index b98c1dc6..00000000 --- a/debian/patches/ext2fs-skip-unallocated-blocks.patch +++ /dev/null @@ -1,17 +0,0 @@ -diff --git a/ext2fs/pager.c b/ext2fs/pager.c -index 6e99c83..4ea4b74 100644 ---- a/ext2fs/pager.c -+++ b/ext2fs/pager.c -@@ -398,8 +398,10 @@ file_pager_write_page (struct node *node, vm_offset_t offset, void *buf) - err = find_block (node, offset, &block, &lock); - if (err) - break; -- assert (block); -- pending_blocks_add (&pb, block); -+ if (block) -+ pending_blocks_add (&pb, block); -+ else -+ pending_blocks_skip (&pb); - offset += block_size; - left -= block_size; - } diff --git a/debian/patches/libpager-threading-rework.patch b/debian/patches/libpager-threading-rework.patch new file mode 100644 index 00000000..2b8c1ac6 --- /dev/null +++ b/debian/patches/libpager-threading-rework.patch @@ -0,0 +1,1983 @@ +commit be7319c2857598f36545cf31addfb8e35f56834d +Author: Justus Winter <4winter@informatik.uni-hamburg.de> +Date: Thu Apr 17 15:44:12 2014 +0200 + + libpager: rework the request handling to use less threads + + Previously, libpager used multiple threads to receive messages from + the pager bucket. It used sequence barriers to execute the requests + to order requests to each object. + + The sequence barriers are implemented in seqnos.c. A server function + uses _pager_wait_for_seqno to wait for its sequence number and + _pager_release_seqno to release it, or it uses _pager_update_seqno to + do both operations in one step. + + These sequence barriers divide each server function in three parts: A, + B, and C. A_i happens "before" the sequence barrier i, B_i happens + "in order", C_i happens "after" the sequence barrier. This partial + order < has the following properties: + + * This order is *per object*. Requests to different objects are not + ordered. + + * A_i < B_i, B_i < C_i (due to the structure of the code) + + * B_i < B_{i+1} (this is due to the sequence barriers) + + * Note that only the B parts are ordered by the sequence numbers, we + are free to execute C_i and C_{i+1} in any possible order. The same + argument applies to the A parts. + + If the server functions would never to block, a single threaded could + do the job. Unfortunately, we sometimes call user-supplied functions + like pager_write_pager, which may block for some time doing IO. + Worse, ext2fs used to trigger a page fault during pager_write_pager, + creating yet another request to the disk pager (and thus, libpager). + + Surveying the code revealed that most of the server functions only + validate arguments in their A phase, do some non-blocking work in + their B phase and then send the reply in their C phase. Three + functions, however, do some potentially blocking and otherwise bad + behaving stuff in their C phase. The three interesting functions are + _pager_seqnos_memory_object_data_request, + _pager_seqnos_memory_object_data_return, and quest. Most notably, the + first calls pager_read_page and the second pager_write_page. + + The sequence barriers are implemented using a very simple ticket + algorithm. Every request, even the invalid ones, is processed by a + thread, and waits until the ticket count reaches its seqno, does some + work in-order, then increments the ticket and awakes all threads that + have piled up up to this moment. All of them except one will then + discover that it's not their turn yet and go to sleep again. + + Creating one thread per request has proven to be problematic as + memory_object requests often arrive in large batches. + + This patch does four things: + + * Use a single thread to receive messages from the port bucket. All + incoming request are put into a queue. + + * Use a fixed-number of threads (though even one is actually enough) + to execute the A and B parts of the server functions. If multiple + threads are used, a work-delegation mechanism ensures that the per + object order < is observed. See the comments in demuxer.c for a + more detailed description and why having a small number of threads + (I propose 3) is advantageous. + + * The code from the C parts is moved out of the server function X into + X_deferred. An "interesting" function now puts a deferred function + calls into a queue instead of executing that part directly. + + * A variable number of threads (though this number is kept close to + one) is executing deferred functions from the queue of deferred + functions. As memory_object requests often arrive in large batches, + the key insight here is to rate-limit the creation of threads. See + the comments in deferreds.c for details on the implementation and + why a lower number of threads results in a better performance. + + For reference, I used the following command to create workloads that + highlight the problem this patch is addressing: + + % settrans t .../ext2fs --sync=30 /dev/sd2s1 + ... + % /usr/bin/time zsh -c 'for ((i=0; i < 3500; i++)); do + dd if=/dev/zero of=t/src/$i bs=4k count=290 2>/dev/null + echo -n . + if ((i % 100 == 0)) ; then echo -n $i; fi + done' + + With this patch, the number of libpager threads tops at 12, and it is + just 6 most of the time. + + * libpager/deferreds.c: New file. + * libpager/deferreds.h: Likewise. + * libpager/pool.c: Likewise. + * libpager/pool.h: Likewise. + * libpager/queue.h: Likewise. + * libpager/demuxer.c: Manage a queue of requests received from the + port bucket. + (pager_demuxer): Just decode the server function and enqueue the + request. + (worker_func): New function that consumes and executes the requests + from the queue. + (service_paging_requests): New function. + (pager_start_workers): Likewise. + * libpager/data-request.c: Remove the seqno barriers. Move code from + the 'C' phase into a deferred function. + * libpager/data-return.c: Likewise. + * libpager/data-unlock.c: Likewise. + * libpager/chg-compl.c: Remove the seqno barriers. + * libpager/lock-completed.c: Likewise. + * libpager/no-senders.c: Likewise. + * libpager/notify-stubs.c: Likewise. + * libpager/object-init.c: Likewise. + * libpager/object-terminate.c: Likewise. + * libpager/seqnos.c: Remove file. + * libpager/stubs.c: Likewise. + * libpager/pager.h (pager_demuxer): Drop declaration. + (pager_start_workers): New declaration. + * libpager/priv.h: Remove the _pager_seqno declarations. + * libpager/Makefile (SRCS): Drop seqnos.c, add deferreds.c, pool.c. + * console/pager.c (user_pager_init): Call pager_start_workers. + * libdiskfs/disk-pager.c: Likewise. + * storeio/pager.c: Likewise. + +diff --git a/console/pager.c b/console/pager.c +index 87c36f0..a08e46d 100644 +--- a/console/pager.c ++++ b/console/pager.c +@@ -119,22 +119,6 @@ void + pager_dropweak (struct user_pager_info *upi) + { + } +- +- +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); +- return NULL; +-} +- + + /* Initialize the pager for the display component. */ + void +@@ -148,15 +132,10 @@ user_pager_init (void) + if (! pager_bucket) + error (5, errno, "Cannot create pager bucket"); + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); +- if (!err) +- pthread_detach (thread); +- else +- { +- errno = err; +- perror ("pthread_create"); +- } ++ /* Start libpagers worker thread. */ ++ err = pager_start_workers (pager_bucket); ++ if (err) ++ error (5, err, "Cannot start pager worker threads"); + } + + +diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c +index 9a0d9d8..eb309b1 100644 +--- a/libdiskfs/disk-pager.c ++++ b/libdiskfs/disk-pager.c +@@ -33,39 +33,19 @@ static struct hurd_signal_preemptor preemptor = + handler: (sighandler_t) &fault_handler, + }; + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); +- return NULL; +-} +- + void + diskfs_start_disk_pager (struct user_pager_info *upi, + struct port_bucket *pager_bucket, + int may_cache, int notify_on_evict, + size_t size, void **image) + { +- pthread_t thread; + error_t err; + mach_port_t disk_pager_port; + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); +- if (!err) +- pthread_detach (thread); +- else +- { +- errno = err; +- perror ("pthread_create"); +- } ++ /* Start libpagers worker thread. */ ++ err = pager_start_workers (pager_bucket); ++ if (err) ++ error (2, err, "creating pager worker threads failed"); + + /* Create the pager. */ + diskfs_disk_pager = pager_create (upi, pager_bucket, +diff --git a/libpager/Makefile b/libpager/Makefile +index b622295..6e7f43f 100644 +--- a/libpager/Makefile ++++ b/libpager/Makefile +@@ -22,9 +22,9 @@ SRCS = data-request.c data-return.c data-unlock.c pager-port.c \ + inhibit-term.c lock-completed.c lock-object.c mark-error.c \ + no-senders.c object-init.c object-terminate.c pagemap.c \ + pager-create.c pager-flush.c pager-shutdown.c pager-sync.c \ +- stubs.c seqnos.c demuxer.c chg-compl.c pager-attr.c clean.c \ ++ stubs.c demuxer.c chg-compl.c pager-attr.c clean.c \ + dropweak.c notify-stubs.c get-upi.c pager-memcpy.c pager-return.c \ +- offer-page.c ++ offer-page.c deferreds.c pool.c + installhdrs = pager.h + + HURDLIBS= ports +diff --git a/libpager/chg-compl.c b/libpager/chg-compl.c +index d77c46c..89ccfc8 100644 +--- a/libpager/chg-compl.c ++++ b/libpager/chg-compl.c +@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, + } + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seq); + + for (ar = p->attribute_requests; ar; ar = ar->next) + if (ar->may_cache == maycache && ar->copy_strategy == strat) +@@ -46,8 +45,7 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, + pthread_cond_broadcast (&p->wakeup); + break; + } +- +- _pager_release_seqno (p, seq); ++ + pthread_mutex_unlock (&p->interlock); + return 0; + } +diff --git a/libpager/data-request.c b/libpager/data-request.c +index 82ce904..0bceac4 100644 +--- a/libpager/data-request.c ++++ b/libpager/data-request.c +@@ -15,11 +15,21 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include "deferreds.h" + #include "priv.h" + #include "memory_object_S.h" + #include <stdio.h> + #include <string.h> + ++static void memory_object_data_request_deferred (void *arg); ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ vm_size_t length; ++}; ++ + /* Implement pagein callback as described in <mach/memory_object.defs>. */ + kern_return_t + _pager_seqnos_memory_object_data_request (struct pager *p, +@@ -31,9 +41,8 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + { + short *pm_entry; + int doread, doerror; ++ struct deferred_args *deferred; + error_t err; +- vm_address_t page; +- int write_lock; + + if (!p + || p->port.class != _pager_class) +@@ -41,7 +50,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -105,7 +113,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + if (!doread) +@@ -113,18 +120,16 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + if (doerror) + goto error_read; + +- err = pager_read_page (p->upi, offset, &page, &write_lock); +- if (err) +- goto error_read; ++ deferred = _pager_allocate_deferred (memory_object_data_request_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ return ENOMEM; + +- memory_object_data_supply (p->memobjcntl, offset, page, length, 1, +- write_lock ? VM_PROT_WRITE : VM_PROT_NONE, +- p->notify_on_evict ? 1 : 0, +- MACH_PORT_NULL); +- pthread_mutex_lock (&p->interlock); +- _pager_mark_object_error (p, offset, length, 0); +- _pager_allow_termination (p); +- pthread_mutex_unlock (&p->interlock); ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->length = length; ++ _pager_defer (deferred); + return 0; + + error_read: +@@ -139,7 +144,41 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + allow_release_out: + _pager_allow_termination (p); + release_out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + return 0; + } ++ ++static void ++memory_object_data_request_deferred (void *arg) ++{ ++ struct deferred_args *args = arg; ++ vm_address_t page; ++ int write_lock; ++ error_t err; ++ ++ err = pager_read_page (args->p->upi, args->offset, &page, &write_lock); ++ if (err) ++ { ++ memory_object_data_error (args->p->memobjcntl, args->offset, ++ args->length, EIO); ++ _pager_mark_object_error (args->p, args->offset, args->length, EIO); ++ ++ pthread_mutex_lock (&args->p->interlock); ++ _pager_allow_termination (args->p); ++ pthread_mutex_unlock (&args->p->interlock); ++ goto out; ++ } ++ ++ memory_object_data_supply (args->p->memobjcntl, args->offset, page, ++ args->length, 1, ++ write_lock ? VM_PROT_WRITE : VM_PROT_NONE, ++ args->p->notify_on_evict ? 1 : 0, ++ MACH_PORT_NULL); ++ pthread_mutex_lock (&args->p->interlock); ++ _pager_mark_object_error (args->p, args->offset, args->length, 0); ++ _pager_allow_termination (args->p); ++ pthread_mutex_unlock (&args->p->interlock); ++ ++ out: ++ ports_port_deref (args->p); ++} +diff --git a/libpager/data-return.c b/libpager/data-return.c +index ee6c6e8..70c2cc8 100644 +--- a/libpager/data-return.c ++++ b/libpager/data-return.c +@@ -15,12 +15,36 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include "deferreds.h" + #include "priv.h" + #include "memory_object_S.h" + #include <stdio.h> + #include <string.h> + #include <assert.h> + ++static void memory_object_data_return_deferred (void *arg); ++ ++struct lock_list ++{ ++ struct lock_request *lr; ++ struct lock_list *next; ++}; ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ pointer_t data; ++ vm_size_t length; ++ int kcopy; ++ int npages; ++ char *notified; ++ error_t *pagerrs; ++ struct lock_list *lock_list; ++ int omitdata; ++}; ++ ++ + /* Worker function used by _pager_seqnos_memory_object_data_return + and _pager_seqnos_memory_object_data_initialize. All args are + as for _pager_seqnos_memory_object_data_return; the additional +@@ -38,13 +62,13 @@ _pager_do_write_request (struct pager *p, + { + short *pm_entries; + int npages, i; +- char *notified; +- error_t *pagerrs; ++ char *notified = NULL; ++ error_t *pagerrs = NULL; + struct lock_request *lr; +- struct lock_list {struct lock_request *lr; +- struct lock_list *next;} *lock_list, *ll; +- int wakeup; ++ struct lock_list *lock_list = NULL, *ll; + int omitdata = 0; ++ struct deferred_args *deferred; ++ error_t err; + + if (!p + || p->port.class != _pager_class) +@@ -52,7 +76,6 @@ _pager_do_write_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -78,9 +101,9 @@ _pager_do_write_request (struct pager *p, + } + + npages = length / __vm_page_size; +- pagerrs = alloca (npages * sizeof (error_t)); ++ pagerrs = malloc (npages * sizeof (error_t)); + +- notified = alloca (npages * (sizeof *notified)); ++ notified = malloc (npages * (sizeof *notified)); + #ifndef NDEBUG + memset (notified, -1, npages * (sizeof *notified)); + #endif +@@ -101,7 +124,6 @@ _pager_do_write_request (struct pager *p, + notified[i] = (p->notify_on_evict + && ! (pm_entries[i] & PM_PAGEINWAIT)); + +- _pager_release_seqno (p, seqno); + goto notify; + } + else { +@@ -150,7 +172,7 @@ _pager_do_write_request (struct pager *p, + for (lr = p->lock_requests; lr; lr = lr->next) + if (offset < lr->end && offset + length >= lr->start) + { +- ll = alloca (sizeof (struct lock_list)); ++ ll = malloc (sizeof (struct lock_list)); + ll->lr = lr; + ll->next = lock_list; + lock_list = ll; +@@ -158,36 +180,106 @@ _pager_do_write_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + ++ deferred = _pager_allocate_deferred (memory_object_data_return_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ { ++ err = ENOMEM; ++ goto release_mem; ++ } ++ ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->data = data; ++ deferred->length = length; ++ deferred->kcopy = kcopy; ++ deferred->npages = npages; ++ deferred->notified = notified; ++ deferred->pagerrs = pagerrs; ++ deferred->lock_list = lock_list; ++ deferred->omitdata = omitdata; ++ _pager_defer (deferred); ++ return 0; ++ ++ notify: ++ _pager_allow_termination (p); ++ pthread_mutex_unlock (&p->interlock); ++ ++ for (i = 0; i < npages; i++) ++ { ++ assert (notified[i] == 0 || notified[i] == 1); ++ if (notified[i]) ++ { ++ short *pm_entry = &pm_entries[i]; ++ ++ /* Do notify user. */ ++ pager_notify_evict (p->upi, offset + (i * vm_page_size)); ++ ++ /* Clear any error that is left. Notification on eviction ++ is used only to change association of page, so any ++ error may no longer be valid. */ ++ pthread_mutex_lock (&p->interlock); ++ *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); ++ pthread_mutex_unlock (&p->interlock); ++ } ++ } ++ goto release_mem; ++ ++ release_out: ++ pthread_mutex_unlock (&p->interlock); ++ ++ release_mem: ++ free (notified); ++ free (pagerrs); ++ for (ll = lock_list; ll; /* nop */) ++ { ++ struct lock_list *o = ll; ++ ll = ll->next; ++ free (o); ++ } ++ ++ return err; ++} ++ ++static void ++memory_object_data_return_deferred (void *arg) ++{ ++ struct deferred_args *a = arg; ++ int i; ++ struct lock_list *ll; ++ short *pm_entries; ++ int wakeup; ++ + /* This is inefficient; we should send all the pages to the device at once + but until the pager library interface is changed, this will have to do. */ + +- for (i = 0; i < npages; i++) +- if (!(omitdata & (1 << i))) +- pagerrs[i] = pager_write_page (p->upi, +- offset + (vm_page_size * i), +- data + (vm_page_size * i)); ++ for (i = 0; i < a->npages; i++) ++ if (!(a->omitdata & (1 << i))) ++ a->pagerrs[i] = pager_write_page (a->p->upi, ++ a->offset + (vm_page_size * i), ++ a->data + (vm_page_size * i)); + + /* Acquire the right to meddle with the pagemap */ +- pthread_mutex_lock (&p->interlock); +- _pager_pagemap_resize (p, offset + length); +- pm_entries = &p->pagemap[offset / __vm_page_size]; ++ pthread_mutex_lock (&a->p->interlock); ++ _pager_pagemap_resize (a->p, a->offset + a->length); ++ pm_entries = &a->p->pagemap[a->offset / __vm_page_size]; + + wakeup = 0; +- for (i = 0; i < npages; i++) ++ for (i = 0; i < a->npages; i++) + { +- if (omitdata & (1 << i)) ++ if (a->omitdata & (1 << i)) + { +- notified[i] = 0; ++ a->notified[i] = 0; + continue; + } + + if (pm_entries[i] & PM_WRITEWAIT) + wakeup = 1; + +- if (pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) ++ if (a->pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) + /* The only thing we can do here is mark the page, and give + errors from now on when it is to be read. This is + imperfect, because if all users go away, the pagemap will +@@ -199,61 +291,63 @@ _pager_do_write_request (struct pager *p, + + if (pm_entries[i] & PM_PAGEINWAIT) + { +- memory_object_data_supply (p->memobjcntl, +- offset + (vm_page_size * i), +- data + (vm_page_size * i), ++ memory_object_data_supply (a->p->memobjcntl, ++ a->offset + (vm_page_size * i), ++ a->data + (vm_page_size * i), + vm_page_size, 1, + VM_PROT_NONE, 0, MACH_PORT_NULL); +- notified[i] = 0; ++ a->notified[i] = 0; + } + else + { +- munmap ((void *) (data + (vm_page_size * i)), ++ munmap ((void *) (a->data + (vm_page_size * i)), + vm_page_size); +- notified[i] = (! kcopy && p->notify_on_evict); +- if (! kcopy) ++ a->notified[i] = (! a->kcopy && a->p->notify_on_evict); ++ if (! a->kcopy) + pm_entries[i] &= ~PM_INCORE; + } + + pm_entries[i] &= ~(PM_PAGINGOUT | PM_PAGEINWAIT | PM_WRITEWAIT); + } + +- for (ll = lock_list; ll; ll = ll->next) ++ for (ll = a->lock_list; ll; ll = ll->next) + if (!--ll->lr->pending_writes && !ll->lr->locks_pending) + wakeup = 1; + + if (wakeup) +- pthread_cond_broadcast (&p->wakeup); ++ pthread_cond_broadcast (&a->p->wakeup); + +- notify: +- _pager_allow_termination (p); +- pthread_mutex_unlock (&p->interlock); ++ _pager_allow_termination (a->p); ++ pthread_mutex_unlock (&a->p->interlock); + +- for (i = 0; i < npages; i++) ++ for (i = 0; i < a->npages; i++) + { +- assert (notified[i] == 0 || notified[i] == 1); +- if (notified[i]) ++ assert (a->notified[i] == 0 || a->notified[i] == 1); ++ if (a->notified[i]) + { + short *pm_entry = &pm_entries[i]; + + /* Do notify user. */ +- pager_notify_evict (p->upi, offset + (i * vm_page_size)); ++ pager_notify_evict (a->p->upi, a->offset + (i * vm_page_size)); + + /* Clear any error that is left. Notification on eviction + is used only to change association of page, so any + error may no longer be valid. */ +- pthread_mutex_lock (&p->interlock); ++ pthread_mutex_lock (&a->p->interlock); + *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); +- pthread_mutex_unlock (&p->interlock); ++ pthread_mutex_unlock (&a->p->interlock); + } + } + +- return 0; +- +- release_out: +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- return 0; ++ free (a->notified); ++ free (a->pagerrs); ++ for (ll = a->lock_list; ll; /* nop */) ++ { ++ struct lock_list *o = ll; ++ ll = ll->next; ++ free (o); ++ } ++ ports_port_deref (a->p); + } + + /* Implement pageout call back as described by <mach/memory_object.defs>. */ +diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c +index 599237c..a969ca6 100644 +--- a/libpager/data-unlock.c ++++ b/libpager/data-unlock.c +@@ -15,9 +15,21 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include "deferreds.h" + #include "priv.h" + #include "memory_object_S.h" + #include <stdio.h> ++#include <stdlib.h> ++#include <error.h> ++ ++static void memory_object_data_unlock_deferred (void *arg); ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ vm_size_t length; ++}; + + /* Implement kernel requests for access as described in + <mach/memory_object.defs>. */ +@@ -29,17 +41,13 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, + vm_size_t length, + vm_prot_t access) + { +- volatile int err; ++ error_t err = 0; ++ struct deferred_args *deferred; + + if (!p + || p->port.class != _pager_class) + return EOPNOTSUPP; + +- pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- + if (p->pager_state != NORMAL) + { + printf ("pager in wrong state for unlock\n"); +@@ -68,20 +76,45 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, + goto out; + } + +- err = pager_unlock_page (p->upi, offset); ++ deferred = _pager_allocate_deferred (memory_object_data_unlock_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ { ++ err = ENOMEM; ++ goto out; ++ } ++ ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->length = length; ++ _pager_defer (deferred); + ++ out: ++ return err; ++} ++ ++static void ++memory_object_data_unlock_deferred (void *arg) ++{ ++ struct deferred_args *args = arg; ++ ++ error_t err = pager_unlock_page (args->p->upi, args->offset); + if (!err) + /* We can go ahead and release the lock. */ +- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 0, +- VM_PROT_NONE, 0); ++ _pager_lock_object (args->p, args->offset, args->length, ++ MEMORY_OBJECT_RETURN_NONE, 0, ++ VM_PROT_NONE, 0); + else + { + /* Flush the page, and set a bit so that m_o_data_request knows +- to issue an error. */ +- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 1, +- VM_PROT_WRITE, 1); +- _pager_mark_next_request_error (p, offset, length, err); ++ to issue an error. */ ++ _pager_lock_object (args->p, args->offset, args->length, ++ MEMORY_OBJECT_RETURN_NONE, 1, ++ VM_PROT_WRITE, 1); ++ _pager_mark_next_request_error (args->p, args->offset, args->length, ++ err); + } +- out: +- return 0; ++ ++ ports_port_deref (args->p); + } +diff --git a/libpager/deferreds.c b/libpager/deferreds.c +new file mode 100644 +index 0000000..25f2420 +--- /dev/null ++++ b/libpager/deferreds.c +@@ -0,0 +1,246 @@ ++/* Deferred function queue. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++#include <error.h> ++#include <maptime.h> ++#include <pthread.h> ++#include <stddef.h> ++#include <stdlib.h> ++#include <sys/time.h> ++ ++#include "deferreds.h" ++#include "pool.h" ++#include "queue.h" ++ ++/* ++ Worker pool for the deferred functions. ++ ++ Server functions may offload long-running functions to a pool of ++ workers. In theory, a single thread would suffice to service this ++ pool. However, ext2fs complicates this quite a bit. ++ ++ ext2fs uses libpager to implement a disk cache and a file cache. If ++ ext2fs services a memory_object request to an object in the file ++ cache, it can trigger an additional request by dereferencing a ++ pointer to memory backed by the disk cache. ++ ++ This is most likely the reason for thread-storms (or rather, the ++ reason why thread storms are that bad). It needs to be addressed in ++ ext2fs. Until it is, we can reasonably well with a simple thread ++ pool that creates threads on demand but limits the rate of the ++ thread creation. ++ ++ Rate-limiting the thread creation is crucial as memory_object ++ requests often arrive in large batches. The rate-limiting is ++ implemented using the token bucket algorithm. ++ ++ Creating more than one worker thread potentially decreases ++ performance. The operations offloaded to this pool are typically IO ++ requests. Using only one thread to perform such operations will ++ likely result in a more linear access pattern. ++ ++ XXX perform some tests on real hardware ++ ++ The following parameters are chosen based on these observations: ++*/ ++ ++#define DEFERREDS_MIN_IDLE 1 /* Create threads on demand so ++ that there is always an ++ idle thread. */ ++#define DEFERREDS_MAX_IDLE 1 /* Keep at most one idle ++ thread around. */ ++#define DEFERREDS_TIMEOUT 1 /* Superfluous idle threads ++ exit after one second. */ ++/* XXX The next two values are somewhat magic but performed well. */ ++#define DEFERREDS_BUCKET_SIZE 3. /* Limit creation of threads ++ to three threads... */ ++#define DEFERREDS_BUCKET_PER 2. /* ... every two seconds. */ ++ ++/* Set to 1 to see messages about thread creation and destruction. */ ++#if 0 ++#define TRACE(ARGS...) error (0, 0, ARGS) ++#else ++#define TRACE(ARGS...) ++#endif ++ ++/* Mapped time as fast time source. */ ++static volatile struct mapped_time_value *mapped_time; ++ ++/* Queue and worker pool for the deferred functions. */ ++static struct ++{ ++ struct queue queue; ++ struct pool pool; ++ struct timeval last_check; ++ float bucket; ++ pthread_cond_t wakeup; ++ pthread_mutex_t lock; ++} deferreds; ++ ++/* A deferred is a deferred function call. */ ++struct deferred ++{ ++ struct item item; ++ deferred_function_t function; ++ char args[0]; ++}; ++ ++void * ++deferreds_worker_func (void *arg __attribute__ ((unused))) ++{ ++ int idle; ++ adjust_priority (pool_thread_online (&deferreds.pool)); ++ ++ while (1) ++ { ++ struct deferred *d; ++ error_t err; ++ struct timespec ts; ++ ++ pthread_mutex_lock (&deferreds.lock); ++ ++ while ((d = queue_dequeue (&deferreds.queue)) == NULL) ++ { ++ clock_gettime (CLOCK_REALTIME, &ts); ++ ts.tv_sec += DEFERREDS_TIMEOUT; ++ ++ pool_thread_idle (&deferreds.pool); ++ err = pthread_cond_timedwait (&deferreds.wakeup, &deferreds.lock, ++ &ts); ++ idle = pool_thread_busy (&deferreds.pool); ++ ++ if (err == ETIMEDOUT && idle >= DEFERREDS_MAX_IDLE) ++ goto out; ++ } ++ ++ pthread_mutex_unlock (&deferreds.lock); ++ ++ (*d->function) (&d->args); ++ free (d); ++ } ++ ++ out: ++ TRACE ("deferred worker thread exiting, %d idle workers left", idle); ++ pthread_mutex_unlock (&deferreds.lock); ++ pool_thread_offline (&deferreds.pool); ++ return NULL; ++} ++ ++/* Allocate storage for a deferred function. A pointer to a buffer of ++ the given size is returned. */ ++void * ++_pager_allocate_deferred (deferred_function_t f, size_t size) ++{ ++ struct deferred *d = malloc (sizeof *d + size); ++ if (d != NULL) ++ d->function = f; ++ return &d->args; ++} ++ ++/* Enqueue the given deferred function. Expects a pointer returned ++ from _pager_allocate_deferred. */ ++void ++_pager_defer (void *cookie) ++{ ++ int idle; ++ struct deferred *d = (struct deferred *) ++ ((char *) cookie - offsetof (struct deferred, args)); ++ ++ pthread_mutex_lock (&deferreds.lock); ++ ++ queue_enqueue (&deferreds.queue, &d->item); ++ ++ idle = pool_idle_threads (&deferreds.pool); ++ if (idle > 0) ++ /* Awake worker. */ ++ pthread_cond_signal (&deferreds.wakeup); ++ else ++ { ++ /* Limit the creation of threads using the token bucket ++ algorithm. */ ++ struct timeval elapsed, last = deferreds.last_check; ++ maptime_read (mapped_time, &deferreds.last_check); ++ timersub (&deferreds.last_check, &last, &elapsed); ++ ++ deferreds.bucket += ++ ((float) elapsed.tv_sec + (float) elapsed.tv_sec / 1000000.) ++ * (DEFERREDS_BUCKET_SIZE / DEFERREDS_BUCKET_PER); ++ ++ if (deferreds.bucket > DEFERREDS_BUCKET_SIZE) ++ deferreds.bucket = DEFERREDS_BUCKET_SIZE; ++ ++ if (deferreds.bucket > 1.) ++ { ++ error_t err; ++ pthread_t t; ++ TRACE ("creating deferred worker thread"); ++ err = pthread_create (&t, &pool_thread_attr, ++ &deferreds_worker_func, NULL); ++ if (err) ++ /* This is tough. Let's hope for the best. */ ++ error (0, err, "pthread_create"); ++ else ++ { ++ pthread_detach (t); ++ deferreds.bucket -= 1.; ++ } ++ } ++ } ++ ++ pthread_mutex_unlock (&deferreds.lock); ++} ++ ++/* Initializes the queue and starts the deferred workers. */ ++error_t ++_pager_deferreds_init (void) ++{ ++ error_t err = 0; ++ int i; ++ pthread_t t; ++ ++ /* Try the unprivileged method using /dev/time first. */ ++ err = maptime_map (0, NULL, &mapped_time); ++ if (err) ++ { ++ /* Fall back to using the time mach device. */ ++ err = maptime_map (1, NULL, &mapped_time); ++ if (err) ++ return err; ++ } ++ ++ queue_init (&deferreds.queue); ++ pthread_cond_init (&deferreds.wakeup, NULL); ++ pthread_mutex_init (&deferreds.lock, NULL); ++ ++ for (i = 0; i < DEFERREDS_MIN_IDLE; i++) ++ { ++ err = pthread_create (&t, &pool_thread_attr, ++ &deferreds_worker_func, NULL); ++ if (err) ++ return err; ++ ++ pthread_detach (t); ++ } ++ deferreds.bucket = DEFERREDS_BUCKET_SIZE; ++ maptime_read (mapped_time, &deferreds.last_check); ++ ++ return err; ++} +diff --git a/libpager/deferreds.h b/libpager/deferreds.h +new file mode 100644 +index 0000000..5cc9aed +--- /dev/null ++++ b/libpager/deferreds.h +@@ -0,0 +1,42 @@ ++/* Deferred function queue. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++#ifndef _LIBPAGER_DEFERREDS_H ++#define _LIBPAGER_DEFERREDS_H ++ ++#include <errno.h> ++#include <stddef.h> ++ ++/* Type for deferred functions. */ ++typedef void (*deferred_function_t) (void *); ++ ++/* Initializes the queue and starts the deferred workers. */ ++error_t _pager_deferreds_init (void); ++ ++/* Allocate storage for a deferred function. A pointer to a buffer of ++ the given size is returned. */ ++void *_pager_allocate_deferred (deferred_function_t, size_t); ++ ++/* Enqueue the given deferred function. Expects a pointer returned ++ from _pager_allocate_deferred. */ ++void _pager_defer (void *); ++ ++#endif /* _LIBPAGER_DEFERREDS_H */ +diff --git a/libpager/demuxer.c b/libpager/demuxer.c +index b4d4054..e3eb06c 100644 +--- a/libpager/demuxer.c ++++ b/libpager/demuxer.c +@@ -1,5 +1,5 @@ + /* Demuxer for pager library +- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation ++ Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as +@@ -15,26 +15,286 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include <error.h> ++#include <mach/mig_errors.h> ++#include <pthread.h> ++#include <string.h> ++ + #include "priv.h" + #include "memory_object_S.h" + #include "notify_S.h" ++#include "deferreds.h" ++#include "pool.h" ++#include "queue.h" ++ ++/* ++ Worker pool for the server functions. ++ ++ A single thread receives messages from the port bucket and puts them ++ into a queue. A fixed number of consumers actually execute the ++ server functions and send the reply. ++ ++ The requests to an object O have to be processed in the order they ++ were received. To this end, each worker has a local queue and a ++ tag. If a thread processes a request to O, it sets its tag to a ++ unique identifier representing O. If another thread now dequeues a ++ second request to O, it enqueues it to the first workers queue. ++ ++ This is an optimization based on the observation that multiple ++ requests to the same objects often come in quick succession. This ++ also makes sure that the second request is processed at the earliest ++ possible moment, by the same thread that processed the previous ++ request to that object. ++ ++ At least one worker thread is necessary. ++*/ ++#define WORKER_COUNT 3 ++ ++/* An request contains the message received from the port set. */ ++struct request ++{ ++ struct item item; ++ mig_routine_t routine; ++ mach_msg_header_t *inp; ++ mach_msg_header_t *outp; ++}; ++ ++/* A worker. */ ++struct worker ++{ ++ struct queue queue; /* other workers may delegate requests to us */ ++ unsigned long tag; /* tag of the object we are working on */ ++}; ++ ++/* This is the queue for incoming requests. A single thread receives ++ messages from the port set, looks the service routine up, and ++ enqueues the request here. */ ++static struct ++{ ++ struct queue queue; ++ int asleep; ++ pthread_cond_t wakeup; ++ pthread_mutex_t lock; ++ struct worker workers[WORKER_COUNT]; ++} requests; + + /* Demultiplex a single message directed at a pager port; INP is the + message received; fill OUTP with the reply. */ +-int ++static int + pager_demuxer (mach_msg_header_t *inp, + mach_msg_header_t *outp) + { ++ error_t err = MIG_NO_REPLY; ++ ++#ifdef MACH_RCV_LARGE ++ const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */ ++#else ++ const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */ ++#endif ++ + mig_routine_t routine; +- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) || +- (routine = _pager_seqnos_notify_server_routine (inp))) ++ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) || ++ (routine = _pager_seqnos_notify_server_routine (inp)))) ++ return FALSE; ++ ++#define MASK (8 - 1) ++ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK; ++#undef MASK ++ ++ struct request *r = malloc (sizeof *r + padded_size + max_size); ++ if (r == NULL) + { +- (*routine) (inp, outp); +- return TRUE; ++ err = ENOMEM; ++ goto out; ++ } ++ ++ r->routine = routine; ++ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r); ++ memcpy (r->inp, inp, inp->msgh_size); ++ ++ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size); ++ memcpy (r->outp, outp, sizeof *outp); ++ ++ pthread_mutex_lock (&requests.lock); ++ ++ queue_enqueue (&requests.queue, &r->item); ++ ++ /* Awake worker. */ ++ if (requests.asleep > 0) ++ pthread_cond_signal (&requests.wakeup); ++ ++ pthread_mutex_unlock (&requests.lock); ++ ++ out: ++ ((mig_reply_header_t *) outp)->RetCode = err; ++ return TRUE; ++} ++ ++/* Consumes requests from the queue. */ ++static void * ++worker_func (void *arg) ++{ ++ struct worker *self = (struct worker *) arg; ++ struct request *r = NULL; ++ ++ while (1) ++ { ++ int i; ++ mach_msg_return_t mr; ++ ++ /* Free previous message. */ ++ free (r); ++ ++ pthread_mutex_lock (&requests.lock); ++ ++ /* First, look in our queue for more requests to the object we ++ have been working on lately. Some other thread might have ++ delegated them to us. */ ++ r = queue_dequeue (&self->queue); ++ if (r != NULL) ++ goto got_one; ++ ++ /* Nope. Clear our tag and... */ ++ self->tag = 0; ++ ++ get_request_locked: ++ /* ... get a request from the global queue instead. */ ++ while ((r = queue_dequeue (&requests.queue)) == NULL) ++ { ++ requests.asleep += 1; ++ pthread_cond_wait (&requests.wakeup, &requests.lock); ++ requests.asleep -= 1; ++ } ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ if (requests.workers[i].tag ++ == (unsigned long) r->inp->msgh_local_port) ++ { ++ /* Some other thread is working on that object. Delegate ++ the request to that worker. */ ++ queue_enqueue (&requests.workers[i].queue, &r->item); ++ goto get_request_locked; ++ } ++ ++ /* Claim responsibility for this object by setting our tag. */ ++ self->tag = (unsigned long) r->inp->msgh_local_port; ++ ++ got_one: ++ pthread_mutex_unlock (&requests.lock); ++ ++ /* Call the server routine. */ ++ (*r->routine) (r->inp, r->outp); ++ ++ /* What follows is basically the second part of ++ mach_msg_server_timeout. */ ++ mig_reply_header_t *request = (mig_reply_header_t *) r->inp; ++ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp; ++ ++ switch (reply->RetCode) ++ { ++ case KERN_SUCCESS: ++ /* Hunky dory. */ ++ break; ++ ++ case MIG_NO_REPLY: ++ /* The server function wanted no reply sent. ++ Loop for another request. */ ++ continue; ++ ++ default: ++ /* Some error; destroy the request message to release any ++ port rights or VM it holds. Don't destroy the reply port ++ right, so we can send an error message. */ ++ request->Head.msgh_remote_port = MACH_PORT_NULL; ++ mach_msg_destroy (&request->Head); ++ break; ++ } ++ ++ if (reply->Head.msgh_remote_port == MACH_PORT_NULL) ++ { ++ /* No reply port, so destroy the reply. */ ++ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) ++ mach_msg_destroy (&reply->Head); ++ continue; ++ } ++ ++ /* Send the reply. */ ++ mr = mach_msg (&reply->Head, ++ MACH_SEND_MSG, ++ reply->Head.msgh_size, ++ 0, ++ MACH_PORT_NULL, ++ 0, ++ MACH_PORT_NULL); ++ ++ switch (mr) ++ { ++ case MACH_SEND_INVALID_DEST: ++ /* The reply can't be delivered, so destroy it. This error ++ indicates only that the requester went away, so we ++ continue and get the next request. */ ++ mach_msg_destroy (&reply->Head); ++ break; ++ ++ default: ++ /* Some other form of lossage; there is not much we can ++ do here. */ ++ error (0, mr, "mach_msg"); ++ } ++ } ++ ++ /* Not reached. */ ++ return NULL; ++} ++ ++/* A top-level function for the paging thread that just services paging ++ requests. */ ++static void * ++service_paging_requests (void *arg) ++{ ++ struct port_bucket *pager_bucket = arg; ++ ports_manage_port_operations_one_thread (pager_bucket, ++ pager_demuxer, ++ 0); ++ /* Not reached. */ ++ return NULL; ++} ++ ++/* Start the worker thread libpager uses to service requests. */ ++error_t ++pager_start_workers (struct port_bucket *pager_bucket) ++{ ++ error_t err; ++ int i; ++ pthread_t t; ++ ++ pool_init (); ++ err = _pager_deferreds_init (); ++ if (err) ++ return err; ++ ++ queue_init (&requests.queue); ++ pthread_cond_init (&requests.wakeup, NULL); ++ pthread_mutex_init (&requests.lock, NULL); ++ ++ /* Make a thread to service paging requests. */ ++ err = pthread_create (&t, &pool_thread_attr, ++ service_paging_requests, pager_bucket); ++ if (err) ++ return err; ++ pthread_detach (t); ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ { ++ requests.workers[i].tag = 0; ++ queue_init (&requests.workers[i].queue); ++ ++ err = pthread_create (&t, &pool_thread_attr, ++ &worker_func, &requests.workers[i]); ++ if (err) ++ return err; ++ pthread_detach (t); + } + +- /* Synchronize our bookkeeping of the port's seqno with the one +- consumed by this bogus message. */ +- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno); +- return FALSE; ++ return err; + } +diff --git a/libpager/lock-completed.c b/libpager/lock-completed.c +index a3f3f16..30b1dd3 100644 +--- a/libpager/lock-completed.c ++++ b/libpager/lock-completed.c +@@ -37,7 +37,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + if (control != p->memobjcntl) + { +@@ -59,7 +58,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, + } + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return err; +diff --git a/libpager/no-senders.c b/libpager/no-senders.c +index c21dfc2..d0bbe27 100644 +--- a/libpager/no-senders.c ++++ b/libpager/no-senders.c +@@ -29,7 +29,6 @@ _pager_do_seqnos_mach_notify_no_senders (struct port_info *pi, + pi->class != _pager_class) + return EOPNOTSUPP; + +- _pager_update_seqno_p ((struct pager *) pi, seqno); + ports_no_senders (pi, mscount); + + return 0; +diff --git a/libpager/notify-stubs.c b/libpager/notify-stubs.c +index ba13882..a826420 100644 +--- a/libpager/notify-stubs.c ++++ b/libpager/notify-stubs.c +@@ -28,8 +28,6 @@ _pager_do_seqnos_mach_notify_port_deleted (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -39,8 +37,6 @@ _pager_do_seqnos_mach_notify_msg_accepted (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -50,8 +46,6 @@ _pager_do_seqnos_mach_notify_port_destroyed (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -59,8 +53,6 @@ error_t + _pager_do_seqnos_mach_notify_send_once (struct port_info *pi, + mach_port_seqno_t seqno) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } + +@@ -70,7 +62,5 @@ _pager_do_seqnos_mach_notify_dead_name (struct port_info *pi, + mach_port_t name + __attribute__ ((unused))) + { +- _pager_update_seqno_p ((struct pager *) pi, seqno); +- + return 0; + } +diff --git a/libpager/object-init.c b/libpager/object-init.c +index 6683e24..eb62c44 100644 +--- a/libpager/object-init.c ++++ b/libpager/object-init.c +@@ -33,7 +33,6 @@ _pager_seqnos_memory_object_init (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + if (pagesize != __vm_page_size) + { +@@ -69,7 +68,6 @@ _pager_seqnos_memory_object_init (struct pager *p, + p->pager_state = NORMAL; + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return 0; +diff --git a/libpager/object-terminate.c b/libpager/object-terminate.c +index 365ba27..3f0482f 100644 +--- a/libpager/object-terminate.c ++++ b/libpager/object-terminate.c +@@ -32,8 +32,7 @@ _pager_seqnos_memory_object_terminate (struct pager *p, + return EOPNOTSUPP; + + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- ++ + if (control != p->memobjcntl) + { + printf ("incg terminate: wrong control port"); +@@ -75,7 +74,6 @@ _pager_seqnos_memory_object_terminate (struct pager *p, + #endif + + out: +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + return 0; +diff --git a/libpager/pager.h b/libpager/pager.h +index d0572af..d209290 100644 +--- a/libpager/pager.h ++++ b/libpager/pager.h +@@ -25,11 +25,8 @@ + scope. */ + struct user_pager_info; + +-/* This de-muxer function is for use within libports_demuxer. */ +-/* INP is a message we've received; OUTP will be filled in with +- a reply message. */ +-int pager_demuxer (mach_msg_header_t *inp, +- mach_msg_header_t *outp); ++/* Start the worker thread libpager uses to service requests. */ ++error_t pager_start_workers (struct port_bucket *pager_bucket); + + /* Create a new pager. The pager will have a port created for it + (using libports, in BUCKET) and will be immediately ready +diff --git a/libpager/pool.c b/libpager/pool.c +new file mode 100644 +index 0000000..c64b46e +--- /dev/null ++++ b/libpager/pool.c +@@ -0,0 +1,103 @@ ++/* A thread pool. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++#include <error.h> ++#include <pthread.h> ++ ++/* XXX Copied from libports. */ ++#include <mach.h> ++#include <hurd.h> ++#include <mach/thread_info.h> ++#include <mach/thread_switch.h> ++ ++#define STACK_SIZE (64 * 1024) ++ ++#define THREAD_PRI 2 ++ ++/* XXX To reduce starvation, the priority of new threads is initially ++ depressed. This helps already existing threads complete their job and be ++ recycled to handle new messages. The duration of this depression is made ++ a function of the total number of threads because more threads imply ++ more contention, and the priority of threads blocking on a contended spin ++ lock is also implicitely depressed. ++ ++ Then, if permitted, a greater priority is requested to further decrease ++ the need for additional threads. */ ++void ++adjust_priority (unsigned int totalthreads) ++{ ++ mach_port_t host_priv, self, pset, pset_priv; ++ unsigned int t; ++ error_t err; ++ ++ t = 10 + (((totalthreads - 1) / 100) + 1) * 10; ++ thread_switch (MACH_PORT_NULL, SWITCH_OPTION_DEPRESS, t); ++ ++ err = get_privileged_ports (&host_priv, NULL); ++ if (err) ++ goto error_host_priv; ++ ++ self = mach_thread_self (); ++ err = thread_get_assignment (self, &pset); ++ if (err) ++ goto error_pset; ++ ++ err = host_processor_set_priv (host_priv, pset, &pset_priv); ++ if (err) ++ goto error_pset_priv; ++ ++ err = thread_max_priority (self, pset_priv, 0); ++ if (err) ++ goto error_max_priority; ++ ++ err = thread_priority (self, THREAD_PRI, 0); ++ if (err) ++ goto error_priority; ++ ++ mach_port_deallocate (mach_task_self (), pset_priv); ++ mach_port_deallocate (mach_task_self (), pset); ++ mach_port_deallocate (mach_task_self (), self); ++ mach_port_deallocate (mach_task_self (), host_priv); ++ return; ++ ++error_priority: ++error_max_priority: ++ mach_port_deallocate (mach_task_self (), pset_priv); ++error_pset_priv: ++ mach_port_deallocate (mach_task_self (), pset); ++error_pset: ++ mach_port_deallocate (mach_task_self (), self); ++ mach_port_deallocate (mach_task_self (), host_priv); ++error_host_priv: ++ if (err != EPERM) ++ error (0, err, "unable to adjust libports thread priority"); ++} ++ ++/* Attributes for newly created threads. */ ++pthread_attr_t pool_thread_attr; ++ ++/* Initializes pool_thread_attr. */ ++void ++pool_init (void) ++{ ++ pthread_attr_init (&pool_thread_attr); ++ pthread_attr_setstacksize (&pool_thread_attr, STACK_SIZE); ++} +diff --git a/libpager/pool.h b/libpager/pool.h +new file mode 100644 +index 0000000..6fa538d +--- /dev/null ++++ b/libpager/pool.h +@@ -0,0 +1,79 @@ ++/* A thread pool. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++#ifndef _LIBPAGER_POOL_H ++#define _LIBPAGER_POOL_H ++ ++/* A thread pool. */ ++struct pool ++{ ++ int alive; ++ int idle; ++}; ++ ++static inline int ++pool_thread_online (struct pool *p) ++{ ++ return __atomic_add_fetch (&p->alive, 1, __ATOMIC_RELAXED); ++} ++ ++static inline int ++pool_thread_offline (struct pool *p) ++{ ++ return __atomic_sub_fetch (&p->alive, 1, __ATOMIC_RELAXED); ++} ++ ++static inline int ++pool_idle_threads (struct pool *p) ++{ ++ return __atomic_load_n (&p->idle, __ATOMIC_RELAXED); ++} ++ ++static inline int ++pool_thread_idle (struct pool *p) ++{ ++ return __atomic_add_fetch (&p->idle, 1, __ATOMIC_RELAXED); ++} ++ ++static inline int ++pool_thread_busy (struct pool *p) ++{ ++ return __atomic_sub_fetch (&p->idle, 1, __ATOMIC_RELAXED); ++} ++ ++/* XXX To reduce starvation, the priority of new threads is initially ++ depressed. This helps already existing threads complete their job and be ++ recycled to handle new messages. The duration of this depression is made ++ a function of the total number of threads because more threads imply ++ more contention, and the priority of threads blocking on a contended spin ++ lock is also implicitely depressed. ++ ++ Then, if permitted, a greater priority is requested to further decrease ++ the need for additional threads. */ ++void adjust_priority (unsigned int totalthreads); ++ ++/* Attributes for newly created threads. */ ++extern pthread_attr_t pool_thread_attr; ++ ++/* Initializes pool_thread_attr. */ ++void pool_init (void); ++ ++#endif /* _LIBPAGER_POOL_H */ +diff --git a/libpager/priv.h b/libpager/priv.h +index d49cbb9..b1d56b5 100644 +--- a/libpager/priv.h ++++ b/libpager/priv.h +@@ -136,10 +136,6 @@ extern int _pager_page_errors[]; + struct port_class *_pager_class; + + +-void _pager_wait_for_seqno (struct pager *, mach_port_seqno_t); +-void _pager_release_seqno (struct pager *, mach_port_seqno_t); +-void _pager_update_seqno (mach_port_t, mach_port_seqno_t); +-void _pager_update_seqno_p (struct pager *, mach_port_seqno_t); + void _pager_block_termination (struct pager *); + void _pager_allow_termination (struct pager *); + error_t _pager_pagemap_resize (struct pager *, vm_address_t); +diff --git a/libpager/queue.h b/libpager/queue.h +new file mode 100644 +index 0000000..d3cf738 +--- /dev/null ++++ b/libpager/queue.h +@@ -0,0 +1,61 @@ ++/* A FIFO queue with constant-time enqueue and dequeue operations. ++ ++ Copyright (C) 2014 Free Software Foundation, Inc. ++ ++ Written by Justus Winter <4winter@informatik.uni-hamburg.de> ++ ++ This file is part of the GNU Hurd. ++ ++ The GNU Hurd is free software; you can redistribute it and/or ++ modify it under the terms of the GNU General Public License as ++ published by the Free Software Foundation; either version 2, or (at ++ your option) any later version. ++ ++ The GNU Hurd is distributed in the hope that it will be useful, but ++ WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ General Public License for more details. ++ ++ You should have received a copy of the GNU General Public License ++ along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ ++ ++/* A FIFO queue with constant-time enqueue and dequeue operations. */ ++struct item { ++ struct item *next; ++}; ++ ++struct queue { ++ struct item *head; ++ struct item **tail; ++}; ++ ++static inline void ++queue_init (struct queue *q) ++{ ++ q->head = NULL; ++ q->tail = &q->head; ++} ++ ++static inline void ++queue_enqueue (struct queue *q, struct item *r) ++{ ++ *q->tail = r; ++ q->tail = &r->next; ++ r->next = NULL; ++} ++ ++static inline void * ++queue_dequeue (struct queue *q) ++{ ++ struct item *r = q->head; ++ if (r == NULL) ++ return NULL; ++ ++ /* Pop the first item off. */ ++ if ((q->head = q->head->next) == NULL) ++ /* The queue is empty, fix tail pointer. */ ++ q->tail = &q->head; ++ ++ r->next = NULL; ++ return r; ++} +diff --git a/libpager/seqnos.c b/libpager/seqnos.c +deleted file mode 100644 +index cab2f33..0000000 +--- a/libpager/seqnos.c ++++ /dev/null +@@ -1,79 +0,0 @@ +-/* Sequence number synchronization routines for pager library +- Copyright (C) 1994, 2011 Free Software Foundation +- +- This program is free software; you can redistribute it and/or +- modify it under the terms of the GNU General Public License as +- published by the Free Software Foundation; either version 2, or (at +- your option) any later version. +- +- This program is distributed in the hope that it will be useful, but +- WITHOUT ANY WARRANTY; without even the implied warranty of +- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +- General Public License for more details. +- +- You should have received a copy of the GNU General Public License +- along with this program; if not, write to the Free Software +- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +- +-#include "priv.h" +-#include <assert.h> +- +-/* The message with seqno SEQNO has just been dequeued for pager P; +- wait until all preceding messages have had a chance and then +- return. */ +-void +-_pager_wait_for_seqno (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- while (seqno != p->seqno + 1) +- { +- p->waitingforseqno = 1; +- pthread_cond_wait (&p->wakeup, &p->interlock); +- } +-} +- +- +-/* Allow the next message for pager P (potentially blocked in +- _pager_wait_for_seqno) to be handled. */ +-void +-_pager_release_seqno (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- assert (seqno == p->seqno + 1); +- p->seqno = seqno; +- if (p->waitingforseqno) +- { +- p->waitingforseqno = 0; +- pthread_cond_broadcast (&p->wakeup); +- } +-} +- +- +-/* Just update the seqno. */ +-void +-_pager_update_seqno (mach_port_t object, +- mach_port_seqno_t seqno) +-{ +- struct pager *p; +- +- p = ports_lookup_port (0, object, _pager_class); +- _pager_update_seqno_p (p, seqno); +- if (p) +- ports_port_deref (p); +-} +- +- +-/* Just update the seqno, pointer version. */ +-void +-_pager_update_seqno_p (struct pager *p, +- mach_port_seqno_t seqno) +-{ +- if (p +- && p->port.class == _pager_class) +- { +- pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- } +-} +diff --git a/libpager/stubs.c b/libpager/stubs.c +index 411f483..c7f1a5a 100644 +--- a/libpager/stubs.c ++++ b/libpager/stubs.c +@@ -29,9 +29,6 @@ _pager_seqnos_memory_object_copy (struct pager *p, + mach_port_t new) + { + printf ("m_o_copy called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } + +@@ -44,9 +41,6 @@ _pager_seqnos_memory_object_data_write (struct pager *p, + vm_size_t data_cnt) + { + printf ("m_o_data_write called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } + +@@ -60,8 +54,5 @@ _pager_seqnos_memory_object_supply_completed (struct pager *p, + vm_offset_t err_off) + { + printf ("m_o_supply_completed called\n"); +- +- _pager_update_seqno_p (p, seq); +- + return EOPNOTSUPP; + } +diff --git a/storeio/pager.c b/storeio/pager.c +index 7d78711..e123ad1 100644 +--- a/storeio/pager.c ++++ b/storeio/pager.c +@@ -24,6 +24,7 @@ + #include <strings.h> + #include <unistd.h> + #include <errno.h> ++#include <error.h> + #include <sys/mman.h> + #include <stdio.h> + +@@ -142,21 +143,6 @@ pager_clear_user_data (struct user_pager_info *upi) + + static struct port_bucket *pager_port_bucket = 0; + +-/* A top-level function for the paging thread that just services paging +- requests. */ +-static void * +-service_paging_requests (void *arg) +-{ +- (void) arg; +- +- for (;;) +- ports_manage_port_operations_multithread (pager_port_bucket, +- pager_demuxer, +- 1000 * 30, 1000 * 60 * 5, 0); +- +- return NULL; +-} +- + /* Initialize paging for this device. */ + static void + init_dev_paging () +@@ -173,14 +159,12 @@ init_dev_paging () + + pager_port_bucket = ports_create_bucket (); + +- /* Make a thread to service paging requests. */ +- err = pthread_create (&thread, NULL, service_paging_requests, NULL); +- if (!err) +- pthread_detach (thread); +- else ++ /* Start libpagers worker thread. */ ++ err = pager_start_workers (pager_port_bucket); ++ if (err) + { + errno = err; +- perror ("pthread_create"); ++ error (0, err, "pager_start_workers"); + } + } + pthread_mutex_unlock (&pager_global_lock); diff --git a/debian/patches/libpager-threadpool.patch b/debian/patches/libpager-threadpool.patch deleted file mode 100644 index 1deacbe9..00000000 --- a/debian/patches/libpager-threadpool.patch +++ /dev/null @@ -1,1274 +0,0 @@ -diff --git a/console/pager.c b/console/pager.c -index 87c36f0..a96f604 100644 ---- a/console/pager.c -+++ b/console/pager.c -@@ -127,11 +127,10 @@ static void * - service_paging_requests (void *arg) - { - struct port_bucket *pager_bucket = arg; -- for (;;) -- ports_manage_port_operations_multithread (pager_bucket, -- pager_demuxer, -- 1000 * 60 * 2, -- 1000 * 60 * 10, 0); -+ ports_manage_port_operations_one_thread (pager_bucket, -+ pager_demuxer, -+ 0); -+ /* Not reached. */ - return NULL; - } - -@@ -148,6 +147,11 @@ user_pager_init (void) - if (! pager_bucket) - error (5, errno, "Cannot create pager bucket"); - -+ /* Start libpagers worker thread. */ -+ err = pager_start_worker (); -+ if (err) -+ error (0, err, "pager_start_worker"); -+ - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); - if (!err) -diff --git a/ext2fs/getblk.c b/ext2fs/getblk.c -index bde66e1..2b44ebb 100644 ---- a/ext2fs/getblk.c -+++ b/ext2fs/getblk.c -@@ -244,7 +244,9 @@ ext2_getblk (struct node *node, block_t block, int create, block_t *disk_block) - { - error_t err; - block_t indir, b; -- unsigned long addr_per_block = EXT2_ADDR_PER_BLOCK (sblock); -+ static unsigned long addr_per_block = 0; -+ if (addr_per_block == 0) -+ addr_per_block = EXT2_ADDR_PER_BLOCK (sblock); - - if (block > EXT2_NDIR_BLOCKS + addr_per_block + - addr_per_block * addr_per_block + -diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c -index 9a0d9d8..d413f02 100644 ---- a/libdiskfs/disk-pager.c -+++ b/libdiskfs/disk-pager.c -@@ -39,11 +39,10 @@ static void * - service_paging_requests (void *arg) - { - struct port_bucket *pager_bucket = arg; -- for (;;) -- ports_manage_port_operations_multithread (pager_bucket, -- pager_demuxer, -- 1000 * 60 * 2, -- 1000 * 60 * 10, 0); -+ ports_manage_port_operations_one_thread (pager_bucket, -+ pager_demuxer, -+ 0); -+ /* Not reached. */ - return NULL; - } - -@@ -57,6 +56,11 @@ diskfs_start_disk_pager (struct user_pager_info *upi, - error_t err; - mach_port_t disk_pager_port; - -+ /* Start libpagers worker thread. */ -+ err = pager_start_worker (); -+ if (err) -+ error (0, err, "pager_start_worker"); -+ - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); - if (!err) -diff --git a/libpager/data-request.c b/libpager/data-request.c -index 82ce904..90a9b09 100644 ---- a/libpager/data-request.c -+++ b/libpager/data-request.c -@@ -20,6 +20,15 @@ - #include <stdio.h> - #include <string.h> - -+static void memory_object_data_request_deferred (void *arg); -+ -+struct deferred_args -+{ -+ struct pager *p; -+ vm_offset_t offset; -+ vm_size_t length; -+}; -+ - /* Implement pagein callback as described in <mach/memory_object.defs>. */ - kern_return_t - _pager_seqnos_memory_object_data_request (struct pager *p, -@@ -31,9 +40,8 @@ _pager_seqnos_memory_object_data_request (struct pager *p, - { - short *pm_entry; - int doread, doerror; -+ struct deferred_args *deferred; - error_t err; -- vm_address_t page; -- int write_lock; - - if (!p - || p->port.class != _pager_class) -@@ -41,7 +49,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, - - /* Acquire the right to meddle with the pagemap */ - pthread_mutex_lock (&p->interlock); -- _pager_wait_for_seqno (p, seqno); - - /* sanity checks -- we don't do multi-page requests yet. */ - if (control != p->memobjcntl) -@@ -105,7 +112,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, - } - - /* Let someone else in. */ -- _pager_release_seqno (p, seqno); - pthread_mutex_unlock (&p->interlock); - - if (!doread) -@@ -113,18 +119,17 @@ _pager_seqnos_memory_object_data_request (struct pager *p, - if (doerror) - goto error_read; - -- err = pager_read_page (p->upi, offset, &page, &write_lock); -- if (err) -- goto error_read; -- -- memory_object_data_supply (p->memobjcntl, offset, page, length, 1, -- write_lock ? VM_PROT_WRITE : VM_PROT_NONE, -- p->notify_on_evict ? 1 : 0, -- MACH_PORT_NULL); -- pthread_mutex_lock (&p->interlock); -- _pager_mark_object_error (p, offset, length, 0); -- _pager_allow_termination (p); -- pthread_mutex_unlock (&p->interlock); -+ //mark -+ deferred = _pager_allocate_deferred (memory_object_data_request_deferred, -+ sizeof *deferred); -+ if (deferred == NULL) -+ return ENOMEM; -+ -+ ports_port_ref (p); -+ deferred->p = p; -+ deferred->offset = offset; -+ deferred->length = length; -+ _pager_defer (deferred); - return 0; - - error_read: -@@ -143,3 +148,37 @@ _pager_seqnos_memory_object_data_request (struct pager *p, - pthread_mutex_unlock (&p->interlock); - return 0; - } -+ -+static void -+memory_object_data_request_deferred (void *arg) -+{ -+ struct deferred_args *args = arg; -+ vm_address_t page; -+ int write_lock; -+ error_t err; -+ -+ err = pager_read_page (args->p->upi, args->offset, &page, &write_lock); -+ if (err) -+ { -+ memory_object_data_error (args->p->memobjcntl, args->offset, -+ args->length, EIO); -+ _pager_mark_object_error (args->p, args->offset, args->length, EIO); -+ -+ pthread_mutex_lock (&args->p->interlock); -+ _pager_allow_termination (args->p); -+ pthread_mutex_unlock (&args->p->interlock); -+ return; -+ } -+ -+ memory_object_data_supply (args->p->memobjcntl, args->offset, page, -+ args->length, 1, -+ write_lock ? VM_PROT_WRITE : VM_PROT_NONE, -+ args->p->notify_on_evict ? 1 : 0, -+ MACH_PORT_NULL); -+ pthread_mutex_lock (&args->p->interlock); -+ _pager_mark_object_error (args->p, args->offset, args->length, 0); -+ _pager_allow_termination (args->p); -+ pthread_mutex_unlock (&args->p->interlock); -+ -+ ports_port_deref (args->p); -+} -diff --git a/libpager/data-return.c b/libpager/data-return.c -index ee6c6e8..62a5541 100644 ---- a/libpager/data-return.c -+++ b/libpager/data-return.c -@@ -21,6 +21,29 @@ - #include <string.h> - #include <assert.h> - -+static void memory_object_data_return_deferred (void *arg); -+ -+struct lock_list -+{ -+ struct lock_request *lr; -+ struct lock_list *next; -+}; -+ -+struct deferred_args -+{ -+ struct pager *p; -+ vm_offset_t offset; -+ pointer_t data; -+ vm_size_t length; -+ int kcopy; -+ int npages; -+ char *notified; -+ error_t *pagerrs; -+ struct lock_list *lock_list; -+ int omitdata; -+}; -+ -+ - /* Worker function used by _pager_seqnos_memory_object_data_return - and _pager_seqnos_memory_object_data_initialize. All args are - as for _pager_seqnos_memory_object_data_return; the additional -@@ -38,13 +61,13 @@ _pager_do_write_request (struct pager *p, - { - short *pm_entries; - int npages, i; -- char *notified; -- error_t *pagerrs; -+ char *notified = NULL; -+ error_t *pagerrs = NULL; - struct lock_request *lr; -- struct lock_list {struct lock_request *lr; -- struct lock_list *next;} *lock_list, *ll; -- int wakeup; -+ struct lock_list *lock_list = NULL, *ll; - int omitdata = 0; -+ struct deferred_args *deferred; -+ error_t err; - - if (!p - || p->port.class != _pager_class) -@@ -52,7 +75,6 @@ _pager_do_write_request (struct pager *p, - - /* Acquire the right to meddle with the pagemap */ - pthread_mutex_lock (&p->interlock); -- _pager_wait_for_seqno (p, seqno); - - /* sanity checks -- we don't do multi-page requests yet. */ - if (control != p->memobjcntl) -@@ -78,9 +100,9 @@ _pager_do_write_request (struct pager *p, - } - - npages = length / __vm_page_size; -- pagerrs = alloca (npages * sizeof (error_t)); -+ pagerrs = malloc (npages * sizeof (error_t)); - -- notified = alloca (npages * (sizeof *notified)); -+ notified = malloc (npages * (sizeof *notified)); - #ifndef NDEBUG - memset (notified, -1, npages * (sizeof *notified)); - #endif -@@ -150,7 +172,7 @@ _pager_do_write_request (struct pager *p, - for (lr = p->lock_requests; lr; lr = lr->next) - if (offset < lr->end && offset + length >= lr->start) - { -- ll = alloca (sizeof (struct lock_list)); -+ ll = malloc (sizeof (struct lock_list)); - ll->lr = lr; - ll->next = lock_list; - lock_list = ll; -@@ -158,36 +180,109 @@ _pager_do_write_request (struct pager *p, - } - - /* Let someone else in. */ -- _pager_release_seqno (p, seqno); - pthread_mutex_unlock (&p->interlock); - -+ //mark -+ deferred = _pager_allocate_deferred (memory_object_data_return_deferred, -+ sizeof *deferred); -+ if (deferred == NULL) -+ { -+ err = ENOMEM; -+ goto release_mem; -+ } -+ -+ ports_port_ref (p); -+ deferred->p = p; -+ deferred->offset = offset; -+ deferred->data = data; -+ deferred->length = length; -+ deferred->kcopy = kcopy; -+ deferred->npages = npages; -+ deferred->notified = notified; -+ deferred->pagerrs = pagerrs; -+ deferred->lock_list = lock_list; -+ deferred->omitdata = omitdata; -+ _pager_defer (deferred); -+ return 0; -+ -+ notify: -+ _pager_allow_termination (p); -+ pthread_mutex_unlock (&p->interlock); -+ -+ for (i = 0; i < npages; i++) -+ { -+ assert (notified[i] == 0 || notified[i] == 1); -+ if (notified[i]) -+ { -+ short *pm_entry = &pm_entries[i]; -+ -+ /* Do notify user. */ -+ //mark -+ pager_notify_evict (p->upi, offset + (i * vm_page_size)); -+ -+ /* Clear any error that is left. Notification on eviction -+ is used only to change association of page, so any -+ error may no longer be valid. */ -+ pthread_mutex_lock (&p->interlock); -+ *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); -+ pthread_mutex_unlock (&p->interlock); -+ } -+ } -+ goto release_mem; -+ -+ release_out: -+ pthread_mutex_unlock (&p->interlock); -+ -+ release_mem: -+ free (notified); -+ free (pagerrs); -+ for (ll = lock_list; ll; /* nop */) -+ { -+ struct lock_list *o = ll; -+ ll = ll->next; -+ free (o); -+ } -+ -+ return err; -+} -+ -+static void -+memory_object_data_return_deferred (void *arg) -+{ -+ struct deferred_args *a = arg; -+ int i; -+ struct lock_list *ll; -+ short *pm_entries; -+ int wakeup; -+ - /* This is inefficient; we should send all the pages to the device at once - but until the pager library interface is changed, this will have to do. */ - -- for (i = 0; i < npages; i++) -- if (!(omitdata & (1 << i))) -- pagerrs[i] = pager_write_page (p->upi, -- offset + (vm_page_size * i), -- data + (vm_page_size * i)); -+ for (i = 0; i < a->npages; i++) -+ if (!(a->omitdata & (1 << i))) -+ //mark -+ a->pagerrs[i] = pager_write_page (a->p->upi, -+ a->offset + (vm_page_size * i), -+ a->data + (vm_page_size * i)); - - /* Acquire the right to meddle with the pagemap */ -- pthread_mutex_lock (&p->interlock); -- _pager_pagemap_resize (p, offset + length); -- pm_entries = &p->pagemap[offset / __vm_page_size]; -+ pthread_mutex_lock (&a->p->interlock); -+ _pager_pagemap_resize (a->p, a->offset + a->length); -+ pm_entries = &a->p->pagemap[a->offset / __vm_page_size]; - - wakeup = 0; -- for (i = 0; i < npages; i++) -+ for (i = 0; i < a->npages; i++) - { -- if (omitdata & (1 << i)) -+ if (a->omitdata & (1 << i)) - { -- notified[i] = 0; -+ a->notified[i] = 0; - continue; - } - - if (pm_entries[i] & PM_WRITEWAIT) - wakeup = 1; - -- if (pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) -+ if (a->pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) - /* The only thing we can do here is mark the page, and give - errors from now on when it is to be read. This is - imperfect, because if all users go away, the pagemap will -@@ -199,61 +294,63 @@ _pager_do_write_request (struct pager *p, - - if (pm_entries[i] & PM_PAGEINWAIT) - { -- memory_object_data_supply (p->memobjcntl, -- offset + (vm_page_size * i), -- data + (vm_page_size * i), -+ memory_object_data_supply (a->p->memobjcntl, -+ a->offset + (vm_page_size * i), -+ a->data + (vm_page_size * i), - vm_page_size, 1, - VM_PROT_NONE, 0, MACH_PORT_NULL); -- notified[i] = 0; -+ a->notified[i] = 0; - } - else - { -- munmap ((void *) (data + (vm_page_size * i)), -+ munmap ((void *) (a->data + (vm_page_size * i)), - vm_page_size); -- notified[i] = (! kcopy && p->notify_on_evict); -- if (! kcopy) -+ a->notified[i] = (! a->kcopy && a->p->notify_on_evict); -+ if (! a->kcopy) - pm_entries[i] &= ~PM_INCORE; - } - - pm_entries[i] &= ~(PM_PAGINGOUT | PM_PAGEINWAIT | PM_WRITEWAIT); - } - -- for (ll = lock_list; ll; ll = ll->next) -+ for (ll = a->lock_list; ll; ll = ll->next) - if (!--ll->lr->pending_writes && !ll->lr->locks_pending) - wakeup = 1; - - if (wakeup) -- pthread_cond_broadcast (&p->wakeup); -+ pthread_cond_broadcast (&a->p->wakeup); - -- notify: -- _pager_allow_termination (p); -- pthread_mutex_unlock (&p->interlock); -+ _pager_allow_termination (a->p); -+ pthread_mutex_unlock (&a->p->interlock); - -- for (i = 0; i < npages; i++) -+ for (i = 0; i < a->npages; i++) - { -- assert (notified[i] == 0 || notified[i] == 1); -- if (notified[i]) -+ assert (a->notified[i] == 0 || a->notified[i] == 1); -+ if (a->notified[i]) - { - short *pm_entry = &pm_entries[i]; - - /* Do notify user. */ -- pager_notify_evict (p->upi, offset + (i * vm_page_size)); -+ pager_notify_evict (a->p->upi, a->offset + (i * vm_page_size)); - - /* Clear any error that is left. Notification on eviction - is used only to change association of page, so any - error may no longer be valid. */ -- pthread_mutex_lock (&p->interlock); -+ pthread_mutex_lock (&a->p->interlock); - *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); -- pthread_mutex_unlock (&p->interlock); -+ pthread_mutex_unlock (&a->p->interlock); - } - } - -- return 0; -- -- release_out: -- _pager_release_seqno (p, seqno); -- pthread_mutex_unlock (&p->interlock); -- return 0; -+ free (a->notified); -+ free (a->pagerrs); -+ for (ll = a->lock_list; ll; /* nop */) -+ { -+ struct lock_list *o = ll; -+ ll = ll->next; -+ free (o); -+ } -+ ports_port_deref (a->p); - } - - /* Implement pageout call back as described by <mach/memory_object.defs>. */ -diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c -index 599237c..11ec396 100644 ---- a/libpager/data-unlock.c -+++ b/libpager/data-unlock.c -@@ -18,6 +18,17 @@ - #include "priv.h" - #include "memory_object_S.h" - #include <stdio.h> -+#include <stdlib.h> -+#include <error.h> -+ -+static void memory_object_data_unlock_deferred (void *arg); -+ -+struct deferred_args -+{ -+ struct pager *p; -+ vm_offset_t offset; -+ vm_size_t length; -+}; - - /* Implement kernel requests for access as described in - <mach/memory_object.defs>. */ -@@ -29,17 +40,13 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, - vm_size_t length, - vm_prot_t access) - { -- volatile int err; -+ error_t err = 0; -+ struct deferred_args *deferred; - - if (!p - || p->port.class != _pager_class) - return EOPNOTSUPP; - -- pthread_mutex_lock (&p->interlock); -- _pager_wait_for_seqno (p, seqno); -- _pager_release_seqno (p, seqno); -- pthread_mutex_unlock (&p->interlock); -- - if (p->pager_state != NORMAL) - { - printf ("pager in wrong state for unlock\n"); -@@ -68,20 +75,45 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, - goto out; - } - -- err = pager_unlock_page (p->upi, offset); -+ deferred = _pager_allocate_deferred (memory_object_data_unlock_deferred, -+ sizeof *deferred); -+ if (deferred == NULL) -+ { -+ err = ENOMEM; -+ goto out; -+ } -+ -+ ports_port_ref (p); -+ deferred->p = p; -+ deferred->offset = offset; -+ deferred->length = length; -+ _pager_defer (deferred); - -+ out: -+ return err; -+} -+ -+static void -+memory_object_data_unlock_deferred (void *arg) -+{ -+ struct deferred_args *args = arg; -+ -+ error_t err = pager_unlock_page (args->p->upi, args->offset); - if (!err) - /* We can go ahead and release the lock. */ -- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 0, -- VM_PROT_NONE, 0); -+ _pager_lock_object (args->p, args->offset, args->length, -+ MEMORY_OBJECT_RETURN_NONE, 0, -+ VM_PROT_NONE, 0); - else - { - /* Flush the page, and set a bit so that m_o_data_request knows -- to issue an error. */ -- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 1, -- VM_PROT_WRITE, 1); -- _pager_mark_next_request_error (p, offset, length, err); -+ to issue an error. */ -+ _pager_lock_object (args->p, args->offset, args->length, -+ MEMORY_OBJECT_RETURN_NONE, 1, -+ VM_PROT_WRITE, 1); -+ _pager_mark_next_request_error (args->p, args->offset, args->length, -+ err); - } -- out: -- return 0; -+ -+ ports_port_deref (args->p); - } -diff --git a/libpager/demuxer.c b/libpager/demuxer.c -index b4d4054..b0b3d56 100644 ---- a/libpager/demuxer.c -+++ b/libpager/demuxer.c -@@ -1,5 +1,5 @@ - /* Demuxer for pager library -- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation -+ Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as -@@ -15,26 +15,549 @@ - along with this program; if not, write to the Free Software - Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ - -+#include <error.h> -+#include <mach/mig_errors.h> -+#include <pthread.h> -+#include <stddef.h> -+#include <string.h> -+#include <time.h> -+ - #include "priv.h" - #include "memory_object_S.h" - #include "notify_S.h" - -+#define error(...) -+ -+#define WORKER_COUNT 3 -+#define DEFERRED_MAX_IDLE 2 -+#define DEFERRED_TIMEOUT 4 -+#define DEFERRED_SLANT (4 - 1) -+ -+struct item { -+ struct item *next; -+}; -+ -+struct request { -+ struct item item; -+ mig_routine_t routine; -+ mach_msg_header_t *inp; -+ mach_msg_header_t *outp; -+}; -+ -+struct queue { -+ struct item *head; -+ struct item **tail; -+}; -+ -+static inline void -+queue_init (struct queue *q) -+{ -+ q->head = NULL; -+ q->tail = &q->head; -+} -+ -+static inline void -+queue_enqueue (struct queue *q, struct item *r) -+{ -+ *q->tail = r; -+ q->tail = &r->next; -+ r->next = NULL; -+} -+ -+static inline void * -+queue_dequeue (struct queue *q) -+{ -+ struct item *r = q->head; -+ if (r == NULL) -+ return NULL; -+ -+ /* Pop the first item off. */ -+ if ((q->head = q->head->next) == NULL) -+ /* The queue is empty, fix tail pointer. */ -+ q->tail = &q->head; -+ -+ r->next = NULL; -+ return r; -+} -+ -+struct worker { -+ struct queue queue; -+ unsigned long tag; -+}; -+ -+static struct { -+ struct queue queue; -+ int asleep; -+ pthread_cond_t wakeup; -+ pthread_mutex_t lock; -+ struct worker workers[WORKER_COUNT]; -+} requests; -+ - /* Demultiplex a single message directed at a pager port; INP is the - message received; fill OUTP with the reply. */ - int - pager_demuxer (mach_msg_header_t *inp, - mach_msg_header_t *outp) - { -+#ifdef MACH_RCV_LARGE -+ const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */ -+#else -+ const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */ -+#endif -+ - mig_routine_t routine; -- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) || -- (routine = _pager_seqnos_notify_server_routine (inp))) -+ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) || -+ (routine = _pager_seqnos_notify_server_routine (inp)))) -+ return FALSE; -+ -+#define MASK (8 - 1) -+ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK; -+#undef MASK -+ -+ struct request *r = malloc (sizeof *r + padded_size + max_size); -+ if (r == NULL) -+ { -+ /* This is bad. */ -+ /* XXX reply */ -+ error (0, errno, "malloc"); -+ return 0; -+ } -+ -+ r->routine = routine; -+ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r); -+ memcpy (r->inp, inp, inp->msgh_size); -+ -+ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size); -+ memcpy (r->outp, outp, sizeof *outp); -+ -+ pthread_mutex_lock (&requests.lock); -+ -+ queue_enqueue (&requests.queue, &r->item); -+ -+ if (0) -+ { -+ error (0, 0, "enqueuing %p (msgh_id: %d on obj %d), head is: %p", -+ r, -+ inp->msgh_id, -+ inp->msgh_local_port, -+ requests.queue.head); -+ } -+ -+ /* Awake worker. */ -+ if (requests.asleep > 0) -+ pthread_cond_signal (&requests.wakeup); -+ -+ pthread_mutex_unlock (&requests.lock); -+ -+ ((mig_reply_header_t *) outp)->RetCode = MIG_NO_REPLY; -+ -+ return TRUE; -+} -+ -+static void * -+worker_func (void *arg) -+{ -+ struct worker *self = (struct worker *) arg; -+ struct request *r = NULL; -+ -+ while (1) -+ { -+ int i; -+ mach_msg_return_t mr; -+ -+ /* Free previous message. */ -+ free (r); -+ -+ pthread_mutex_lock (&requests.lock); -+ -+ /* First, look in our queue for more requests to the object we -+ have been working on lately. Some other thread might have -+ delegated it to us. */ -+ r = queue_dequeue (&self->queue); -+ if (r != NULL) -+ goto gotone; -+ -+ /* Nope. Clear our tag and... */ -+ self->tag = 0; -+ -+ get_request_locked: -+ /* ... get a request from the global queue instead. */ -+ while ((r = queue_dequeue (&requests.queue)) == NULL) -+ { -+ requests.asleep += 1; -+ pthread_cond_wait (&requests.wakeup, &requests.lock); -+ requests.asleep -= 1; -+ } -+ -+ for (i = 0; i < WORKER_COUNT; i++) -+ if (requests.workers[i].tag -+ == (unsigned long) r->inp->msgh_local_port) -+ { -+ /* Some other thread is working on that object. Delegate -+ the request to that worker. */ -+ queue_enqueue (&requests.workers[i].queue, &r->item); -+ goto get_request_locked; -+ } -+ -+ /* Claim responsibility for this object by setting our tag. */ -+ self->tag = (unsigned long) r->inp->msgh_local_port; -+ -+ gotone: -+ pthread_mutex_unlock (&requests.lock); -+ -+ /* Call the server routine. */ -+ (*r->routine) (r->inp, r->outp); -+ -+ /* What follows is basically the second part of -+ mach_msg_server_timeout. */ -+ mig_reply_header_t *request = (mig_reply_header_t *) r->inp; -+ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp; -+ -+ if (reply->RetCode) -+ { -+ error (0, reply->RetCode, "server routine said"); -+ } -+ -+ switch (reply->RetCode) -+ { -+ case KERN_SUCCESS: -+ /* Hunky dory. */ -+ break; -+ -+ case MIG_NO_REPLY: -+ /* The server function wanted no reply sent. -+ Loop for another request. */ -+ continue; -+ -+ default: -+ /* Some error; destroy the request message to release any -+ port rights or VM it holds. Don't destroy the reply port -+ right, so we can send an error message. */ -+ request->Head.msgh_remote_port = MACH_PORT_NULL; -+ mach_msg_destroy (&request->Head); -+ break; -+ } -+ -+ if (reply->Head.msgh_remote_port == MACH_PORT_NULL) -+ { -+ /* No reply port, so destroy the reply. */ -+ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) -+ mach_msg_destroy (&reply->Head); -+ continue; -+ } -+ -+ /* Send the reply. */ -+ mr = mach_msg (&reply->Head, -+ MACH_SEND_MSG, -+ reply->Head.msgh_size, -+ 0, -+ MACH_PORT_NULL, -+ 0, -+ MACH_PORT_NULL); -+ -+ switch (mr) -+ { -+ case MACH_SEND_INVALID_DEST: -+ /* The reply can't be delivered, so destroy it. This error -+ indicates only that the requester went away, so we -+ continue and get the next request. */ -+ mach_msg_destroy (&reply->Head); -+ break; -+ -+ default: -+ /* Some other form of lossage; there is not much we can -+ do here. */ -+ error (0, mr, "mach_msg"); -+ } -+ } -+ -+ /* Not reached. */ -+ return NULL; -+} -+ -+static struct -+{ -+ struct queue queue; -+ int alive; -+ int online; -+ int asleep; -+ float average; -+ pthread_cond_t wakeup; -+ pthread_mutex_t lock; -+} deferreds; -+ -+struct deferred -+{ -+ struct item item; -+ deferred_function_t function; -+ char args[0]; -+}; -+ -+void * -+_pager_allocate_deferred (deferred_function_t f, size_t size) -+{ -+ struct deferred *d = malloc (sizeof *d + size); -+ if (d != NULL) -+ d->function = f; -+ return &d->args; -+} -+ -+void -+_pager_defer (void *cookie) -+{ -+ struct deferred *d = (struct deferred *) -+ ((char *) cookie - offsetof (struct deferred, args)); -+ -+ pthread_mutex_lock (&deferreds.lock); -+ queue_enqueue (&deferreds.queue, &d->item); -+ -+ /* Awake worker. */ -+ if (deferreds.asleep > 0) -+ pthread_cond_signal (&deferreds.wakeup); -+ -+ pthread_mutex_unlock (&deferreds.lock); -+} -+ -+/* XXX copied from libports. */ -+#include <mach/thread_info.h> -+#include <mach/thread_switch.h> -+ -+#define STACK_SIZE (64 * 1024) -+ -+#define THREAD_PRI 2 -+ -+/* XXX To reduce starvation, the priority of new threads is initially -+ depressed. This helps already existing threads complete their job and be -+ recycled to handle new messages. The duration of this depression is made -+ a function of the total number of threads because more threads imply -+ more contention, and the priority of threads blocking on a contended spin -+ lock is also implicitely depressed. -+ -+ Then, if permitted, a greater priority is requested to further decrease -+ the need for additional threads. */ -+static void -+adjust_priority (unsigned int totalthreads) -+{ -+ mach_port_t host_priv, self, pset, pset_priv; -+ unsigned int t; -+ error_t err; -+ -+ t = 10 + (((totalthreads - 1) / 100) + 1) * 10; -+ thread_switch (MACH_PORT_NULL, SWITCH_OPTION_DEPRESS, t); -+ -+ err = get_privileged_ports (&host_priv, NULL); -+ if (err) -+ goto error_host_priv; -+ -+ self = mach_thread_self (); -+ err = thread_get_assignment (self, &pset); -+ if (err) -+ goto error_pset; -+ -+ err = host_processor_set_priv (host_priv, pset, &pset_priv); -+ if (err) -+ goto error_pset_priv; -+ -+ err = thread_max_priority (self, pset_priv, 0); -+ if (err) -+ goto error_max_priority; -+ -+ err = thread_priority (self, THREAD_PRI, 0); -+ if (err) -+ goto error_priority; -+ -+ mach_port_deallocate (mach_task_self (), pset_priv); -+ mach_port_deallocate (mach_task_self (), pset); -+ mach_port_deallocate (mach_task_self (), self); -+ mach_port_deallocate (mach_task_self (), host_priv); -+ return; -+ -+error_priority: -+error_max_priority: -+ mach_port_deallocate (mach_task_self (), pset_priv); -+error_pset_priv: -+ mach_port_deallocate (mach_task_self (), pset); -+error_pset: -+ mach_port_deallocate (mach_task_self (), self); -+ mach_port_deallocate (mach_task_self (), host_priv); -+error_host_priv: -+ if (err != EPERM) -+ error (0, err, "unable to adjust libports thread priority"); -+} -+ -+static pthread_attr_t tiny_stack_attr; -+ -+static void * -+deferreds_worker_func (void *arg __attribute__ ((unused))) -+{ -+ adjust_priority (__atomic_add_fetch (&deferreds.alive, 1, __ATOMIC_RELAXED)); -+ -+ boolean_t first_run = TRUE; -+ while (1) - { -- (*routine) (inp, outp); -- return TRUE; -+ struct deferred *d; -+ error_t err; -+ struct timespec ts; -+ -+ pthread_mutex_lock (&deferreds.lock); -+ -+ if (first_run) -+ { -+ /* A starting thread is considered asleep. */ -+ deferreds.asleep -= 1; -+ first_run = FALSE; -+ } -+ -+ while ((d = queue_dequeue (&deferreds.queue)) == NULL) -+ { -+ deferreds.asleep += 1; -+ clock_gettime (CLOCK_REALTIME, &ts); -+ ts.tv_sec += DEFERRED_TIMEOUT + (random () & DEFERRED_SLANT); -+ err = pthread_cond_timedwait (&deferreds.wakeup, &deferreds.lock, -+ &ts); -+ deferreds.asleep -= 1; -+ if (err == ETIMEDOUT && -+#if 0 -+deferreds.asleep >= (DEFERRED_MAX_IDLE ?: 1) -+#else -+ deferreds.alive - deferreds.asleep < (int) deferreds.average - (DEFERRED_MAX_IDLE ?: 1) -+#endif -+) -+ { -+ error (0, 0, "deferred worker thread exiting, %d idle workers left, average is %f", deferreds.asleep, deferreds.average); -+ __atomic_sub_fetch (&deferreds.alive, 1, __ATOMIC_RELAXED); -+ pthread_mutex_unlock (&deferreds.lock); -+ return NULL; -+ } -+ } -+ -+#if 0 -+ if (deferreds.asleep == 0) -+ { -+ pthread_t t; -+ error (0, 0, "creating deferred worker thread"); -+ /* A starting thread is considered asleep. */ -+ deferreds.asleep += 1; -+ err = pthread_create (&t, &tiny_stack_attr, -+ &deferreds_worker_func, NULL); -+ if (err) -+ { -+ /* This is tough. Let's hope for the best. */ -+ error (0, err, "pthread_create"); -+ deferreds.asleep -= 1; -+ } -+ else -+ pthread_detach (t); -+ } -+#endif -+ -+ pthread_mutex_unlock (&deferreds.lock); -+ -+ (*d->function) (&d->args); -+ free (d); -+ } -+ -+ /* Not reached. */ -+ return NULL; -+} -+ -+#define DEFERREDS_SMOOTHNESS (0.005) -+ -+struct timespec deferreds_delay = { 0, 100000000L }; -+ -+static void * -+deferreds_manager_func (void *arg __attribute__ ((unused))) -+{ -+ int i = 0; -+ float last = 0; -+ while (1) -+ { -+ pthread_mutex_lock (&deferreds.lock); -+ last = deferreds.average; -+ deferreds.average = -+ DEFERREDS_SMOOTHNESS * (deferreds.alive - deferreds.asleep) -+ + (1. - DEFERREDS_SMOOTHNESS) * last; -+ -+ if (i == 0) -+ error (0, 0, "alive: %d working: %d, average: %f", deferreds.alive, deferreds.alive - deferreds.asleep, deferreds.average); -+ -+ i = (i + 1) % 10; -+ -+ if (deferreds.asleep == 0 && i == 0) -+ { -+ error_t err; -+ pthread_t t; -+ error (0, 0, "creating deferred worker thread"); -+ /* A starting thread is considered asleep. */ -+ deferreds.asleep += 1; -+ err = pthread_create (&t, &tiny_stack_attr, -+ &deferreds_worker_func, NULL); -+ if (err) -+ { -+ /* This is tough. Let's hope for the best. */ -+ error (0, err, "pthread_create"); -+ deferreds.asleep -= 1; -+ } -+ else -+ pthread_detach (t); -+ } -+ -+ -+ pthread_mutex_unlock (&deferreds.lock); -+ nanosleep (&deferreds_delay, NULL); -+ } -+ return NULL; -+} -+ -+error_t -+pager_start_worker (void) -+{ -+ error_t err; -+ int i; -+ pthread_t t; -+ -+ pthread_attr_init (&tiny_stack_attr); -+ pthread_attr_setstacksize (&tiny_stack_attr, STACK_SIZE); -+ -+ queue_init (&requests.queue); -+ pthread_cond_init (&requests.wakeup, NULL); -+ pthread_mutex_init (&requests.lock, NULL); -+ -+ for (i = 0; i < WORKER_COUNT; i++) -+ { -+ requests.workers[i].tag = 0; -+ queue_init (&requests.workers[i].queue); -+ -+ err = pthread_create (&t, &tiny_stack_attr, -+ &worker_func, &requests.workers[i]); -+ if (err) -+ return err; -+ -+ pthread_detach (t); -+ } -+ -+ queue_init (&deferreds.queue); -+ pthread_cond_init (&deferreds.wakeup, NULL); -+ pthread_mutex_init (&deferreds.lock, NULL); -+ -+ for (i = 0; i < (DEFERRED_MAX_IDLE ?: 1); i++) -+ { -+ /* A starting thread is considered asleep. */ -+ deferreds.asleep += 1; -+ err = pthread_create (&t, &tiny_stack_attr, -+ &deferreds_worker_func, NULL); -+ if (err) -+ return err; -+ -+ pthread_detach (t); - } - -- /* Synchronize our bookkeeping of the port's seqno with the one -- consumed by this bogus message. */ -- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno); -- return FALSE; -+ err = pthread_create (&t, &tiny_stack_attr, -+ &deferreds_manager_func, NULL); -+ if (err) -+ return err; -+ pthread_detach (t); -+ -+ -+ return err; - } -diff --git a/libpager/priv.h b/libpager/priv.h -index d49cbb9..59117a4 100644 ---- a/libpager/priv.h -+++ b/libpager/priv.h -@@ -152,4 +152,8 @@ void _pager_lock_object (struct pager *, vm_offset_t, vm_size_t, int, int, - void _pager_free_structure (struct pager *); - void _pager_clean (void *arg); - void _pager_real_dropweak (void *arg); -+ -+typedef void (*deferred_function_t) (void *); -+void *_pager_allocate_deferred (deferred_function_t, size_t); -+void _pager_defer (void *); - #endif -diff --git a/libpager/seqnos.c b/libpager/seqnos.c -index cab2f33..455378f 100644 ---- a/libpager/seqnos.c -+++ b/libpager/seqnos.c -@@ -25,6 +25,7 @@ void - _pager_wait_for_seqno (struct pager *p, - mach_port_seqno_t seqno) - { -+ return; - while (seqno != p->seqno + 1) - { - p->waitingforseqno = 1; -@@ -39,6 +40,7 @@ void - _pager_release_seqno (struct pager *p, - mach_port_seqno_t seqno) - { -+ return; - assert (seqno == p->seqno + 1); - p->seqno = seqno; - if (p->waitingforseqno) -@@ -54,6 +56,7 @@ void - _pager_update_seqno (mach_port_t object, - mach_port_seqno_t seqno) - { -+ return; - struct pager *p; - - p = ports_lookup_port (0, object, _pager_class); -@@ -68,6 +71,7 @@ void - _pager_update_seqno_p (struct pager *p, - mach_port_seqno_t seqno) - { -+ return; - if (p - && p->port.class == _pager_class) - { -diff --git a/libports/manage-one-thread.c b/libports/manage-one-thread.c -index 4ea740b..6f20942 100644 ---- a/libports/manage-one-thread.c -+++ b/libports/manage-one-thread.c -@@ -85,7 +85,14 @@ ports_manage_port_operations_one_thread (struct port_bucket *bucket, - - return status; - } -- -+ -+ /* It is currently unsafe for most servers to terminate based on -+ inactivity because a request may arrive after a server has -+ started shutting down, causing the client to receive an error. -+ Prevent the service loop from terminating by setting TIMEOUT to -+ zero. */ -+ timeout = 0; -+ - do - err = mach_msg_server_timeout (internal_demuxer, 0, bucket->portset, - timeout ? MACH_RCV_TIMEOUT : 0, timeout); -diff --git a/storeio/pager.c b/storeio/pager.c -index 7d78711..4c6fa4d 100644 ---- a/storeio/pager.c -+++ b/storeio/pager.c -@@ -24,6 +24,7 @@ - #include <strings.h> - #include <unistd.h> - #include <errno.h> -+#include <error.h> - #include <sys/mman.h> - #include <stdio.h> - -@@ -148,12 +149,10 @@ static void * - service_paging_requests (void *arg) - { - (void) arg; -- -- for (;;) -- ports_manage_port_operations_multithread (pager_port_bucket, -- pager_demuxer, -- 1000 * 30, 1000 * 60 * 5, 0); -- -+ ports_manage_port_operations_one_thread (pager_port_bucket, -+ pager_demuxer, -+ 0); -+ /* Not reached. */ - return NULL; - } - -@@ -173,6 +172,11 @@ init_dev_paging () - - pager_port_bucket = ports_create_bucket (); - -+ /* Start libpagers worker thread. */ -+ err = pager_start_worker (); -+ if (err) -+ error (0, err, "pager_start_worker"); -+ - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, NULL); - if (!err) -diff --git a/tmpfs/pager-stubs.c b/tmpfs/pager-stubs.c -index 3cb264b..975800a 100644 ---- a/tmpfs/pager-stubs.c -+++ b/tmpfs/pager-stubs.c -@@ -92,5 +92,5 @@ pager_clear_user_data (struct user_pager_info *pager) - void - pager_dropweak (struct user_pager_info *p) - { -- abort(); -+ /* Do nothing. */ - } |