serenity/AK/AsyncStream.h

228 lines
8.3 KiB
C
Raw Normal View History

2024-02-20 00:19:30 -05:00
/*
* Copyright (c) 2024, Dan Klishch <danilklishch@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Badge.h>
#include <AK/ByteBuffer.h>
#include <AK/Coroutine.h>
#include <AK/Error.h>
#include <AK/ScopeGuard.h>
namespace AK {
// AsyncResource represents a generic resource (e. g. POSIX file descriptor, AsyncStream, HTTP
// response body) with a failible and/or asynchronous destructor. Refer to AsynchronousDesign.md
// documentation page for a description tailored for users of the asynchronous resources.
//
// In order to correctly implement methods of AsyncResource, you first have to define (not
// necessarily in code) two abstract operations: Close and Reset. They should have the following
// semantics:
//
// * Close AO:
// 1. Assert that nobody is awaiting on a resource.
// 2. Ensure that further attempts to wait on a resource will assert.
// 3. Shutdown (possibly asynchronously) the associated low-level resource. Shutdown must ensure
// that if the state of a resource is clean, it will remain so indefinitely. The "clean"
// state is resource-specific--for example, streams might define it as "no outstanding writes
// and no unread data".
// 4. Check if the state of the resource is clean. If it is not, call Reset AO and return an
// error (preferably, EBUSY).
// 5. Free (possibly asynchronously) the associated low-level resource.
// 6. Return success.
//
// * Reset AO:
// 1. Schedule returning an error (preferably, ECANCELED) from the current resource awaiters.
// 2. Ensure that further attempts to wait on a resource will assert.
// 3. Free synchronously the associated low-level resource. Preferably, this should be done in a
// way that cleanly indicates an error for the event producer.
// 4. Return synchronously.
class AsyncResource {
AK_MAKE_NONCOPYABLE(AsyncResource);
AK_MAKE_NONMOVABLE(AsyncResource);
public:
AsyncResource() = default;
// Destructor of an AsyncResource must perform the following steps when called:
// 1. Assert that nobody is awaiting on the resource.
// 2. If resource is open, perform Reset AO.
virtual ~AsyncResource() = default;
// reset() must perform the following steps when called:
// 1. Assert that the resource is open.
// 2. Perform Reset AO.
virtual void reset() = 0;
// close() must perform the following steps when called:
// 1. Assert that the object is fully constructed. For example, a socket might assert that it is
// connected.
// 2. Assert that the resource is open.
// 3. Perform Close AO, await and return its result.
virtual Coroutine<ErrorOr<void>> close() = 0;
// Resource is said to be in an error state if either Reset AO was invoked or if an operation on
// a resource has failed and an implementation deemed the error unrecoverable. If a resource is
// being transitioned to an error state because of an internal error, Reset AO (or its
// equivalent) must be executed by an implementation. Resource is said to be open if it is not
// in a error state and Close AO has never been called on it.
virtual bool is_open() const = 0;
};
// AsyncInputStream is a base class for all asynchronous input streams. Refer to
// AsynchronousDesign.md documentation page for a description tailored for users of the streams.
//
// In order to implement a brand new AsyncInputStream, you generally have to define a destructor and
// overload six virtual functions: 3 from AsyncResource and 3 from AsyncInputStream. When
// implementing the AsyncResource interface, please note that AsyncInputStream is considered clean
// if there's no data left to be read.
class AsyncInputStream : public virtual AsyncResource {
public:
struct PeekOrEofResult {
ReadonlyBytes data;
bool is_eof;
};
AsyncInputStream() = default;
ReadonlyBytes buffered_data() const
{
VERIFY(is_open());
return buffered_data_unchecked({});
}
Coroutine<ErrorOr<PeekOrEofResult>> peek_or_eof()
{
VERIFY(is_open());
if (!m_is_reading_peek) {
m_is_reading_peek = true;
auto data = buffered_data_unchecked({});
if (!data.is_empty())
co_return PeekOrEofResult { data, false };
}
bool is_not_eof = CO_TRY(co_await enqueue_some({}));
co_return PeekOrEofResult { buffered_data_unchecked({}), !is_not_eof };
}
Coroutine<ErrorOr<ReadonlyBytes>> peek()
{
auto [data, is_eof] = CO_TRY(co_await peek_or_eof());
if (is_eof) {
reset();
co_return Error::from_errno(EIO);
}
co_return data;
}
Coroutine<ErrorOr<ReadonlyBytes>> read(size_t bytes)
{
m_is_reading_peek = false;
if (bytes) {
auto buffer = buffered_data();
while (buffer.size() < bytes) {
if (!CO_TRY(co_await enqueue_some({}))) {
reset();
co_return Error::from_errno(EIO);
}
buffer = buffered_data_unchecked({});
}
dequeue({}, bytes);
co_return buffer.slice(0, bytes);
} else {
co_return Bytes {};
}
}
template<typename T>
Coroutine<ErrorOr<T>> read_object()
{
auto bytes = CO_TRY(co_await read(sizeof(T)));
union {
T object;
char representation[sizeof(T)];
} reinterpreter = {};
memcpy(&reinterpreter, bytes.data(), sizeof(T));
co_return reinterpreter.object;
}
// If EOF has not been reached, `enqueue_some` should read at least one byte from the underlying
// stream to the internal buffer and return true. Otherwise, it must not change the buffer and
// return false. If read fails and, consequently, `enqueue_some` returns Error, it must
// perform Reset AO (or an equivalent of it). Therefore, all reading errors are considered fatal
// for AsyncInputStream. Additionally, implementation must assert if `enqueue_some` is called
// concurrently. This is the only method that can be interrupted by `reset`.
virtual Coroutine<ErrorOr<bool>> enqueue_some(Badge<AsyncInputStream>) = 0;
// `buffered_data_unchecked` should just return a view of the buffer. It must not invalidate
// previously returned views of the buffer.
virtual ReadonlyBytes buffered_data_unchecked(Badge<AsyncInputStream>) const = 0;
// `dequeue` should remove `bytes` bytes from the buffer. It is guaranteed that this amount of
// bytes will be present in the buffer at the point of the call. `dequeue` must not invalidate
// previously returned views of the buffer. There are some restrictions on `bytes` parameter
// originating from the length condition (see documentation), so if you just use
// AsyncStreamBuffer as the stream buffer, `dequeue` and `enqueue_some` will have amortized
// O(stream_length) complexity.
virtual void dequeue(Badge<AsyncInputStream>, size_t bytes) = 0;
protected:
static Badge<AsyncInputStream> badge() { return {}; }
bool m_is_reading_peek { false };
};
class AsyncOutputStream : public virtual AsyncResource {
public:
AsyncOutputStream() = default;
virtual Coroutine<ErrorOr<size_t>> write_some(ReadonlyBytes buffer) = 0;
virtual Coroutine<ErrorOr<void>> write(ReadonlySpan<ReadonlyBytes> buffers)
{
for (auto buffer : buffers) {
while (!buffer.is_empty()) {
auto nwritten = CO_TRY(co_await write_some(buffer));
buffer = buffer.slice(nwritten);
}
}
co_return {};
}
};
class AsyncStream
: public AsyncInputStream
, public AsyncOutputStream {
public:
AsyncStream() = default;
};
template<typename T>
class StreamWrapper : public virtual AsyncResource {
public:
StreamWrapper(NonnullOwnPtr<T>&& stream)
: m_stream(move(stream))
{
}
void reset() override { return m_stream->reset(); }
Coroutine<ErrorOr<void>> close() override { return m_stream->close(); }
bool is_open() const override { return m_stream->is_open(); }
protected:
NonnullOwnPtr<T> m_stream;
};
}
#ifdef USING_AK_GLOBALLY
using AK::AsyncInputStream;
using AK::AsyncOutputStream;
using AK::AsyncResource;
using AK::AsyncStream;
using AK::StreamWrapper;
#endif