/* * Copyright (c) 2018-2024, Andreas Kling * Copyright (c) 2022, the SerenityOS developers. * * SPDX-License-Identifier: BSD-2-Clause */ #pragma once #include #include #include #include #include #include #include #include #include namespace IPC { class ConnectionBase : public Core::EventReceiver { C_OBJECT_ABSTRACT(ConnectionBase); public: virtual ~ConnectionBase() override; [[nodiscard]] bool is_open() const; ErrorOr post_message(Message const&); void shutdown(); virtual void die() { } Transport& transport() { return m_transport; } protected: explicit ConnectionBase(IPC::Stub&, Transport, u32 local_endpoint_magic); virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } virtual void shutdown_with_error(Error const&); virtual OwnPtr try_parse_message(ReadonlyBytes, Queue&) = 0; OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); ErrorOr> read_as_much_as_possible_from_transport_without_blocking(); ErrorOr drain_messages_from_peer(); void try_parse_messages(Vector const& bytes, size_t& index); ErrorOr post_message(MessageBuffer); void handle_messages(); IPC::Stub& m_local_stub; Transport m_transport; RefPtr m_responsiveness_timer; Vector> m_unprocessed_messages; Queue m_unprocessed_fds; ByteBuffer m_unprocessed_bytes; u32 m_local_endpoint_magic { 0 }; struct SendQueue : public AtomicRefCounted { AK::SinglyLinkedList messages; Threading::Mutex mutex; Threading::ConditionVariable condition { mutex }; bool running { true }; }; RefPtr m_send_thread; RefPtr m_send_queue; }; template class Connection : public ConnectionBase { public: Connection(IPC::Stub& local_stub, Transport transport) : ConnectionBase(local_stub, move(transport), LocalEndpoint::static_magic()) { } template OwnPtr wait_for_specific_message() { return wait_for_specific_endpoint_message(); } template NonnullOwnPtr send_sync(Args&&... args) { MUST(post_message(RequestType(forward(args)...))); auto response = wait_for_specific_endpoint_message(); VERIFY(response); return response.release_nonnull(); } template OwnPtr send_sync_but_allow_failure(Args&&... args) { if (post_message(RequestType(forward(args)...)).is_error()) return nullptr; return wait_for_specific_endpoint_message(); } protected: template OwnPtr wait_for_specific_endpoint_message() { if (auto message = wait_for_specific_endpoint_message_impl(Endpoint::static_magic(), MessageType::static_message_id())) return message.template release_nonnull(); return {}; } virtual OwnPtr try_parse_message(ReadonlyBytes bytes, Queue& fds) override { auto local_message = LocalEndpoint::decode_message(bytes, fds); if (!local_message.is_error()) return local_message.release_value(); auto peer_message = PeerEndpoint::decode_message(bytes, fds); if (!peer_message.is_error()) return peer_message.release_value(); return nullptr; } }; }