diff options
Diffstat (limited to 'libpager')
-rw-r--r-- | libpager/demuxer.c | 119 | ||||
-rw-r--r-- | libpager/pager.h | 28 | ||||
-rw-r--r-- | libpager/queue.h | 8 |
3 files changed, 137 insertions, 18 deletions
diff --git a/libpager/demuxer.c b/libpager/demuxer.c index 4dd3cd87..59dd1c59 100644 --- a/libpager/demuxer.c +++ b/libpager/demuxer.c @@ -60,7 +60,7 @@ request_inp (const struct request *r) /* A worker. */ struct worker { - struct requests *requests; /* our pagers request queue */ + struct pager_requests *requests; /* our pagers request queue */ struct queue queue; /* other workers may delegate requests to us */ unsigned long tag; /* tag of the object we are working on */ }; @@ -68,12 +68,18 @@ struct worker /* This is the queue for incoming requests. A single thread receives messages from the port set, looks the service routine up, and enqueues the request here. */ -struct requests +struct pager_requests { struct port_bucket *bucket; - struct queue queue; + /* Normally, both queues are the same. However, when the workers are + inhibited, a new queue_in is created, but queue_out is left as the + old value, so the workers drain queue_out but do not receive new + requests. */ + struct queue *queue_in; /* the queue to add to */ + struct queue *queue_out; /* the queue to take from */ int asleep; pthread_cond_t wakeup; + pthread_cond_t inhibit_wakeup; pthread_mutex_t lock; struct worker workers[WORKER_COUNT]; }; @@ -81,7 +87,7 @@ struct requests /* Demultiplex a single message directed at a pager port; INP is the message received; fill OUTP with the reply. */ static int -pager_demuxer (struct requests *requests, +pager_demuxer (struct pager_requests *requests, mach_msg_header_t *inp, mach_msg_header_t *outp) { @@ -108,10 +114,10 @@ pager_demuxer (struct requests *requests, pthread_mutex_lock (&requests->lock); - queue_enqueue (&requests->queue, &r->item); + queue_enqueue (requests->queue_in, &r->item); - /* Awake worker. */ - if (requests->asleep > 0) + /* Awake worker, but only if not inhibited. */ + if (requests->asleep > 0 && requests->queue_in == requests->queue_out) pthread_cond_signal (&requests->wakeup); pthread_mutex_unlock (&requests->lock); @@ -160,7 +166,7 @@ static void * worker_func (void *arg) { struct worker *self = (struct worker *) arg; - struct requests *requests = self->requests; + struct pager_requests *requests = self->requests; struct request *r = NULL; mig_reply_header_t reply_msg; @@ -186,9 +192,11 @@ worker_func (void *arg) get_request_locked: /* ... get a request from the global queue instead. */ - while ((r = queue_dequeue (&requests->queue)) == NULL) + while ((r = queue_dequeue (requests->queue_out)) == NULL) { requests->asleep += 1; + if (requests->asleep == WORKER_COUNT) + pthread_cond_broadcast (&requests->inhibit_wakeup); pthread_cond_wait (&requests->wakeup, &requests->lock); requests->asleep -= 1; } @@ -281,7 +289,7 @@ worker_func (void *arg) static void * service_paging_requests (void *arg) { - struct requests *requests = arg; + struct pager_requests *requests = arg; int demuxer (mach_msg_header_t *inp, mach_msg_header_t *outp) @@ -298,27 +306,44 @@ service_paging_requests (void *arg) /* Start the worker threads libpager uses to service requests. */ error_t -pager_start_workers (struct port_bucket *pager_bucket) +pager_start_workers (struct port_bucket *pager_bucket, + struct pager_requests **out_requests) { error_t err; int i; pthread_t t; - struct requests *requests; + struct pager_requests *requests; + + assert (out_requests != NULL); requests = malloc (sizeof *requests); if (requests == NULL) - return ENOMEM; + { + err = ENOMEM; + goto done; + } requests->bucket = pager_bucket; requests->asleep = 0; - queue_init (&requests->queue); + + requests->queue_in = malloc (sizeof *requests->queue_in); + if (requests->queue_in == NULL) + { + err = ENOMEM; + goto done; + } + queue_init (requests->queue_in); + /* Until the workers are inhibited, both queues are the same. */ + requests->queue_out = requests->queue_in; + pthread_cond_init (&requests->wakeup, NULL); + pthread_cond_init (&requests->inhibit_wakeup, NULL); pthread_mutex_init (&requests->lock, NULL); /* Make a thread to service paging requests. */ err = pthread_create (&t, NULL, service_paging_requests, requests); if (err) - return err; + goto done; pthread_detach (t); for (i = 0; i < WORKER_COUNT; i++) @@ -329,9 +354,71 @@ pager_start_workers (struct port_bucket *pager_bucket) err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]); if (err) - return err; + goto done; pthread_detach (t); } +done: + if (err) + *out_requests = NULL; + else + *out_requests = requests; + return err; } + +error_t +pager_inhibit_workers (struct pager_requests *requests) +{ + error_t err = 0; + + pthread_mutex_lock (&requests->lock); + + /* Check the workers are not already inhibited. */ + assert (requests->queue_out == requests->queue_in); + + /* Any new paging requests will go into a new queue. */ + struct queue *new_queue = malloc (sizeof *new_queue); + if (new_queue == NULL) + { + err = ENOMEM; + goto done_locked; + } + queue_init (new_queue); + requests->queue_in = new_queue; + + /* Wait until all the workers are asleep and the queue has been + drained. All individual worker queues must have been drained, as + they are populated while the relevant worker is still running, and + it will always drain its personal queue before sleeping. + Check that the queue is empty, since it's possible that a request + came in, was queued and a worker was signalled but the lock was + acquired here before the worker woke up. */ + while (requests->asleep < WORKER_COUNT || !queue_empty(requests->queue_out)) + pthread_cond_wait (&requests->inhibit_wakeup, &requests->lock); + +done_locked: + pthread_mutex_unlock (&requests->lock); + return err; +} + +void +pager_resume_workers (struct pager_requests *requests) +{ + pthread_mutex_lock (&requests->lock); + + /* Check the workers are inhibited. */ + assert (requests->queue_out != requests->queue_in); + assert (requests->asleep == WORKER_COUNT); + assert (queue_empty(requests->queue_out)); + + /* The queue has been drained and will no longer be used. */ + free (requests->queue_out); + requests->queue_out = requests->queue_in; + + /* We need to wake up all workers, as there could be multiple requests + in the new queue. */ + pthread_cond_broadcast (&requests->wakeup); + + pthread_mutex_unlock (&requests->lock); +} diff --git a/libpager/pager.h b/libpager/pager.h index fe342388..df4db686 100644 --- a/libpager/pager.h +++ b/libpager/pager.h @@ -25,8 +25,32 @@ scope. */ struct user_pager_info; -/* Start the worker threads libpager uses to service requests. */ -error_t pager_start_workers (struct port_bucket *pager_bucket); +struct pager_requests; + +/* Start the worker threads libpager uses to service requests. If no + error is returned, *requests will be a valid pointer, else it will be + set to NULL. */ +error_t +pager_start_workers (struct port_bucket *pager_bucket, + struct pager_requests **requests); + +/* Inhibit the worker threads libpager uses to service requests, + blocking until all requests sent before this function is called have + finished. + Note that RPCs will not be inhibited, so new requests will + queue up, but will not be handled until the workers are resumed. If + RPCs should be inhibited as well, call ports_inhibit_bucket_rpcs with + the bucket used to create the workers before calling this. However, + inhibiting RPCs and not calling this is generally insufficient, as + libports is unaware of our internal worker pool, and will return once + all the RPCs have been queued, before they have been handled by a + worker thread. */ +error_t +pager_inhibit_workers (struct pager_requests *requests); + +/* Resume the worker threads libpager uses to service requests. */ +void +pager_resume_workers (struct pager_requests *requests); /* Create a new pager. The pager will have a port created for it (using libports, in BUCKET) and will be immediately ready diff --git a/libpager/queue.h b/libpager/queue.h index d3cf738e..abcd3b98 100644 --- a/libpager/queue.h +++ b/libpager/queue.h @@ -19,6 +19,8 @@ You should have received a copy of the GNU General Public License along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */ +#include <stdbool.h> + /* A FIFO queue with constant-time enqueue and dequeue operations. */ struct item { struct item *next; @@ -59,3 +61,9 @@ queue_dequeue (struct queue *q) r->next = NULL; return r; } + +static inline bool +queue_empty (struct queue *q) +{ + return q->head == NULL; +} |