Kernel: Introduce ThreadBlocker as a way to make unblocking neater :)

And port all the descriptor-based blocks over to it as a proof of concept.
This commit is contained in:
Robin Burchell 2019-07-18 16:22:26 +02:00 committed by Andreas Kling
parent a27c9e3e01
commit 0c8813e6d9
6 changed files with 154 additions and 67 deletions

View file

@ -212,7 +212,7 @@ ssize_t IPv4Socket::recvfrom(FileDescription& description, void* buffer, size_t
}
load_receive_deadline();
current->block(Thread::BlockedReceive, description);
current->block(*new Thread::ThreadBlockerReceive(description));
LOCKER(lock());
if (!m_can_read) {

View file

@ -3,6 +3,7 @@
#include <Kernel/Net/Routing.h>
#include <Kernel/Net/TCP.h>
#include <Kernel/Net/TCPSocket.h>
#include <Kernel/FileSystem/FileDescription.h>
#include <Kernel/Process.h>
Lockable<HashMap<u16, TCPSocket*>>& TCPSocket::sockets_by_port()
@ -161,7 +162,7 @@ KResult TCPSocket::protocol_connect(FileDescription& description, ShouldBlock sh
m_state = State::Connecting;
if (should_block == ShouldBlock::Yes) {
current->block(Thread::BlockedConnect, description);
current->block(*new Thread::ThreadBlockerConnect(description));
ASSERT(is_connected());
return KSuccess;
}

View file

@ -900,7 +900,7 @@ ssize_t Process::do_write(FileDescription& description, const u8* data, int data
#ifdef IO_DEBUG
dbgprintf("block write on %d\n", fd);
#endif
current->block(Thread::State::BlockedWrite, description);
current->block(*new Thread::ThreadBlockerWrite(description));
}
ssize_t rc = description.write(data + nwritten, data_size - nwritten);
#ifdef IO_DEBUG
@ -962,7 +962,7 @@ ssize_t Process::sys$read(int fd, u8* buffer, ssize_t size)
return -EBADF;
if (description->is_blocking()) {
if (!description->can_read()) {
current->block(Thread::State::BlockedRead, *description);
current->block(*new Thread::ThreadBlockerRead(*description));
if (current->m_was_interrupted_while_blocked)
return -EINTR;
}
@ -2122,7 +2122,7 @@ int Process::sys$accept(int accepting_socket_fd, sockaddr* address, socklen_t* a
auto& socket = *accepting_socket_description->socket();
if (!socket.can_accept()) {
if (accepting_socket_description->is_blocking()) {
current->block(Thread::State::BlockedAccept, *accepting_socket_description);
current->block(*new Thread::ThreadBlockerAccept(*accepting_socket_description));
if (current->m_was_interrupted_while_blocked)
return -EINTR;
} else {

View file

@ -51,6 +51,88 @@ void Scheduler::beep()
s_beep_timeout = g_uptime + 100;
}
Thread::ThreadBlockerFileDescription::ThreadBlockerFileDescription(const RefPtr<FileDescription>& description)
: m_blocked_description(description)
{}
RefPtr<FileDescription> Thread::ThreadBlockerFileDescription::blocked_description() const
{
return m_blocked_description;
}
Thread::ThreadBlockerAccept::ThreadBlockerAccept(const RefPtr<FileDescription>& description)
: ThreadBlockerFileDescription(description)
{
}
bool Thread::ThreadBlockerAccept::should_unblock(time_t, long)
{
auto& description = *blocked_description();
auto& socket = *description.socket();
return socket.can_accept();
}
Thread::ThreadBlockerReceive::ThreadBlockerReceive(const RefPtr<FileDescription>& description)
: ThreadBlockerFileDescription(description)
{
}
bool Thread::ThreadBlockerReceive::should_unblock(time_t now_sec, long now_usec)
{
auto& description = *blocked_description();
auto& socket = *description.socket();
// FIXME: Block until the amount of data wanted is available.
bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec);
if (timed_out || description.can_read())
return true;
return false;
}
Thread::ThreadBlockerConnect::ThreadBlockerConnect(const RefPtr<FileDescription>& description)
: ThreadBlockerFileDescription(description)
{
}
bool Thread::ThreadBlockerConnect::should_unblock(time_t, long)
{
auto& description = *blocked_description();
auto& socket = *description.socket();
return socket.is_connected();
}
Thread::ThreadBlockerWrite::ThreadBlockerWrite(const RefPtr<FileDescription>& description)
: ThreadBlockerFileDescription(description)
{
}
bool Thread::ThreadBlockerWrite::should_unblock(time_t, long)
{
return blocked_description()->can_write();
}
Thread::ThreadBlockerRead::ThreadBlockerRead(const RefPtr<FileDescription>& description)
: ThreadBlockerFileDescription(description)
{
}
bool Thread::ThreadBlockerRead::should_unblock(time_t, long)
{
// FIXME: Block until the amount of data wanted is available.
return blocked_description()->can_read();
}
Thread::ThreadBlockerCondition::ThreadBlockerCondition(Function<bool()> &condition)
: m_block_until_condition(move(condition))
{
ASSERT(m_block_until_condition);
}
bool Thread::ThreadBlockerCondition::should_unblock(time_t, long)
{
return m_block_until_condition();
}
// Called by the scheduler on threads that are blocked for some reason.
// Make a decision as to whether to unblock them or not.
void Thread::consider_unblock(time_t now_sec, long now_usec)
@ -93,41 +175,6 @@ void Thread::consider_unblock(time_t now_sec, long now_usec)
return IterationDecision::Break;
});
return;
case Thread::BlockedRead:
ASSERT(m_blocked_description);
// FIXME: Block until the amount of data wanted is available.
if (m_blocked_description->can_read())
unblock();
return;
case Thread::BlockedWrite:
ASSERT(m_blocked_description != -1);
if (m_blocked_description->can_write())
unblock();
return;
case Thread::BlockedConnect: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
if (socket.is_connected())
unblock();
return;
}
case Thread::BlockedReceive: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
// FIXME: Block until the amount of data wanted is available.
bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec);
if (timed_out || description.can_read())
unblock();
return;
}
case Thread::BlockedAccept: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
if (socket.can_accept())
unblock();
return;
}
case Thread::BlockedSelect:
if (m_select_has_timeout) {
if (now_sec > m_select_timeout.tv_sec || (now_sec == m_select_timeout.tv_sec && now_usec >= m_select_timeout.tv_usec)) {
@ -149,9 +196,10 @@ void Thread::consider_unblock(time_t now_sec, long now_usec)
}
return;
case Thread::BlockedCondition:
if (m_block_until_condition()) {
m_block_until_condition = nullptr;
ASSERT(m_blocker);
if (m_blocker->should_unblock(now_sec, now_usec)) {
unblock();
m_blocker = nullptr;
}
return;
case Thread::Skip1SchedulerPass:

View file

@ -99,7 +99,7 @@ Thread::~Thread()
void Thread::unblock()
{
m_blocked_description = nullptr;
m_blocker = nullptr;
if (current == this) {
set_state(Thread::Running);
return;
@ -110,7 +110,7 @@ void Thread::unblock()
void Thread::block_until(Function<bool()>&& condition)
{
m_block_until_condition = move(condition);
m_blocker = make<ThreadBlockerCondition>(condition);
block(Thread::BlockedCondition);
Scheduler::yield();
}
@ -129,10 +129,10 @@ void Thread::block(Thread::State new_state)
process().big_lock().lock();
}
void Thread::block(Thread::State new_state, FileDescription& description)
void Thread::block(ThreadBlocker& blocker)
{
m_blocked_description = &description;
block(new_state);
m_blocker = &blocker;
block(Thread::BlockedCondition);
}
void Thread::sleep(u32 ticks)
@ -165,22 +165,12 @@ const char* to_string(Thread::State state)
return "Sleep";
case Thread::BlockedWait:
return "Wait";
case Thread::BlockedRead:
return "Read";
case Thread::BlockedWrite:
return "Write";
case Thread::BlockedSignal:
return "Signal";
case Thread::BlockedSelect:
return "Select";
case Thread::BlockedLurking:
return "Lurking";
case Thread::BlockedConnect:
return "Connect";
case Thread::BlockedReceive:
return "Receive";
case Thread::BlockedAccept:
return "Accepting";
case Thread::BlockedCondition:
return "Condition";
case Thread::__Begin_Blocked_States__:
@ -197,7 +187,7 @@ void Thread::finalize()
dbgprintf("Finalizing Thread %u in %s(%u)\n", tid(), m_process.name().characters(), pid());
set_state(Thread::State::Dead);
m_blocked_description = nullptr;
m_blocker = nullptr;
if (this == &m_process.main_thread())
m_process.finalize();
@ -558,7 +548,7 @@ KResult Thread::wait_for_connect(FileDescription& description)
auto& socket = *description.socket();
if (socket.is_connected())
return KSuccess;
block(Thread::State::BlockedConnect, description);
block(*new Thread::ThreadBlockerConnect(description));
Scheduler::yield();
if (!socket.is_connected())
return KResult(-ECONNREFUSED);

View file

@ -67,17 +67,66 @@ public:
BlockedLurking,
BlockedSleep,
BlockedWait,
BlockedRead,
BlockedWrite,
BlockedSignal,
BlockedSelect,
BlockedConnect,
BlockedReceive,
BlockedAccept,
BlockedCondition,
__End_Blocked_States__
};
class ThreadBlocker {
public:
virtual ~ThreadBlocker() {}
virtual bool should_unblock(time_t now_s, long us) = 0;
};
class ThreadBlockerFileDescription : public ThreadBlocker {
public:
ThreadBlockerFileDescription(const RefPtr<FileDescription>& description);
RefPtr<FileDescription> blocked_description() const;
private:
RefPtr<FileDescription> m_blocked_description;
};
class ThreadBlockerAccept : public ThreadBlockerFileDescription {
public:
ThreadBlockerAccept(const RefPtr<FileDescription>& description);
virtual bool should_unblock(time_t, long) override;
};
class ThreadBlockerReceive : public ThreadBlockerFileDescription {
public:
ThreadBlockerReceive(const RefPtr<FileDescription>& description);
virtual bool should_unblock(time_t, long) override;
};
class ThreadBlockerConnect : public ThreadBlockerFileDescription {
public:
ThreadBlockerConnect(const RefPtr<FileDescription>& description);
virtual bool should_unblock(time_t, long) override;
};
class ThreadBlockerWrite : public ThreadBlockerFileDescription {
public:
ThreadBlockerWrite(const RefPtr<FileDescription>& description);
virtual bool should_unblock(time_t, long) override;
};
class ThreadBlockerRead : public ThreadBlockerFileDescription {
public:
ThreadBlockerRead(const RefPtr<FileDescription>& description);
virtual bool should_unblock(time_t, long) override;
};
class ThreadBlockerCondition : public ThreadBlocker {
public:
ThreadBlockerCondition(Function<bool()> &condition);
virtual bool should_unblock(time_t, long) override;
private:
Function<bool()> m_block_until_condition;
};
void did_schedule() { ++m_times_scheduled; }
u32 times_scheduled() const { return m_times_scheduled; }
@ -99,7 +148,7 @@ public:
void sleep(u32 ticks);
void block(Thread::State);
void block(Thread::State, FileDescription&);
void block(ThreadBlocker& blocker);
void unblock();
void set_wakeup_time(u64 t) { m_wakeup_time = t; }
@ -186,11 +235,10 @@ private:
RefPtr<Region> m_kernel_stack_for_signal_handler_region;
pid_t m_waitee_pid { -1 };
int m_wait_options { 0 };
RefPtr<FileDescription> m_blocked_description;
timeval m_select_timeout;
SignalActionData m_signal_action_data[32];
Region* m_signal_stack_user_region { nullptr };
Function<bool()> m_block_until_condition;
OwnPtr<ThreadBlocker> m_blocker;
Vector<int> m_select_read_fds;
Vector<int> m_select_write_fds;
Vector<int> m_select_exceptional_fds;