diff options
author | Zheng Da <zhengda1936@gmail.com> | 2009-11-21 12:52:14 +0100 |
---|---|---|
committer | Zheng Da <zhengda1936@gmail.com> | 2009-11-21 12:52:14 +0100 |
commit | 1467ade7d8fd55511e6f94abd41078eab90b57b0 (patch) | |
tree | d90abf6b03c809e6dc22c699c625870cd73710fc /libddekit/thread.c | |
parent | 7c5d267799af896d4bc9e7a03ece141a6764c12c (diff) |
Implement the semaphore (in thread.c).
Diffstat (limited to 'libddekit/thread.c')
-rw-r--r-- | libddekit/thread.c | 294 |
1 files changed, 274 insertions, 20 deletions
diff --git a/libddekit/thread.c b/libddekit/thread.c index ddd5c7ef..a095db2f 100644 --- a/libddekit/thread.c +++ b/libddekit/thread.c @@ -4,17 +4,70 @@ #include <time.h> #include <error.h> +#include "ddekit/semaphore.h" +#include "list.h" #include "ddekit/thread.h" #define DDEKIT_THREAD_STACK_SIZE 0x2000 /* 8 KB */ static struct ddekit_slab *ddekit_stack_slab = NULL; +struct _ddekit_private_data { + struct list list; + condition_t sleep_cond; + /* point to the thread who has the private data. */ + struct ddekit_thread *thread; + mach_msg_header_t wakeupmsg; + +} + struct ddekit_thread { struct cthread thread; }; +struct ddekit_sem +{ + spin_lock_t lock; + /* A list of thread waiting for the semaphore. */ + struct list head; + int value; +}; + +/* Prepare a wakeup message. */ +static error_t _create_wakeupmsg (struct _ddekit_private_data *data) +{ + kern_return_t err; + + /* Build wakeup message. */ + data->wakeupmsg.msgh_bits = MACH_MSGH_BITS (MACH_MSG_TYPE_COPY_SEND, 0); + data->wakeupmsg.msgh_size = 0; + + err = mach_port_allocate (mach_task_self (), MACH_PORT_RIGHT_RECEIVE, + &data->wakeupmsg.msgh_remote_port); + if (err) + return EAGAIN; + + data->wakeupmsg.msgh_local_port = MACH_PORT_NULL; + data->wakeupmsg.msgh_seqno = 0; + data->wakeupmsg.msgh_id = 0; + + err = mach_port_insert_right (mach_task_self (), + data->wakeupmsg.msgh_remote_port, + data->wakeupmsg.msgh_remote_port, + MACH_MSG_TYPE_MAKE_SEND); + if (err) { + mach_port_destroy (mach_task_self (), + data->wakeupmsg.msgh_remote_port); + return EAGAIN; + } + + return 0; +} + static void setup_thread (cthread_t *t, const char *name) { + error_t err; + struct _ddekit_private_data *private_data; + if (name) { const char *cpy = NULL; @@ -32,9 +85,21 @@ static void setup_thread (cthread_t *t, const char *name) { * the user of this library. It's very safe to store * the condition variable in ldata. */ - sleep_cond = condition_alloc (); - condition_init (sleep_cond); - cthread_set_ldata (t, sleep_cond); + + private_data = (struct _ddekit_private_data *) + ddekit_simple_malloc (sizeof (*private_data)); + + private_data->sleep_cond = condition_alloc (); + condition_init (private_data->sleep_cond); + + private_data->list = {&private_data->list, &private_data->list}; + private_data->thread = t; + + err = _create_wakeupmsg (private_data); + // TODO I need to change this. + assert_perror (err); + + cthread_set_ldata (t, private_data); } ddekit_thread_t *ddekit_thread_setup_myself(const char *name) { @@ -46,11 +111,11 @@ ddekit_thread_t *ddekit_thread_setup_myself(const char *name) { ddekit_thread_t *ddekit_thread_create(void (*fun)(void *), void *arg, const char *name) { ddekit_thread_t *td; - condition_t sleep_cond; - // TODO not very sure whether I should let the thread suspend + // TODO I should let the thread suspend // before initialization is completed. td = (ddekit_thread_t *) cthread_fork (fun, arg); + cthread_detach (&td->thread); setup_thread (&td->thread, name); return td; } @@ -60,8 +125,6 @@ ddekit_thread_t *ddekit_thread_myself(void) { } void ddekit_thread_set_data(ddekit_thread_t *thread, void *data) { - // TODO not very sure whether I should call cthread_set_ldata - // or cthread_set_data. cthread_set_data ((cthread_t) thread, data); } @@ -112,37 +175,33 @@ void ddekit_thread_nsleep(unsigned long nsecs) { } void ddekit_thread_sleep(ddekit_lock_t *lock) { - ddekit_thread_t *td; - condition_t sleep_cond; - - td = ddekit_thread_myself(); - sleep_cond = ddekit_thread_get_data (td); + struct _ddekit_private_data *data = cthread_ldata (cthread_self ()); // TODO condition_wait cannot guarantee that the thread is // woke up by another thread, maybe by signals. // Does it matter here? - condition_wait (sleep_cond, lock); + condition_wait (data->sleep_cond, lock); } -void ddekit_thread_wakeup(ddekit_thread_t *td) { - condition_t sleep_cond; +void dekit_thread_wakeup(ddekit_thread_t *td) { + struct _ddekit_private_data *data = cthread_ldata (cthread_self ()); - sleep_cond = ddekit_thread_get_data (td); - condition_signal (sleep_cond); + condition_signal (data->sleep_cond); } void ddekit_thread_exit() { const char *name; - condition_t sleep_cond; + struct _ddekit_private_data *data; cthread_t t = cthread_self (); // TODO I hope I don't need a lock to protect ldata and name. /* I have to free the sleep condition variable * before the thread exits. */ - sleep_cond = cthread_ldata (t); + data = cthread_ldata (t); cthread_set_ldata (t, NULL); - condition_free (sleep_cond); + condition_free (data->sleep_cond); + ddekit_simple_free (data); name = cthread_name (t); cthread_set_name (t, NULL); @@ -169,3 +228,198 @@ void ddekit_init_threads() { // TODO maybe the name has already been set. cthread_set_name (cthread_self (), "main"); } + +/* Block THREAD. */ +static error_t _timedblock (struct _ddekit_private_data *data, + const struct timespec *abstime) +{ + error_t err; + mach_msg_header_t msg; + mach_msg_timeout_t timeout; + struct timeval now; + + /* We have an absolute time and now we have to convert it to a + relative time. Arg. */ + + err = gettimeofday(&now, NULL); + assert (! err); + + if (now.tv_sec > abstime->tv_sec + || (now.tv_sec == abstime->tv_sec + && now.tv_usec > ((abstime->tv_nsec + 999) / 1000))) + return ETIMEDOUT; + + timeout = (abstime->tv_sec - now.tv_sec) * 1000; + + if (((abstime->tv_nsec + 999) / 1000) >= now.tv_usec) + timeout -= (((abstime->tv_nsec + 999) / 1000) + - now.tv_usec + 999) / 1000; + else + /* Need to do a carry. */ + timeout -= 1000 + ((abstime->tv_nsec + 999999) / 1000000) + - (now.tv_usec + 999) / 1000; + + err = mach_msg (&msg, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, + sizeof msg, data->wakeupmsg.msgh_remote_port, + timeout, MACH_PORT_NULL); + if (err == EMACH_RCV_TIMED_OUT) + return ETIMEDOUT; + + assert_perror (err); + return 0; +} + +/* Block THREAD. */ +static void _block (struct _ddekit_private_data *data) +{ + mach_msg_header_t msg; + error_t err; + + err = mach_msg (&msg, MACH_RCV_MSG, 0, sizeof msg, + data->wakeupmsg.msgh_remote_port, + MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); + assert_perror (err); +} + +static int _sem_timedwait_internal (sem_t *restrict sem, + const struct timespec *restrict timeout) +{ + struct ddekit_private_data *self_private_data; + + spin_lock (&sem->lock); + if (sem->value > 0) { + /* Successful down. */ + sem->value --; + spin_unlock (&sem->__lock); + return 0; + } + + if (timeout && (timeout->tv_nsec < 0 + || timeout->tv_nsec >= 1000000000)) { + errno = EINVAL; + return -1; + } + + /* Add ourselves to the queue. */ + self_private_data = cthread_ldata (cthread_self ()); + + add_entry_head (&sem->head, (struct list *) self_private_data); + spin_unlock (&sem->lock); + + /* Block the thread. */ + if (timeout) { + error_t err; + + err = _timedblock (self_private_data, timeout); + if (err) { + /* We timed out. We may need to disconnect ourself from the + waiter queue. + + FIXME: What do we do if we get a wakeup message before we + disconnect ourself? It may remain until the next time we + block. */ + assert (err == ETIMEDOUT); + + spin_lock (&sem->lock); + remove_entry ((struct list *) self_private_data); + spin_unlock (&sem->lock); + + errno = err; + return -1; + } + } + else + _block (self_private_data); + + return 0; +} + +/* Wakeup THREAD. */ +static void _thread_wakeup (struct _ddekit_private_data *data) +{ + error_t err; + + err = mach_msg (&data->wakeupmsg, MACH_SEND_MSG, + sizeof (data->wakeupmsg), 0, MACH_PORT_NULL, + MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); + assert_perror (err); +} + +ddekit_sem_t *ddekit_sem_init(int value) { + ddekit_sem_t *sem = + (ddekit_sem_t *) ddekit_simple_malloc (sizeof (*sem)); + + sem->lock = SPIN_LOCK_INITIALIZER; + sem->head = {&sem->head, &sem->head}; + sem->value = value; + return sem; +} + +void ddekit_sem_deinit(ddekit_sem_t *sem) { + if (!EMPTY_ENTRY (&sem->head)) { + error (0, EBUSY, "ddekit_sem_deinit"); + } + else + ddekit_simple_free(sem); +} + +void ddekit_sem_down(ddekit_sem_t *sem) { + _sem_timedwait_internal (sem, NULL); +} + +/* returns 0 on success, != 0 when it would block */ +int ddekit_sem_down_try(ddekit_sem_t *sem) { + spin_lock (&sem->lock); + if (sem->value > 0) { + /* Successful down. */ + sem->value --; + spin_unlock (&sem->lock); + return 0; + } + spin_unlock (&sem->lock); + + return -1; +} + +/* returns 0 on success, != 0 on timeout */ +int ddekit_sem_down_timed(ddekit_sem_t *sem, int timo) { + /* wait for up to timo milliseconds */ + struct timespec timeout; + + timeout.tv_sec = timo / 1000; + timeout.tv_nsec = (timo % 1000) * 1000 * 1000; + return __sem_timedwait_internal (sem, &timeout); +} + +void ddekit_sem_up(ddekit_sem_t *sem) { + struct _ddekit_thread_data *wakeup; + + spin_lock (&sem->lock); + if (sem->value > 0) { + /* Do a quick up. */ + assert (EMPTY_LIST (&sem->head)); + sem->value ++; + spin_unlock (&sem->lock); + return 0; + } + + if (EMPTY_LIST (&sem->head)) { + /* No one waiting. */ + sem->value = 1; + spin_unlock (&sem->lock); + return 0; + } + + /* Wake someone up. */ + + /* First dequeue someone. */ + wakeup = (struct _ddekit_private_data *) remove_entry_end (&sem->head); + + /* Then drop the lock and transfer control. */ + spin_unlock (&sem->lock); + if (wakeup) + _thread_wakeup (wakeup); + + return 0; +} + |