diff options
author | Justus Winter <4winter@informatik.uni-hamburg.de> | 2014-04-23 02:19:34 +0200 |
---|---|---|
committer | Justus Winter <4winter@informatik.uni-hamburg.de> | 2014-04-23 02:19:34 +0200 |
commit | 20a6358d01242444c5bcf71627a221a17f8dd2d1 (patch) | |
tree | b7435f4ccf8a6eedfd026eea043e78490842a628 /debian | |
parent | 23e0a986b36dc1855a2d4912f30ba84c572a8451 (diff) |
add libpager-threadpool.patch
Diffstat (limited to 'debian')
-rw-r--r-- | debian/patches/libpager-threadpool.patch | 1274 | ||||
-rw-r--r-- | debian/patches/series | 1 |
2 files changed, 1275 insertions, 0 deletions
diff --git a/debian/patches/libpager-threadpool.patch b/debian/patches/libpager-threadpool.patch new file mode 100644 index 00000000..1deacbe9 --- /dev/null +++ b/debian/patches/libpager-threadpool.patch @@ -0,0 +1,1274 @@ +diff --git a/console/pager.c b/console/pager.c +index 87c36f0..a96f604 100644 +--- a/console/pager.c ++++ b/console/pager.c +@@ -127,11 +127,10 @@ static void * + service_paging_requests (void *arg) + { + struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); ++ ports_manage_port_operations_one_thread (pager_bucket, ++ pager_demuxer, ++ 0); ++ /* Not reached. */ + return NULL; + } + +@@ -148,6 +147,11 @@ user_pager_init (void) + if (! pager_bucket) + error (5, errno, "Cannot create pager bucket"); + ++ /* Start libpagers worker thread. */ ++ err = pager_start_worker (); ++ if (err) ++ error (0, err, "pager_start_worker"); ++ + /* Make a thread to service paging requests. */ + err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); + if (!err) +diff --git a/ext2fs/getblk.c b/ext2fs/getblk.c +index bde66e1..2b44ebb 100644 +--- a/ext2fs/getblk.c ++++ b/ext2fs/getblk.c +@@ -244,7 +244,9 @@ ext2_getblk (struct node *node, block_t block, int create, block_t *disk_block) + { + error_t err; + block_t indir, b; +- unsigned long addr_per_block = EXT2_ADDR_PER_BLOCK (sblock); ++ static unsigned long addr_per_block = 0; ++ if (addr_per_block == 0) ++ addr_per_block = EXT2_ADDR_PER_BLOCK (sblock); + + if (block > EXT2_NDIR_BLOCKS + addr_per_block + + addr_per_block * addr_per_block + +diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c +index 9a0d9d8..d413f02 100644 +--- a/libdiskfs/disk-pager.c ++++ b/libdiskfs/disk-pager.c +@@ -39,11 +39,10 @@ static void * + service_paging_requests (void *arg) + { + struct port_bucket *pager_bucket = arg; +- for (;;) +- ports_manage_port_operations_multithread (pager_bucket, +- pager_demuxer, +- 1000 * 60 * 2, +- 1000 * 60 * 10, 0); ++ ports_manage_port_operations_one_thread (pager_bucket, ++ pager_demuxer, ++ 0); ++ /* Not reached. */ + return NULL; + } + +@@ -57,6 +56,11 @@ diskfs_start_disk_pager (struct user_pager_info *upi, + error_t err; + mach_port_t disk_pager_port; + ++ /* Start libpagers worker thread. */ ++ err = pager_start_worker (); ++ if (err) ++ error (0, err, "pager_start_worker"); ++ + /* Make a thread to service paging requests. */ + err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); + if (!err) +diff --git a/libpager/data-request.c b/libpager/data-request.c +index 82ce904..90a9b09 100644 +--- a/libpager/data-request.c ++++ b/libpager/data-request.c +@@ -20,6 +20,15 @@ + #include <stdio.h> + #include <string.h> + ++static void memory_object_data_request_deferred (void *arg); ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ vm_size_t length; ++}; ++ + /* Implement pagein callback as described in <mach/memory_object.defs>. */ + kern_return_t + _pager_seqnos_memory_object_data_request (struct pager *p, +@@ -31,9 +40,8 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + { + short *pm_entry; + int doread, doerror; ++ struct deferred_args *deferred; + error_t err; +- vm_address_t page; +- int write_lock; + + if (!p + || p->port.class != _pager_class) +@@ -41,7 +49,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -105,7 +112,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + + if (!doread) +@@ -113,18 +119,17 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + if (doerror) + goto error_read; + +- err = pager_read_page (p->upi, offset, &page, &write_lock); +- if (err) +- goto error_read; +- +- memory_object_data_supply (p->memobjcntl, offset, page, length, 1, +- write_lock ? VM_PROT_WRITE : VM_PROT_NONE, +- p->notify_on_evict ? 1 : 0, +- MACH_PORT_NULL); +- pthread_mutex_lock (&p->interlock); +- _pager_mark_object_error (p, offset, length, 0); +- _pager_allow_termination (p); +- pthread_mutex_unlock (&p->interlock); ++ //mark ++ deferred = _pager_allocate_deferred (memory_object_data_request_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ return ENOMEM; ++ ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->length = length; ++ _pager_defer (deferred); + return 0; + + error_read: +@@ -143,3 +148,37 @@ _pager_seqnos_memory_object_data_request (struct pager *p, + pthread_mutex_unlock (&p->interlock); + return 0; + } ++ ++static void ++memory_object_data_request_deferred (void *arg) ++{ ++ struct deferred_args *args = arg; ++ vm_address_t page; ++ int write_lock; ++ error_t err; ++ ++ err = pager_read_page (args->p->upi, args->offset, &page, &write_lock); ++ if (err) ++ { ++ memory_object_data_error (args->p->memobjcntl, args->offset, ++ args->length, EIO); ++ _pager_mark_object_error (args->p, args->offset, args->length, EIO); ++ ++ pthread_mutex_lock (&args->p->interlock); ++ _pager_allow_termination (args->p); ++ pthread_mutex_unlock (&args->p->interlock); ++ return; ++ } ++ ++ memory_object_data_supply (args->p->memobjcntl, args->offset, page, ++ args->length, 1, ++ write_lock ? VM_PROT_WRITE : VM_PROT_NONE, ++ args->p->notify_on_evict ? 1 : 0, ++ MACH_PORT_NULL); ++ pthread_mutex_lock (&args->p->interlock); ++ _pager_mark_object_error (args->p, args->offset, args->length, 0); ++ _pager_allow_termination (args->p); ++ pthread_mutex_unlock (&args->p->interlock); ++ ++ ports_port_deref (args->p); ++} +diff --git a/libpager/data-return.c b/libpager/data-return.c +index ee6c6e8..62a5541 100644 +--- a/libpager/data-return.c ++++ b/libpager/data-return.c +@@ -21,6 +21,29 @@ + #include <string.h> + #include <assert.h> + ++static void memory_object_data_return_deferred (void *arg); ++ ++struct lock_list ++{ ++ struct lock_request *lr; ++ struct lock_list *next; ++}; ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ pointer_t data; ++ vm_size_t length; ++ int kcopy; ++ int npages; ++ char *notified; ++ error_t *pagerrs; ++ struct lock_list *lock_list; ++ int omitdata; ++}; ++ ++ + /* Worker function used by _pager_seqnos_memory_object_data_return + and _pager_seqnos_memory_object_data_initialize. All args are + as for _pager_seqnos_memory_object_data_return; the additional +@@ -38,13 +61,13 @@ _pager_do_write_request (struct pager *p, + { + short *pm_entries; + int npages, i; +- char *notified; +- error_t *pagerrs; ++ char *notified = NULL; ++ error_t *pagerrs = NULL; + struct lock_request *lr; +- struct lock_list {struct lock_request *lr; +- struct lock_list *next;} *lock_list, *ll; +- int wakeup; ++ struct lock_list *lock_list = NULL, *ll; + int omitdata = 0; ++ struct deferred_args *deferred; ++ error_t err; + + if (!p + || p->port.class != _pager_class) +@@ -52,7 +75,6 @@ _pager_do_write_request (struct pager *p, + + /* Acquire the right to meddle with the pagemap */ + pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); + + /* sanity checks -- we don't do multi-page requests yet. */ + if (control != p->memobjcntl) +@@ -78,9 +100,9 @@ _pager_do_write_request (struct pager *p, + } + + npages = length / __vm_page_size; +- pagerrs = alloca (npages * sizeof (error_t)); ++ pagerrs = malloc (npages * sizeof (error_t)); + +- notified = alloca (npages * (sizeof *notified)); ++ notified = malloc (npages * (sizeof *notified)); + #ifndef NDEBUG + memset (notified, -1, npages * (sizeof *notified)); + #endif +@@ -150,7 +172,7 @@ _pager_do_write_request (struct pager *p, + for (lr = p->lock_requests; lr; lr = lr->next) + if (offset < lr->end && offset + length >= lr->start) + { +- ll = alloca (sizeof (struct lock_list)); ++ ll = malloc (sizeof (struct lock_list)); + ll->lr = lr; + ll->next = lock_list; + lock_list = ll; +@@ -158,36 +180,109 @@ _pager_do_write_request (struct pager *p, + } + + /* Let someone else in. */ +- _pager_release_seqno (p, seqno); + pthread_mutex_unlock (&p->interlock); + ++ //mark ++ deferred = _pager_allocate_deferred (memory_object_data_return_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ { ++ err = ENOMEM; ++ goto release_mem; ++ } ++ ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->data = data; ++ deferred->length = length; ++ deferred->kcopy = kcopy; ++ deferred->npages = npages; ++ deferred->notified = notified; ++ deferred->pagerrs = pagerrs; ++ deferred->lock_list = lock_list; ++ deferred->omitdata = omitdata; ++ _pager_defer (deferred); ++ return 0; ++ ++ notify: ++ _pager_allow_termination (p); ++ pthread_mutex_unlock (&p->interlock); ++ ++ for (i = 0; i < npages; i++) ++ { ++ assert (notified[i] == 0 || notified[i] == 1); ++ if (notified[i]) ++ { ++ short *pm_entry = &pm_entries[i]; ++ ++ /* Do notify user. */ ++ //mark ++ pager_notify_evict (p->upi, offset + (i * vm_page_size)); ++ ++ /* Clear any error that is left. Notification on eviction ++ is used only to change association of page, so any ++ error may no longer be valid. */ ++ pthread_mutex_lock (&p->interlock); ++ *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); ++ pthread_mutex_unlock (&p->interlock); ++ } ++ } ++ goto release_mem; ++ ++ release_out: ++ pthread_mutex_unlock (&p->interlock); ++ ++ release_mem: ++ free (notified); ++ free (pagerrs); ++ for (ll = lock_list; ll; /* nop */) ++ { ++ struct lock_list *o = ll; ++ ll = ll->next; ++ free (o); ++ } ++ ++ return err; ++} ++ ++static void ++memory_object_data_return_deferred (void *arg) ++{ ++ struct deferred_args *a = arg; ++ int i; ++ struct lock_list *ll; ++ short *pm_entries; ++ int wakeup; ++ + /* This is inefficient; we should send all the pages to the device at once + but until the pager library interface is changed, this will have to do. */ + +- for (i = 0; i < npages; i++) +- if (!(omitdata & (1 << i))) +- pagerrs[i] = pager_write_page (p->upi, +- offset + (vm_page_size * i), +- data + (vm_page_size * i)); ++ for (i = 0; i < a->npages; i++) ++ if (!(a->omitdata & (1 << i))) ++ //mark ++ a->pagerrs[i] = pager_write_page (a->p->upi, ++ a->offset + (vm_page_size * i), ++ a->data + (vm_page_size * i)); + + /* Acquire the right to meddle with the pagemap */ +- pthread_mutex_lock (&p->interlock); +- _pager_pagemap_resize (p, offset + length); +- pm_entries = &p->pagemap[offset / __vm_page_size]; ++ pthread_mutex_lock (&a->p->interlock); ++ _pager_pagemap_resize (a->p, a->offset + a->length); ++ pm_entries = &a->p->pagemap[a->offset / __vm_page_size]; + + wakeup = 0; +- for (i = 0; i < npages; i++) ++ for (i = 0; i < a->npages; i++) + { +- if (omitdata & (1 << i)) ++ if (a->omitdata & (1 << i)) + { +- notified[i] = 0; ++ a->notified[i] = 0; + continue; + } + + if (pm_entries[i] & PM_WRITEWAIT) + wakeup = 1; + +- if (pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) ++ if (a->pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) + /* The only thing we can do here is mark the page, and give + errors from now on when it is to be read. This is + imperfect, because if all users go away, the pagemap will +@@ -199,61 +294,63 @@ _pager_do_write_request (struct pager *p, + + if (pm_entries[i] & PM_PAGEINWAIT) + { +- memory_object_data_supply (p->memobjcntl, +- offset + (vm_page_size * i), +- data + (vm_page_size * i), ++ memory_object_data_supply (a->p->memobjcntl, ++ a->offset + (vm_page_size * i), ++ a->data + (vm_page_size * i), + vm_page_size, 1, + VM_PROT_NONE, 0, MACH_PORT_NULL); +- notified[i] = 0; ++ a->notified[i] = 0; + } + else + { +- munmap ((void *) (data + (vm_page_size * i)), ++ munmap ((void *) (a->data + (vm_page_size * i)), + vm_page_size); +- notified[i] = (! kcopy && p->notify_on_evict); +- if (! kcopy) ++ a->notified[i] = (! a->kcopy && a->p->notify_on_evict); ++ if (! a->kcopy) + pm_entries[i] &= ~PM_INCORE; + } + + pm_entries[i] &= ~(PM_PAGINGOUT | PM_PAGEINWAIT | PM_WRITEWAIT); + } + +- for (ll = lock_list; ll; ll = ll->next) ++ for (ll = a->lock_list; ll; ll = ll->next) + if (!--ll->lr->pending_writes && !ll->lr->locks_pending) + wakeup = 1; + + if (wakeup) +- pthread_cond_broadcast (&p->wakeup); ++ pthread_cond_broadcast (&a->p->wakeup); + +- notify: +- _pager_allow_termination (p); +- pthread_mutex_unlock (&p->interlock); ++ _pager_allow_termination (a->p); ++ pthread_mutex_unlock (&a->p->interlock); + +- for (i = 0; i < npages; i++) ++ for (i = 0; i < a->npages; i++) + { +- assert (notified[i] == 0 || notified[i] == 1); +- if (notified[i]) ++ assert (a->notified[i] == 0 || a->notified[i] == 1); ++ if (a->notified[i]) + { + short *pm_entry = &pm_entries[i]; + + /* Do notify user. */ +- pager_notify_evict (p->upi, offset + (i * vm_page_size)); ++ pager_notify_evict (a->p->upi, a->offset + (i * vm_page_size)); + + /* Clear any error that is left. Notification on eviction + is used only to change association of page, so any + error may no longer be valid. */ +- pthread_mutex_lock (&p->interlock); ++ pthread_mutex_lock (&a->p->interlock); + *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); +- pthread_mutex_unlock (&p->interlock); ++ pthread_mutex_unlock (&a->p->interlock); + } + } + +- return 0; +- +- release_out: +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- return 0; ++ free (a->notified); ++ free (a->pagerrs); ++ for (ll = a->lock_list; ll; /* nop */) ++ { ++ struct lock_list *o = ll; ++ ll = ll->next; ++ free (o); ++ } ++ ports_port_deref (a->p); + } + + /* Implement pageout call back as described by <mach/memory_object.defs>. */ +diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c +index 599237c..11ec396 100644 +--- a/libpager/data-unlock.c ++++ b/libpager/data-unlock.c +@@ -18,6 +18,17 @@ + #include "priv.h" + #include "memory_object_S.h" + #include <stdio.h> ++#include <stdlib.h> ++#include <error.h> ++ ++static void memory_object_data_unlock_deferred (void *arg); ++ ++struct deferred_args ++{ ++ struct pager *p; ++ vm_offset_t offset; ++ vm_size_t length; ++}; + + /* Implement kernel requests for access as described in + <mach/memory_object.defs>. */ +@@ -29,17 +40,13 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, + vm_size_t length, + vm_prot_t access) + { +- volatile int err; ++ error_t err = 0; ++ struct deferred_args *deferred; + + if (!p + || p->port.class != _pager_class) + return EOPNOTSUPP; + +- pthread_mutex_lock (&p->interlock); +- _pager_wait_for_seqno (p, seqno); +- _pager_release_seqno (p, seqno); +- pthread_mutex_unlock (&p->interlock); +- + if (p->pager_state != NORMAL) + { + printf ("pager in wrong state for unlock\n"); +@@ -68,20 +75,45 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, + goto out; + } + +- err = pager_unlock_page (p->upi, offset); ++ deferred = _pager_allocate_deferred (memory_object_data_unlock_deferred, ++ sizeof *deferred); ++ if (deferred == NULL) ++ { ++ err = ENOMEM; ++ goto out; ++ } ++ ++ ports_port_ref (p); ++ deferred->p = p; ++ deferred->offset = offset; ++ deferred->length = length; ++ _pager_defer (deferred); + ++ out: ++ return err; ++} ++ ++static void ++memory_object_data_unlock_deferred (void *arg) ++{ ++ struct deferred_args *args = arg; ++ ++ error_t err = pager_unlock_page (args->p->upi, args->offset); + if (!err) + /* We can go ahead and release the lock. */ +- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 0, +- VM_PROT_NONE, 0); ++ _pager_lock_object (args->p, args->offset, args->length, ++ MEMORY_OBJECT_RETURN_NONE, 0, ++ VM_PROT_NONE, 0); + else + { + /* Flush the page, and set a bit so that m_o_data_request knows +- to issue an error. */ +- _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 1, +- VM_PROT_WRITE, 1); +- _pager_mark_next_request_error (p, offset, length, err); ++ to issue an error. */ ++ _pager_lock_object (args->p, args->offset, args->length, ++ MEMORY_OBJECT_RETURN_NONE, 1, ++ VM_PROT_WRITE, 1); ++ _pager_mark_next_request_error (args->p, args->offset, args->length, ++ err); + } +- out: +- return 0; ++ ++ ports_port_deref (args->p); + } +diff --git a/libpager/demuxer.c b/libpager/demuxer.c +index b4d4054..b0b3d56 100644 +--- a/libpager/demuxer.c ++++ b/libpager/demuxer.c +@@ -1,5 +1,5 @@ + /* Demuxer for pager library +- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation ++ Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as +@@ -15,26 +15,549 @@ + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + ++#include <error.h> ++#include <mach/mig_errors.h> ++#include <pthread.h> ++#include <stddef.h> ++#include <string.h> ++#include <time.h> ++ + #include "priv.h" + #include "memory_object_S.h" + #include "notify_S.h" + ++#define error(...) ++ ++#define WORKER_COUNT 3 ++#define DEFERRED_MAX_IDLE 2 ++#define DEFERRED_TIMEOUT 4 ++#define DEFERRED_SLANT (4 - 1) ++ ++struct item { ++ struct item *next; ++}; ++ ++struct request { ++ struct item item; ++ mig_routine_t routine; ++ mach_msg_header_t *inp; ++ mach_msg_header_t *outp; ++}; ++ ++struct queue { ++ struct item *head; ++ struct item **tail; ++}; ++ ++static inline void ++queue_init (struct queue *q) ++{ ++ q->head = NULL; ++ q->tail = &q->head; ++} ++ ++static inline void ++queue_enqueue (struct queue *q, struct item *r) ++{ ++ *q->tail = r; ++ q->tail = &r->next; ++ r->next = NULL; ++} ++ ++static inline void * ++queue_dequeue (struct queue *q) ++{ ++ struct item *r = q->head; ++ if (r == NULL) ++ return NULL; ++ ++ /* Pop the first item off. */ ++ if ((q->head = q->head->next) == NULL) ++ /* The queue is empty, fix tail pointer. */ ++ q->tail = &q->head; ++ ++ r->next = NULL; ++ return r; ++} ++ ++struct worker { ++ struct queue queue; ++ unsigned long tag; ++}; ++ ++static struct { ++ struct queue queue; ++ int asleep; ++ pthread_cond_t wakeup; ++ pthread_mutex_t lock; ++ struct worker workers[WORKER_COUNT]; ++} requests; ++ + /* Demultiplex a single message directed at a pager port; INP is the + message received; fill OUTP with the reply. */ + int + pager_demuxer (mach_msg_header_t *inp, + mach_msg_header_t *outp) + { ++#ifdef MACH_RCV_LARGE ++ const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */ ++#else ++ const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */ ++#endif ++ + mig_routine_t routine; +- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) || +- (routine = _pager_seqnos_notify_server_routine (inp))) ++ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) || ++ (routine = _pager_seqnos_notify_server_routine (inp)))) ++ return FALSE; ++ ++#define MASK (8 - 1) ++ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK; ++#undef MASK ++ ++ struct request *r = malloc (sizeof *r + padded_size + max_size); ++ if (r == NULL) ++ { ++ /* This is bad. */ ++ /* XXX reply */ ++ error (0, errno, "malloc"); ++ return 0; ++ } ++ ++ r->routine = routine; ++ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r); ++ memcpy (r->inp, inp, inp->msgh_size); ++ ++ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size); ++ memcpy (r->outp, outp, sizeof *outp); ++ ++ pthread_mutex_lock (&requests.lock); ++ ++ queue_enqueue (&requests.queue, &r->item); ++ ++ if (0) ++ { ++ error (0, 0, "enqueuing %p (msgh_id: %d on obj %d), head is: %p", ++ r, ++ inp->msgh_id, ++ inp->msgh_local_port, ++ requests.queue.head); ++ } ++ ++ /* Awake worker. */ ++ if (requests.asleep > 0) ++ pthread_cond_signal (&requests.wakeup); ++ ++ pthread_mutex_unlock (&requests.lock); ++ ++ ((mig_reply_header_t *) outp)->RetCode = MIG_NO_REPLY; ++ ++ return TRUE; ++} ++ ++static void * ++worker_func (void *arg) ++{ ++ struct worker *self = (struct worker *) arg; ++ struct request *r = NULL; ++ ++ while (1) ++ { ++ int i; ++ mach_msg_return_t mr; ++ ++ /* Free previous message. */ ++ free (r); ++ ++ pthread_mutex_lock (&requests.lock); ++ ++ /* First, look in our queue for more requests to the object we ++ have been working on lately. Some other thread might have ++ delegated it to us. */ ++ r = queue_dequeue (&self->queue); ++ if (r != NULL) ++ goto gotone; ++ ++ /* Nope. Clear our tag and... */ ++ self->tag = 0; ++ ++ get_request_locked: ++ /* ... get a request from the global queue instead. */ ++ while ((r = queue_dequeue (&requests.queue)) == NULL) ++ { ++ requests.asleep += 1; ++ pthread_cond_wait (&requests.wakeup, &requests.lock); ++ requests.asleep -= 1; ++ } ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ if (requests.workers[i].tag ++ == (unsigned long) r->inp->msgh_local_port) ++ { ++ /* Some other thread is working on that object. Delegate ++ the request to that worker. */ ++ queue_enqueue (&requests.workers[i].queue, &r->item); ++ goto get_request_locked; ++ } ++ ++ /* Claim responsibility for this object by setting our tag. */ ++ self->tag = (unsigned long) r->inp->msgh_local_port; ++ ++ gotone: ++ pthread_mutex_unlock (&requests.lock); ++ ++ /* Call the server routine. */ ++ (*r->routine) (r->inp, r->outp); ++ ++ /* What follows is basically the second part of ++ mach_msg_server_timeout. */ ++ mig_reply_header_t *request = (mig_reply_header_t *) r->inp; ++ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp; ++ ++ if (reply->RetCode) ++ { ++ error (0, reply->RetCode, "server routine said"); ++ } ++ ++ switch (reply->RetCode) ++ { ++ case KERN_SUCCESS: ++ /* Hunky dory. */ ++ break; ++ ++ case MIG_NO_REPLY: ++ /* The server function wanted no reply sent. ++ Loop for another request. */ ++ continue; ++ ++ default: ++ /* Some error; destroy the request message to release any ++ port rights or VM it holds. Don't destroy the reply port ++ right, so we can send an error message. */ ++ request->Head.msgh_remote_port = MACH_PORT_NULL; ++ mach_msg_destroy (&request->Head); ++ break; ++ } ++ ++ if (reply->Head.msgh_remote_port == MACH_PORT_NULL) ++ { ++ /* No reply port, so destroy the reply. */ ++ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) ++ mach_msg_destroy (&reply->Head); ++ continue; ++ } ++ ++ /* Send the reply. */ ++ mr = mach_msg (&reply->Head, ++ MACH_SEND_MSG, ++ reply->Head.msgh_size, ++ 0, ++ MACH_PORT_NULL, ++ 0, ++ MACH_PORT_NULL); ++ ++ switch (mr) ++ { ++ case MACH_SEND_INVALID_DEST: ++ /* The reply can't be delivered, so destroy it. This error ++ indicates only that the requester went away, so we ++ continue and get the next request. */ ++ mach_msg_destroy (&reply->Head); ++ break; ++ ++ default: ++ /* Some other form of lossage; there is not much we can ++ do here. */ ++ error (0, mr, "mach_msg"); ++ } ++ } ++ ++ /* Not reached. */ ++ return NULL; ++} ++ ++static struct ++{ ++ struct queue queue; ++ int alive; ++ int online; ++ int asleep; ++ float average; ++ pthread_cond_t wakeup; ++ pthread_mutex_t lock; ++} deferreds; ++ ++struct deferred ++{ ++ struct item item; ++ deferred_function_t function; ++ char args[0]; ++}; ++ ++void * ++_pager_allocate_deferred (deferred_function_t f, size_t size) ++{ ++ struct deferred *d = malloc (sizeof *d + size); ++ if (d != NULL) ++ d->function = f; ++ return &d->args; ++} ++ ++void ++_pager_defer (void *cookie) ++{ ++ struct deferred *d = (struct deferred *) ++ ((char *) cookie - offsetof (struct deferred, args)); ++ ++ pthread_mutex_lock (&deferreds.lock); ++ queue_enqueue (&deferreds.queue, &d->item); ++ ++ /* Awake worker. */ ++ if (deferreds.asleep > 0) ++ pthread_cond_signal (&deferreds.wakeup); ++ ++ pthread_mutex_unlock (&deferreds.lock); ++} ++ ++/* XXX copied from libports. */ ++#include <mach/thread_info.h> ++#include <mach/thread_switch.h> ++ ++#define STACK_SIZE (64 * 1024) ++ ++#define THREAD_PRI 2 ++ ++/* XXX To reduce starvation, the priority of new threads is initially ++ depressed. This helps already existing threads complete their job and be ++ recycled to handle new messages. The duration of this depression is made ++ a function of the total number of threads because more threads imply ++ more contention, and the priority of threads blocking on a contended spin ++ lock is also implicitely depressed. ++ ++ Then, if permitted, a greater priority is requested to further decrease ++ the need for additional threads. */ ++static void ++adjust_priority (unsigned int totalthreads) ++{ ++ mach_port_t host_priv, self, pset, pset_priv; ++ unsigned int t; ++ error_t err; ++ ++ t = 10 + (((totalthreads - 1) / 100) + 1) * 10; ++ thread_switch (MACH_PORT_NULL, SWITCH_OPTION_DEPRESS, t); ++ ++ err = get_privileged_ports (&host_priv, NULL); ++ if (err) ++ goto error_host_priv; ++ ++ self = mach_thread_self (); ++ err = thread_get_assignment (self, &pset); ++ if (err) ++ goto error_pset; ++ ++ err = host_processor_set_priv (host_priv, pset, &pset_priv); ++ if (err) ++ goto error_pset_priv; ++ ++ err = thread_max_priority (self, pset_priv, 0); ++ if (err) ++ goto error_max_priority; ++ ++ err = thread_priority (self, THREAD_PRI, 0); ++ if (err) ++ goto error_priority; ++ ++ mach_port_deallocate (mach_task_self (), pset_priv); ++ mach_port_deallocate (mach_task_self (), pset); ++ mach_port_deallocate (mach_task_self (), self); ++ mach_port_deallocate (mach_task_self (), host_priv); ++ return; ++ ++error_priority: ++error_max_priority: ++ mach_port_deallocate (mach_task_self (), pset_priv); ++error_pset_priv: ++ mach_port_deallocate (mach_task_self (), pset); ++error_pset: ++ mach_port_deallocate (mach_task_self (), self); ++ mach_port_deallocate (mach_task_self (), host_priv); ++error_host_priv: ++ if (err != EPERM) ++ error (0, err, "unable to adjust libports thread priority"); ++} ++ ++static pthread_attr_t tiny_stack_attr; ++ ++static void * ++deferreds_worker_func (void *arg __attribute__ ((unused))) ++{ ++ adjust_priority (__atomic_add_fetch (&deferreds.alive, 1, __ATOMIC_RELAXED)); ++ ++ boolean_t first_run = TRUE; ++ while (1) + { +- (*routine) (inp, outp); +- return TRUE; ++ struct deferred *d; ++ error_t err; ++ struct timespec ts; ++ ++ pthread_mutex_lock (&deferreds.lock); ++ ++ if (first_run) ++ { ++ /* A starting thread is considered asleep. */ ++ deferreds.asleep -= 1; ++ first_run = FALSE; ++ } ++ ++ while ((d = queue_dequeue (&deferreds.queue)) == NULL) ++ { ++ deferreds.asleep += 1; ++ clock_gettime (CLOCK_REALTIME, &ts); ++ ts.tv_sec += DEFERRED_TIMEOUT + (random () & DEFERRED_SLANT); ++ err = pthread_cond_timedwait (&deferreds.wakeup, &deferreds.lock, ++ &ts); ++ deferreds.asleep -= 1; ++ if (err == ETIMEDOUT && ++#if 0 ++deferreds.asleep >= (DEFERRED_MAX_IDLE ?: 1) ++#else ++ deferreds.alive - deferreds.asleep < (int) deferreds.average - (DEFERRED_MAX_IDLE ?: 1) ++#endif ++) ++ { ++ error (0, 0, "deferred worker thread exiting, %d idle workers left, average is %f", deferreds.asleep, deferreds.average); ++ __atomic_sub_fetch (&deferreds.alive, 1, __ATOMIC_RELAXED); ++ pthread_mutex_unlock (&deferreds.lock); ++ return NULL; ++ } ++ } ++ ++#if 0 ++ if (deferreds.asleep == 0) ++ { ++ pthread_t t; ++ error (0, 0, "creating deferred worker thread"); ++ /* A starting thread is considered asleep. */ ++ deferreds.asleep += 1; ++ err = pthread_create (&t, &tiny_stack_attr, ++ &deferreds_worker_func, NULL); ++ if (err) ++ { ++ /* This is tough. Let's hope for the best. */ ++ error (0, err, "pthread_create"); ++ deferreds.asleep -= 1; ++ } ++ else ++ pthread_detach (t); ++ } ++#endif ++ ++ pthread_mutex_unlock (&deferreds.lock); ++ ++ (*d->function) (&d->args); ++ free (d); ++ } ++ ++ /* Not reached. */ ++ return NULL; ++} ++ ++#define DEFERREDS_SMOOTHNESS (0.005) ++ ++struct timespec deferreds_delay = { 0, 100000000L }; ++ ++static void * ++deferreds_manager_func (void *arg __attribute__ ((unused))) ++{ ++ int i = 0; ++ float last = 0; ++ while (1) ++ { ++ pthread_mutex_lock (&deferreds.lock); ++ last = deferreds.average; ++ deferreds.average = ++ DEFERREDS_SMOOTHNESS * (deferreds.alive - deferreds.asleep) ++ + (1. - DEFERREDS_SMOOTHNESS) * last; ++ ++ if (i == 0) ++ error (0, 0, "alive: %d working: %d, average: %f", deferreds.alive, deferreds.alive - deferreds.asleep, deferreds.average); ++ ++ i = (i + 1) % 10; ++ ++ if (deferreds.asleep == 0 && i == 0) ++ { ++ error_t err; ++ pthread_t t; ++ error (0, 0, "creating deferred worker thread"); ++ /* A starting thread is considered asleep. */ ++ deferreds.asleep += 1; ++ err = pthread_create (&t, &tiny_stack_attr, ++ &deferreds_worker_func, NULL); ++ if (err) ++ { ++ /* This is tough. Let's hope for the best. */ ++ error (0, err, "pthread_create"); ++ deferreds.asleep -= 1; ++ } ++ else ++ pthread_detach (t); ++ } ++ ++ ++ pthread_mutex_unlock (&deferreds.lock); ++ nanosleep (&deferreds_delay, NULL); ++ } ++ return NULL; ++} ++ ++error_t ++pager_start_worker (void) ++{ ++ error_t err; ++ int i; ++ pthread_t t; ++ ++ pthread_attr_init (&tiny_stack_attr); ++ pthread_attr_setstacksize (&tiny_stack_attr, STACK_SIZE); ++ ++ queue_init (&requests.queue); ++ pthread_cond_init (&requests.wakeup, NULL); ++ pthread_mutex_init (&requests.lock, NULL); ++ ++ for (i = 0; i < WORKER_COUNT; i++) ++ { ++ requests.workers[i].tag = 0; ++ queue_init (&requests.workers[i].queue); ++ ++ err = pthread_create (&t, &tiny_stack_attr, ++ &worker_func, &requests.workers[i]); ++ if (err) ++ return err; ++ ++ pthread_detach (t); ++ } ++ ++ queue_init (&deferreds.queue); ++ pthread_cond_init (&deferreds.wakeup, NULL); ++ pthread_mutex_init (&deferreds.lock, NULL); ++ ++ for (i = 0; i < (DEFERRED_MAX_IDLE ?: 1); i++) ++ { ++ /* A starting thread is considered asleep. */ ++ deferreds.asleep += 1; ++ err = pthread_create (&t, &tiny_stack_attr, ++ &deferreds_worker_func, NULL); ++ if (err) ++ return err; ++ ++ pthread_detach (t); + } + +- /* Synchronize our bookkeeping of the port's seqno with the one +- consumed by this bogus message. */ +- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno); +- return FALSE; ++ err = pthread_create (&t, &tiny_stack_attr, ++ &deferreds_manager_func, NULL); ++ if (err) ++ return err; ++ pthread_detach (t); ++ ++ ++ return err; + } +diff --git a/libpager/priv.h b/libpager/priv.h +index d49cbb9..59117a4 100644 +--- a/libpager/priv.h ++++ b/libpager/priv.h +@@ -152,4 +152,8 @@ void _pager_lock_object (struct pager *, vm_offset_t, vm_size_t, int, int, + void _pager_free_structure (struct pager *); + void _pager_clean (void *arg); + void _pager_real_dropweak (void *arg); ++ ++typedef void (*deferred_function_t) (void *); ++void *_pager_allocate_deferred (deferred_function_t, size_t); ++void _pager_defer (void *); + #endif +diff --git a/libpager/seqnos.c b/libpager/seqnos.c +index cab2f33..455378f 100644 +--- a/libpager/seqnos.c ++++ b/libpager/seqnos.c +@@ -25,6 +25,7 @@ void + _pager_wait_for_seqno (struct pager *p, + mach_port_seqno_t seqno) + { ++ return; + while (seqno != p->seqno + 1) + { + p->waitingforseqno = 1; +@@ -39,6 +40,7 @@ void + _pager_release_seqno (struct pager *p, + mach_port_seqno_t seqno) + { ++ return; + assert (seqno == p->seqno + 1); + p->seqno = seqno; + if (p->waitingforseqno) +@@ -54,6 +56,7 @@ void + _pager_update_seqno (mach_port_t object, + mach_port_seqno_t seqno) + { ++ return; + struct pager *p; + + p = ports_lookup_port (0, object, _pager_class); +@@ -68,6 +71,7 @@ void + _pager_update_seqno_p (struct pager *p, + mach_port_seqno_t seqno) + { ++ return; + if (p + && p->port.class == _pager_class) + { +diff --git a/libports/manage-one-thread.c b/libports/manage-one-thread.c +index 4ea740b..6f20942 100644 +--- a/libports/manage-one-thread.c ++++ b/libports/manage-one-thread.c +@@ -85,7 +85,14 @@ ports_manage_port_operations_one_thread (struct port_bucket *bucket, + + return status; + } +- ++ ++ /* It is currently unsafe for most servers to terminate based on ++ inactivity because a request may arrive after a server has ++ started shutting down, causing the client to receive an error. ++ Prevent the service loop from terminating by setting TIMEOUT to ++ zero. */ ++ timeout = 0; ++ + do + err = mach_msg_server_timeout (internal_demuxer, 0, bucket->portset, + timeout ? MACH_RCV_TIMEOUT : 0, timeout); +diff --git a/storeio/pager.c b/storeio/pager.c +index 7d78711..4c6fa4d 100644 +--- a/storeio/pager.c ++++ b/storeio/pager.c +@@ -24,6 +24,7 @@ + #include <strings.h> + #include <unistd.h> + #include <errno.h> ++#include <error.h> + #include <sys/mman.h> + #include <stdio.h> + +@@ -148,12 +149,10 @@ static void * + service_paging_requests (void *arg) + { + (void) arg; +- +- for (;;) +- ports_manage_port_operations_multithread (pager_port_bucket, +- pager_demuxer, +- 1000 * 30, 1000 * 60 * 5, 0); +- ++ ports_manage_port_operations_one_thread (pager_port_bucket, ++ pager_demuxer, ++ 0); ++ /* Not reached. */ + return NULL; + } + +@@ -173,6 +172,11 @@ init_dev_paging () + + pager_port_bucket = ports_create_bucket (); + ++ /* Start libpagers worker thread. */ ++ err = pager_start_worker (); ++ if (err) ++ error (0, err, "pager_start_worker"); ++ + /* Make a thread to service paging requests. */ + err = pthread_create (&thread, NULL, service_paging_requests, NULL); + if (!err) +diff --git a/tmpfs/pager-stubs.c b/tmpfs/pager-stubs.c +index 3cb264b..975800a 100644 +--- a/tmpfs/pager-stubs.c ++++ b/tmpfs/pager-stubs.c +@@ -92,5 +92,5 @@ pager_clear_user_data (struct user_pager_info *pager) + void + pager_dropweak (struct user_pager_info *p) + { +- abort(); ++ /* Do nothing. */ + } diff --git a/debian/patches/series b/debian/patches/series index 65769bfb..c672eed1 100644 --- a/debian/patches/series +++ b/debian/patches/series @@ -42,3 +42,4 @@ xkb-compat.patch xxx-fix-build.patch mach-defpager-protected-payload.patch #ext2fs-skip-unallocated-blocks.patch +libpager-threadpool.patch |