serenity/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp
kleines Filmröllchen 6b13436ef6 LibCore: Introduce SharedSingleProducerCircularQueue
This new class with an admittedly long OOP-y name provides a circular
queue in shared memory. The queue is a lock-free synchronous queue
implemented with atomics, and its implementation is significantly
simplified by only accounting for one producer (and multiple consumers).
It is intended to be used as a producer-consumer communication
datastructure across processes. The original motivation behind this
class is efficient short-period transfer of audio data in userspace.

This class includes formal proofs of several correctness properties of
the main queue operations `enqueue` and `dequeue`. These proofs are not
100% complete in their existing form as the invariants they depend on
are "handwaved". This seems fine to me right now, as any proof is better
than no proof :^). Anyways, the proofs should build confidence that the
implemented algorithms, which are only roughly based on existing work,
operate correctly in even the worst-case concurrency scenarios.
2022-04-21 13:55:00 +02:00

203 lines
6.8 KiB
C++

/*
* Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include "sched.h"
#include <LibCore/SharedCircularQueue.h>
#include <LibTest/TestCase.h>
#include <LibThreading/Thread.h>
using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
// These first two cases don't multithread at all.
TEST_CASE(simple_enqueue)
{
auto queue = MUST(TestQueue::try_create());
for (size_t i = 0; i < queue.size() - 1; ++i)
EXPECT(!queue.try_enqueue((int)i).is_error());
auto result = queue.try_enqueue(0);
EXPECT(result.is_error());
EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
}
TEST_CASE(simple_dequeue)
{
auto queue = MUST(TestQueue::try_create());
auto const test_count = 10;
for (int i = 0; i < test_count; ++i)
(void)queue.try_enqueue(i);
for (int i = 0; i < test_count; ++i) {
auto const element = queue.try_dequeue();
EXPECT(!element.is_error());
EXPECT_EQ(element.value(), i);
}
}
// There is one parallel consumer, but nobody is producing at the same time.
TEST_CASE(simple_multithread)
{
auto queue = MUST(TestQueue::try_create());
auto const test_count = 10;
for (int i = 0; i < test_count; ++i)
(void)queue.try_enqueue(i);
auto second_thread = Threading::Thread::construct([&queue]() {
auto copied_queue = queue;
for (int i = 0; i < test_count; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid;
do {
result = copied_queue.try_dequeue();
if (!result.is_error())
EXPECT_EQ(result.value(), i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
if (result.is_error())
FAIL("Unexpected error while dequeueing.");
}
return 0;
});
second_thread->start();
(void)second_thread->join();
EXPECT_EQ(queue.weak_used(), (size_t)0);
}
// There is one parallel consumer and one parallel producer.
TEST_CASE(producer_consumer_multithread)
{
auto queue = MUST(TestQueue::try_create());
// Ensure that we have the possibility of filling the queue up.
auto const test_count = queue.size() * 4;
Atomic<bool> other_thread_running { false };
auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
auto copied_queue = queue;
other_thread_running.store(true);
for (size_t i = 0; i < test_count; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid;
do {
result = copied_queue.try_dequeue();
if (!result.is_error())
EXPECT_EQ(result.value(), (int)i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
if (result.is_error())
FAIL("Unexpected error while dequeueing.");
}
return 0;
});
second_thread->start();
while (!other_thread_running.load())
;
for (size_t i = 0; i < test_count; ++i) {
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
do {
result = queue.try_enqueue((int)i);
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
if (result.is_error())
FAIL("Unexpected error while enqueueing.");
}
(void)second_thread->join();
EXPECT_EQ(queue.weak_used(), (size_t)0);
}
// There are multiple parallel consumers, but nobody is producing at the same time.
TEST_CASE(multi_consumer)
{
auto queue = MUST(TestQueue::try_create());
// This needs to be divisible by 4!
size_t const test_count = queue.size() - 4;
Atomic<size_t> dequeue_count = 0;
auto threads = {
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
};
for (size_t i = 0; i < test_count; ++i)
(void)queue.try_enqueue((int)i);
for (auto thread : threads)
thread->start();
for (auto thread : threads)
(void)thread->join();
EXPECT_EQ(queue.weak_used(), (size_t)0);
EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
}
// There are multiple parallel consumers and one parallel producer.
TEST_CASE(single_producer_multi_consumer)
{
auto queue = MUST(TestQueue::try_create());
// Choose a higher number to provoke possible race conditions.
size_t const test_count = queue.size() * 8;
Atomic<size_t> dequeue_count = 0;
auto threads = {
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
};
for (auto thread : threads)
thread->start();
for (size_t i = 0; i < test_count; ++i) {
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
do {
result = queue.try_enqueue((int)i);
// After we put something in the first time, let's wait while nobody has dequeued yet.
while (dequeue_count.load() == 0)
;
// Give others time to do something.
sched_yield();
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
if (result.is_error())
FAIL("Unexpected error while enqueueing.");
}
for (auto thread : threads)
(void)thread->join();
EXPECT_EQ(queue.weak_used(), (size_t)0);
EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
}
Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
{
return [&queue, &dequeue_count, test_count]() {
auto copied_queue = queue;
for (size_t i = 0; i < test_count / 4; ++i) {
QueueError result = TestQueue::QueueStatus::Invalid;
do {
result = copied_queue.try_dequeue();
if (!result.is_error())
dequeue_count.fetch_add(1);
// Give others time to do something.
sched_yield();
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
if (result.is_error())
FAIL("Unexpected error while dequeueing.");
}
return (intptr_t)0;
};
}