From 74ed057a5fd8266d1d838f1327d0574ce613ebcd Mon Sep 17 00:00:00 2001 From: Bill Currie Date: Fri, 20 Mar 2020 08:48:40 +0900 Subject: [PATCH] [qwaq] Begin move over to threading Of course, it currently locks up, but there aren't any other threads as of yet so the queues starve. --- ruamoko/qwaq/qwaq-curses.c | 198 +++++++++++++++++++++++++++---------- 1 file changed, 145 insertions(+), 53 deletions(-) diff --git a/ruamoko/qwaq/qwaq-curses.c b/ruamoko/qwaq/qwaq-curses.c index 3837d2f8c..8f793bdac 100644 --- a/ruamoko/qwaq/qwaq-curses.c +++ b/ruamoko/qwaq/qwaq-curses.c @@ -31,8 +31,12 @@ # include "config.h" #endif +#include + #include +#include #include +#include #include #include #include @@ -85,7 +89,7 @@ typedef enum qwaq_commands_e { qwaq_cmd_mvwblit_line, } qwaq_commands; -const char *qwaq_command_names[]= { +static const char *qwaq_command_names[]= { "newwin", "delwin", "getwrect", @@ -123,15 +127,24 @@ typedef struct panel_s { int window_id; } panel_t; +typedef struct cond_s { + pthread_cond_t cond; + pthread_mutex_t mut; +} cond_t; + typedef struct qwaq_resources_s { progs_t *pr; int initialized; window_t stdscr; PR_RESMAP (window_t) window_map; PR_RESMAP (panel_t) panel_map; + cond_t event_cond; RING_BUFFER (qwaq_event_t, QUEUE_SIZE) event_queue; + cond_t command_cond; RING_BUFFER (int, COMMAND_QUEUE_SIZE) command_queue; + cond_t results_cond; RING_BUFFER (int, COMMAND_QUEUE_SIZE) results; + cond_t string_id_cond; RING_BUFFER (int, STRING_ID_QUEUE_SIZE) string_ids; dstring_t strings[STRING_ID_QUEUE_SIZE - 1]; } qwaq_resources_t; @@ -222,34 +235,33 @@ get_panel (qwaq_resources_t *res, const char *name, int handle) return panel; } -//XXX goes away with threads -static void process_commands (qwaq_resources_t *); -static void process_input (qwaq_resources_t *); static int acquire_string (qwaq_resources_t *res) { int string_id = -1; - // XXX add locking and loop for available - if (!RB_DATA_AVAILABLE (res->string_ids)) { - process_commands(res); + pthread_mutex_lock (&res->string_id_cond.mut); + while (!RB_DATA_AVAILABLE (res->string_ids)) { + pthread_cond_wait (&res->string_id_cond.cond, + &res->string_id_cond.mut); } - if (RB_DATA_AVAILABLE (res->string_ids)) { - RB_READ_DATA (res->string_ids, &string_id, 1); - } - // XXX unlock and end of loop + RB_READ_DATA (res->string_ids, &string_id, 1); + pthread_cond_broadcast (&res->string_id_cond.cond); + pthread_mutex_unlock (&res->string_id_cond.mut); return string_id; } static void release_string (qwaq_resources_t *res, int string_id) { - // locking shouldn't be necessary as there should be only one writer - // but if it becomes such that there is more than one writer, locks as per - // acquire - if (RB_SPACE_AVAILABLE (res->string_ids)) { - RB_WRITE_DATA (res->string_ids, &string_id, 1); + pthread_mutex_lock (&res->string_id_cond.mut); + while (RB_SPACE_AVAILABLE (res->string_ids)) { + pthread_cond_wait (&res->string_id_cond.cond, + &res->string_id_cond.mut); } + RB_WRITE_DATA (res->string_ids, &string_id, 1); + pthread_cond_broadcast (&res->string_id_cond.cond); + pthread_mutex_unlock (&res->string_id_cond.mut); } static void @@ -257,35 +269,41 @@ qwaq_submit_command (qwaq_resources_t *res, const int *cmd) { unsigned len = cmd[1]; - if (RB_SPACE_AVAILABLE (res->command_queue) >= len) { - RB_WRITE_DATA (res->command_queue, cmd, len); - } else { - PR_RunError (res->pr, "command buffer full"); + pthread_mutex_lock (&res->command_cond.mut); + while (RB_SPACE_AVAILABLE (res->command_queue) < len) { + pthread_cond_wait (&res->command_cond.cond, + &res->command_cond.mut); } + RB_WRITE_DATA (res->command_queue, cmd, len); + pthread_cond_broadcast (&res->command_cond.cond); + pthread_mutex_unlock (&res->command_cond.mut); } static void qwaq_submit_result (qwaq_resources_t *res, const int *result, unsigned len) { - // loop - if (RB_SPACE_AVAILABLE (res->results) >= len) { - RB_WRITE_DATA (res->results, result, len); - } else { - PR_RunError (res->pr, "result buffer full"); + pthread_mutex_lock (&res->results_cond.mut); + while (RB_SPACE_AVAILABLE (res->results) < len) { + pthread_cond_wait (&res->results_cond.cond, + &res->results_cond.mut); } + RB_WRITE_DATA (res->results, result, len); + pthread_cond_broadcast (&res->results_cond.cond); + pthread_mutex_unlock (&res->results_cond.mut); } static void qwaq_wait_result (qwaq_resources_t *res, int *result, int cmd, unsigned len) { - // XXX should just wait on the mutex - process_commands (res); - process_input (res); - // locking and loop until id is correct - if (RB_DATA_AVAILABLE (res->results) >= len - && RB_PEEK_DATA (res->results, 0) == cmd) { - RB_READ_DATA (res->results, result, len); + pthread_mutex_lock (&res->results_cond.mut); + while (RB_DATA_AVAILABLE (res->results) < len + && RB_PEEK_DATA (res->results, 0) != cmd) { + pthread_cond_wait (&res->results_cond.cond, + &res->results_cond.mut); } + RB_READ_DATA (res->results, result, len); + pthread_cond_broadcast (&res->results_cond.cond); + pthread_mutex_unlock (&res->results_cond.mut); } static void @@ -580,17 +598,53 @@ cmd_mvwblit_line (qwaq_resources_t *res) release_string (res, chs_id); } +static void +dump_command (qwaq_resources_t *res, int len) +{ + if (0) { + qwaq_commands cmd = RB_PEEK_DATA (res->command_queue, 0); + Sys_Printf ("%s[%d]", qwaq_command_names[cmd], len); + for (int i = 2; i < len; i++) { + Sys_Printf (" %d", RB_PEEK_DATA (res->command_queue, i)); + } + Sys_Printf ("\n"); + } +} + +static void +init_timeout (struct timespec *timeout, long time) +{ + #define SEC 1000000000 + struct timeval now; + gettimeofday(&now, 0); + timeout->tv_sec = now.tv_sec; + timeout->tv_nsec = now.tv_usec * 1000 + time; + if (timeout->tv_nsec > SEC) { + timeout->tv_sec += timeout->tv_nsec / SEC; + timeout->tv_nsec %= SEC; + } +} + static void process_commands (qwaq_resources_t *res) { - while (RB_DATA_AVAILABLE (res->command_queue) >= 2) { + struct timespec timeout; + int avail; + int len; + int ret = 0; + + pthread_mutex_lock (&res->command_cond.mut); + init_timeout (&timeout, 20 * 1000000); + while (RB_DATA_AVAILABLE (res->command_queue) < 2 && ret != ETIMEDOUT) { + ret = pthread_cond_timedwait (&res->command_cond.cond, + &res->command_cond.mut, &timeout); + } + pthread_mutex_unlock (&res->command_cond.mut); + + while ((avail = RB_DATA_AVAILABLE (res->command_queue)) >= 2 + && avail >= (len = RB_PEEK_DATA (res->command_queue, 1))) { + dump_command (res, len); qwaq_commands cmd = RB_PEEK_DATA (res->command_queue, 0); - //int len = RB_PEEK_DATA (res->command_queue, 1); - //Sys_Printf ("%s[%d]", qwaq_command_names[cmd], len); - //for (int i = 2; i < len; i++) { - // Sys_Printf (" %d", RB_PEEK_DATA (res->command_queue, i)); - //} - //Sys_Printf ("\n"); switch (cmd) { case qwaq_cmd_newwin: cmd_newwin (res); @@ -671,39 +725,66 @@ process_commands (qwaq_resources_t *res) cmd_mvwblit_line (res); break; } - RB_DROP_DATA (res->command_queue, RB_PEEK_DATA (res->command_queue, 1)); + pthread_mutex_lock (&res->command_cond.mut); + RB_DROP_DATA (res->command_queue, len); + pthread_cond_broadcast (&res->command_cond.cond); + pthread_mutex_unlock (&res->command_cond.mut); } } static void add_event (qwaq_resources_t *res, qwaq_event_t *event) { - // lock - // { + struct timespec timeout; + int merged = 0; + int ret = 0; + // merge motion events + pthread_mutex_lock (&res->event_cond.mut); unsigned last = RB_DATA_AVAILABLE (res->event_queue); if (event->what == qe_mousemove && last > 1 && RB_PEEK_DATA(res->event_queue, last - 1).what == qe_mousemove) { RB_POKE_DATA(res->event_queue, last - 1, *event); - return; // unlock + merged = 1; + pthread_cond_broadcast (&res->event_cond.cond); } - // } - if (RB_SPACE_AVAILABLE (res->event_queue) >= 1) { - RB_WRITE_DATA (res->event_queue, event, 1); + pthread_mutex_unlock (&res->event_cond.mut); + if (merged) { + return; } + + pthread_mutex_lock (&res->event_cond.mut); + init_timeout (&timeout, 5000 * 1000000L); + while (RB_SPACE_AVAILABLE (res->event_queue) < 1 && ret != ETIMEDOUT) { + ret = pthread_cond_timedwait (&res->event_cond.cond, + &res->event_cond.mut, &timeout); + } + RB_WRITE_DATA (res->event_queue, event, 1); + pthread_cond_broadcast (&res->event_cond.cond); } static int get_event (qwaq_resources_t *res, qwaq_event_t *event) { - if (RB_DATA_AVAILABLE (res->event_queue) >= 1) { - if (event) { - RB_READ_DATA (res->event_queue, event, 1); - } - return 1; + struct timespec timeout; + int ret = 0; + + pthread_mutex_lock (&res->event_cond.mut); + init_timeout (&timeout, 20 * 1000000); + while (RB_DATA_AVAILABLE (res->event_queue) < 1 && ret != ETIMEDOUT) { + ret = pthread_cond_timedwait (&res->event_cond.cond, + &res->event_cond.mut, &timeout); } - event->what = qe_none; - return 0; + if (event) { + if (ret != ETIMEDOUT) { + RB_READ_DATA (res->event_queue, event, 1); + } else { + event->what = qe_none; + } + } + pthread_cond_broadcast (&res->event_cond.cond); + pthread_mutex_lock (&res->event_cond.mut); + return ret != ETIMEDOUT; } #define M_MOVE REPORT_MOUSE_POSITION @@ -1793,6 +1874,13 @@ qwaq_print (const char *fmt, va_list args) fflush (logfile); } +static void +qwaq_init_cond (cond_t *cond) +{ + pthread_cond_init (&cond->cond, 0); + pthread_mutex_init (&cond->mut, 0); +} + void BI_Init (progs_t *pr) { @@ -1802,6 +1890,10 @@ BI_Init (progs_t *pr) RB_WRITE_DATA (res->string_ids, &i, 1); res->strings[i].mem = &dstring_default_mem; } + qwaq_init_cond (&res->event_cond); + qwaq_init_cond (&res->command_cond); + qwaq_init_cond (&res->results_cond); + qwaq_init_cond (&res->string_id_cond); PR_Resources_Register (pr, "qwaq", res, bi_qwaq_clear); PR_RegisterBuiltins (pr, builtins);