diff options
Diffstat (limited to 'pflocal')
-rw-r--r-- | pflocal/connq.c | 145 |
1 files changed, 121 insertions, 24 deletions
diff --git a/pflocal/connq.c b/pflocal/connq.c index 572c7ccf..ee326509 100644 --- a/pflocal/connq.c +++ b/pflocal/connq.c @@ -31,27 +31,36 @@ struct connq int noqueue; /* The connection request queue. */ + struct connq_request **queue; unsigned length; + /* Head is the position in QUEUE of the first request, and TAIL is the + first free position in the queue. If HEAD == TAIL, then the queue is + empty. Starting at HEAD, successive positions can be calculated by + using qnext(). */ unsigned head, tail; - struct connq_request **queue; /* Threads that have done an accept on this queue wait on this condition. */ struct condition listeners; unsigned num_listeners; + + /* When a connection queue receives an interrupt, we want to wake up all + listeners, and have them realize they've been interrupted; listeners + that happen after the interrupt shouldn't return EINTR. When a thread + waits on this pipe's LISTENERS condition, it remembers this sequence + number; any interrupt bumps this number and broadcasts on the condition. + A listening thread will try to accept a connection only if the sequence + number is the same as when it went to sleep. */ + unsigned long interrupt_seq_num; + struct mutex lock; }; +/* Returns the position CQ's queue after POS. */ static inline unsigned qnext (struct connq *cq, unsigned pos) { return (pos + 1 == cq->length) ? 0 : pos + 1; } - -static inline unsigned -qprev (struct connq *cq, unsigned pos) -{ - return (pos == 0) ? cq->length - 1 : pos - 1; -} /* ---------------------------------------------------------------- */ @@ -76,7 +85,7 @@ struct connq_request }; static inline void -connq_request_init (struct sock *sock, struct connq_request *req) +connq_request_init (struct connq_request *req, struct sock *sock) { req->err = 0; req->sock = sock; @@ -135,17 +144,26 @@ debug (cq, "lock"); cq->num_listeners++; while (cq->head == cq->tail) -{debug (cq, "wait listeners"); - condition_wait (&cq->listeners, &cq->lock); -} + { + unsigned seq_num = cq->interrupt_seq_num; +debug (cq, "wait listeners"); + condition_wait (&cq->listeners, &cq->lock); + if (seq_num != cq->interrupt_seq_num) + { +debug (cq, "eintr"); + cq->num_listeners--; +debug (cq, "unlock"); + mutex_unlock (&cq->lock); +debug (cq, "out"); + return EINTR; + } + } if (req != NULL) /* Dequeue the next request, if desired. */ { - *req = cq->queue[cq->tail]; - cq->tail = qnext (cq, cq->tail); -debug (*req, "(req) lock"); - mutex_lock (&(*req)->lock); + *req = cq->queue[cq->head]; + cq->head = qnext (cq, cq->head); if (sock != NULL) *sock = (*req)->sock; } @@ -176,18 +194,18 @@ debug (req, "unlock"); /* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true, then return EWOULDBLOCK immediately when there are no immediate - connections available. */ + connections available. Neither SOCK nor CQ should be locked. */ error_t connq_connect (struct connq *cq, int noblock, struct sock *sock) { error_t err = 0; - struct connq_request req; unsigned next; debug (cq, "in"); debug (cq, "lock"); mutex_lock (&cq->lock); + /* Check for listeners after we've locked CQ for good. */ if ((noblock || cq->noqueue) && cq->num_listeners == 0) {debug (cq, "unlock"); mutex_unlock (&cq->lock); @@ -195,16 +213,21 @@ debug (cq, "ewouldblock"); return EWOULDBLOCK; } - next = qnext (cq, cq->head); + next = qnext (cq, cq->tail); if (next == cq->tail) + /* The queue is full. */ err = ECONNREFUSED; else { - cq->queue[cq->head] = &req; - cq->head = next; + struct connq_request req; + + connq_request_init (&req, sock); + + cq->queue[cq->tail] = &req; + cq->tail = next; - /* Hold REQ.LOCK before we signal the condition so that we're sure to be - woken up. */ + /* Hold REQ.LOCK before we signal the condition so that we're sure + to be woken up. */ debug (&req, "(req) lock"); mutex_lock (&req.lock); @@ -227,6 +250,77 @@ debug (&req, "(req) unlock"); return err; } +/* Interrupt any threads waiting on CQ, both listeners and connectors, and + make them return with EINTR. */ +void +connq_interrupt (struct connq *cq) +{ +debug (cq, "in"); +debug (cq, "lock"); + mutex_lock (&cq->lock); + +debug (cq, "interrupt connectors"); + /* Interrupt everyone trying to connect. */ + while (cq->head != cq->tail) + { +debug (cq->queue[cq->head], "(req) interrupting"); + connq_request_complete (cq->queue[cq->head], EINTR); + cq->head = qnext (cq, cq->head); + } + +debug (cq, "interrupt listeners"); + /* Interrupt anyone waiting for a connection. */ + if (cq->num_listeners > 0) + { + cq->interrupt_seq_num++; + condition_broadcast (&cq->listeners); + } + +debug (cq, "unlock"); + mutex_unlock (&cq->lock); +debug (cq, "out"); +} + +/* Interrupt any threads that are attempting to connect SOCK to CQ, and make + them return with EINTR. */ +void +connq_interrupt_sock (struct connq *cq, struct sock *sock) +{ + unsigned pos, comp_pos; + +debug (cq, "in"); +debug (cq, "lock"); + mutex_lock (&cq->lock); + +debug (cq, "interrupt connections from: %p", sock); + for (pos = cq->head; pos != cq->tail; pos = qnext (cq, pos)) + { + struct connq_request *req = cq->queue[pos]; + if (req->sock == sock) +{debug (req, "(req) interrupting"); + connq_request_complete (req, EINTR); +} + cq->queue[pos] = NULL; /* Mark REQ as being deleted. */ + } + +debug (cq, "compress queue"); + /* Now compress the queue to remove any null entries we put in. */ + for (pos = cq->head, comp_pos = cq->head; + pos != cq->tail; + pos = qnext (cq, pos)) + if (cq->queue[pos] != NULL) + /* This position has a non-NULL request, so move it to the end of the + compressed queue. */ + { + cq->queue[comp_pos] = cq->queue[pos]; + comp_pos = qnext (cq, comp_pos); + } + +debug (cq, "unlock"); + mutex_unlock (&cq->lock); +debug (cq, "out"); +} + /* Set CQ's queue length to LENGTH. Any sockets already waiting for a connections that are past the new length will fail with ECONNREFUSED. */ error_t @@ -246,16 +340,19 @@ debug (cq, "lock"); struct connq_request **new_queue = malloc (sizeof (struct connq_request *) * length); - for (i = 0; i < cq->length && cq->head != cq->tail;) + for (i = 0; i < cq->length && cq->head != cq->tail; i++) { - cq->head = qprev (cq, cq->head); if (i < length) /* Keep this connect request in the queue. */ new_queue[length - i] = cq->queue[cq->head]; else /* Punt this one. */ connq_request_complete (cq->queue[cq->head], ECONNREFUSED); + cq->head = qnext (cq, cq->head); } + + free (cq->queue); + cq->queue = new_queue; } cq->noqueue = 0; /* Turn on queueing. */ |