diff options
Diffstat (limited to 'libddekit/thread.c')
-rw-r--r-- | libddekit/thread.c | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/libddekit/thread.c b/libddekit/thread.c new file mode 100644 index 00000000..8f7b3ed9 --- /dev/null +++ b/libddekit/thread.c @@ -0,0 +1,434 @@ +#include <stdio.h> +#include <string.h> +#include <pthread.h> +#include <time.h> +#include <error.h> +#include <mach.h> +#include <hurd.h> +#include <sys/time.h> +#include <assert.h> + +#include "ddekit/memory.h" +#include "ddekit/semaphore.h" +#include "ddekit/condvar.h" +#include "list.h" +#include "ddekit/thread.h" + +#define DDEKIT_THREAD_STACK_SIZE 0x2000 /* 8 KB */ + +//static struct ddekit_slab *ddekit_stack_slab = NULL; + +struct _ddekit_private_data { + struct list list; + ddekit_condvar_t *sleep_cond; + /* point to the thread who has the private data. */ + struct ddekit_thread *thread; + mach_msg_header_t wakeupmsg; +}; + +struct ddekit_thread { + pthread_t thread; + char *name; + struct _ddekit_private_data *private; + void *user; +}; + +struct ddekit_sem +{ + pthread_spinlock_t lock; + /* A list of thread waiting for the semaphore. */ + struct list head; + int value; +}; + +static __thread struct ddekit_thread *thread_self; + +static void _thread_cleanup () +{ + mach_port_destroy (mach_task_self (), + thread_self->private->wakeupmsg.msgh_remote_port); + ddekit_condvar_deinit (thread_self->private->sleep_cond); + ddekit_simple_free (thread_self->private); + ddekit_simple_free (thread_self->name); + ddekit_simple_free (thread_self); +} + +/* Prepare a wakeup message. */ +static error_t _create_wakeupmsg (struct _ddekit_private_data *data) +{ + kern_return_t err; + + /* Build wakeup message. */ + data->wakeupmsg.msgh_bits = MACH_MSGH_BITS (MACH_MSG_TYPE_COPY_SEND, 0); + data->wakeupmsg.msgh_size = 0; + + err = mach_port_allocate (mach_task_self (), MACH_PORT_RIGHT_RECEIVE, + &data->wakeupmsg.msgh_remote_port); + if (err) + return EAGAIN; + + data->wakeupmsg.msgh_local_port = MACH_PORT_NULL; + data->wakeupmsg.msgh_seqno = 0; + data->wakeupmsg.msgh_id = 0; + + err = mach_port_insert_right (mach_task_self (), + data->wakeupmsg.msgh_remote_port, + data->wakeupmsg.msgh_remote_port, + MACH_MSG_TYPE_MAKE_SEND); + if (err) { + mach_port_destroy (mach_task_self (), + data->wakeupmsg.msgh_remote_port); + return EAGAIN; + } + + return 0; +} + +static void setup_thread (struct ddekit_thread *t, const char *name) { + error_t err; + struct _ddekit_private_data *private_data; + + if (name) { + char *cpy = NULL; + + cpy = ddekit_simple_malloc (strlen (name) + 1); + if (cpy == NULL) + error (0, 0, "fail to allocate memory"); + else + strcpy (cpy, name); + + t->name = cpy; + } + + private_data = (struct _ddekit_private_data *) + ddekit_simple_malloc (sizeof (*private_data)); + + private_data->sleep_cond = ddekit_condvar_init (); + private_data->list.prev = &private_data->list; + private_data->list.next = &private_data->list; + private_data->thread = t; + + err = _create_wakeupmsg (private_data); + if (err) + error (1, err, "_create_wakeupmsg"); + + t->private = private_data; +} + +ddekit_thread_t *ddekit_thread_setup_myself(const char *name) { + ddekit_thread_t *td = (ddekit_thread_t *) malloc (sizeof (*td)); + setup_thread (td, name); + thread_self = td; + return td; +} + +typedef struct +{ + void (*fun)(void *); + void *arg; + struct ddekit_thread *td; + pthread_cond_t cond; + pthread_mutex_t lock; + int status; +} priv_arg_t; + +static void* _priv_fun (void *arg) +{ + priv_arg_t *priv_arg = arg; + thread_self = priv_arg->td; + /* We wait until the initialization of the thread is finished. */ + pthread_mutex_lock (&priv_arg->lock); + while (!priv_arg->status) + pthread_cond_wait (&priv_arg->cond, &priv_arg->lock); + pthread_mutex_unlock (&priv_arg->lock); + + priv_arg->fun(priv_arg->arg); + free (priv_arg->arg); + _thread_cleanup (); + return NULL; +} + +ddekit_thread_t *ddekit_thread_create(void (*fun)(void *), void *arg, const char *name) { + ddekit_thread_t *td = (ddekit_thread_t *) malloc (sizeof (*td)); + setup_thread (td, name); + + priv_arg_t *priv_arg = (priv_arg_t *) malloc (sizeof (*priv_arg)); + priv_arg->fun = fun; + priv_arg->arg = arg; + priv_arg->td = td; + pthread_cond_init (&priv_arg->cond, NULL); + pthread_mutex_init (&priv_arg->lock, NULL); + priv_arg->status = 0; + + pthread_create (&td->thread, NULL, _priv_fun, priv_arg); + pthread_detach (td->thread); + + /* Tell the new thread that initialization has been finished. */ + pthread_mutex_lock (&priv_arg->lock); + priv_arg->status = 1; + pthread_cond_signal (&priv_arg->cond); + pthread_mutex_unlock (&priv_arg->lock); + + return td; +} + +ddekit_thread_t *ddekit_thread_myself(void) { + return thread_self; +} + +void ddekit_thread_set_data(ddekit_thread_t *thread, void *data) { + thread->user = data; +} + +void ddekit_thread_set_my_data(void *data) { + ddekit_thread_set_data(ddekit_thread_myself(), data); +} + +void *ddekit_thread_get_data(ddekit_thread_t *thread) { + return thread->user; +} + +void *ddekit_thread_get_my_data() { + return ddekit_thread_get_data(ddekit_thread_myself()); +} + +void ddekit_thread_msleep(unsigned long msecs) { + int ret; + struct timespec rgt; + + rgt.tv_sec = (time_t) (msecs / 1000); + rgt.tv_nsec = (msecs % 1000) * 1000 * 1000; + ret = nanosleep (&rgt , NULL); + if (ret < 0) + error (0, errno, "nanosleep"); +} + +void ddekit_thread_usleep(unsigned long usecs) { + int ret; + struct timespec rgt; + + rgt.tv_sec = (time_t) (usecs / 1000 / 1000); + rgt.tv_nsec = (usecs % (1000 * 1000)) * 1000; + ret = nanosleep (&rgt , NULL); + if (ret < 0) + error (0, errno, "nanosleep"); +} + + +void ddekit_thread_nsleep(unsigned long nsecs) { + int ret; + struct timespec rgt; + + rgt.tv_sec = (time_t) (nsecs / 1000 / 1000 / 1000); + rgt.tv_nsec = nsecs % (1000 * 1000 * 1000); + ret = nanosleep (&rgt , NULL); + if (ret < 0) + error (0, errno, "nanosleep"); +} + +void ddekit_thread_sleep(ddekit_lock_t *lock) { + // TODO pthread_cond_wait cannot guarantee that the thread is + // woke up by another thread, maybe by signals. + // Does it matter here? + // If it does, use pthread_hurd_cond_wait_np. + ddekit_condvar_wait (thread_self->private->sleep_cond, lock); +} + +void ddekit_thread_wakeup(ddekit_thread_t *td) { + if (td->private == NULL) + return; + ddekit_condvar_signal (td->private->sleep_cond); +} + +void ddekit_thread_exit() { + _thread_cleanup (); + pthread_exit (NULL); +} + +const char *ddekit_thread_get_name(ddekit_thread_t *thread) { + return thread->name; +} + +void ddekit_thread_schedule(void) +{ + swtch_pri (0); +} + +void ddekit_yield(void) +{ + swtch_pri (0); +} + +void ddekit_init_threads() { + ddekit_thread_setup_myself ("main"); +} + +/********************************************************************** + * semaphore + **********************************************************************/ + +/* Block THREAD. */ +static error_t _timedblock (struct _ddekit_private_data *data, + const int timeout) +{ + error_t err; + mach_msg_header_t msg; + + assert (timeout > 0); + + err = mach_msg (&msg, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, + sizeof msg, data->wakeupmsg.msgh_remote_port, + timeout, MACH_PORT_NULL); + if (err == EMACH_RCV_TIMED_OUT) + return ETIMEDOUT; + + assert_perror (err); + return 0; +} + +/* Block THREAD. */ +static void _block (struct _ddekit_private_data *data) +{ + mach_msg_header_t msg; + error_t err; + + err = mach_msg (&msg, MACH_RCV_MSG, 0, sizeof msg, + data->wakeupmsg.msgh_remote_port, + MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); + assert_perror (err); +} + +static int _sem_timedwait_internal (ddekit_sem_t *restrict sem, + const int timeout) +{ + pthread_spin_lock (&sem->lock); + if (sem->value > 0) { + /* Successful down. */ + sem->value --; + pthread_spin_unlock (&sem->lock); + return 0; + } + + if (timeout < 0) { + pthread_spin_unlock (&sem->lock); + errno = EINVAL; + return -1; + } + + /* Add ourselves to the queue. */ + add_entry_head (&sem->head, &thread_self->private->list); + pthread_spin_unlock (&sem->lock); + + /* Block the thread. */ + if (timeout) { + error_t err; + + err = _timedblock (thread_self->private, timeout); + if (err) { + /* We timed out. We may need to disconnect ourself from the + waiter queue. + + FIXME: What do we do if we get a wakeup message before we + disconnect ourself? It may remain until the next time we + block. */ + assert (err == ETIMEDOUT); + + pthread_spin_lock (&sem->lock); + remove_entry (&thread_self->private->list); + pthread_spin_unlock (&sem->lock); + + errno = err; + return -1; + } + } + else + _block (thread_self->private); + + return 0; +} + +/* Wakeup THREAD. */ +static void _thread_wakeup (struct _ddekit_private_data *data) +{ + error_t err; + + err = mach_msg (&data->wakeupmsg, MACH_SEND_MSG, + sizeof (data->wakeupmsg), 0, MACH_PORT_NULL, + MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); + assert_perror (err); +} + +ddekit_sem_t *ddekit_sem_init(int value) { + ddekit_sem_t *sem = + (ddekit_sem_t *) ddekit_simple_malloc (sizeof (*sem)); + + sem->lock = PTHREAD_SPINLOCK_INITIALIZER; + sem->head.prev = &sem->head; + sem->head.next = &sem->head; + sem->value = value; + return sem; +} + +void ddekit_sem_deinit(ddekit_sem_t *sem) { + if (!EMPTY_LIST (&sem->head)) { + error (0, EBUSY, "ddekit_sem_deinit"); + } + else + ddekit_simple_free(sem); +} + +void ddekit_sem_down(ddekit_sem_t *sem) { + _sem_timedwait_internal (sem, 0); +} + +/* returns 0 on success, != 0 when it would block */ +int ddekit_sem_down_try(ddekit_sem_t *sem) { + pthread_spin_lock (&sem->lock); + if (sem->value > 0) { + /* Successful down. */ + sem->value --; + pthread_spin_unlock (&sem->lock); + return 0; + } + pthread_spin_unlock (&sem->lock); + + return -1; +} + +/* returns 0 on success, != 0 on timeout */ +int ddekit_sem_down_timed(ddekit_sem_t *sem, int timo) { + /* wait for up to timo milliseconds */ + return _sem_timedwait_internal (sem, timo); +} + +void ddekit_sem_up(ddekit_sem_t *sem) { + struct _ddekit_private_data *wakeup; + + pthread_spin_lock (&sem->lock); + if (sem->value > 0) { + /* Do a quick up. */ + assert (EMPTY_LIST (&sem->head)); + sem->value ++; + pthread_spin_unlock (&sem->lock); + return; + } + + if (EMPTY_LIST (&sem->head)) { + /* No one waiting. */ + sem->value = 1; + pthread_spin_unlock (&sem->lock); + return; + } + + /* Wake someone up. */ + + /* First dequeue someone. */ + wakeup = LIST_ENTRY (remove_entry_end (&sem->head), + struct _ddekit_private_data, list); + + /* Then drop the lock and transfer control. */ + pthread_spin_unlock (&sem->lock); + if (wakeup) + _thread_wakeup (wakeup); +} + |