From 2a47a61bc340185fde4873eac45ae258df707543 Mon Sep 17 00:00:00 2001 From: Bill Currie Date: Wed, 8 Jun 2022 01:57:06 +0900 Subject: [PATCH] [util] Add atomic ring buffer and automated tests Just head and tail are atomic, but it seems to work nicely (at least on intel). I actually had more trouble with gcc (due to accidentally testing lock-free with the wrong ring buffer... oops, but yup, gcc will happily optimize your loop to spin really really fast). Also served as a nice test for C11 threading. --- include/QF/ringbuffer.h | 34 +++- libs/util/test/Makemodule.am | 6 + libs/util/test/test-ringbuffer.c | 313 +++++++++++++++++++++++++++++++ 3 files changed, 349 insertions(+), 4 deletions(-) create mode 100644 libs/util/test/test-ringbuffer.c diff --git a/include/QF/ringbuffer.h b/include/QF/ringbuffer.h index 60eb5d440..d6ffa4a9b 100644 --- a/include/QF/ringbuffer.h +++ b/include/QF/ringbuffer.h @@ -37,6 +37,8 @@ * \param size The number of objects in the ring buffer. Note that the * actual capacity of the buffer is `size - 1` due to the * way ring buffers work. + * + * \note On its own, this is not thread-safe. Suitable locking is required. */ #define RING_BUFFER(type, size) \ struct { \ @@ -45,6 +47,26 @@ unsigned tail; \ } +/** Type declaration for a type-safe ring buffer with atomic head and tail. + * + * \param type The type of data element stored in the ring buffer. + * \param size The number of objects in the ring buffer. Note that the + * actual capacity of the buffer is `size - 1` due to the + * way ring buffers work. + * + * \note This is suitable only for single-reader/single-writer unless + * additional locking is provided for multi-user side. This means `head` must + * have an associated lock for multiple writers, and `tail` must have an + * associated lock for multiple readers. For multi-reader + multi-writer, it + * may be better to use RING_BUFFER with suitable locking. + */ +#define RING_BUFFER_ATOMIC(type, size) \ + struct { \ + type buffer[size]; \ + _Atomic unsigned head; \ + _Atomic unsigned tail; \ + } + #define RB_buffer_size(ring_buffer) \ ({ __auto_type rb_s = (ring_buffer); \ sizeof (rb_s->buffer) / sizeof (rb_s->buffer[0]); \ @@ -95,7 +117,7 @@ const typeof (rb->buffer[0]) *d = (data); \ unsigned c = (count); \ unsigned h = rb->head; \ - rb->head = (h + c) % RB_buffer_size (rb); \ + unsigned new_head = (h + c) % RB_buffer_size (rb); \ if (c > RB_buffer_size (rb) - h) { \ memcpy (rb->buffer + h, d, \ (RB_buffer_size (rb) - h) * sizeof (rb->buffer[0])); \ @@ -104,6 +126,7 @@ h = 0; \ } \ memcpy (rb->buffer + h, d, c * sizeof (rb->buffer[0])); \ + rb->head = new_head; \ }) /** Acquire \a count objects from the buffer, wrapping if necessary. @@ -122,8 +145,10 @@ ({ __auto_type rb = &(ring_buffer); \ unsigned c = (count); \ unsigned h = rb->head; \ - rb->head = (h + c) % RB_buffer_size (rb); \ - &rb->buffer[h]; \ + unsigned new_head = (h + c) % RB_buffer_size (rb); \ + __auto_type head_addr = &rb->buffer[h]; \ + rb->head = new_head; \ + head_addr; \ }) /** Read \a count objects from the buffer to \a data, wrapping if necessary. @@ -145,6 +170,7 @@ unsigned c = (count); \ unsigned oc = c; \ unsigned t = rb->tail; \ + unsigned new_tail = (t + oc) % RB_buffer_size (rb); \ if (c > RB_buffer_size (rb) - t) { \ memcpy (d, rb->buffer + t, \ (RB_buffer_size (rb) - t) * sizeof (rb->buffer[0])); \ @@ -153,7 +179,7 @@ t = 0; \ } \ memcpy (d, rb->buffer + t, c * sizeof (rb->buffer[0])); \ - rb->tail = (t + oc) % RB_buffer_size (rb); \ + rb->tail = new_tail; \ }) /** Discard \a count objects from the ring buffer. diff --git a/libs/util/test/Makemodule.am b/libs/util/test/Makemodule.am index 1272b7242..64cc8b9e4 100644 --- a/libs/util/test/Makemodule.am +++ b/libs/util/test/Makemodule.am @@ -18,6 +18,7 @@ libs_util_tests = \ libs/util/test/test-pqueue \ libs/util/test/test-qfs \ libs/util/test/test-quat \ + libs/util/test/test-ringbuffer \ libs/util/test/test-seb \ libs/util/test/test-sebvf \ libs/util/test/test-seg \ @@ -106,6 +107,11 @@ libs_util_test_test_quat_SOURCES=libs/util/test/test-quat.c libs_util_test_test_quat_LDADD=libs/util/libQFutil.la libs_util_test_test_quat_DEPENDENCIES=libs/util/libQFutil.la +libs_util_test_test_ringbuffer_SOURCES=libs/util/test/test-ringbuffer.c +libs_util_test_test_ringbuffer_LDADD=libs/util/libQFutil.la +libs_util_test_test_ringbuffer_LDFLAGS=-pthread +libs_util_test_test_ringbuffer_DEPENDENCIES=libs/util/libQFutil.la + libs_util_test_test_seb_SOURCES=libs/util/test/test-seb.c libs_util_test_test_seb_LDADD=libs/util/libQFutil.la libs_util_test_test_seb_DEPENDENCIES=libs/util/libQFutil.la diff --git a/libs/util/test/test-ringbuffer.c b/libs/util/test/test-ringbuffer.c new file mode 100644 index 000000000..ece04c992 --- /dev/null +++ b/libs/util/test/test-ringbuffer.c @@ -0,0 +1,313 @@ +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include +#include +#include + +#include "QF/dstring.h" +#include "QF/mathlib.h" +#include "QF/ringbuffer.h" +#include "QF/sys.h" + +typedef int (*test_func) (int a, int b); + +typedef RING_BUFFER (int, 8) ringbuffer_t; +typedef RING_BUFFER (char, 8) ringbuffer_char_t; +typedef RING_BUFFER_ATOMIC (char, 8) ringbuffer_atomic_t; + +static ringbuffer_t int_ringbuffer = { .buffer = { [0 ... 7] = 0xdeadbeef } }; +static ringbuffer_char_t ringbuffer_char; +static ringbuffer_atomic_t ringbuffer_atomic; +mtx_t rb_mtx; +cnd_t rb_write_cnd; +cnd_t rb_read_cnd; + +ringbuffer_atomic_t int_ringbuffer_atomic; + +static int +get_size (int a, int b) +{ + return RB_buffer_size (&int_ringbuffer); +} + +static int +space_available (int a, int b) +{ + return RB_SPACE_AVAILABLE (int_ringbuffer); +} + +static int +data_available (int a, int b) +{ + return RB_DATA_AVAILABLE (int_ringbuffer); +} + +static int +write_data (int value, int b) +{ + return RB_WRITE_DATA (int_ringbuffer, &value, 1); +} + +static int +acquire (int count, int b) +{ + return *RB_ACQUIRE (int_ringbuffer, count); +} + +static int +read_data (int a, int b) +{ + int ret; + RB_READ_DATA (int_ringbuffer, &ret, 1); + return ret; +} + +static int +release (int count, int b) +{ + return RB_RELEASE (int_ringbuffer, count); +} + +static int +peek_data (int offset, int b) +{ + return *RB_PEEK_DATA (int_ringbuffer, offset); +} + +static int +poke_data (int offset, int value) +{ + return RB_POKE_DATA (int_ringbuffer, offset, value); +} + +static int +init_threading (int a, int b) +{ + if (mtx_init (&rb_mtx, mtx_plain) != thrd_success) { + return -1; + } + if (cnd_init (&rb_write_cnd) != thrd_success) { + return -1; + } + if (cnd_init (&rb_read_cnd) != thrd_success) { + return -1; + } + return 0; +} + +static const char *text = "Sometimes, it is necessary to write long gibberish " +"in order to test things. Hopefully, this may be an exception, though it's " +"not exactly the most useful or intersting text around."; + +static dstring_t *out_text; + +static int +compare_strings (int a, int b) +{ + int ret = strcmp (out_text->str, text); + dstring_delete (out_text); + out_text = 0; + return ret; +} + +static int +locked_reader (void *data) +{ + if (!out_text) { + out_text = dstring_new (); + } + dstring_clear (out_text); + + do { + mtx_lock (&rb_mtx); + while (RB_DATA_AVAILABLE (ringbuffer_char) < 1) { + cnd_wait (&rb_read_cnd, &rb_mtx); + } + unsigned len = RB_DATA_AVAILABLE (ringbuffer_char); + RB_READ_DATA (ringbuffer_char, dstring_reserve (out_text, len), len); + cnd_signal (&rb_write_cnd); + mtx_unlock (&rb_mtx); + } while (out_text->str[out_text->size - 1]); + + return 0; +} + +static int +locked_writer (void *data) +{ + size_t data_len = strlen (text) + 1; // send nul terminator too + const char *str = text; + + while (data_len > 0) { + mtx_lock (&rb_mtx); + while (RB_SPACE_AVAILABLE (ringbuffer_char) < 1) { + cnd_wait (&rb_write_cnd, &rb_mtx); + } + unsigned len = RB_SPACE_AVAILABLE (ringbuffer_char); + len = min (len, data_len); + RB_WRITE_DATA (ringbuffer_char, str, len); + str += len; + data_len -= len; + cnd_signal (&rb_read_cnd); + mtx_unlock (&rb_mtx); + } + + return 0; +} + +static int +run_locked (int a, int b) +{ + thrd_t writer; + thrd_t reader; + + if (thrd_create (&writer, locked_writer, 0) != thrd_success) { + return -1; + } + if (thrd_create (&reader, locked_reader, 0) != thrd_success) { + return -1; + } + + int res; + if (thrd_join (writer, &res) != thrd_success) { + return -1; + } + if (thrd_join (reader, &res) != thrd_success) { + return -1; + } + return 0; +} + +static int +free_reader (void *data) +{ + if (!out_text) { + out_text = dstring_new (); + } + dstring_clear (out_text); + + do { + while (RB_DATA_AVAILABLE (ringbuffer_atomic) < 1) { + } + unsigned len = RB_DATA_AVAILABLE (ringbuffer_atomic); + RB_READ_DATA (ringbuffer_atomic, dstring_reserve (out_text, len), len); + } while (out_text->str[out_text->size - 1]); + + return 0; +} + +static int +free_writer (void *data) +{ + size_t data_len = strlen (text) + 1; // send nul terminator too + const char *str = text; + + while (data_len > 0) { + while (RB_SPACE_AVAILABLE (ringbuffer_atomic) < 1) { + } + unsigned len = RB_SPACE_AVAILABLE (ringbuffer_atomic); + len = min (len, data_len); + RB_WRITE_DATA (ringbuffer_atomic, str, len); + str += len; + data_len -= len; + } + + return 0; +} + +static int +run_free (int a, int b) +{ + thrd_t writer; + thrd_t reader; + + if (thrd_create (&writer, free_writer, 0) != thrd_success) { + return -1; + } + if (thrd_create (&reader, free_reader, 0) != thrd_success) { + return -1; + } + + int res; + if (thrd_join (writer, &res) != thrd_success) { + return -1; + } + if (thrd_join (reader, &res) != thrd_success) { + return -1; + } + return 0; +} + +#define _ 0 // unused parameter + +struct { + test_func test; + int param1, param2; + int test_expect; +} tests[] = { + { get_size, _, _, 8 }, + { space_available, _, _, 7 }, + { data_available, _, _, 0 }, + { write_data, 0x600dc0de, _, 1 }, + { space_available, _, _, 6 }, + { data_available, _, _, 1 }, + { acquire, 0, _, 0xdeadbeef }, // iffy test: 0 count + { space_available, _, _, 6 }, + { data_available, _, _, 1 }, + { read_data, 1, _, 0x600dc0de }, + { space_available, _, _, 7 }, + { data_available, _, _, 0 }, + { release, 0, _, 1 }, + { space_available, _, _, 7 }, + { data_available, _, _, 0 }, + { poke_data, 0, 1, 1 }, + { poke_data, 1, 2, 2 }, + { poke_data, 2, 3, 3 }, + { poke_data, 3, 4, 4 }, + { poke_data, 4, 5, 5 }, + { poke_data, 5, 6, 6 }, + { acquire, 6, _, 1 }, + { space_available, _, _, 1 }, + { data_available, _, _, 6 }, + { write_data, 7, _, 0 }, // head wrapped to 0 + { space_available, _, _, 0 }, + { data_available, _, _, 7 }, + { peek_data, 0, _, 1 }, + { peek_data, 1, _, 2 }, + { peek_data, 2, _, 3 }, + { peek_data, 3, _, 4 }, + { peek_data, 4, _, 5 }, + { peek_data, 5, _, 6 }, + { peek_data, 6, _, 7 }, + { space_available, _, _, 0 }, + { data_available, _, _, 7 }, + { init_threading, _, _, 0 }, + { run_locked, _, _, 0 }, + { compare_strings, _, _, 0 }, + { run_free, _, _, 0 }, + { compare_strings, _, _, 0 }, +}; +#define num_tests (sizeof (tests) / sizeof (tests[0])) +int test_start_line = __LINE__ - num_tests - 2; + +int +main (int argc, const char **argv) +{ + int res = 0; + + for (size_t i = 0; i < num_tests; i++) { + int test_res; + test_res = tests[i].test (tests[i].param1, tests[i].param2); + if (test_res != tests[i].test_expect) { + res |= 1; + printf ("test %zd (line %zd) failed\n", i, i + test_start_line); + printf ("expect: %d\n", tests[i].test_expect); + printf ("got : %d\n", test_res); + } + } + + return res; +}