commit be7319c2857598f36545cf31addfb8e35f56834d Author: Justus Winter <4winter@informatik.uni-hamburg.de> Date: Thu Apr 17 15:44:12 2014 +0200 libpager: rework the request handling to use less threads Previously, libpager used multiple threads to receive messages from the pager bucket. It used sequence barriers to execute the requests to order requests to each object. The sequence barriers are implemented in seqnos.c. A server function uses _pager_wait_for_seqno to wait for its sequence number and _pager_release_seqno to release it, or it uses _pager_update_seqno to do both operations in one step. These sequence barriers divide each server function in three parts: A, B, and C. A_i happens "before" the sequence barrier i, B_i happens "in order", C_i happens "after" the sequence barrier. This partial order < has the following properties: * This order is *per object*. Requests to different objects are not ordered. * A_i < B_i, B_i < C_i (due to the structure of the code) * B_i < B_{i+1} (this is due to the sequence barriers) * Note that only the B parts are ordered by the sequence numbers, we are free to execute C_i and C_{i+1} in any possible order. The same argument applies to the A parts. If the server functions would never to block, a single threaded could do the job. Unfortunately, we sometimes call user-supplied functions like pager_write_pager, which may block for some time doing IO. Worse, ext2fs used to trigger a page fault during pager_write_pager, creating yet another request to the disk pager (and thus, libpager). Surveying the code revealed that most of the server functions only validate arguments in their A phase, do some non-blocking work in their B phase and then send the reply in their C phase. Three functions, however, do some potentially blocking and otherwise bad behaving stuff in their C phase. The three interesting functions are _pager_seqnos_memory_object_data_request, _pager_seqnos_memory_object_data_return, and quest. Most notably, the first calls pager_read_page and the second pager_write_page. The sequence barriers are implemented using a very simple ticket algorithm. Every request, even the invalid ones, is processed by a thread, and waits until the ticket count reaches its seqno, does some work in-order, then increments the ticket and awakes all threads that have piled up up to this moment. All of them except one will then discover that it's not their turn yet and go to sleep again. Creating one thread per request has proven to be problematic as memory_object requests often arrive in large batches. This patch does four things: * Use a single thread to receive messages from the port bucket. All incoming request are put into a queue. * Use a fixed-number of threads (though even one is actually enough) to execute the A and B parts of the server functions. If multiple threads are used, a work-delegation mechanism ensures that the per object order < is observed. See the comments in demuxer.c for a more detailed description and why having a small number of threads (I propose 3) is advantageous. * The code from the C parts is moved out of the server function X into X_deferred. An "interesting" function now puts a deferred function calls into a queue instead of executing that part directly. * A variable number of threads (though this number is kept close to one) is executing deferred functions from the queue of deferred functions. As memory_object requests often arrive in large batches, the key insight here is to rate-limit the creation of threads. See the comments in deferreds.c for details on the implementation and why a lower number of threads results in a better performance. For reference, I used the following command to create workloads that highlight the problem this patch is addressing: % settrans t .../ext2fs --sync=30 /dev/sd2s1 ... % /usr/bin/time zsh -c 'for ((i=0; i < 3500; i++)); do dd if=/dev/zero of=t/src/$i bs=4k count=290 2>/dev/null echo -n . if ((i % 100 == 0)) ; then echo -n $i; fi done' With this patch, the number of libpager threads tops at 12, and it is just 6 most of the time. * libpager/deferreds.c: New file. * libpager/deferreds.h: Likewise. * libpager/pool.c: Likewise. * libpager/pool.h: Likewise. * libpager/queue.h: Likewise. * libpager/demuxer.c: Manage a queue of requests received from the port bucket. (pager_demuxer): Just decode the server function and enqueue the request. (worker_func): New function that consumes and executes the requests from the queue. (service_paging_requests): New function. (pager_start_workers): Likewise. * libpager/data-request.c: Remove the seqno barriers. Move code from the 'C' phase into a deferred function. * libpager/data-return.c: Likewise. * libpager/data-unlock.c: Likewise. * libpager/chg-compl.c: Remove the seqno barriers. * libpager/lock-completed.c: Likewise. * libpager/no-senders.c: Likewise. * libpager/notify-stubs.c: Likewise. * libpager/object-init.c: Likewise. * libpager/object-terminate.c: Likewise. * libpager/seqnos.c: Remove file. * libpager/stubs.c: Likewise. * libpager/pager.h (pager_demuxer): Drop declaration. (pager_start_workers): New declaration. * libpager/priv.h: Remove the _pager_seqno declarations. * libpager/Makefile (SRCS): Drop seqnos.c, add deferreds.c, pool.c. * console/pager.c (user_pager_init): Call pager_start_workers. * libdiskfs/disk-pager.c: Likewise. * storeio/pager.c: Likewise. diff --git a/console/pager.c b/console/pager.c index 87c36f0..a08e46d 100644 --- a/console/pager.c +++ b/console/pager.c @@ -119,22 +119,6 @@ void pager_dropweak (struct user_pager_info *upi) { } - - -/* A top-level function for the paging thread that just services paging - requests. */ -static void * -service_paging_requests (void *arg) -{ - struct port_bucket *pager_bucket = arg; - for (;;) - ports_manage_port_operations_multithread (pager_bucket, - pager_demuxer, - 1000 * 60 * 2, - 1000 * 60 * 10, 0); - return NULL; -} - /* Initialize the pager for the display component. */ void @@ -148,15 +132,10 @@ user_pager_init (void) if (! pager_bucket) error (5, errno, "Cannot create pager bucket"); - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); - if (!err) - pthread_detach (thread); - else - { - errno = err; - perror ("pthread_create"); - } + /* Start libpagers worker thread. */ + err = pager_start_workers (pager_bucket); + if (err) + error (5, err, "Cannot start pager worker threads"); } diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c index 9a0d9d8..eb309b1 100644 --- a/libdiskfs/disk-pager.c +++ b/libdiskfs/disk-pager.c @@ -33,39 +33,19 @@ static struct hurd_signal_preemptor preemptor = handler: (sighandler_t) &fault_handler, }; -/* A top-level function for the paging thread that just services paging - requests. */ -static void * -service_paging_requests (void *arg) -{ - struct port_bucket *pager_bucket = arg; - for (;;) - ports_manage_port_operations_multithread (pager_bucket, - pager_demuxer, - 1000 * 60 * 2, - 1000 * 60 * 10, 0); - return NULL; -} - void diskfs_start_disk_pager (struct user_pager_info *upi, struct port_bucket *pager_bucket, int may_cache, int notify_on_evict, size_t size, void **image) { - pthread_t thread; error_t err; mach_port_t disk_pager_port; - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, pager_bucket); - if (!err) - pthread_detach (thread); - else - { - errno = err; - perror ("pthread_create"); - } + /* Start libpagers worker thread. */ + err = pager_start_workers (pager_bucket); + if (err) + error (2, err, "creating pager worker threads failed"); /* Create the pager. */ diskfs_disk_pager = pager_create (upi, pager_bucket, diff --git a/libpager/Makefile b/libpager/Makefile index b622295..6e7f43f 100644 --- a/libpager/Makefile +++ b/libpager/Makefile @@ -22,9 +22,9 @@ SRCS = data-request.c data-return.c data-unlock.c pager-port.c \ inhibit-term.c lock-completed.c lock-object.c mark-error.c \ no-senders.c object-init.c object-terminate.c pagemap.c \ pager-create.c pager-flush.c pager-shutdown.c pager-sync.c \ - stubs.c seqnos.c demuxer.c chg-compl.c pager-attr.c clean.c \ + stubs.c demuxer.c chg-compl.c pager-attr.c clean.c \ dropweak.c notify-stubs.c get-upi.c pager-memcpy.c pager-return.c \ - offer-page.c + offer-page.c deferreds.c pool.c installhdrs = pager.h HURDLIBS= ports diff --git a/libpager/chg-compl.c b/libpager/chg-compl.c index d77c46c..89ccfc8 100644 --- a/libpager/chg-compl.c +++ b/libpager/chg-compl.c @@ -37,7 +37,6 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, } pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seq); for (ar = p->attribute_requests; ar; ar = ar->next) if (ar->may_cache == maycache && ar->copy_strategy == strat) @@ -46,8 +45,7 @@ _pager_seqnos_memory_object_change_completed (struct pager *p, pthread_cond_broadcast (&p->wakeup); break; } - - _pager_release_seqno (p, seq); + pthread_mutex_unlock (&p->interlock); return 0; } diff --git a/libpager/data-request.c b/libpager/data-request.c index 82ce904..0bceac4 100644 --- a/libpager/data-request.c +++ b/libpager/data-request.c @@ -15,11 +15,21 @@ along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include "deferreds.h" #include "priv.h" #include "memory_object_S.h" #include #include +static void memory_object_data_request_deferred (void *arg); + +struct deferred_args +{ + struct pager *p; + vm_offset_t offset; + vm_size_t length; +}; + /* Implement pagein callback as described in . */ kern_return_t _pager_seqnos_memory_object_data_request (struct pager *p, @@ -31,9 +41,8 @@ _pager_seqnos_memory_object_data_request (struct pager *p, { short *pm_entry; int doread, doerror; + struct deferred_args *deferred; error_t err; - vm_address_t page; - int write_lock; if (!p || p->port.class != _pager_class) @@ -41,7 +50,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, /* Acquire the right to meddle with the pagemap */ pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); /* sanity checks -- we don't do multi-page requests yet. */ if (control != p->memobjcntl) @@ -105,7 +113,6 @@ _pager_seqnos_memory_object_data_request (struct pager *p, } /* Let someone else in. */ - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); if (!doread) @@ -113,18 +120,16 @@ _pager_seqnos_memory_object_data_request (struct pager *p, if (doerror) goto error_read; - err = pager_read_page (p->upi, offset, &page, &write_lock); - if (err) - goto error_read; + deferred = _pager_allocate_deferred (memory_object_data_request_deferred, + sizeof *deferred); + if (deferred == NULL) + return ENOMEM; - memory_object_data_supply (p->memobjcntl, offset, page, length, 1, - write_lock ? VM_PROT_WRITE : VM_PROT_NONE, - p->notify_on_evict ? 1 : 0, - MACH_PORT_NULL); - pthread_mutex_lock (&p->interlock); - _pager_mark_object_error (p, offset, length, 0); - _pager_allow_termination (p); - pthread_mutex_unlock (&p->interlock); + ports_port_ref (p); + deferred->p = p; + deferred->offset = offset; + deferred->length = length; + _pager_defer (deferred); return 0; error_read: @@ -139,7 +144,41 @@ _pager_seqnos_memory_object_data_request (struct pager *p, allow_release_out: _pager_allow_termination (p); release_out: - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); return 0; } + +static void +memory_object_data_request_deferred (void *arg) +{ + struct deferred_args *args = arg; + vm_address_t page; + int write_lock; + error_t err; + + err = pager_read_page (args->p->upi, args->offset, &page, &write_lock); + if (err) + { + memory_object_data_error (args->p->memobjcntl, args->offset, + args->length, EIO); + _pager_mark_object_error (args->p, args->offset, args->length, EIO); + + pthread_mutex_lock (&args->p->interlock); + _pager_allow_termination (args->p); + pthread_mutex_unlock (&args->p->interlock); + goto out; + } + + memory_object_data_supply (args->p->memobjcntl, args->offset, page, + args->length, 1, + write_lock ? VM_PROT_WRITE : VM_PROT_NONE, + args->p->notify_on_evict ? 1 : 0, + MACH_PORT_NULL); + pthread_mutex_lock (&args->p->interlock); + _pager_mark_object_error (args->p, args->offset, args->length, 0); + _pager_allow_termination (args->p); + pthread_mutex_unlock (&args->p->interlock); + + out: + ports_port_deref (args->p); +} diff --git a/libpager/data-return.c b/libpager/data-return.c index ee6c6e8..70c2cc8 100644 --- a/libpager/data-return.c +++ b/libpager/data-return.c @@ -15,12 +15,36 @@ along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include "deferreds.h" #include "priv.h" #include "memory_object_S.h" #include #include #include +static void memory_object_data_return_deferred (void *arg); + +struct lock_list +{ + struct lock_request *lr; + struct lock_list *next; +}; + +struct deferred_args +{ + struct pager *p; + vm_offset_t offset; + pointer_t data; + vm_size_t length; + int kcopy; + int npages; + char *notified; + error_t *pagerrs; + struct lock_list *lock_list; + int omitdata; +}; + + /* Worker function used by _pager_seqnos_memory_object_data_return and _pager_seqnos_memory_object_data_initialize. All args are as for _pager_seqnos_memory_object_data_return; the additional @@ -38,13 +62,13 @@ _pager_do_write_request (struct pager *p, { short *pm_entries; int npages, i; - char *notified; - error_t *pagerrs; + char *notified = NULL; + error_t *pagerrs = NULL; struct lock_request *lr; - struct lock_list {struct lock_request *lr; - struct lock_list *next;} *lock_list, *ll; - int wakeup; + struct lock_list *lock_list = NULL, *ll; int omitdata = 0; + struct deferred_args *deferred; + error_t err; if (!p || p->port.class != _pager_class) @@ -52,7 +76,6 @@ _pager_do_write_request (struct pager *p, /* Acquire the right to meddle with the pagemap */ pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); /* sanity checks -- we don't do multi-page requests yet. */ if (control != p->memobjcntl) @@ -78,9 +101,9 @@ _pager_do_write_request (struct pager *p, } npages = length / __vm_page_size; - pagerrs = alloca (npages * sizeof (error_t)); + pagerrs = malloc (npages * sizeof (error_t)); - notified = alloca (npages * (sizeof *notified)); + notified = malloc (npages * (sizeof *notified)); #ifndef NDEBUG memset (notified, -1, npages * (sizeof *notified)); #endif @@ -101,7 +124,6 @@ _pager_do_write_request (struct pager *p, notified[i] = (p->notify_on_evict && ! (pm_entries[i] & PM_PAGEINWAIT)); - _pager_release_seqno (p, seqno); goto notify; } else { @@ -150,7 +172,7 @@ _pager_do_write_request (struct pager *p, for (lr = p->lock_requests; lr; lr = lr->next) if (offset < lr->end && offset + length >= lr->start) { - ll = alloca (sizeof (struct lock_list)); + ll = malloc (sizeof (struct lock_list)); ll->lr = lr; ll->next = lock_list; lock_list = ll; @@ -158,36 +180,106 @@ _pager_do_write_request (struct pager *p, } /* Let someone else in. */ - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); + deferred = _pager_allocate_deferred (memory_object_data_return_deferred, + sizeof *deferred); + if (deferred == NULL) + { + err = ENOMEM; + goto release_mem; + } + + ports_port_ref (p); + deferred->p = p; + deferred->offset = offset; + deferred->data = data; + deferred->length = length; + deferred->kcopy = kcopy; + deferred->npages = npages; + deferred->notified = notified; + deferred->pagerrs = pagerrs; + deferred->lock_list = lock_list; + deferred->omitdata = omitdata; + _pager_defer (deferred); + return 0; + + notify: + _pager_allow_termination (p); + pthread_mutex_unlock (&p->interlock); + + for (i = 0; i < npages; i++) + { + assert (notified[i] == 0 || notified[i] == 1); + if (notified[i]) + { + short *pm_entry = &pm_entries[i]; + + /* Do notify user. */ + pager_notify_evict (p->upi, offset + (i * vm_page_size)); + + /* Clear any error that is left. Notification on eviction + is used only to change association of page, so any + error may no longer be valid. */ + pthread_mutex_lock (&p->interlock); + *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); + pthread_mutex_unlock (&p->interlock); + } + } + goto release_mem; + + release_out: + pthread_mutex_unlock (&p->interlock); + + release_mem: + free (notified); + free (pagerrs); + for (ll = lock_list; ll; /* nop */) + { + struct lock_list *o = ll; + ll = ll->next; + free (o); + } + + return err; +} + +static void +memory_object_data_return_deferred (void *arg) +{ + struct deferred_args *a = arg; + int i; + struct lock_list *ll; + short *pm_entries; + int wakeup; + /* This is inefficient; we should send all the pages to the device at once but until the pager library interface is changed, this will have to do. */ - for (i = 0; i < npages; i++) - if (!(omitdata & (1 << i))) - pagerrs[i] = pager_write_page (p->upi, - offset + (vm_page_size * i), - data + (vm_page_size * i)); + for (i = 0; i < a->npages; i++) + if (!(a->omitdata & (1 << i))) + a->pagerrs[i] = pager_write_page (a->p->upi, + a->offset + (vm_page_size * i), + a->data + (vm_page_size * i)); /* Acquire the right to meddle with the pagemap */ - pthread_mutex_lock (&p->interlock); - _pager_pagemap_resize (p, offset + length); - pm_entries = &p->pagemap[offset / __vm_page_size]; + pthread_mutex_lock (&a->p->interlock); + _pager_pagemap_resize (a->p, a->offset + a->length); + pm_entries = &a->p->pagemap[a->offset / __vm_page_size]; wakeup = 0; - for (i = 0; i < npages; i++) + for (i = 0; i < a->npages; i++) { - if (omitdata & (1 << i)) + if (a->omitdata & (1 << i)) { - notified[i] = 0; + a->notified[i] = 0; continue; } if (pm_entries[i] & PM_WRITEWAIT) wakeup = 1; - if (pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) + if (a->pagerrs[i] && ! (pm_entries[i] & PM_PAGEINWAIT)) /* The only thing we can do here is mark the page, and give errors from now on when it is to be read. This is imperfect, because if all users go away, the pagemap will @@ -199,61 +291,63 @@ _pager_do_write_request (struct pager *p, if (pm_entries[i] & PM_PAGEINWAIT) { - memory_object_data_supply (p->memobjcntl, - offset + (vm_page_size * i), - data + (vm_page_size * i), + memory_object_data_supply (a->p->memobjcntl, + a->offset + (vm_page_size * i), + a->data + (vm_page_size * i), vm_page_size, 1, VM_PROT_NONE, 0, MACH_PORT_NULL); - notified[i] = 0; + a->notified[i] = 0; } else { - munmap ((void *) (data + (vm_page_size * i)), + munmap ((void *) (a->data + (vm_page_size * i)), vm_page_size); - notified[i] = (! kcopy && p->notify_on_evict); - if (! kcopy) + a->notified[i] = (! a->kcopy && a->p->notify_on_evict); + if (! a->kcopy) pm_entries[i] &= ~PM_INCORE; } pm_entries[i] &= ~(PM_PAGINGOUT | PM_PAGEINWAIT | PM_WRITEWAIT); } - for (ll = lock_list; ll; ll = ll->next) + for (ll = a->lock_list; ll; ll = ll->next) if (!--ll->lr->pending_writes && !ll->lr->locks_pending) wakeup = 1; if (wakeup) - pthread_cond_broadcast (&p->wakeup); + pthread_cond_broadcast (&a->p->wakeup); - notify: - _pager_allow_termination (p); - pthread_mutex_unlock (&p->interlock); + _pager_allow_termination (a->p); + pthread_mutex_unlock (&a->p->interlock); - for (i = 0; i < npages; i++) + for (i = 0; i < a->npages; i++) { - assert (notified[i] == 0 || notified[i] == 1); - if (notified[i]) + assert (a->notified[i] == 0 || a->notified[i] == 1); + if (a->notified[i]) { short *pm_entry = &pm_entries[i]; /* Do notify user. */ - pager_notify_evict (p->upi, offset + (i * vm_page_size)); + pager_notify_evict (a->p->upi, a->offset + (i * vm_page_size)); /* Clear any error that is left. Notification on eviction is used only to change association of page, so any error may no longer be valid. */ - pthread_mutex_lock (&p->interlock); + pthread_mutex_lock (&a->p->interlock); *pm_entry = SET_PM_ERROR (SET_PM_NEXTERROR (*pm_entry, 0), 0); - pthread_mutex_unlock (&p->interlock); + pthread_mutex_unlock (&a->p->interlock); } } - return 0; - - release_out: - _pager_release_seqno (p, seqno); - pthread_mutex_unlock (&p->interlock); - return 0; + free (a->notified); + free (a->pagerrs); + for (ll = a->lock_list; ll; /* nop */) + { + struct lock_list *o = ll; + ll = ll->next; + free (o); + } + ports_port_deref (a->p); } /* Implement pageout call back as described by . */ diff --git a/libpager/data-unlock.c b/libpager/data-unlock.c index 599237c..a969ca6 100644 --- a/libpager/data-unlock.c +++ b/libpager/data-unlock.c @@ -15,9 +15,21 @@ along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include "deferreds.h" #include "priv.h" #include "memory_object_S.h" #include +#include +#include + +static void memory_object_data_unlock_deferred (void *arg); + +struct deferred_args +{ + struct pager *p; + vm_offset_t offset; + vm_size_t length; +}; /* Implement kernel requests for access as described in . */ @@ -29,17 +41,13 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, vm_size_t length, vm_prot_t access) { - volatile int err; + error_t err = 0; + struct deferred_args *deferred; if (!p || p->port.class != _pager_class) return EOPNOTSUPP; - pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); - _pager_release_seqno (p, seqno); - pthread_mutex_unlock (&p->interlock); - if (p->pager_state != NORMAL) { printf ("pager in wrong state for unlock\n"); @@ -68,20 +76,45 @@ _pager_seqnos_memory_object_data_unlock (struct pager *p, goto out; } - err = pager_unlock_page (p->upi, offset); + deferred = _pager_allocate_deferred (memory_object_data_unlock_deferred, + sizeof *deferred); + if (deferred == NULL) + { + err = ENOMEM; + goto out; + } + + ports_port_ref (p); + deferred->p = p; + deferred->offset = offset; + deferred->length = length; + _pager_defer (deferred); + out: + return err; +} + +static void +memory_object_data_unlock_deferred (void *arg) +{ + struct deferred_args *args = arg; + + error_t err = pager_unlock_page (args->p->upi, args->offset); if (!err) /* We can go ahead and release the lock. */ - _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 0, - VM_PROT_NONE, 0); + _pager_lock_object (args->p, args->offset, args->length, + MEMORY_OBJECT_RETURN_NONE, 0, + VM_PROT_NONE, 0); else { /* Flush the page, and set a bit so that m_o_data_request knows - to issue an error. */ - _pager_lock_object (p, offset, length, MEMORY_OBJECT_RETURN_NONE, 1, - VM_PROT_WRITE, 1); - _pager_mark_next_request_error (p, offset, length, err); + to issue an error. */ + _pager_lock_object (args->p, args->offset, args->length, + MEMORY_OBJECT_RETURN_NONE, 1, + VM_PROT_WRITE, 1); + _pager_mark_next_request_error (args->p, args->offset, args->length, + err); } - out: - return 0; + + ports_port_deref (args->p); } diff --git a/libpager/deferreds.c b/libpager/deferreds.c new file mode 100644 index 0000000..25f2420 --- /dev/null +++ b/libpager/deferreds.c @@ -0,0 +1,246 @@ +/* Deferred function queue. + + Copyright (C) 2014 Free Software Foundation, Inc. + + Written by Justus Winter <4winter@informatik.uni-hamburg.de> + + This file is part of the GNU Hurd. + + The GNU Hurd is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + The GNU Hurd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with the GNU Hurd. If not, see . */ + +#include +#include +#include +#include +#include +#include + +#include "deferreds.h" +#include "pool.h" +#include "queue.h" + +/* + Worker pool for the deferred functions. + + Server functions may offload long-running functions to a pool of + workers. In theory, a single thread would suffice to service this + pool. However, ext2fs complicates this quite a bit. + + ext2fs uses libpager to implement a disk cache and a file cache. If + ext2fs services a memory_object request to an object in the file + cache, it can trigger an additional request by dereferencing a + pointer to memory backed by the disk cache. + + This is most likely the reason for thread-storms (or rather, the + reason why thread storms are that bad). It needs to be addressed in + ext2fs. Until it is, we can reasonably well with a simple thread + pool that creates threads on demand but limits the rate of the + thread creation. + + Rate-limiting the thread creation is crucial as memory_object + requests often arrive in large batches. The rate-limiting is + implemented using the token bucket algorithm. + + Creating more than one worker thread potentially decreases + performance. The operations offloaded to this pool are typically IO + requests. Using only one thread to perform such operations will + likely result in a more linear access pattern. + + XXX perform some tests on real hardware + + The following parameters are chosen based on these observations: +*/ + +#define DEFERREDS_MIN_IDLE 1 /* Create threads on demand so + that there is always an + idle thread. */ +#define DEFERREDS_MAX_IDLE 1 /* Keep at most one idle + thread around. */ +#define DEFERREDS_TIMEOUT 1 /* Superfluous idle threads + exit after one second. */ +/* XXX The next two values are somewhat magic but performed well. */ +#define DEFERREDS_BUCKET_SIZE 3. /* Limit creation of threads + to three threads... */ +#define DEFERREDS_BUCKET_PER 2. /* ... every two seconds. */ + +/* Set to 1 to see messages about thread creation and destruction. */ +#if 0 +#define TRACE(ARGS...) error (0, 0, ARGS) +#else +#define TRACE(ARGS...) +#endif + +/* Mapped time as fast time source. */ +static volatile struct mapped_time_value *mapped_time; + +/* Queue and worker pool for the deferred functions. */ +static struct +{ + struct queue queue; + struct pool pool; + struct timeval last_check; + float bucket; + pthread_cond_t wakeup; + pthread_mutex_t lock; +} deferreds; + +/* A deferred is a deferred function call. */ +struct deferred +{ + struct item item; + deferred_function_t function; + char args[0]; +}; + +void * +deferreds_worker_func (void *arg __attribute__ ((unused))) +{ + int idle; + adjust_priority (pool_thread_online (&deferreds.pool)); + + while (1) + { + struct deferred *d; + error_t err; + struct timespec ts; + + pthread_mutex_lock (&deferreds.lock); + + while ((d = queue_dequeue (&deferreds.queue)) == NULL) + { + clock_gettime (CLOCK_REALTIME, &ts); + ts.tv_sec += DEFERREDS_TIMEOUT; + + pool_thread_idle (&deferreds.pool); + err = pthread_cond_timedwait (&deferreds.wakeup, &deferreds.lock, + &ts); + idle = pool_thread_busy (&deferreds.pool); + + if (err == ETIMEDOUT && idle >= DEFERREDS_MAX_IDLE) + goto out; + } + + pthread_mutex_unlock (&deferreds.lock); + + (*d->function) (&d->args); + free (d); + } + + out: + TRACE ("deferred worker thread exiting, %d idle workers left", idle); + pthread_mutex_unlock (&deferreds.lock); + pool_thread_offline (&deferreds.pool); + return NULL; +} + +/* Allocate storage for a deferred function. A pointer to a buffer of + the given size is returned. */ +void * +_pager_allocate_deferred (deferred_function_t f, size_t size) +{ + struct deferred *d = malloc (sizeof *d + size); + if (d != NULL) + d->function = f; + return &d->args; +} + +/* Enqueue the given deferred function. Expects a pointer returned + from _pager_allocate_deferred. */ +void +_pager_defer (void *cookie) +{ + int idle; + struct deferred *d = (struct deferred *) + ((char *) cookie - offsetof (struct deferred, args)); + + pthread_mutex_lock (&deferreds.lock); + + queue_enqueue (&deferreds.queue, &d->item); + + idle = pool_idle_threads (&deferreds.pool); + if (idle > 0) + /* Awake worker. */ + pthread_cond_signal (&deferreds.wakeup); + else + { + /* Limit the creation of threads using the token bucket + algorithm. */ + struct timeval elapsed, last = deferreds.last_check; + maptime_read (mapped_time, &deferreds.last_check); + timersub (&deferreds.last_check, &last, &elapsed); + + deferreds.bucket += + ((float) elapsed.tv_sec + (float) elapsed.tv_sec / 1000000.) + * (DEFERREDS_BUCKET_SIZE / DEFERREDS_BUCKET_PER); + + if (deferreds.bucket > DEFERREDS_BUCKET_SIZE) + deferreds.bucket = DEFERREDS_BUCKET_SIZE; + + if (deferreds.bucket > 1.) + { + error_t err; + pthread_t t; + TRACE ("creating deferred worker thread"); + err = pthread_create (&t, &pool_thread_attr, + &deferreds_worker_func, NULL); + if (err) + /* This is tough. Let's hope for the best. */ + error (0, err, "pthread_create"); + else + { + pthread_detach (t); + deferreds.bucket -= 1.; + } + } + } + + pthread_mutex_unlock (&deferreds.lock); +} + +/* Initializes the queue and starts the deferred workers. */ +error_t +_pager_deferreds_init (void) +{ + error_t err = 0; + int i; + pthread_t t; + + /* Try the unprivileged method using /dev/time first. */ + err = maptime_map (0, NULL, &mapped_time); + if (err) + { + /* Fall back to using the time mach device. */ + err = maptime_map (1, NULL, &mapped_time); + if (err) + return err; + } + + queue_init (&deferreds.queue); + pthread_cond_init (&deferreds.wakeup, NULL); + pthread_mutex_init (&deferreds.lock, NULL); + + for (i = 0; i < DEFERREDS_MIN_IDLE; i++) + { + err = pthread_create (&t, &pool_thread_attr, + &deferreds_worker_func, NULL); + if (err) + return err; + + pthread_detach (t); + } + deferreds.bucket = DEFERREDS_BUCKET_SIZE; + maptime_read (mapped_time, &deferreds.last_check); + + return err; +} diff --git a/libpager/deferreds.h b/libpager/deferreds.h new file mode 100644 index 0000000..5cc9aed --- /dev/null +++ b/libpager/deferreds.h @@ -0,0 +1,42 @@ +/* Deferred function queue. + + Copyright (C) 2014 Free Software Foundation, Inc. + + Written by Justus Winter <4winter@informatik.uni-hamburg.de> + + This file is part of the GNU Hurd. + + The GNU Hurd is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + The GNU Hurd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with the GNU Hurd. If not, see . */ + +#ifndef _LIBPAGER_DEFERREDS_H +#define _LIBPAGER_DEFERREDS_H + +#include +#include + +/* Type for deferred functions. */ +typedef void (*deferred_function_t) (void *); + +/* Initializes the queue and starts the deferred workers. */ +error_t _pager_deferreds_init (void); + +/* Allocate storage for a deferred function. A pointer to a buffer of + the given size is returned. */ +void *_pager_allocate_deferred (deferred_function_t, size_t); + +/* Enqueue the given deferred function. Expects a pointer returned + from _pager_allocate_deferred. */ +void _pager_defer (void *); + +#endif /* _LIBPAGER_DEFERREDS_H */ diff --git a/libpager/demuxer.c b/libpager/demuxer.c index b4d4054..e3eb06c 100644 --- a/libpager/demuxer.c +++ b/libpager/demuxer.c @@ -1,5 +1,5 @@ /* Demuxer for pager library - Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation + Copyright (C) 1994, 1995, 2002, 2011, 2014 Free Software Foundation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -15,26 +15,286 @@ along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include +#include +#include +#include + #include "priv.h" #include "memory_object_S.h" #include "notify_S.h" +#include "deferreds.h" +#include "pool.h" +#include "queue.h" + +/* + Worker pool for the server functions. + + A single thread receives messages from the port bucket and puts them + into a queue. A fixed number of consumers actually execute the + server functions and send the reply. + + The requests to an object O have to be processed in the order they + were received. To this end, each worker has a local queue and a + tag. If a thread processes a request to O, it sets its tag to a + unique identifier representing O. If another thread now dequeues a + second request to O, it enqueues it to the first workers queue. + + This is an optimization based on the observation that multiple + requests to the same objects often come in quick succession. This + also makes sure that the second request is processed at the earliest + possible moment, by the same thread that processed the previous + request to that object. + + At least one worker thread is necessary. +*/ +#define WORKER_COUNT 3 + +/* An request contains the message received from the port set. */ +struct request +{ + struct item item; + mig_routine_t routine; + mach_msg_header_t *inp; + mach_msg_header_t *outp; +}; + +/* A worker. */ +struct worker +{ + struct queue queue; /* other workers may delegate requests to us */ + unsigned long tag; /* tag of the object we are working on */ +}; + +/* This is the queue for incoming requests. A single thread receives + messages from the port set, looks the service routine up, and + enqueues the request here. */ +static struct +{ + struct queue queue; + int asleep; + pthread_cond_t wakeup; + pthread_mutex_t lock; + struct worker workers[WORKER_COUNT]; +} requests; /* Demultiplex a single message directed at a pager port; INP is the message received; fill OUTP with the reply. */ -int +static int pager_demuxer (mach_msg_header_t *inp, mach_msg_header_t *outp) { + error_t err = MIG_NO_REPLY; + +#ifdef MACH_RCV_LARGE + const mach_msg_size_t max_size = 2 * __vm_page_size; /* Generic. Good? XXX */ +#else + const mach_msg_size_t max_size = 4 * __vm_page_size; /* XXX */ +#endif + mig_routine_t routine; - if ((routine = _pager_seqnos_memory_object_server_routine (inp)) || - (routine = _pager_seqnos_notify_server_routine (inp))) + if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) || + (routine = _pager_seqnos_notify_server_routine (inp)))) + return FALSE; + +#define MASK (8 - 1) + mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK; +#undef MASK + + struct request *r = malloc (sizeof *r + padded_size + max_size); + if (r == NULL) { - (*routine) (inp, outp); - return TRUE; + err = ENOMEM; + goto out; + } + + r->routine = routine; + r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r); + memcpy (r->inp, inp, inp->msgh_size); + + r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size); + memcpy (r->outp, outp, sizeof *outp); + + pthread_mutex_lock (&requests.lock); + + queue_enqueue (&requests.queue, &r->item); + + /* Awake worker. */ + if (requests.asleep > 0) + pthread_cond_signal (&requests.wakeup); + + pthread_mutex_unlock (&requests.lock); + + out: + ((mig_reply_header_t *) outp)->RetCode = err; + return TRUE; +} + +/* Consumes requests from the queue. */ +static void * +worker_func (void *arg) +{ + struct worker *self = (struct worker *) arg; + struct request *r = NULL; + + while (1) + { + int i; + mach_msg_return_t mr; + + /* Free previous message. */ + free (r); + + pthread_mutex_lock (&requests.lock); + + /* First, look in our queue for more requests to the object we + have been working on lately. Some other thread might have + delegated them to us. */ + r = queue_dequeue (&self->queue); + if (r != NULL) + goto got_one; + + /* Nope. Clear our tag and... */ + self->tag = 0; + + get_request_locked: + /* ... get a request from the global queue instead. */ + while ((r = queue_dequeue (&requests.queue)) == NULL) + { + requests.asleep += 1; + pthread_cond_wait (&requests.wakeup, &requests.lock); + requests.asleep -= 1; + } + + for (i = 0; i < WORKER_COUNT; i++) + if (requests.workers[i].tag + == (unsigned long) r->inp->msgh_local_port) + { + /* Some other thread is working on that object. Delegate + the request to that worker. */ + queue_enqueue (&requests.workers[i].queue, &r->item); + goto get_request_locked; + } + + /* Claim responsibility for this object by setting our tag. */ + self->tag = (unsigned long) r->inp->msgh_local_port; + + got_one: + pthread_mutex_unlock (&requests.lock); + + /* Call the server routine. */ + (*r->routine) (r->inp, r->outp); + + /* What follows is basically the second part of + mach_msg_server_timeout. */ + mig_reply_header_t *request = (mig_reply_header_t *) r->inp; + mig_reply_header_t *reply = (mig_reply_header_t *) r->outp; + + switch (reply->RetCode) + { + case KERN_SUCCESS: + /* Hunky dory. */ + break; + + case MIG_NO_REPLY: + /* The server function wanted no reply sent. + Loop for another request. */ + continue; + + default: + /* Some error; destroy the request message to release any + port rights or VM it holds. Don't destroy the reply port + right, so we can send an error message. */ + request->Head.msgh_remote_port = MACH_PORT_NULL; + mach_msg_destroy (&request->Head); + break; + } + + if (reply->Head.msgh_remote_port == MACH_PORT_NULL) + { + /* No reply port, so destroy the reply. */ + if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) + mach_msg_destroy (&reply->Head); + continue; + } + + /* Send the reply. */ + mr = mach_msg (&reply->Head, + MACH_SEND_MSG, + reply->Head.msgh_size, + 0, + MACH_PORT_NULL, + 0, + MACH_PORT_NULL); + + switch (mr) + { + case MACH_SEND_INVALID_DEST: + /* The reply can't be delivered, so destroy it. This error + indicates only that the requester went away, so we + continue and get the next request. */ + mach_msg_destroy (&reply->Head); + break; + + default: + /* Some other form of lossage; there is not much we can + do here. */ + error (0, mr, "mach_msg"); + } + } + + /* Not reached. */ + return NULL; +} + +/* A top-level function for the paging thread that just services paging + requests. */ +static void * +service_paging_requests (void *arg) +{ + struct port_bucket *pager_bucket = arg; + ports_manage_port_operations_one_thread (pager_bucket, + pager_demuxer, + 0); + /* Not reached. */ + return NULL; +} + +/* Start the worker thread libpager uses to service requests. */ +error_t +pager_start_workers (struct port_bucket *pager_bucket) +{ + error_t err; + int i; + pthread_t t; + + pool_init (); + err = _pager_deferreds_init (); + if (err) + return err; + + queue_init (&requests.queue); + pthread_cond_init (&requests.wakeup, NULL); + pthread_mutex_init (&requests.lock, NULL); + + /* Make a thread to service paging requests. */ + err = pthread_create (&t, &pool_thread_attr, + service_paging_requests, pager_bucket); + if (err) + return err; + pthread_detach (t); + + for (i = 0; i < WORKER_COUNT; i++) + { + requests.workers[i].tag = 0; + queue_init (&requests.workers[i].queue); + + err = pthread_create (&t, &pool_thread_attr, + &worker_func, &requests.workers[i]); + if (err) + return err; + pthread_detach (t); } - /* Synchronize our bookkeeping of the port's seqno with the one - consumed by this bogus message. */ - _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno); - return FALSE; + return err; } diff --git a/libpager/lock-completed.c b/libpager/lock-completed.c index a3f3f16..30b1dd3 100644 --- a/libpager/lock-completed.c +++ b/libpager/lock-completed.c @@ -37,7 +37,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, return EOPNOTSUPP; pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); if (control != p->memobjcntl) { @@ -59,7 +58,6 @@ _pager_seqnos_memory_object_lock_completed (struct pager *p, } out: - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); return err; diff --git a/libpager/no-senders.c b/libpager/no-senders.c index c21dfc2..d0bbe27 100644 --- a/libpager/no-senders.c +++ b/libpager/no-senders.c @@ -29,7 +29,6 @@ _pager_do_seqnos_mach_notify_no_senders (struct port_info *pi, pi->class != _pager_class) return EOPNOTSUPP; - _pager_update_seqno_p ((struct pager *) pi, seqno); ports_no_senders (pi, mscount); return 0; diff --git a/libpager/notify-stubs.c b/libpager/notify-stubs.c index ba13882..a826420 100644 --- a/libpager/notify-stubs.c +++ b/libpager/notify-stubs.c @@ -28,8 +28,6 @@ _pager_do_seqnos_mach_notify_port_deleted (struct port_info *pi, mach_port_t name __attribute__ ((unused))) { - _pager_update_seqno_p ((struct pager *) pi, seqno); - return 0; } @@ -39,8 +37,6 @@ _pager_do_seqnos_mach_notify_msg_accepted (struct port_info *pi, mach_port_t name __attribute__ ((unused))) { - _pager_update_seqno_p ((struct pager *) pi, seqno); - return 0; } @@ -50,8 +46,6 @@ _pager_do_seqnos_mach_notify_port_destroyed (struct port_info *pi, mach_port_t name __attribute__ ((unused))) { - _pager_update_seqno_p ((struct pager *) pi, seqno); - return 0; } @@ -59,8 +53,6 @@ error_t _pager_do_seqnos_mach_notify_send_once (struct port_info *pi, mach_port_seqno_t seqno) { - _pager_update_seqno_p ((struct pager *) pi, seqno); - return 0; } @@ -70,7 +62,5 @@ _pager_do_seqnos_mach_notify_dead_name (struct port_info *pi, mach_port_t name __attribute__ ((unused))) { - _pager_update_seqno_p ((struct pager *) pi, seqno); - return 0; } diff --git a/libpager/object-init.c b/libpager/object-init.c index 6683e24..eb62c44 100644 --- a/libpager/object-init.c +++ b/libpager/object-init.c @@ -33,7 +33,6 @@ _pager_seqnos_memory_object_init (struct pager *p, return EOPNOTSUPP; pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); if (pagesize != __vm_page_size) { @@ -69,7 +68,6 @@ _pager_seqnos_memory_object_init (struct pager *p, p->pager_state = NORMAL; out: - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); return 0; diff --git a/libpager/object-terminate.c b/libpager/object-terminate.c index 365ba27..3f0482f 100644 --- a/libpager/object-terminate.c +++ b/libpager/object-terminate.c @@ -32,8 +32,7 @@ _pager_seqnos_memory_object_terminate (struct pager *p, return EOPNOTSUPP; pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); - + if (control != p->memobjcntl) { printf ("incg terminate: wrong control port"); @@ -75,7 +74,6 @@ _pager_seqnos_memory_object_terminate (struct pager *p, #endif out: - _pager_release_seqno (p, seqno); pthread_mutex_unlock (&p->interlock); return 0; diff --git a/libpager/pager.h b/libpager/pager.h index d0572af..d209290 100644 --- a/libpager/pager.h +++ b/libpager/pager.h @@ -25,11 +25,8 @@ scope. */ struct user_pager_info; -/* This de-muxer function is for use within libports_demuxer. */ -/* INP is a message we've received; OUTP will be filled in with - a reply message. */ -int pager_demuxer (mach_msg_header_t *inp, - mach_msg_header_t *outp); +/* Start the worker thread libpager uses to service requests. */ +error_t pager_start_workers (struct port_bucket *pager_bucket); /* Create a new pager. The pager will have a port created for it (using libports, in BUCKET) and will be immediately ready diff --git a/libpager/pool.c b/libpager/pool.c new file mode 100644 index 0000000..c64b46e --- /dev/null +++ b/libpager/pool.c @@ -0,0 +1,103 @@ +/* A thread pool. + + Copyright (C) 2014 Free Software Foundation, Inc. + + Written by Justus Winter <4winter@informatik.uni-hamburg.de> + + This file is part of the GNU Hurd. + + The GNU Hurd is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + The GNU Hurd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with the GNU Hurd. If not, see . */ + +#include +#include + +/* XXX Copied from libports. */ +#include +#include +#include +#include + +#define STACK_SIZE (64 * 1024) + +#define THREAD_PRI 2 + +/* XXX To reduce starvation, the priority of new threads is initially + depressed. This helps already existing threads complete their job and be + recycled to handle new messages. The duration of this depression is made + a function of the total number of threads because more threads imply + more contention, and the priority of threads blocking on a contended spin + lock is also implicitely depressed. + + Then, if permitted, a greater priority is requested to further decrease + the need for additional threads. */ +void +adjust_priority (unsigned int totalthreads) +{ + mach_port_t host_priv, self, pset, pset_priv; + unsigned int t; + error_t err; + + t = 10 + (((totalthreads - 1) / 100) + 1) * 10; + thread_switch (MACH_PORT_NULL, SWITCH_OPTION_DEPRESS, t); + + err = get_privileged_ports (&host_priv, NULL); + if (err) + goto error_host_priv; + + self = mach_thread_self (); + err = thread_get_assignment (self, &pset); + if (err) + goto error_pset; + + err = host_processor_set_priv (host_priv, pset, &pset_priv); + if (err) + goto error_pset_priv; + + err = thread_max_priority (self, pset_priv, 0); + if (err) + goto error_max_priority; + + err = thread_priority (self, THREAD_PRI, 0); + if (err) + goto error_priority; + + mach_port_deallocate (mach_task_self (), pset_priv); + mach_port_deallocate (mach_task_self (), pset); + mach_port_deallocate (mach_task_self (), self); + mach_port_deallocate (mach_task_self (), host_priv); + return; + +error_priority: +error_max_priority: + mach_port_deallocate (mach_task_self (), pset_priv); +error_pset_priv: + mach_port_deallocate (mach_task_self (), pset); +error_pset: + mach_port_deallocate (mach_task_self (), self); + mach_port_deallocate (mach_task_self (), host_priv); +error_host_priv: + if (err != EPERM) + error (0, err, "unable to adjust libports thread priority"); +} + +/* Attributes for newly created threads. */ +pthread_attr_t pool_thread_attr; + +/* Initializes pool_thread_attr. */ +void +pool_init (void) +{ + pthread_attr_init (&pool_thread_attr); + pthread_attr_setstacksize (&pool_thread_attr, STACK_SIZE); +} diff --git a/libpager/pool.h b/libpager/pool.h new file mode 100644 index 0000000..6fa538d --- /dev/null +++ b/libpager/pool.h @@ -0,0 +1,79 @@ +/* A thread pool. + + Copyright (C) 2014 Free Software Foundation, Inc. + + Written by Justus Winter <4winter@informatik.uni-hamburg.de> + + This file is part of the GNU Hurd. + + The GNU Hurd is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + The GNU Hurd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with the GNU Hurd. If not, see . */ + +#ifndef _LIBPAGER_POOL_H +#define _LIBPAGER_POOL_H + +/* A thread pool. */ +struct pool +{ + int alive; + int idle; +}; + +static inline int +pool_thread_online (struct pool *p) +{ + return __atomic_add_fetch (&p->alive, 1, __ATOMIC_RELAXED); +} + +static inline int +pool_thread_offline (struct pool *p) +{ + return __atomic_sub_fetch (&p->alive, 1, __ATOMIC_RELAXED); +} + +static inline int +pool_idle_threads (struct pool *p) +{ + return __atomic_load_n (&p->idle, __ATOMIC_RELAXED); +} + +static inline int +pool_thread_idle (struct pool *p) +{ + return __atomic_add_fetch (&p->idle, 1, __ATOMIC_RELAXED); +} + +static inline int +pool_thread_busy (struct pool *p) +{ + return __atomic_sub_fetch (&p->idle, 1, __ATOMIC_RELAXED); +} + +/* XXX To reduce starvation, the priority of new threads is initially + depressed. This helps already existing threads complete their job and be + recycled to handle new messages. The duration of this depression is made + a function of the total number of threads because more threads imply + more contention, and the priority of threads blocking on a contended spin + lock is also implicitely depressed. + + Then, if permitted, a greater priority is requested to further decrease + the need for additional threads. */ +void adjust_priority (unsigned int totalthreads); + +/* Attributes for newly created threads. */ +extern pthread_attr_t pool_thread_attr; + +/* Initializes pool_thread_attr. */ +void pool_init (void); + +#endif /* _LIBPAGER_POOL_H */ diff --git a/libpager/priv.h b/libpager/priv.h index d49cbb9..b1d56b5 100644 --- a/libpager/priv.h +++ b/libpager/priv.h @@ -136,10 +136,6 @@ extern int _pager_page_errors[]; struct port_class *_pager_class; -void _pager_wait_for_seqno (struct pager *, mach_port_seqno_t); -void _pager_release_seqno (struct pager *, mach_port_seqno_t); -void _pager_update_seqno (mach_port_t, mach_port_seqno_t); -void _pager_update_seqno_p (struct pager *, mach_port_seqno_t); void _pager_block_termination (struct pager *); void _pager_allow_termination (struct pager *); error_t _pager_pagemap_resize (struct pager *, vm_address_t); diff --git a/libpager/queue.h b/libpager/queue.h new file mode 100644 index 0000000..d3cf738 --- /dev/null +++ b/libpager/queue.h @@ -0,0 +1,61 @@ +/* A FIFO queue with constant-time enqueue and dequeue operations. + + Copyright (C) 2014 Free Software Foundation, Inc. + + Written by Justus Winter <4winter@informatik.uni-hamburg.de> + + This file is part of the GNU Hurd. + + The GNU Hurd is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + The GNU Hurd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with the GNU Hurd. If not, see . */ + +/* A FIFO queue with constant-time enqueue and dequeue operations. */ +struct item { + struct item *next; +}; + +struct queue { + struct item *head; + struct item **tail; +}; + +static inline void +queue_init (struct queue *q) +{ + q->head = NULL; + q->tail = &q->head; +} + +static inline void +queue_enqueue (struct queue *q, struct item *r) +{ + *q->tail = r; + q->tail = &r->next; + r->next = NULL; +} + +static inline void * +queue_dequeue (struct queue *q) +{ + struct item *r = q->head; + if (r == NULL) + return NULL; + + /* Pop the first item off. */ + if ((q->head = q->head->next) == NULL) + /* The queue is empty, fix tail pointer. */ + q->tail = &q->head; + + r->next = NULL; + return r; +} diff --git a/libpager/seqnos.c b/libpager/seqnos.c deleted file mode 100644 index cab2f33..0000000 --- a/libpager/seqnos.c +++ /dev/null @@ -1,79 +0,0 @@ -/* Sequence number synchronization routines for pager library - Copyright (C) 1994, 2011 Free Software Foundation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2, or (at - your option) any later version. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ - -#include "priv.h" -#include - -/* The message with seqno SEQNO has just been dequeued for pager P; - wait until all preceding messages have had a chance and then - return. */ -void -_pager_wait_for_seqno (struct pager *p, - mach_port_seqno_t seqno) -{ - while (seqno != p->seqno + 1) - { - p->waitingforseqno = 1; - pthread_cond_wait (&p->wakeup, &p->interlock); - } -} - - -/* Allow the next message for pager P (potentially blocked in - _pager_wait_for_seqno) to be handled. */ -void -_pager_release_seqno (struct pager *p, - mach_port_seqno_t seqno) -{ - assert (seqno == p->seqno + 1); - p->seqno = seqno; - if (p->waitingforseqno) - { - p->waitingforseqno = 0; - pthread_cond_broadcast (&p->wakeup); - } -} - - -/* Just update the seqno. */ -void -_pager_update_seqno (mach_port_t object, - mach_port_seqno_t seqno) -{ - struct pager *p; - - p = ports_lookup_port (0, object, _pager_class); - _pager_update_seqno_p (p, seqno); - if (p) - ports_port_deref (p); -} - - -/* Just update the seqno, pointer version. */ -void -_pager_update_seqno_p (struct pager *p, - mach_port_seqno_t seqno) -{ - if (p - && p->port.class == _pager_class) - { - pthread_mutex_lock (&p->interlock); - _pager_wait_for_seqno (p, seqno); - _pager_release_seqno (p, seqno); - pthread_mutex_unlock (&p->interlock); - } -} diff --git a/libpager/stubs.c b/libpager/stubs.c index 411f483..c7f1a5a 100644 --- a/libpager/stubs.c +++ b/libpager/stubs.c @@ -29,9 +29,6 @@ _pager_seqnos_memory_object_copy (struct pager *p, mach_port_t new) { printf ("m_o_copy called\n"); - - _pager_update_seqno_p (p, seq); - return EOPNOTSUPP; } @@ -44,9 +41,6 @@ _pager_seqnos_memory_object_data_write (struct pager *p, vm_size_t data_cnt) { printf ("m_o_data_write called\n"); - - _pager_update_seqno_p (p, seq); - return EOPNOTSUPP; } @@ -60,8 +54,5 @@ _pager_seqnos_memory_object_supply_completed (struct pager *p, vm_offset_t err_off) { printf ("m_o_supply_completed called\n"); - - _pager_update_seqno_p (p, seq); - return EOPNOTSUPP; } diff --git a/storeio/pager.c b/storeio/pager.c index 7d78711..e123ad1 100644 --- a/storeio/pager.c +++ b/storeio/pager.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -142,21 +143,6 @@ pager_clear_user_data (struct user_pager_info *upi) static struct port_bucket *pager_port_bucket = 0; -/* A top-level function for the paging thread that just services paging - requests. */ -static void * -service_paging_requests (void *arg) -{ - (void) arg; - - for (;;) - ports_manage_port_operations_multithread (pager_port_bucket, - pager_demuxer, - 1000 * 30, 1000 * 60 * 5, 0); - - return NULL; -} - /* Initialize paging for this device. */ static void init_dev_paging () @@ -173,14 +159,12 @@ init_dev_paging () pager_port_bucket = ports_create_bucket (); - /* Make a thread to service paging requests. */ - err = pthread_create (&thread, NULL, service_paging_requests, NULL); - if (!err) - pthread_detach (thread); - else + /* Start libpagers worker thread. */ + err = pager_start_workers (pager_port_bucket); + if (err) { errno = err; - perror ("pthread_create"); + error (0, err, "pager_start_workers"); } } pthread_mutex_unlock (&pager_global_lock);