summaryrefslogtreecommitdiff
path: root/pflocal
diff options
context:
space:
mode:
Diffstat (limited to 'pflocal')
-rw-r--r--pflocal/connq.c145
1 files changed, 121 insertions, 24 deletions
diff --git a/pflocal/connq.c b/pflocal/connq.c
index 572c7ccf..ee326509 100644
--- a/pflocal/connq.c
+++ b/pflocal/connq.c
@@ -31,27 +31,36 @@ struct connq
int noqueue;
/* The connection request queue. */
+ struct connq_request **queue;
unsigned length;
+ /* Head is the position in QUEUE of the first request, and TAIL is the
+ first free position in the queue. If HEAD == TAIL, then the queue is
+ empty. Starting at HEAD, successive positions can be calculated by
+ using qnext(). */
unsigned head, tail;
- struct connq_request **queue;
/* Threads that have done an accept on this queue wait on this condition. */
struct condition listeners;
unsigned num_listeners;
+
+ /* When a connection queue receives an interrupt, we want to wake up all
+ listeners, and have them realize they've been interrupted; listeners
+ that happen after the interrupt shouldn't return EINTR. When a thread
+ waits on this pipe's LISTENERS condition, it remembers this sequence
+ number; any interrupt bumps this number and broadcasts on the condition.
+ A listening thread will try to accept a connection only if the sequence
+ number is the same as when it went to sleep. */
+ unsigned long interrupt_seq_num;
+
struct mutex lock;
};
+/* Returns the position CQ's queue after POS. */
static inline unsigned
qnext (struct connq *cq, unsigned pos)
{
return (pos + 1 == cq->length) ? 0 : pos + 1;
}
-
-static inline unsigned
-qprev (struct connq *cq, unsigned pos)
-{
- return (pos == 0) ? cq->length - 1 : pos - 1;
-}
/* ---------------------------------------------------------------- */
@@ -76,7 +85,7 @@ struct connq_request
};
static inline void
-connq_request_init (struct sock *sock, struct connq_request *req)
+connq_request_init (struct connq_request *req, struct sock *sock)
{
req->err = 0;
req->sock = sock;
@@ -135,17 +144,26 @@ debug (cq, "lock");
cq->num_listeners++;
while (cq->head == cq->tail)
-{debug (cq, "wait listeners");
- condition_wait (&cq->listeners, &cq->lock);
-}
+ {
+ unsigned seq_num = cq->interrupt_seq_num;
+debug (cq, "wait listeners");
+ condition_wait (&cq->listeners, &cq->lock);
+ if (seq_num != cq->interrupt_seq_num)
+ {
+debug (cq, "eintr");
+ cq->num_listeners--;
+debug (cq, "unlock");
+ mutex_unlock (&cq->lock);
+debug (cq, "out");
+ return EINTR;
+ }
+ }
if (req != NULL)
/* Dequeue the next request, if desired. */
{
- *req = cq->queue[cq->tail];
- cq->tail = qnext (cq, cq->tail);
-debug (*req, "(req) lock");
- mutex_lock (&(*req)->lock);
+ *req = cq->queue[cq->head];
+ cq->head = qnext (cq, cq->head);
if (sock != NULL)
*sock = (*req)->sock;
}
@@ -176,18 +194,18 @@ debug (req, "unlock");
/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true,
then return EWOULDBLOCK immediately when there are no immediate
- connections available. */
+ connections available. Neither SOCK nor CQ should be locked. */
error_t
connq_connect (struct connq *cq, int noblock, struct sock *sock)
{
error_t err = 0;
- struct connq_request req;
unsigned next;
debug (cq, "in");
debug (cq, "lock");
mutex_lock (&cq->lock);
+ /* Check for listeners after we've locked CQ for good. */
if ((noblock || cq->noqueue) && cq->num_listeners == 0)
{debug (cq, "unlock");
mutex_unlock (&cq->lock);
@@ -195,16 +213,21 @@ debug (cq, "ewouldblock");
return EWOULDBLOCK;
}
- next = qnext (cq, cq->head);
+ next = qnext (cq, cq->tail);
if (next == cq->tail)
+ /* The queue is full. */
err = ECONNREFUSED;
else
{
- cq->queue[cq->head] = &req;
- cq->head = next;
+ struct connq_request req;
+
+ connq_request_init (&req, sock);
+
+ cq->queue[cq->tail] = &req;
+ cq->tail = next;
- /* Hold REQ.LOCK before we signal the condition so that we're sure to be
- woken up. */
+ /* Hold REQ.LOCK before we signal the condition so that we're sure
+ to be woken up. */
debug (&req, "(req) lock");
mutex_lock (&req.lock);
@@ -227,6 +250,77 @@ debug (&req, "(req) unlock");
return err;
}
+/* Interrupt any threads waiting on CQ, both listeners and connectors, and
+ make them return with EINTR. */
+void
+connq_interrupt (struct connq *cq)
+{
+debug (cq, "in");
+debug (cq, "lock");
+ mutex_lock (&cq->lock);
+
+debug (cq, "interrupt connectors");
+ /* Interrupt everyone trying to connect. */
+ while (cq->head != cq->tail)
+ {
+debug (cq->queue[cq->head], "(req) interrupting");
+ connq_request_complete (cq->queue[cq->head], EINTR);
+ cq->head = qnext (cq, cq->head);
+ }
+
+debug (cq, "interrupt listeners");
+ /* Interrupt anyone waiting for a connection. */
+ if (cq->num_listeners > 0)
+ {
+ cq->interrupt_seq_num++;
+ condition_broadcast (&cq->listeners);
+ }
+
+debug (cq, "unlock");
+ mutex_unlock (&cq->lock);
+debug (cq, "out");
+}
+
+/* Interrupt any threads that are attempting to connect SOCK to CQ, and make
+ them return with EINTR. */
+void
+connq_interrupt_sock (struct connq *cq, struct sock *sock)
+{
+ unsigned pos, comp_pos;
+
+debug (cq, "in");
+debug (cq, "lock");
+ mutex_lock (&cq->lock);
+
+debug (cq, "interrupt connections from: %p", sock);
+ for (pos = cq->head; pos != cq->tail; pos = qnext (cq, pos))
+ {
+ struct connq_request *req = cq->queue[pos];
+ if (req->sock == sock)
+{debug (req, "(req) interrupting");
+ connq_request_complete (req, EINTR);
+}
+ cq->queue[pos] = NULL; /* Mark REQ as being deleted. */
+ }
+
+debug (cq, "compress queue");
+ /* Now compress the queue to remove any null entries we put in. */
+ for (pos = cq->head, comp_pos = cq->head;
+ pos != cq->tail;
+ pos = qnext (cq, pos))
+ if (cq->queue[pos] != NULL)
+ /* This position has a non-NULL request, so move it to the end of the
+ compressed queue. */
+ {
+ cq->queue[comp_pos] = cq->queue[pos];
+ comp_pos = qnext (cq, comp_pos);
+ }
+
+debug (cq, "unlock");
+ mutex_unlock (&cq->lock);
+debug (cq, "out");
+}
+
/* Set CQ's queue length to LENGTH. Any sockets already waiting for a
connections that are past the new length will fail with ECONNREFUSED. */
error_t
@@ -246,16 +340,19 @@ debug (cq, "lock");
struct connq_request **new_queue =
malloc (sizeof (struct connq_request *) * length);
- for (i = 0; i < cq->length && cq->head != cq->tail;)
+ for (i = 0; i < cq->length && cq->head != cq->tail; i++)
{
- cq->head = qprev (cq, cq->head);
if (i < length)
/* Keep this connect request in the queue. */
new_queue[length - i] = cq->queue[cq->head];
else
/* Punt this one. */
connq_request_complete (cq->queue[cq->head], ECONNREFUSED);
+ cq->head = qnext (cq, cq->head);
}
+
+ free (cq->queue);
+ cq->queue = new_queue;
}
cq->noqueue = 0; /* Turn on queueing. */