diff options
Diffstat (limited to 'pflocal/connq.c')
-rw-r--r-- | pflocal/connq.c | 349 |
1 files changed, 189 insertions, 160 deletions
diff --git a/pflocal/connq.c b/pflocal/connq.c index d4103840..a4524804 100644 --- a/pflocal/connq.c +++ b/pflocal/connq.c @@ -1,6 +1,6 @@ /* Listen queue functions - Copyright (C) 1995,96,2001 Free Software Foundation, Inc. + Copyright (C) 1995,96,2001, 2005 Free Software Foundation, Inc. Written by Miles Bader <miles@gnu.org> @@ -26,31 +26,22 @@ /* A queue for queueing incoming connections. */ struct connq { - /* True if all connection requests should be treated as non-blocking. */ - int noqueue; - /* The connection request queue. */ - struct connq_request **queue; - unsigned length; - /* Head is the position in QUEUE of the first request, and TAIL is the - first free position in the queue. If HEAD == TAIL, then the queue is - empty. Starting at HEAD, successive positions can be calculated by - using qnext(). */ - unsigned head, tail; + struct connq_request *head; + struct connq_request **tail; + unsigned count; + unsigned max; /* Threads that have done an accept on this queue wait on this condition. */ struct condition listeners; unsigned num_listeners; + /* Threads that have done a connect on this queue wait on this condition. */ + struct condition connectors; + unsigned num_connectors; + struct mutex lock; }; - -/* Returns the position CQ's queue after POS. */ -static inline unsigned -qnext (struct connq *cq, unsigned pos) -{ - return (pos + 1 == cq->length) ? 0 : pos + 1; -} /* ---------------------------------------------------------------- */ @@ -58,30 +49,50 @@ qnext (struct connq *cq, unsigned pos) get information from and to the thread. */ struct connq_request { + struct connq_request *next; + /* The socket that's waiting to connect. */ struct sock *sock; - - /* What the waiting thread blocks on. */ - struct condition signal; - struct mutex lock; - - /* Set to true when this request has been dealt with, to guard against - spurious conditions being signaled. */ - int completed; - - /* After the waiting thread is unblocked, this is the result, either 0 if - SOCK has been connected, or an error. */ - error_t err; }; static inline void connq_request_init (struct connq_request *req, struct sock *sock) { - req->err = 0; req->sock = sock; - req->completed = 0; - condition_init (&req->signal); - mutex_init (&req->lock); +} + +/* Enqueue connection request REQ onto CQ. CQ must be locked. */ +static void +connq_request_enqueue (struct connq *cq, struct connq_request *req) +{ + assert (! mutex_try_lock (&cq->lock)); + + req->next = NULL; + *cq->tail = req; + cq->tail = &req->next; + + cq->count ++; +} + +/* Dequeue a pending request from CQ. CQ must be locked and must not + be empty. */ +static struct connq_request * +connq_request_dequeue (struct connq *cq) +{ + struct connq_request *req; + + assert (! mutex_try_lock (&cq->lock)); + assert (cq->head); + + req = cq->head; + cq->head = req->next; + if (! cq->head) + /* We just dequeued the last element. Fixup the tail pointer. */ + cq->tail = &cq->head; + + cq->count --; + + return req; } /* ---------------------------------------------------------------- */ @@ -95,16 +106,20 @@ connq_create (struct connq **cq) struct connq *new = malloc (sizeof (struct connq)); if (!new) - return ENOMEM; + return ENOBUFS; + + new->head = NULL; + new->tail = &new->head; + new->count = 0; + /* By default, don't queue requests. */ + new->max = 0; - new->noqueue = 1; /* By default, don't queue requests. */ - new->length = 0; - new->head = new->tail = 0; - new->queue = NULL; new->num_listeners = 0; + new->num_connectors = 0; mutex_init (&new->lock); condition_init (&new->listeners); + condition_init (&new->connectors); *cq = new; return 0; @@ -116,175 +131,189 @@ connq_destroy (struct connq *cq) { /* Everybody in the queue should hold a reference to the socket containing the queue. */ - assert (cq->length == 0); - /* Nevertheless, malloc(0) or realloc(0) might allocate some small - space. */ - if (cq->queue) - free (cq->queue); + assert (! cq->head); + assert (cq->count == 0); + free (cq); } /* ---------------------------------------------------------------- */ -/* Wait for a connection attempt to be made on CQ, and return the connecting - socket in SOCK, and a request tag in REQ. If REQ is NULL, the request is - left in the queue, otherwise connq_request_complete must be called on REQ - to allow the requesting thread to continue. If NOBLOCK is true, - EWOULDBLOCK is returned when there are no immediate connections - available. */ +/* Return a connection request on CQ. If SOCK is NULL, the request is + left in the queue. If NOBLOCK is true, EWOULDBLOCK is returned + when there are no immediate connections available. */ error_t -connq_listen (struct connq *cq, int noblock, - struct connq_request **req, struct sock **sock) +connq_listen (struct connq *cq, int noblock, struct sock **sock) { + error_t err = 0; + mutex_lock (&cq->lock); - if (noblock && cq->head == cq->tail) + if (noblock && cq->count == 0 && cq->num_connectors == 0) { mutex_unlock (&cq->lock); return EWOULDBLOCK; } - cq->num_listeners++; + if (! sock && (cq->count > 0 || cq->num_connectors > 0)) + /* The caller just wants to know if a connection ready. */ + { + mutex_unlock (&cq->lock); + return 0; + } - while (cq->head == cq->tail) - if (hurd_condition_wait (&cq->listeners, &cq->lock)) - { - cq->num_listeners--; - mutex_unlock (&cq->lock); - return EINTR; - } + cq->num_listeners++; - if (req != NULL) - /* Dequeue the next request, if desired. */ + if (cq->count == 0) + /* The request queue is empty. */ { - *req = cq->queue[cq->head]; - cq->head = qnext (cq, cq->head); - if (sock != NULL) - *sock = (*req)->sock; + assert (! cq->head); + + if (cq->num_connectors > 0) + /* Someone is waiting for an acceptor. Signal that we can + service their request. */ + condition_signal (&cq->connectors); + + do + if (hurd_condition_wait (&cq->listeners, &cq->lock)) + { + cq->num_listeners--; + err = EINTR; + goto out; + } + while (cq->count == 0); } - cq->num_listeners--; + assert (cq->head); + if (sock) + /* Dequeue the next request, if desired. */ + { + struct connq_request *req = connq_request_dequeue (cq); + *sock = req->sock; + free (req); + } + else if (cq->num_listeners > 0) + /* The caller will not actually process this request but someone + else could. (This case is rare but possible: it would require + one thread to do a select on the socket and a second to do an + accept.) */ + condition_signal (&cq->listeners); + else + /* There is no one else to process the request and the connection + has now been initiated. This is not actually a problem as even + if the current queue limit is 0, the connector will queue the + request and another listener (should) eventually come along. + (In fact it is very probably as the caller has likely done a + select and will now follow up with an accept.) */ + ; + + out: mutex_unlock (&cq->lock); - - return 0; -} - -/* Return the error code ERR to the thread that made the listen request REQ, - returned from a previous connq_listen. */ -void -connq_request_complete (struct connq_request *req, error_t err) -{ - mutex_lock (&req->lock); - req->err = err; - req->completed = 1; - condition_signal (&req->signal); - mutex_unlock (&req->lock); + return err; } -/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true, - then return EWOULDBLOCK immediately when there are no immediate - connections available. Neither SOCK nor CQ should be locked. */ +/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is + true, then return EWOULDBLOCK if there are no connections + immediately available. On success, this call must be followed up + either connq_connect_complete or connq_connect_cancel. */ error_t -connq_connect (struct connq *cq, int noblock, struct sock *sock) +connq_connect (struct connq *cq, int noblock) { - error_t err = 0; - unsigned next; - mutex_lock (&cq->lock); /* Check for listeners after we've locked CQ for good. */ - if ((noblock || cq->noqueue) && cq->num_listeners == 0) + + if (noblock + && cq->count + cq->num_connectors >= cq->max + cq->num_listeners) + /* We are in non-blocking mode and would have to wait to secure an + entry in the listen queue. */ { mutex_unlock (&cq->lock); return EWOULDBLOCK; } - next = qnext (cq, cq->tail); - if (next == cq->tail) - /* The queue is full. */ - err = ECONNREFUSED; - else - { - struct connq_request req; + cq->num_connectors ++; - connq_request_init (&req, sock); + while (cq->count + cq->num_connectors > cq->max + cq->num_listeners) + /* The queue is full and there is no immediate listener to service + us. Block until we can get a slot. */ + if (hurd_condition_wait (&cq->connectors, &cq->lock)) + { + cq->num_connectors --; + mutex_unlock (&cq->lock); + return EINTR; + } - cq->queue[cq->tail] = &req; - cq->tail = next; + mutex_unlock (&cq->lock); - /* Hold REQ.LOCK before we signal the condition so that we're sure - to be woken up. */ - mutex_lock (&req.lock); - condition_signal (&cq->listeners); - mutex_unlock (&cq->lock); + return 0; +} - while (!req.completed) - condition_wait (&req.signal, &req.lock); - err = req.err; +/* Follow up to connq_connect. Completes the connect, SOCK is the new + server socket. */ +void +connq_connect_complete (struct connq *cq, struct sock *sock) +{ + struct connq_request *req; + + req = malloc (sizeof (struct connq_request)); + if (! req) + abort (); + + connq_request_init (req, sock); + + mutex_lock (&cq->lock); - mutex_unlock (&req.lock); + assert (cq->num_connectors > 0); + cq->num_connectors --; + + connq_request_enqueue (cq, req); + + if (cq->num_listeners > 0) + /* Wake a listener up. We must consume the listener ref here as + someone else might call this function before the listener + thread dequeues this request. */ + { + cq->num_listeners --; + condition_signal (&cq->listeners); } - return err; + mutex_unlock (&cq->lock); } - -#if 0 -/* `Compresses' CQ, by removing any NULL entries. CQ should be locked. */ -static void -connq_compress (struct connq *cq) + +/* Follow up to connq_connect. Cancel the connect. */ +void +connq_connect_cancel (struct connq *cq) { - unsigned pos; - unsigned comp_tail = cq->head; - - /* Now compress the queue to remove any null entries we put in. */ - for (pos = cq->head; pos != cq->tail; pos = qnext (cq, pos)) - if (cq->queue[pos] != NULL) - /* This position has a non-NULL request, so move it to the end of the - compressed queue. */ - { - cq->queue[comp_tail] = cq->queue[pos]; - comp_tail = qnext (cq, comp_tail); - } + mutex_lock (&cq->lock); + + assert (cq->num_connectors > 0); + cq->num_connectors --; - /* Move back tail to only include what we kept in the queue. */ - cq->tail = comp_tail; + if (cq->count + cq->num_connectors >= cq->max + cq->num_listeners) + /* A connector is blocked and could use the spot we reserved. */ + condition_signal (&cq->connectors); + + mutex_unlock (&cq->lock); } -#endif -/* Set CQ's queue length to LENGTH. Any sockets already waiting for a - connections that are past the new length will fail with ECONNREFUSED. */ +/* Set CQ's queue length to LENGTH. */ error_t -connq_set_length (struct connq *cq, int length) +connq_set_length (struct connq *cq, int max) { - mutex_lock (&cq->lock); + int omax; - if (length > cq->length) - /* Growing the queue is simple... */ - cq->queue = realloc (cq->queue, sizeof (struct connq_request *) * length); - else - /* Shrinking it less so. */ - { - int i; - struct connq_request **new_queue = - malloc (sizeof (struct connq_request *) * length); - - for (i = 0; i < cq->length && cq->head != cq->tail; i++) - { - if (i < length) - /* Keep this connect request in the queue. */ - new_queue[length - i] = cq->queue[cq->head]; - else - /* Punt this one. */ - connq_request_complete (cq->queue[cq->head], ECONNREFUSED); - cq->head = qnext (cq, cq->head); - } - - free (cq->queue); - cq->queue = new_queue; - } - - cq->noqueue = 0; /* Turn on queueing. */ + mutex_lock (&cq->lock); + omax = cq->max; + cq->max = max; + + if (max > omax && cq->count >= omax && cq->count < max + && cq->num_connectors >= cq->num_listeners) + /* This is an increase in the number of connection slots which has + made some slots available and there are waiting threads. Wake + them up. */ + condition_broadcast (&cq->listeners); mutex_unlock (&cq->lock); |