diff --git a/AK/AsyncStreamTransform.h b/AK/AsyncStreamTransform.h new file mode 100644 index 00000000000..008a9878d15 --- /dev/null +++ b/AK/AsyncStreamTransform.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024, Dan Klishch + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include + +namespace AK { + +template +class AsyncStreamTransform : public AsyncInputStream { +public: + AsyncStreamTransform(MaybeOwned&& stream, AK::Generator>&& generator) + : m_stream(move(stream)) + , m_generator(move(generator)) + { + } + + ~AsyncStreamTransform() + { + // 1. Assert that nobody is awaiting on the resource. + VERIFY(!m_generator_has_awaiters); + + // 2. If resource is open, perform Reset AO. + if (is_open()) + reset(); + } + + void reset() override + { + VERIFY(is_open()); + m_stream->reset(); + if (!m_generator_has_awaiters) + m_generator.destroy(); + m_is_open = false; + } + + Coroutine> close() override + { + VERIFY(is_open()); + TemporaryChange await_guard(m_generator_has_awaiters, true); + + if (!m_generator.is_done()) { + Variant> chunk_or_eof = co_await m_generator.next(); + if (chunk_or_eof.has()) { + reset(); + co_return Error::from_errno(EBUSY); + } else { + m_is_open = false; + auto& error_or_eof = chunk_or_eof.get>(); + if (error_or_eof.is_error()) { + co_return error_or_eof.release_error(); + } else { + if (m_stream.is_owned()) + CO_TRY(co_await m_stream->close()); + co_return {}; + } + } + } else { + m_is_open = false; + if (m_stream.is_owned()) + CO_TRY(co_await m_stream->close()); + co_return {}; + } + } + + bool is_open() const override + { + return m_is_open; + } + + Coroutine> enqueue_some(Badge) override + { + VERIFY(is_open()); + TemporaryChange await_guard(m_generator_has_awaiters, true); + + if (m_generator.is_done()) + co_return false; + + Variant> chunk_or_eof = co_await m_generator.next(); + if (chunk_or_eof.has()) { + co_return true; + } else { + auto& error_or_eof = chunk_or_eof.get>(); + if (error_or_eof.is_error()) { + m_is_open = false; + co_return error_or_eof.release_error(); + } else { + co_return false; + } + } + } + +protected: + using Generator = AK::Generator>; + + MaybeOwned m_stream; + +private: + Generator m_generator; + bool m_is_open { true }; + bool m_generator_has_awaiters { false }; +}; + +} + +#ifdef USING_AK_GLOBALLY +using AK::AsyncStreamTransform; +#endif