summaryrefslogtreecommitdiff
path: root/libddekit/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'libddekit/thread.c')
-rw-r--r--libddekit/thread.c434
1 files changed, 434 insertions, 0 deletions
diff --git a/libddekit/thread.c b/libddekit/thread.c
new file mode 100644
index 00000000..8f7b3ed9
--- /dev/null
+++ b/libddekit/thread.c
@@ -0,0 +1,434 @@
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <time.h>
+#include <error.h>
+#include <mach.h>
+#include <hurd.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "ddekit/memory.h"
+#include "ddekit/semaphore.h"
+#include "ddekit/condvar.h"
+#include "list.h"
+#include "ddekit/thread.h"
+
+#define DDEKIT_THREAD_STACK_SIZE 0x2000 /* 8 KB */
+
+//static struct ddekit_slab *ddekit_stack_slab = NULL;
+
+struct _ddekit_private_data {
+ struct list list;
+ ddekit_condvar_t *sleep_cond;
+ /* point to the thread who has the private data. */
+ struct ddekit_thread *thread;
+ mach_msg_header_t wakeupmsg;
+};
+
+struct ddekit_thread {
+ pthread_t thread;
+ char *name;
+ struct _ddekit_private_data *private;
+ void *user;
+};
+
+struct ddekit_sem
+{
+ pthread_spinlock_t lock;
+ /* A list of thread waiting for the semaphore. */
+ struct list head;
+ int value;
+};
+
+static __thread struct ddekit_thread *thread_self;
+
+static void _thread_cleanup ()
+{
+ mach_port_destroy (mach_task_self (),
+ thread_self->private->wakeupmsg.msgh_remote_port);
+ ddekit_condvar_deinit (thread_self->private->sleep_cond);
+ ddekit_simple_free (thread_self->private);
+ ddekit_simple_free (thread_self->name);
+ ddekit_simple_free (thread_self);
+}
+
+/* Prepare a wakeup message. */
+static error_t _create_wakeupmsg (struct _ddekit_private_data *data)
+{
+ kern_return_t err;
+
+ /* Build wakeup message. */
+ data->wakeupmsg.msgh_bits = MACH_MSGH_BITS (MACH_MSG_TYPE_COPY_SEND, 0);
+ data->wakeupmsg.msgh_size = 0;
+
+ err = mach_port_allocate (mach_task_self (), MACH_PORT_RIGHT_RECEIVE,
+ &data->wakeupmsg.msgh_remote_port);
+ if (err)
+ return EAGAIN;
+
+ data->wakeupmsg.msgh_local_port = MACH_PORT_NULL;
+ data->wakeupmsg.msgh_seqno = 0;
+ data->wakeupmsg.msgh_id = 0;
+
+ err = mach_port_insert_right (mach_task_self (),
+ data->wakeupmsg.msgh_remote_port,
+ data->wakeupmsg.msgh_remote_port,
+ MACH_MSG_TYPE_MAKE_SEND);
+ if (err) {
+ mach_port_destroy (mach_task_self (),
+ data->wakeupmsg.msgh_remote_port);
+ return EAGAIN;
+ }
+
+ return 0;
+}
+
+static void setup_thread (struct ddekit_thread *t, const char *name) {
+ error_t err;
+ struct _ddekit_private_data *private_data;
+
+ if (name) {
+ char *cpy = NULL;
+
+ cpy = ddekit_simple_malloc (strlen (name) + 1);
+ if (cpy == NULL)
+ error (0, 0, "fail to allocate memory");
+ else
+ strcpy (cpy, name);
+
+ t->name = cpy;
+ }
+
+ private_data = (struct _ddekit_private_data *)
+ ddekit_simple_malloc (sizeof (*private_data));
+
+ private_data->sleep_cond = ddekit_condvar_init ();
+ private_data->list.prev = &private_data->list;
+ private_data->list.next = &private_data->list;
+ private_data->thread = t;
+
+ err = _create_wakeupmsg (private_data);
+ if (err)
+ error (1, err, "_create_wakeupmsg");
+
+ t->private = private_data;
+}
+
+ddekit_thread_t *ddekit_thread_setup_myself(const char *name) {
+ ddekit_thread_t *td = (ddekit_thread_t *) malloc (sizeof (*td));
+ setup_thread (td, name);
+ thread_self = td;
+ return td;
+}
+
+typedef struct
+{
+ void (*fun)(void *);
+ void *arg;
+ struct ddekit_thread *td;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+ int status;
+} priv_arg_t;
+
+static void* _priv_fun (void *arg)
+{
+ priv_arg_t *priv_arg = arg;
+ thread_self = priv_arg->td;
+ /* We wait until the initialization of the thread is finished. */
+ pthread_mutex_lock (&priv_arg->lock);
+ while (!priv_arg->status)
+ pthread_cond_wait (&priv_arg->cond, &priv_arg->lock);
+ pthread_mutex_unlock (&priv_arg->lock);
+
+ priv_arg->fun(priv_arg->arg);
+ free (priv_arg->arg);
+ _thread_cleanup ();
+ return NULL;
+}
+
+ddekit_thread_t *ddekit_thread_create(void (*fun)(void *), void *arg, const char *name) {
+ ddekit_thread_t *td = (ddekit_thread_t *) malloc (sizeof (*td));
+ setup_thread (td, name);
+
+ priv_arg_t *priv_arg = (priv_arg_t *) malloc (sizeof (*priv_arg));
+ priv_arg->fun = fun;
+ priv_arg->arg = arg;
+ priv_arg->td = td;
+ pthread_cond_init (&priv_arg->cond, NULL);
+ pthread_mutex_init (&priv_arg->lock, NULL);
+ priv_arg->status = 0;
+
+ pthread_create (&td->thread, NULL, _priv_fun, priv_arg);
+ pthread_detach (td->thread);
+
+ /* Tell the new thread that initialization has been finished. */
+ pthread_mutex_lock (&priv_arg->lock);
+ priv_arg->status = 1;
+ pthread_cond_signal (&priv_arg->cond);
+ pthread_mutex_unlock (&priv_arg->lock);
+
+ return td;
+}
+
+ddekit_thread_t *ddekit_thread_myself(void) {
+ return thread_self;
+}
+
+void ddekit_thread_set_data(ddekit_thread_t *thread, void *data) {
+ thread->user = data;
+}
+
+void ddekit_thread_set_my_data(void *data) {
+ ddekit_thread_set_data(ddekit_thread_myself(), data);
+}
+
+void *ddekit_thread_get_data(ddekit_thread_t *thread) {
+ return thread->user;
+}
+
+void *ddekit_thread_get_my_data() {
+ return ddekit_thread_get_data(ddekit_thread_myself());
+}
+
+void ddekit_thread_msleep(unsigned long msecs) {
+ int ret;
+ struct timespec rgt;
+
+ rgt.tv_sec = (time_t) (msecs / 1000);
+ rgt.tv_nsec = (msecs % 1000) * 1000 * 1000;
+ ret = nanosleep (&rgt , NULL);
+ if (ret < 0)
+ error (0, errno, "nanosleep");
+}
+
+void ddekit_thread_usleep(unsigned long usecs) {
+ int ret;
+ struct timespec rgt;
+
+ rgt.tv_sec = (time_t) (usecs / 1000 / 1000);
+ rgt.tv_nsec = (usecs % (1000 * 1000)) * 1000;
+ ret = nanosleep (&rgt , NULL);
+ if (ret < 0)
+ error (0, errno, "nanosleep");
+}
+
+
+void ddekit_thread_nsleep(unsigned long nsecs) {
+ int ret;
+ struct timespec rgt;
+
+ rgt.tv_sec = (time_t) (nsecs / 1000 / 1000 / 1000);
+ rgt.tv_nsec = nsecs % (1000 * 1000 * 1000);
+ ret = nanosleep (&rgt , NULL);
+ if (ret < 0)
+ error (0, errno, "nanosleep");
+}
+
+void ddekit_thread_sleep(ddekit_lock_t *lock) {
+ // TODO pthread_cond_wait cannot guarantee that the thread is
+ // woke up by another thread, maybe by signals.
+ // Does it matter here?
+ // If it does, use pthread_hurd_cond_wait_np.
+ ddekit_condvar_wait (thread_self->private->sleep_cond, lock);
+}
+
+void ddekit_thread_wakeup(ddekit_thread_t *td) {
+ if (td->private == NULL)
+ return;
+ ddekit_condvar_signal (td->private->sleep_cond);
+}
+
+void ddekit_thread_exit() {
+ _thread_cleanup ();
+ pthread_exit (NULL);
+}
+
+const char *ddekit_thread_get_name(ddekit_thread_t *thread) {
+ return thread->name;
+}
+
+void ddekit_thread_schedule(void)
+{
+ swtch_pri (0);
+}
+
+void ddekit_yield(void)
+{
+ swtch_pri (0);
+}
+
+void ddekit_init_threads() {
+ ddekit_thread_setup_myself ("main");
+}
+
+/**********************************************************************
+ * semaphore
+ **********************************************************************/
+
+/* Block THREAD. */
+static error_t _timedblock (struct _ddekit_private_data *data,
+ const int timeout)
+{
+ error_t err;
+ mach_msg_header_t msg;
+
+ assert (timeout > 0);
+
+ err = mach_msg (&msg, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
+ sizeof msg, data->wakeupmsg.msgh_remote_port,
+ timeout, MACH_PORT_NULL);
+ if (err == EMACH_RCV_TIMED_OUT)
+ return ETIMEDOUT;
+
+ assert_perror (err);
+ return 0;
+}
+
+/* Block THREAD. */
+static void _block (struct _ddekit_private_data *data)
+{
+ mach_msg_header_t msg;
+ error_t err;
+
+ err = mach_msg (&msg, MACH_RCV_MSG, 0, sizeof msg,
+ data->wakeupmsg.msgh_remote_port,
+ MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
+ assert_perror (err);
+}
+
+static int _sem_timedwait_internal (ddekit_sem_t *restrict sem,
+ const int timeout)
+{
+ pthread_spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Successful down. */
+ sem->value --;
+ pthread_spin_unlock (&sem->lock);
+ return 0;
+ }
+
+ if (timeout < 0) {
+ pthread_spin_unlock (&sem->lock);
+ errno = EINVAL;
+ return -1;
+ }
+
+ /* Add ourselves to the queue. */
+ add_entry_head (&sem->head, &thread_self->private->list);
+ pthread_spin_unlock (&sem->lock);
+
+ /* Block the thread. */
+ if (timeout) {
+ error_t err;
+
+ err = _timedblock (thread_self->private, timeout);
+ if (err) {
+ /* We timed out. We may need to disconnect ourself from the
+ waiter queue.
+
+ FIXME: What do we do if we get a wakeup message before we
+ disconnect ourself? It may remain until the next time we
+ block. */
+ assert (err == ETIMEDOUT);
+
+ pthread_spin_lock (&sem->lock);
+ remove_entry (&thread_self->private->list);
+ pthread_spin_unlock (&sem->lock);
+
+ errno = err;
+ return -1;
+ }
+ }
+ else
+ _block (thread_self->private);
+
+ return 0;
+}
+
+/* Wakeup THREAD. */
+static void _thread_wakeup (struct _ddekit_private_data *data)
+{
+ error_t err;
+
+ err = mach_msg (&data->wakeupmsg, MACH_SEND_MSG,
+ sizeof (data->wakeupmsg), 0, MACH_PORT_NULL,
+ MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
+ assert_perror (err);
+}
+
+ddekit_sem_t *ddekit_sem_init(int value) {
+ ddekit_sem_t *sem =
+ (ddekit_sem_t *) ddekit_simple_malloc (sizeof (*sem));
+
+ sem->lock = PTHREAD_SPINLOCK_INITIALIZER;
+ sem->head.prev = &sem->head;
+ sem->head.next = &sem->head;
+ sem->value = value;
+ return sem;
+}
+
+void ddekit_sem_deinit(ddekit_sem_t *sem) {
+ if (!EMPTY_LIST (&sem->head)) {
+ error (0, EBUSY, "ddekit_sem_deinit");
+ }
+ else
+ ddekit_simple_free(sem);
+}
+
+void ddekit_sem_down(ddekit_sem_t *sem) {
+ _sem_timedwait_internal (sem, 0);
+}
+
+/* returns 0 on success, != 0 when it would block */
+int ddekit_sem_down_try(ddekit_sem_t *sem) {
+ pthread_spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Successful down. */
+ sem->value --;
+ pthread_spin_unlock (&sem->lock);
+ return 0;
+ }
+ pthread_spin_unlock (&sem->lock);
+
+ return -1;
+}
+
+/* returns 0 on success, != 0 on timeout */
+int ddekit_sem_down_timed(ddekit_sem_t *sem, int timo) {
+ /* wait for up to timo milliseconds */
+ return _sem_timedwait_internal (sem, timo);
+}
+
+void ddekit_sem_up(ddekit_sem_t *sem) {
+ struct _ddekit_private_data *wakeup;
+
+ pthread_spin_lock (&sem->lock);
+ if (sem->value > 0) {
+ /* Do a quick up. */
+ assert (EMPTY_LIST (&sem->head));
+ sem->value ++;
+ pthread_spin_unlock (&sem->lock);
+ return;
+ }
+
+ if (EMPTY_LIST (&sem->head)) {
+ /* No one waiting. */
+ sem->value = 1;
+ pthread_spin_unlock (&sem->lock);
+ return;
+ }
+
+ /* Wake someone up. */
+
+ /* First dequeue someone. */
+ wakeup = LIST_ENTRY (remove_entry_end (&sem->head),
+ struct _ddekit_private_data, list);
+
+ /* Then drop the lock and transfer control. */
+ pthread_spin_unlock (&sem->lock);
+ if (wakeup)
+ _thread_wakeup (wakeup);
+}
+