[qwaq] Begin move over to threading

Of course, it currently locks up, but there aren't any other threads as
of yet so the queues starve.
This commit is contained in:
Bill Currie 2020-03-20 08:48:40 +09:00
parent d2f03e8a64
commit 74ed057a5f

View file

@ -31,8 +31,12 @@
# include "config.h" # include "config.h"
#endif #endif
#include <sys/time.h>
#include <curses.h> #include <curses.h>
#include <errno.h>
#include <panel.h> #include <panel.h>
#include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -85,7 +89,7 @@ typedef enum qwaq_commands_e {
qwaq_cmd_mvwblit_line, qwaq_cmd_mvwblit_line,
} qwaq_commands; } qwaq_commands;
const char *qwaq_command_names[]= { static const char *qwaq_command_names[]= {
"newwin", "newwin",
"delwin", "delwin",
"getwrect", "getwrect",
@ -123,15 +127,24 @@ typedef struct panel_s {
int window_id; int window_id;
} panel_t; } panel_t;
typedef struct cond_s {
pthread_cond_t cond;
pthread_mutex_t mut;
} cond_t;
typedef struct qwaq_resources_s { typedef struct qwaq_resources_s {
progs_t *pr; progs_t *pr;
int initialized; int initialized;
window_t stdscr; window_t stdscr;
PR_RESMAP (window_t) window_map; PR_RESMAP (window_t) window_map;
PR_RESMAP (panel_t) panel_map; PR_RESMAP (panel_t) panel_map;
cond_t event_cond;
RING_BUFFER (qwaq_event_t, QUEUE_SIZE) event_queue; RING_BUFFER (qwaq_event_t, QUEUE_SIZE) event_queue;
cond_t command_cond;
RING_BUFFER (int, COMMAND_QUEUE_SIZE) command_queue; RING_BUFFER (int, COMMAND_QUEUE_SIZE) command_queue;
cond_t results_cond;
RING_BUFFER (int, COMMAND_QUEUE_SIZE) results; RING_BUFFER (int, COMMAND_QUEUE_SIZE) results;
cond_t string_id_cond;
RING_BUFFER (int, STRING_ID_QUEUE_SIZE) string_ids; RING_BUFFER (int, STRING_ID_QUEUE_SIZE) string_ids;
dstring_t strings[STRING_ID_QUEUE_SIZE - 1]; dstring_t strings[STRING_ID_QUEUE_SIZE - 1];
} qwaq_resources_t; } qwaq_resources_t;
@ -222,34 +235,33 @@ get_panel (qwaq_resources_t *res, const char *name, int handle)
return panel; return panel;
} }
//XXX goes away with threads
static void process_commands (qwaq_resources_t *);
static void process_input (qwaq_resources_t *);
static int static int
acquire_string (qwaq_resources_t *res) acquire_string (qwaq_resources_t *res)
{ {
int string_id = -1; int string_id = -1;
// XXX add locking and loop for available pthread_mutex_lock (&res->string_id_cond.mut);
if (!RB_DATA_AVAILABLE (res->string_ids)) { while (!RB_DATA_AVAILABLE (res->string_ids)) {
process_commands(res); pthread_cond_wait (&res->string_id_cond.cond,
&res->string_id_cond.mut);
} }
if (RB_DATA_AVAILABLE (res->string_ids)) { RB_READ_DATA (res->string_ids, &string_id, 1);
RB_READ_DATA (res->string_ids, &string_id, 1); pthread_cond_broadcast (&res->string_id_cond.cond);
} pthread_mutex_unlock (&res->string_id_cond.mut);
// XXX unlock and end of loop
return string_id; return string_id;
} }
static void static void
release_string (qwaq_resources_t *res, int string_id) release_string (qwaq_resources_t *res, int string_id)
{ {
// locking shouldn't be necessary as there should be only one writer pthread_mutex_lock (&res->string_id_cond.mut);
// but if it becomes such that there is more than one writer, locks as per while (RB_SPACE_AVAILABLE (res->string_ids)) {
// acquire pthread_cond_wait (&res->string_id_cond.cond,
if (RB_SPACE_AVAILABLE (res->string_ids)) { &res->string_id_cond.mut);
RB_WRITE_DATA (res->string_ids, &string_id, 1);
} }
RB_WRITE_DATA (res->string_ids, &string_id, 1);
pthread_cond_broadcast (&res->string_id_cond.cond);
pthread_mutex_unlock (&res->string_id_cond.mut);
} }
static void static void
@ -257,35 +269,41 @@ qwaq_submit_command (qwaq_resources_t *res, const int *cmd)
{ {
unsigned len = cmd[1]; unsigned len = cmd[1];
if (RB_SPACE_AVAILABLE (res->command_queue) >= len) { pthread_mutex_lock (&res->command_cond.mut);
RB_WRITE_DATA (res->command_queue, cmd, len); while (RB_SPACE_AVAILABLE (res->command_queue) < len) {
} else { pthread_cond_wait (&res->command_cond.cond,
PR_RunError (res->pr, "command buffer full"); &res->command_cond.mut);
} }
RB_WRITE_DATA (res->command_queue, cmd, len);
pthread_cond_broadcast (&res->command_cond.cond);
pthread_mutex_unlock (&res->command_cond.mut);
} }
static void static void
qwaq_submit_result (qwaq_resources_t *res, const int *result, unsigned len) qwaq_submit_result (qwaq_resources_t *res, const int *result, unsigned len)
{ {
// loop pthread_mutex_lock (&res->results_cond.mut);
if (RB_SPACE_AVAILABLE (res->results) >= len) { while (RB_SPACE_AVAILABLE (res->results) < len) {
RB_WRITE_DATA (res->results, result, len); pthread_cond_wait (&res->results_cond.cond,
} else { &res->results_cond.mut);
PR_RunError (res->pr, "result buffer full");
} }
RB_WRITE_DATA (res->results, result, len);
pthread_cond_broadcast (&res->results_cond.cond);
pthread_mutex_unlock (&res->results_cond.mut);
} }
static void static void
qwaq_wait_result (qwaq_resources_t *res, int *result, int cmd, unsigned len) qwaq_wait_result (qwaq_resources_t *res, int *result, int cmd, unsigned len)
{ {
// XXX should just wait on the mutex pthread_mutex_lock (&res->results_cond.mut);
process_commands (res); while (RB_DATA_AVAILABLE (res->results) < len
process_input (res); && RB_PEEK_DATA (res->results, 0) != cmd) {
// locking and loop until id is correct pthread_cond_wait (&res->results_cond.cond,
if (RB_DATA_AVAILABLE (res->results) >= len &res->results_cond.mut);
&& RB_PEEK_DATA (res->results, 0) == cmd) {
RB_READ_DATA (res->results, result, len);
} }
RB_READ_DATA (res->results, result, len);
pthread_cond_broadcast (&res->results_cond.cond);
pthread_mutex_unlock (&res->results_cond.mut);
} }
static void static void
@ -580,17 +598,53 @@ cmd_mvwblit_line (qwaq_resources_t *res)
release_string (res, chs_id); release_string (res, chs_id);
} }
static void
dump_command (qwaq_resources_t *res, int len)
{
if (0) {
qwaq_commands cmd = RB_PEEK_DATA (res->command_queue, 0);
Sys_Printf ("%s[%d]", qwaq_command_names[cmd], len);
for (int i = 2; i < len; i++) {
Sys_Printf (" %d", RB_PEEK_DATA (res->command_queue, i));
}
Sys_Printf ("\n");
}
}
static void
init_timeout (struct timespec *timeout, long time)
{
#define SEC 1000000000
struct timeval now;
gettimeofday(&now, 0);
timeout->tv_sec = now.tv_sec;
timeout->tv_nsec = now.tv_usec * 1000 + time;
if (timeout->tv_nsec > SEC) {
timeout->tv_sec += timeout->tv_nsec / SEC;
timeout->tv_nsec %= SEC;
}
}
static void static void
process_commands (qwaq_resources_t *res) process_commands (qwaq_resources_t *res)
{ {
while (RB_DATA_AVAILABLE (res->command_queue) >= 2) { struct timespec timeout;
int avail;
int len;
int ret = 0;
pthread_mutex_lock (&res->command_cond.mut);
init_timeout (&timeout, 20 * 1000000);
while (RB_DATA_AVAILABLE (res->command_queue) < 2 && ret != ETIMEDOUT) {
ret = pthread_cond_timedwait (&res->command_cond.cond,
&res->command_cond.mut, &timeout);
}
pthread_mutex_unlock (&res->command_cond.mut);
while ((avail = RB_DATA_AVAILABLE (res->command_queue)) >= 2
&& avail >= (len = RB_PEEK_DATA (res->command_queue, 1))) {
dump_command (res, len);
qwaq_commands cmd = RB_PEEK_DATA (res->command_queue, 0); qwaq_commands cmd = RB_PEEK_DATA (res->command_queue, 0);
//int len = RB_PEEK_DATA (res->command_queue, 1);
//Sys_Printf ("%s[%d]", qwaq_command_names[cmd], len);
//for (int i = 2; i < len; i++) {
// Sys_Printf (" %d", RB_PEEK_DATA (res->command_queue, i));
//}
//Sys_Printf ("\n");
switch (cmd) { switch (cmd) {
case qwaq_cmd_newwin: case qwaq_cmd_newwin:
cmd_newwin (res); cmd_newwin (res);
@ -671,39 +725,66 @@ process_commands (qwaq_resources_t *res)
cmd_mvwblit_line (res); cmd_mvwblit_line (res);
break; break;
} }
RB_DROP_DATA (res->command_queue, RB_PEEK_DATA (res->command_queue, 1)); pthread_mutex_lock (&res->command_cond.mut);
RB_DROP_DATA (res->command_queue, len);
pthread_cond_broadcast (&res->command_cond.cond);
pthread_mutex_unlock (&res->command_cond.mut);
} }
} }
static void static void
add_event (qwaq_resources_t *res, qwaq_event_t *event) add_event (qwaq_resources_t *res, qwaq_event_t *event)
{ {
// lock struct timespec timeout;
// { int merged = 0;
int ret = 0;
// merge motion events // merge motion events
pthread_mutex_lock (&res->event_cond.mut);
unsigned last = RB_DATA_AVAILABLE (res->event_queue); unsigned last = RB_DATA_AVAILABLE (res->event_queue);
if (event->what == qe_mousemove && last > 1 if (event->what == qe_mousemove && last > 1
&& RB_PEEK_DATA(res->event_queue, last - 1).what == qe_mousemove) { && RB_PEEK_DATA(res->event_queue, last - 1).what == qe_mousemove) {
RB_POKE_DATA(res->event_queue, last - 1, *event); RB_POKE_DATA(res->event_queue, last - 1, *event);
return; // unlock merged = 1;
pthread_cond_broadcast (&res->event_cond.cond);
} }
// } pthread_mutex_unlock (&res->event_cond.mut);
if (RB_SPACE_AVAILABLE (res->event_queue) >= 1) { if (merged) {
RB_WRITE_DATA (res->event_queue, event, 1); return;
} }
pthread_mutex_lock (&res->event_cond.mut);
init_timeout (&timeout, 5000 * 1000000L);
while (RB_SPACE_AVAILABLE (res->event_queue) < 1 && ret != ETIMEDOUT) {
ret = pthread_cond_timedwait (&res->event_cond.cond,
&res->event_cond.mut, &timeout);
}
RB_WRITE_DATA (res->event_queue, event, 1);
pthread_cond_broadcast (&res->event_cond.cond);
} }
static int static int
get_event (qwaq_resources_t *res, qwaq_event_t *event) get_event (qwaq_resources_t *res, qwaq_event_t *event)
{ {
if (RB_DATA_AVAILABLE (res->event_queue) >= 1) { struct timespec timeout;
if (event) { int ret = 0;
RB_READ_DATA (res->event_queue, event, 1);
} pthread_mutex_lock (&res->event_cond.mut);
return 1; init_timeout (&timeout, 20 * 1000000);
while (RB_DATA_AVAILABLE (res->event_queue) < 1 && ret != ETIMEDOUT) {
ret = pthread_cond_timedwait (&res->event_cond.cond,
&res->event_cond.mut, &timeout);
} }
event->what = qe_none; if (event) {
return 0; if (ret != ETIMEDOUT) {
RB_READ_DATA (res->event_queue, event, 1);
} else {
event->what = qe_none;
}
}
pthread_cond_broadcast (&res->event_cond.cond);
pthread_mutex_lock (&res->event_cond.mut);
return ret != ETIMEDOUT;
} }
#define M_MOVE REPORT_MOUSE_POSITION #define M_MOVE REPORT_MOUSE_POSITION
@ -1793,6 +1874,13 @@ qwaq_print (const char *fmt, va_list args)
fflush (logfile); fflush (logfile);
} }
static void
qwaq_init_cond (cond_t *cond)
{
pthread_cond_init (&cond->cond, 0);
pthread_mutex_init (&cond->mut, 0);
}
void void
BI_Init (progs_t *pr) BI_Init (progs_t *pr)
{ {
@ -1802,6 +1890,10 @@ BI_Init (progs_t *pr)
RB_WRITE_DATA (res->string_ids, &i, 1); RB_WRITE_DATA (res->string_ids, &i, 1);
res->strings[i].mem = &dstring_default_mem; res->strings[i].mem = &dstring_default_mem;
} }
qwaq_init_cond (&res->event_cond);
qwaq_init_cond (&res->command_cond);
qwaq_init_cond (&res->results_cond);
qwaq_init_cond (&res->string_id_cond);
PR_Resources_Register (pr, "qwaq", res, bi_qwaq_clear); PR_Resources_Register (pr, "qwaq", res, bi_qwaq_clear);
PR_RegisterBuiltins (pr, builtins); PR_RegisterBuiltins (pr, builtins);