summaryrefslogtreecommitdiff
path: root/libddekit/thread.c
diff options
context:
space:
mode:
authorZheng Da <zhengda1936@gmail.com>2009-11-21 12:52:14 +0100
committerZheng Da <zhengda1936@gmail.com>2009-11-21 12:52:14 +0100
commit1467ade7d8fd55511e6f94abd41078eab90b57b0 (patch)
treed90abf6b03c809e6dc22c699c625870cd73710fc /libddekit/thread.c
parent7c5d267799af896d4bc9e7a03ece141a6764c12c (diff)
Implement the semaphore (in thread.c).
Diffstat (limited to 'libddekit/thread.c')
-rw-r--r--libddekit/thread.c294
1 files changed, 274 insertions, 20 deletions
diff --git a/libddekit/thread.c b/libddekit/thread.c
index ddd5c7ef..a095db2f 100644
--- a/libddekit/thread.c
+++ b/libddekit/thread.c
@@ -4,17 +4,70 @@
#include <time.h>
#include <error.h>
+#include "ddekit/semaphore.h"
+#include "list.h"
#include "ddekit/thread.h"
#define DDEKIT_THREAD_STACK_SIZE 0x2000 /* 8 KB */
static struct ddekit_slab *ddekit_stack_slab = NULL;
+struct _ddekit_private_data {
+ struct list list;
+ condition_t sleep_cond;
+ /* point to the thread who has the private data. */
+ struct ddekit_thread *thread;
+ mach_msg_header_t wakeupmsg;
+
+}
+
struct ddekit_thread {
struct cthread thread;
};
+struct ddekit_sem
+{
+ spin_lock_t lock;
+ /* A list of thread waiting for the semaphore. */
+ struct list head;
+ int value;
+};
+
+/* Prepare a wakeup message. */
+static error_t _create_wakeupmsg (struct _ddekit_private_data *data)
+{
+ kern_return_t err;
+
+ /* Build wakeup message. */
+ data->wakeupmsg.msgh_bits = MACH_MSGH_BITS (MACH_MSG_TYPE_COPY_SEND, 0);
+ data->wakeupmsg.msgh_size = 0;
+
+ err = mach_port_allocate (mach_task_self (), MACH_PORT_RIGHT_RECEIVE,
+ &data->wakeupmsg.msgh_remote_port);
+ if (err)
+ return EAGAIN;
+
+ data->wakeupmsg.msgh_local_port = MACH_PORT_NULL;
+ data->wakeupmsg.msgh_seqno = 0;
+ data->wakeupmsg.msgh_id = 0;
+
+ err = mach_port_insert_right (mach_task_self (),
+ data->wakeupmsg.msgh_remote_port,
+ data->wakeupmsg.msgh_remote_port,
+ MACH_MSG_TYPE_MAKE_SEND);
+ if (err) {
+ mach_port_destroy (mach_task_self (),
+ data->wakeupmsg.msgh_remote_port);
+ return EAGAIN;
+ }
+
+ return 0;
+}
+
static void setup_thread (cthread_t *t, const char *name) {
+ error_t err;
+ struct _ddekit_private_data *private_data;
+
if (name) {
const char *cpy = NULL;
@@ -32,9 +85,21 @@ static void setup_thread (cthread_t *t, const char *name) {
* the user of this library. It's very safe to store
* the condition variable in ldata.
*/
- sleep_cond = condition_alloc ();
- condition_init (sleep_cond);
- cthread_set_ldata (t, sleep_cond);
+
+ private_data = (struct _ddekit_private_data *)
+ ddekit_simple_malloc (sizeof (*private_data));
+
+ private_data->sleep_cond = condition_alloc ();
+ condition_init (private_data->sleep_cond);
+
+ private_data->list = {&private_data->list, &private_data->list};
+ private_data->thread = t;
+
+ err = _create_wakeupmsg (private_data);
+ // TODO I need to change this.
+ assert_perror (err);
+
+ cthread_set_ldata (t, private_data);
}
ddekit_thread_t *ddekit_thread_setup_myself(const char *name) {
@@ -46,11 +111,11 @@ ddekit_thread_t *ddekit_thread_setup_myself(const char *name) {
ddekit_thread_t *ddekit_thread_create(void (*fun)(void *), void *arg, const char *name) {
ddekit_thread_t *td;
- condition_t sleep_cond;
- // TODO not very sure whether I should let the thread suspend
+ // TODO I should let the thread suspend
// before initialization is completed.
td = (ddekit_thread_t *) cthread_fork (fun, arg);
+ cthread_detach (&td->thread);
setup_thread (&td->thread, name);
return td;
}
@@ -60,8 +125,6 @@ ddekit_thread_t *ddekit_thread_myself(void) {
}
void ddekit_thread_set_data(ddekit_thread_t *thread, void *data) {
- // TODO not very sure whether I should call cthread_set_ldata
- // or cthread_set_data.
cthread_set_data ((cthread_t) thread, data);
}
@@ -112,37 +175,33 @@ void ddekit_thread_nsleep(unsigned long nsecs) {
}
void ddekit_thread_sleep(ddekit_lock_t *lock) {
- ddekit_thread_t *td;
- condition_t sleep_cond;
-
- td = ddekit_thread_myself();
- sleep_cond = ddekit_thread_get_data (td);
+ struct _ddekit_private_data *data = cthread_ldata (cthread_self ());
// TODO condition_wait cannot guarantee that the thread is
// woke up by another thread, maybe by signals.
// Does it matter here?
- condition_wait (sleep_cond, lock);
+ condition_wait (data->sleep_cond, lock);
}
-void ddekit_thread_wakeup(ddekit_thread_t *td) {
- condition_t sleep_cond;
+void dekit_thread_wakeup(ddekit_thread_t *td) {
+ struct _ddekit_private_data *data = cthread_ldata (cthread_self ());
- sleep_cond = ddekit_thread_get_data (td);
- condition_signal (sleep_cond);
+ condition_signal (data->sleep_cond);
}
void ddekit_thread_exit() {
const char *name;
- condition_t sleep_cond;
+ struct _ddekit_private_data *data;
cthread_t t = cthread_self ();
// TODO I hope I don't need a lock to protect ldata and name.
/* I have to free the sleep condition variable
* before the thread exits. */
- sleep_cond = cthread_ldata (t);
+ data = cthread_ldata (t);
cthread_set_ldata (t, NULL);
- condition_free (sleep_cond);
+ condition_free (data->sleep_cond);
+ ddekit_simple_free (data);
name = cthread_name (t);
cthread_set_name (t, NULL);
@@ -169,3 +228,198 @@ void ddekit_init_threads() {
// TODO maybe the name has already been set.
cthread_set_name (cthread_self (), "main");
}
+
+/* Block THREAD. */
+static error_t _timedblock (struct _ddekit_private_data *data,
+ const struct timespec *abstime)
+{
+ error_t err;
+ mach_msg_header_t msg;
+ mach_msg_timeout_t timeout;
+ struct timeval now;
+
+ /* We have an absolute time and now we have to convert it to a
+ relative time. Arg. */
+
+ err = gettimeofday(&now, NULL);
+ assert (! err);
+
+ if (now.tv_sec > abstime->tv_sec
+ || (now.tv_sec == abstime->tv_sec
+ && now.tv_usec > ((abstime->tv_nsec + 999) / 1000)))
+ return ETIMEDOUT;
+
+ timeout = (abstime->tv_sec - now.tv_sec) * 1000;
+
+ if (((abstime->tv_nsec + 999) / 1000) >= now.tv_usec)
+ timeout -= (((abstime->tv_nsec + 999) / 1000)
+ - now.tv_usec + 999) / 1000;
+ else
+ /* Need to do a carry. */
+ timeout -= 1000 + ((abstime->tv_nsec + 999999) / 1000000)
+ - (now.tv_usec + 999) / 1000;
+
+ err = mach_msg (&msg, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
+ sizeof msg, data->wakeupmsg.msgh_remote_port,
+ timeout, MACH_PORT_NULL);
+ if (err == EMACH_RCV_TIMED_OUT)
+ return ETIMEDOUT;
+
+ assert_perror (err);
+ return 0;
+}
+
+/* Block THREAD. */
+static void _block (struct _ddekit_private_data *data)
+{
+ mach_msg_header_t msg;
+ error_t err;
+
+ err = mach_msg (&msg, MACH_RCV_MSG, 0, sizeof msg,
+ data->wakeupmsg.msgh_remote_port,
+ MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
+ assert_perror (err);
+}
+
+static int _sem_timedwait_internal (sem_t *restrict sem,
+ const struct timespec *restrict timeout)
+{
+ struct ddekit_private_data *self_private_data;
+
+ spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Successful down. */
+ sem->value --;
+ spin_unlock (&sem->__lock);
+ return 0;
+ }
+
+ if (timeout && (timeout->tv_nsec < 0
+ || timeout->tv_nsec >= 1000000000)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ /* Add ourselves to the queue. */
+ self_private_data = cthread_ldata (cthread_self ());
+
+ add_entry_head (&sem->head, (struct list *) self_private_data);
+ spin_unlock (&sem->lock);
+
+ /* Block the thread. */
+ if (timeout) {
+ error_t err;
+
+ err = _timedblock (self_private_data, timeout);
+ if (err) {
+ /* We timed out. We may need to disconnect ourself from the
+ waiter queue.
+
+ FIXME: What do we do if we get a wakeup message before we
+ disconnect ourself? It may remain until the next time we
+ block. */
+ assert (err == ETIMEDOUT);
+
+ spin_lock (&sem->lock);
+ remove_entry ((struct list *) self_private_data);
+ spin_unlock (&sem->lock);
+
+ errno = err;
+ return -1;
+ }
+ }
+ else
+ _block (self_private_data);
+
+ return 0;
+}
+
+/* Wakeup THREAD. */
+static void _thread_wakeup (struct _ddekit_private_data *data)
+{
+ error_t err;
+
+ err = mach_msg (&data->wakeupmsg, MACH_SEND_MSG,
+ sizeof (data->wakeupmsg), 0, MACH_PORT_NULL,
+ MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
+ assert_perror (err);
+}
+
+ddekit_sem_t *ddekit_sem_init(int value) {
+ ddekit_sem_t *sem =
+ (ddekit_sem_t *) ddekit_simple_malloc (sizeof (*sem));
+
+ sem->lock = SPIN_LOCK_INITIALIZER;
+ sem->head = {&sem->head, &sem->head};
+ sem->value = value;
+ return sem;
+}
+
+void ddekit_sem_deinit(ddekit_sem_t *sem) {
+ if (!EMPTY_ENTRY (&sem->head)) {
+ error (0, EBUSY, "ddekit_sem_deinit");
+ }
+ else
+ ddekit_simple_free(sem);
+}
+
+void ddekit_sem_down(ddekit_sem_t *sem) {
+ _sem_timedwait_internal (sem, NULL);
+}
+
+/* returns 0 on success, != 0 when it would block */
+int ddekit_sem_down_try(ddekit_sem_t *sem) {
+ spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Successful down. */
+ sem->value --;
+ spin_unlock (&sem->lock);
+ return 0;
+ }
+ spin_unlock (&sem->lock);
+
+ return -1;
+}
+
+/* returns 0 on success, != 0 on timeout */
+int ddekit_sem_down_timed(ddekit_sem_t *sem, int timo) {
+ /* wait for up to timo milliseconds */
+ struct timespec timeout;
+
+ timeout.tv_sec = timo / 1000;
+ timeout.tv_nsec = (timo % 1000) * 1000 * 1000;
+ return __sem_timedwait_internal (sem, &timeout);
+}
+
+void ddekit_sem_up(ddekit_sem_t *sem) {
+ struct _ddekit_thread_data *wakeup;
+
+ spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Do a quick up. */
+ assert (EMPTY_LIST (&sem->head));
+ sem->value ++;
+ spin_unlock (&sem->lock);
+ return 0;
+ }
+
+ if (EMPTY_LIST (&sem->head)) {
+ /* No one waiting. */
+ sem->value = 1;
+ spin_unlock (&sem->lock);
+ return 0;
+ }
+
+ /* Wake someone up. */
+
+ /* First dequeue someone. */
+ wakeup = (struct _ddekit_private_data *) remove_entry_end (&sem->head);
+
+ /* Then drop the lock and transfer control. */
+ spin_unlock (&sem->lock);
+ if (wakeup)
+ _thread_wakeup (wakeup);
+
+ return 0;
+}
+