summaryrefslogtreecommitdiff
path: root/pflocal/connq.c
diff options
context:
space:
mode:
Diffstat (limited to 'pflocal/connq.c')
-rw-r--r--pflocal/connq.c349
1 files changed, 189 insertions, 160 deletions
diff --git a/pflocal/connq.c b/pflocal/connq.c
index d4103840..a4524804 100644
--- a/pflocal/connq.c
+++ b/pflocal/connq.c
@@ -1,6 +1,6 @@
/* Listen queue functions
- Copyright (C) 1995,96,2001 Free Software Foundation, Inc.
+ Copyright (C) 1995,96,2001, 2005 Free Software Foundation, Inc.
Written by Miles Bader <miles@gnu.org>
@@ -26,31 +26,22 @@
/* A queue for queueing incoming connections. */
struct connq
{
- /* True if all connection requests should be treated as non-blocking. */
- int noqueue;
-
/* The connection request queue. */
- struct connq_request **queue;
- unsigned length;
- /* Head is the position in QUEUE of the first request, and TAIL is the
- first free position in the queue. If HEAD == TAIL, then the queue is
- empty. Starting at HEAD, successive positions can be calculated by
- using qnext(). */
- unsigned head, tail;
+ struct connq_request *head;
+ struct connq_request **tail;
+ unsigned count;
+ unsigned max;
/* Threads that have done an accept on this queue wait on this condition. */
struct condition listeners;
unsigned num_listeners;
+ /* Threads that have done a connect on this queue wait on this condition. */
+ struct condition connectors;
+ unsigned num_connectors;
+
struct mutex lock;
};
-
-/* Returns the position CQ's queue after POS. */
-static inline unsigned
-qnext (struct connq *cq, unsigned pos)
-{
- return (pos + 1 == cq->length) ? 0 : pos + 1;
-}
/* ---------------------------------------------------------------- */
@@ -58,30 +49,50 @@ qnext (struct connq *cq, unsigned pos)
get information from and to the thread. */
struct connq_request
{
+ struct connq_request *next;
+
/* The socket that's waiting to connect. */
struct sock *sock;
-
- /* What the waiting thread blocks on. */
- struct condition signal;
- struct mutex lock;
-
- /* Set to true when this request has been dealt with, to guard against
- spurious conditions being signaled. */
- int completed;
-
- /* After the waiting thread is unblocked, this is the result, either 0 if
- SOCK has been connected, or an error. */
- error_t err;
};
static inline void
connq_request_init (struct connq_request *req, struct sock *sock)
{
- req->err = 0;
req->sock = sock;
- req->completed = 0;
- condition_init (&req->signal);
- mutex_init (&req->lock);
+}
+
+/* Enqueue connection request REQ onto CQ. CQ must be locked. */
+static void
+connq_request_enqueue (struct connq *cq, struct connq_request *req)
+{
+ assert (! mutex_try_lock (&cq->lock));
+
+ req->next = NULL;
+ *cq->tail = req;
+ cq->tail = &req->next;
+
+ cq->count ++;
+}
+
+/* Dequeue a pending request from CQ. CQ must be locked and must not
+ be empty. */
+static struct connq_request *
+connq_request_dequeue (struct connq *cq)
+{
+ struct connq_request *req;
+
+ assert (! mutex_try_lock (&cq->lock));
+ assert (cq->head);
+
+ req = cq->head;
+ cq->head = req->next;
+ if (! cq->head)
+ /* We just dequeued the last element. Fixup the tail pointer. */
+ cq->tail = &cq->head;
+
+ cq->count --;
+
+ return req;
}
/* ---------------------------------------------------------------- */
@@ -95,16 +106,20 @@ connq_create (struct connq **cq)
struct connq *new = malloc (sizeof (struct connq));
if (!new)
- return ENOMEM;
+ return ENOBUFS;
+
+ new->head = NULL;
+ new->tail = &new->head;
+ new->count = 0;
+ /* By default, don't queue requests. */
+ new->max = 0;
- new->noqueue = 1; /* By default, don't queue requests. */
- new->length = 0;
- new->head = new->tail = 0;
- new->queue = NULL;
new->num_listeners = 0;
+ new->num_connectors = 0;
mutex_init (&new->lock);
condition_init (&new->listeners);
+ condition_init (&new->connectors);
*cq = new;
return 0;
@@ -116,175 +131,189 @@ connq_destroy (struct connq *cq)
{
/* Everybody in the queue should hold a reference to the socket
containing the queue. */
- assert (cq->length == 0);
- /* Nevertheless, malloc(0) or realloc(0) might allocate some small
- space. */
- if (cq->queue)
- free (cq->queue);
+ assert (! cq->head);
+ assert (cq->count == 0);
+
free (cq);
}
/* ---------------------------------------------------------------- */
-/* Wait for a connection attempt to be made on CQ, and return the connecting
- socket in SOCK, and a request tag in REQ. If REQ is NULL, the request is
- left in the queue, otherwise connq_request_complete must be called on REQ
- to allow the requesting thread to continue. If NOBLOCK is true,
- EWOULDBLOCK is returned when there are no immediate connections
- available. */
+/* Return a connection request on CQ. If SOCK is NULL, the request is
+ left in the queue. If NOBLOCK is true, EWOULDBLOCK is returned
+ when there are no immediate connections available. */
error_t
-connq_listen (struct connq *cq, int noblock,
- struct connq_request **req, struct sock **sock)
+connq_listen (struct connq *cq, int noblock, struct sock **sock)
{
+ error_t err = 0;
+
mutex_lock (&cq->lock);
- if (noblock && cq->head == cq->tail)
+ if (noblock && cq->count == 0 && cq->num_connectors == 0)
{
mutex_unlock (&cq->lock);
return EWOULDBLOCK;
}
- cq->num_listeners++;
+ if (! sock && (cq->count > 0 || cq->num_connectors > 0))
+ /* The caller just wants to know if a connection ready. */
+ {
+ mutex_unlock (&cq->lock);
+ return 0;
+ }
- while (cq->head == cq->tail)
- if (hurd_condition_wait (&cq->listeners, &cq->lock))
- {
- cq->num_listeners--;
- mutex_unlock (&cq->lock);
- return EINTR;
- }
+ cq->num_listeners++;
- if (req != NULL)
- /* Dequeue the next request, if desired. */
+ if (cq->count == 0)
+ /* The request queue is empty. */
{
- *req = cq->queue[cq->head];
- cq->head = qnext (cq, cq->head);
- if (sock != NULL)
- *sock = (*req)->sock;
+ assert (! cq->head);
+
+ if (cq->num_connectors > 0)
+ /* Someone is waiting for an acceptor. Signal that we can
+ service their request. */
+ condition_signal (&cq->connectors);
+
+ do
+ if (hurd_condition_wait (&cq->listeners, &cq->lock))
+ {
+ cq->num_listeners--;
+ err = EINTR;
+ goto out;
+ }
+ while (cq->count == 0);
}
- cq->num_listeners--;
+ assert (cq->head);
+ if (sock)
+ /* Dequeue the next request, if desired. */
+ {
+ struct connq_request *req = connq_request_dequeue (cq);
+ *sock = req->sock;
+ free (req);
+ }
+ else if (cq->num_listeners > 0)
+ /* The caller will not actually process this request but someone
+ else could. (This case is rare but possible: it would require
+ one thread to do a select on the socket and a second to do an
+ accept.) */
+ condition_signal (&cq->listeners);
+ else
+ /* There is no one else to process the request and the connection
+ has now been initiated. This is not actually a problem as even
+ if the current queue limit is 0, the connector will queue the
+ request and another listener (should) eventually come along.
+ (In fact it is very probably as the caller has likely done a
+ select and will now follow up with an accept.) */
+ ;
+
+ out:
mutex_unlock (&cq->lock);
-
- return 0;
-}
-
-/* Return the error code ERR to the thread that made the listen request REQ,
- returned from a previous connq_listen. */
-void
-connq_request_complete (struct connq_request *req, error_t err)
-{
- mutex_lock (&req->lock);
- req->err = err;
- req->completed = 1;
- condition_signal (&req->signal);
- mutex_unlock (&req->lock);
+ return err;
}
-/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true,
- then return EWOULDBLOCK immediately when there are no immediate
- connections available. Neither SOCK nor CQ should be locked. */
+/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is
+ true, then return EWOULDBLOCK if there are no connections
+ immediately available. On success, this call must be followed up
+ either connq_connect_complete or connq_connect_cancel. */
error_t
-connq_connect (struct connq *cq, int noblock, struct sock *sock)
+connq_connect (struct connq *cq, int noblock)
{
- error_t err = 0;
- unsigned next;
-
mutex_lock (&cq->lock);
/* Check for listeners after we've locked CQ for good. */
- if ((noblock || cq->noqueue) && cq->num_listeners == 0)
+
+ if (noblock
+ && cq->count + cq->num_connectors >= cq->max + cq->num_listeners)
+ /* We are in non-blocking mode and would have to wait to secure an
+ entry in the listen queue. */
{
mutex_unlock (&cq->lock);
return EWOULDBLOCK;
}
- next = qnext (cq, cq->tail);
- if (next == cq->tail)
- /* The queue is full. */
- err = ECONNREFUSED;
- else
- {
- struct connq_request req;
+ cq->num_connectors ++;
- connq_request_init (&req, sock);
+ while (cq->count + cq->num_connectors > cq->max + cq->num_listeners)
+ /* The queue is full and there is no immediate listener to service
+ us. Block until we can get a slot. */
+ if (hurd_condition_wait (&cq->connectors, &cq->lock))
+ {
+ cq->num_connectors --;
+ mutex_unlock (&cq->lock);
+ return EINTR;
+ }
- cq->queue[cq->tail] = &req;
- cq->tail = next;
+ mutex_unlock (&cq->lock);
- /* Hold REQ.LOCK before we signal the condition so that we're sure
- to be woken up. */
- mutex_lock (&req.lock);
- condition_signal (&cq->listeners);
- mutex_unlock (&cq->lock);
+ return 0;
+}
- while (!req.completed)
- condition_wait (&req.signal, &req.lock);
- err = req.err;
+/* Follow up to connq_connect. Completes the connect, SOCK is the new
+ server socket. */
+void
+connq_connect_complete (struct connq *cq, struct sock *sock)
+{
+ struct connq_request *req;
+
+ req = malloc (sizeof (struct connq_request));
+ if (! req)
+ abort ();
+
+ connq_request_init (req, sock);
+
+ mutex_lock (&cq->lock);
- mutex_unlock (&req.lock);
+ assert (cq->num_connectors > 0);
+ cq->num_connectors --;
+
+ connq_request_enqueue (cq, req);
+
+ if (cq->num_listeners > 0)
+ /* Wake a listener up. We must consume the listener ref here as
+ someone else might call this function before the listener
+ thread dequeues this request. */
+ {
+ cq->num_listeners --;
+ condition_signal (&cq->listeners);
}
- return err;
+ mutex_unlock (&cq->lock);
}
-
-#if 0
-/* `Compresses' CQ, by removing any NULL entries. CQ should be locked. */
-static void
-connq_compress (struct connq *cq)
+
+/* Follow up to connq_connect. Cancel the connect. */
+void
+connq_connect_cancel (struct connq *cq)
{
- unsigned pos;
- unsigned comp_tail = cq->head;
-
- /* Now compress the queue to remove any null entries we put in. */
- for (pos = cq->head; pos != cq->tail; pos = qnext (cq, pos))
- if (cq->queue[pos] != NULL)
- /* This position has a non-NULL request, so move it to the end of the
- compressed queue. */
- {
- cq->queue[comp_tail] = cq->queue[pos];
- comp_tail = qnext (cq, comp_tail);
- }
+ mutex_lock (&cq->lock);
+
+ assert (cq->num_connectors > 0);
+ cq->num_connectors --;
- /* Move back tail to only include what we kept in the queue. */
- cq->tail = comp_tail;
+ if (cq->count + cq->num_connectors >= cq->max + cq->num_listeners)
+ /* A connector is blocked and could use the spot we reserved. */
+ condition_signal (&cq->connectors);
+
+ mutex_unlock (&cq->lock);
}
-#endif
-/* Set CQ's queue length to LENGTH. Any sockets already waiting for a
- connections that are past the new length will fail with ECONNREFUSED. */
+/* Set CQ's queue length to LENGTH. */
error_t
-connq_set_length (struct connq *cq, int length)
+connq_set_length (struct connq *cq, int max)
{
- mutex_lock (&cq->lock);
+ int omax;
- if (length > cq->length)
- /* Growing the queue is simple... */
- cq->queue = realloc (cq->queue, sizeof (struct connq_request *) * length);
- else
- /* Shrinking it less so. */
- {
- int i;
- struct connq_request **new_queue =
- malloc (sizeof (struct connq_request *) * length);
-
- for (i = 0; i < cq->length && cq->head != cq->tail; i++)
- {
- if (i < length)
- /* Keep this connect request in the queue. */
- new_queue[length - i] = cq->queue[cq->head];
- else
- /* Punt this one. */
- connq_request_complete (cq->queue[cq->head], ECONNREFUSED);
- cq->head = qnext (cq, cq->head);
- }
-
- free (cq->queue);
- cq->queue = new_queue;
- }
-
- cq->noqueue = 0; /* Turn on queueing. */
+ mutex_lock (&cq->lock);
+ omax = cq->max;
+ cq->max = max;
+
+ if (max > omax && cq->count >= omax && cq->count < max
+ && cq->num_connectors >= cq->num_listeners)
+ /* This is an increase in the number of connection slots which has
+ made some slots available and there are waiting threads. Wake
+ them up. */
+ condition_broadcast (&cq->listeners);
mutex_unlock (&cq->lock);