summaryrefslogtreecommitdiff
path: root/libpager
diff options
context:
space:
mode:
authorJames Clarke <jrtc27@jrtc27.com>2015-08-27 17:22:11 +0100
committerSamuel Thibault <samuel.thibault@ens-lyon.org>2015-09-06 21:40:20 +0200
commit7a2c17fcbb789997421725d726340301ef35d84c (patch)
treeea51083985eaf4c5aad3d21716a77863dd241e0a /libpager
parent918a8c2ce462cea65e3e7a614f19b4f5ae4ff1e1 (diff)
Fix race condition in ext2fs when remounting
On some systems, ext2fs.static would regularly hang at startup, as a race condition meant it would process paging requests while remounting. To fix this, libpager has been altered to allow inhibiting and resuming its worker threads, and ext2fs uses this to inhibit paging while remounting. * console/pager.c (pager_requests): New variable. (user_pager_init): Updated call to pager_start_workers to use new pager_requests variable. * daemons/runsystem.sh: Removed artificial delay working around the race condition. * ext2fs/ext2fs.c (diskfs_reload_global_state): Call new inhibit_ext2_pager and resume_ext2_pager functions, and leave sblock as non-NULL so it will be munmapped. * ext2fs/ext2fs.h (inhibit_ext2_pager,resume_ext2_pager): New functions. * ext2fs/pager.c (file_pager_requests): New variable. (create_disk_pager): Updated call to pager_start_workers to use new file_pager_requests variable. (inhibit_ext2_pager,resume_ext2_pager): New functions. * fatfs/fatfs.h (inhibit_fat_pager,resume_fat_pager): New functions. * fatfs/pager.c (file_pager_requests): New variable. (create_fat_pager): Updated call to pager_start_workers to use new file_pager_requests variable. (inhibit_fat_pager,resume_fat_pager): New functions. * libdiskfs/disk-pager.c (diskfs_disk_pager_requests): New variable. (diskfs_start_disk_pager): Updated call to pager_start_workers to use new diskfs_disk_pager_requests variable. * libdiskfs/diskfs-pager.h (diskfs_disk_pager_requests): New variable. * libpager/demuxer.c (struct pager_requests): Renamed struct requests to struct pager_requests. Replaced queue with queue_in and queue_out pointers. Added inhibit_wakeup field. (pager_demuxer): Updated to use new queue_in/queue_out pointers. Only wake up workers if not inhibited. (worker_func): Updated to use new queue_in/queue_out pointers. Final worker thread to sleep notifies the inhibit_wakeup condition variable. (pager_start_workers): Added out parameter for the requests instance. Allocate heap space shared by both queues. Initialise new inhibit_wakeup condition. (pager_inhibit_workers,pager_resume_workers): New functions. * libpager/pager.h (struct pager_requests): Public forward definition. (pager_start_workers): Added out parameter for the requests instance. (pager_inhibit_workers,pager_resume_workers): New functions. * libpager/queue.h (queue_empty): New function. * storeio/pager.c (pager_requests): New variable. (init_dev_paging): Updated call to pager_start_workers to use new pager_requests variable.
Diffstat (limited to 'libpager')
-rw-r--r--libpager/demuxer.c119
-rw-r--r--libpager/pager.h28
-rw-r--r--libpager/queue.h8
3 files changed, 137 insertions, 18 deletions
diff --git a/libpager/demuxer.c b/libpager/demuxer.c
index 4dd3cd87..59dd1c59 100644
--- a/libpager/demuxer.c
+++ b/libpager/demuxer.c
@@ -60,7 +60,7 @@ request_inp (const struct request *r)
/* A worker. */
struct worker
{
- struct requests *requests; /* our pagers request queue */
+ struct pager_requests *requests; /* our pagers request queue */
struct queue queue; /* other workers may delegate requests to us */
unsigned long tag; /* tag of the object we are working on */
};
@@ -68,12 +68,18 @@ struct worker
/* This is the queue for incoming requests. A single thread receives
messages from the port set, looks the service routine up, and
enqueues the request here. */
-struct requests
+struct pager_requests
{
struct port_bucket *bucket;
- struct queue queue;
+ /* Normally, both queues are the same. However, when the workers are
+ inhibited, a new queue_in is created, but queue_out is left as the
+ old value, so the workers drain queue_out but do not receive new
+ requests. */
+ struct queue *queue_in; /* the queue to add to */
+ struct queue *queue_out; /* the queue to take from */
int asleep;
pthread_cond_t wakeup;
+ pthread_cond_t inhibit_wakeup;
pthread_mutex_t lock;
struct worker workers[WORKER_COUNT];
};
@@ -81,7 +87,7 @@ struct requests
/* Demultiplex a single message directed at a pager port; INP is the
message received; fill OUTP with the reply. */
static int
-pager_demuxer (struct requests *requests,
+pager_demuxer (struct pager_requests *requests,
mach_msg_header_t *inp,
mach_msg_header_t *outp)
{
@@ -108,10 +114,10 @@ pager_demuxer (struct requests *requests,
pthread_mutex_lock (&requests->lock);
- queue_enqueue (&requests->queue, &r->item);
+ queue_enqueue (requests->queue_in, &r->item);
- /* Awake worker. */
- if (requests->asleep > 0)
+ /* Awake worker, but only if not inhibited. */
+ if (requests->asleep > 0 && requests->queue_in == requests->queue_out)
pthread_cond_signal (&requests->wakeup);
pthread_mutex_unlock (&requests->lock);
@@ -160,7 +166,7 @@ static void *
worker_func (void *arg)
{
struct worker *self = (struct worker *) arg;
- struct requests *requests = self->requests;
+ struct pager_requests *requests = self->requests;
struct request *r = NULL;
mig_reply_header_t reply_msg;
@@ -186,9 +192,11 @@ worker_func (void *arg)
get_request_locked:
/* ... get a request from the global queue instead. */
- while ((r = queue_dequeue (&requests->queue)) == NULL)
+ while ((r = queue_dequeue (requests->queue_out)) == NULL)
{
requests->asleep += 1;
+ if (requests->asleep == WORKER_COUNT)
+ pthread_cond_broadcast (&requests->inhibit_wakeup);
pthread_cond_wait (&requests->wakeup, &requests->lock);
requests->asleep -= 1;
}
@@ -281,7 +289,7 @@ worker_func (void *arg)
static void *
service_paging_requests (void *arg)
{
- struct requests *requests = arg;
+ struct pager_requests *requests = arg;
int demuxer (mach_msg_header_t *inp,
mach_msg_header_t *outp)
@@ -298,27 +306,44 @@ service_paging_requests (void *arg)
/* Start the worker threads libpager uses to service requests. */
error_t
-pager_start_workers (struct port_bucket *pager_bucket)
+pager_start_workers (struct port_bucket *pager_bucket,
+ struct pager_requests **out_requests)
{
error_t err;
int i;
pthread_t t;
- struct requests *requests;
+ struct pager_requests *requests;
+
+ assert (out_requests != NULL);
requests = malloc (sizeof *requests);
if (requests == NULL)
- return ENOMEM;
+ {
+ err = ENOMEM;
+ goto done;
+ }
requests->bucket = pager_bucket;
requests->asleep = 0;
- queue_init (&requests->queue);
+
+ requests->queue_in = malloc (sizeof *requests->queue_in);
+ if (requests->queue_in == NULL)
+ {
+ err = ENOMEM;
+ goto done;
+ }
+ queue_init (requests->queue_in);
+ /* Until the workers are inhibited, both queues are the same. */
+ requests->queue_out = requests->queue_in;
+
pthread_cond_init (&requests->wakeup, NULL);
+ pthread_cond_init (&requests->inhibit_wakeup, NULL);
pthread_mutex_init (&requests->lock, NULL);
/* Make a thread to service paging requests. */
err = pthread_create (&t, NULL, service_paging_requests, requests);
if (err)
- return err;
+ goto done;
pthread_detach (t);
for (i = 0; i < WORKER_COUNT; i++)
@@ -329,9 +354,71 @@ pager_start_workers (struct port_bucket *pager_bucket)
err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]);
if (err)
- return err;
+ goto done;
pthread_detach (t);
}
+done:
+ if (err)
+ *out_requests = NULL;
+ else
+ *out_requests = requests;
+
return err;
}
+
+error_t
+pager_inhibit_workers (struct pager_requests *requests)
+{
+ error_t err = 0;
+
+ pthread_mutex_lock (&requests->lock);
+
+ /* Check the workers are not already inhibited. */
+ assert (requests->queue_out == requests->queue_in);
+
+ /* Any new paging requests will go into a new queue. */
+ struct queue *new_queue = malloc (sizeof *new_queue);
+ if (new_queue == NULL)
+ {
+ err = ENOMEM;
+ goto done_locked;
+ }
+ queue_init (new_queue);
+ requests->queue_in = new_queue;
+
+ /* Wait until all the workers are asleep and the queue has been
+ drained. All individual worker queues must have been drained, as
+ they are populated while the relevant worker is still running, and
+ it will always drain its personal queue before sleeping.
+ Check that the queue is empty, since it's possible that a request
+ came in, was queued and a worker was signalled but the lock was
+ acquired here before the worker woke up. */
+ while (requests->asleep < WORKER_COUNT || !queue_empty(requests->queue_out))
+ pthread_cond_wait (&requests->inhibit_wakeup, &requests->lock);
+
+done_locked:
+ pthread_mutex_unlock (&requests->lock);
+ return err;
+}
+
+void
+pager_resume_workers (struct pager_requests *requests)
+{
+ pthread_mutex_lock (&requests->lock);
+
+ /* Check the workers are inhibited. */
+ assert (requests->queue_out != requests->queue_in);
+ assert (requests->asleep == WORKER_COUNT);
+ assert (queue_empty(requests->queue_out));
+
+ /* The queue has been drained and will no longer be used. */
+ free (requests->queue_out);
+ requests->queue_out = requests->queue_in;
+
+ /* We need to wake up all workers, as there could be multiple requests
+ in the new queue. */
+ pthread_cond_broadcast (&requests->wakeup);
+
+ pthread_mutex_unlock (&requests->lock);
+}
diff --git a/libpager/pager.h b/libpager/pager.h
index fe342388..df4db686 100644
--- a/libpager/pager.h
+++ b/libpager/pager.h
@@ -25,8 +25,32 @@
scope. */
struct user_pager_info;
-/* Start the worker threads libpager uses to service requests. */
-error_t pager_start_workers (struct port_bucket *pager_bucket);
+struct pager_requests;
+
+/* Start the worker threads libpager uses to service requests. If no
+ error is returned, *requests will be a valid pointer, else it will be
+ set to NULL. */
+error_t
+pager_start_workers (struct port_bucket *pager_bucket,
+ struct pager_requests **requests);
+
+/* Inhibit the worker threads libpager uses to service requests,
+ blocking until all requests sent before this function is called have
+ finished.
+ Note that RPCs will not be inhibited, so new requests will
+ queue up, but will not be handled until the workers are resumed. If
+ RPCs should be inhibited as well, call ports_inhibit_bucket_rpcs with
+ the bucket used to create the workers before calling this. However,
+ inhibiting RPCs and not calling this is generally insufficient, as
+ libports is unaware of our internal worker pool, and will return once
+ all the RPCs have been queued, before they have been handled by a
+ worker thread. */
+error_t
+pager_inhibit_workers (struct pager_requests *requests);
+
+/* Resume the worker threads libpager uses to service requests. */
+void
+pager_resume_workers (struct pager_requests *requests);
/* Create a new pager. The pager will have a port created for it
(using libports, in BUCKET) and will be immediately ready
diff --git a/libpager/queue.h b/libpager/queue.h
index d3cf738e..abcd3b98 100644
--- a/libpager/queue.h
+++ b/libpager/queue.h
@@ -19,6 +19,8 @@
You should have received a copy of the GNU General Public License
along with the GNU Hurd. If not, see <http://www.gnu.org/licenses/>. */
+#include <stdbool.h>
+
/* A FIFO queue with constant-time enqueue and dequeue operations. */
struct item {
struct item *next;
@@ -59,3 +61,9 @@ queue_dequeue (struct queue *q)
r->next = NULL;
return r;
}
+
+static inline bool
+queue_empty (struct queue *q)
+{
+ return q->head == NULL;
+}