mirror of
https://github.com/SerenityOS/serenity.git
synced 2025-01-24 02:12:09 -05:00
LibWeb: Implement Web::Streams::readable_stream_enqueue AO
This AO will be used in the Web::FileAPI::Blob::get_stream() implementation to enqueue all data in the blob to the stream. There are still plenty of cases to handle, but this appears to be enough for the basic case of reading all chunks from a readable stream until it is done.
This commit is contained in:
parent
b7b5b5763e
commit
94883866f5
2 changed files with 267 additions and 0 deletions
|
@ -6,6 +6,7 @@
|
|||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
||||
#include <LibJS/Runtime/ArrayBuffer.h>
|
||||
#include <LibJS/Runtime/PromiseCapability.h>
|
||||
#include <LibJS/Runtime/PromiseConstructor.h>
|
||||
#include <LibWeb/Bindings/ExceptionOrUtils.h>
|
||||
|
@ -1261,6 +1262,262 @@ WebIDL::ExceptionOr<void> set_up_readable_byte_stream_controller(ReadableStream&
|
|||
return {};
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readablestream-enqueue
|
||||
WebIDL::ExceptionOr<void> readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk)
|
||||
{
|
||||
// 1. If stream.[[controller]] implements ReadableStreamDefaultController,
|
||||
if (controller.has<JS::NonnullGCPtr<ReadableStreamDefaultController>>()) {
|
||||
// 1. Perform ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk).
|
||||
return readable_stream_default_controller_enqueue(controller.get<JS::NonnullGCPtr<ReadableStreamDefaultController>>(), chunk);
|
||||
}
|
||||
// 2. Otherwise,
|
||||
else {
|
||||
// 1. Assert: stream.[[controller]] implements ReadableByteStreamController.
|
||||
VERIFY(controller.has<JS::NonnullGCPtr<ReadableByteStreamController>>());
|
||||
auto readable_byte_controller = controller.get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// FIXME: 2. Assert: chunk is an ArrayBufferView.
|
||||
|
||||
// 3. Let byobView be the current BYOB request view for stream.
|
||||
auto byob_view = readable_byte_controller->byob_request();
|
||||
|
||||
// 4. If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is byobView.[[ViewedArrayBuffer]], then:
|
||||
if (byob_view) {
|
||||
// FIXME: 1. Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]].
|
||||
// FIXME: 2. Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]].
|
||||
// FIXME: 3. Perform ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]]).
|
||||
TODO();
|
||||
}
|
||||
|
||||
// 5. Otherwise, perform ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk).
|
||||
return readable_byte_stream_controller_enqueue(readable_byte_controller, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk)
|
||||
{
|
||||
auto& vm = controller.vm();
|
||||
auto& realm = controller.realm();
|
||||
|
||||
// 1. Let stream be controller.[[stream]].
|
||||
auto stream = controller.stream();
|
||||
|
||||
// 2. If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
|
||||
if (controller.close_requested() || stream->state() != ReadableStream ::State::Readable)
|
||||
return {};
|
||||
|
||||
// 3. Let buffer be chunk.[[ViewedArrayBuffer]].
|
||||
auto* typed_array = TRY(JS::typed_array_from(vm, chunk));
|
||||
auto* buffer = typed_array->viewed_array_buffer();
|
||||
|
||||
// 4. Let byteOffset be chunk.[[ByteOffset]].
|
||||
auto byte_offset = typed_array->byte_offset();
|
||||
|
||||
// 5. Let byteLength be chunk.[[ByteLength]].
|
||||
auto byte_length = typed_array->byte_length();
|
||||
|
||||
// 6. If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
|
||||
if (buffer->is_detached()) {
|
||||
auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv));
|
||||
return JS::throw_completion(error);
|
||||
}
|
||||
|
||||
// 7. Let transferredBuffer be ? TransferArrayBuffer(buffer).
|
||||
auto transferred_buffer = TRY(transfer_array_buffer(realm, *buffer));
|
||||
|
||||
// 8. If controller.[[pendingPullIntos]] is not empty,
|
||||
if (!controller.pending_pull_intos().is_empty()) {
|
||||
// 1. Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
|
||||
auto& first_pending_pull_into = controller.pending_pull_intos().first();
|
||||
|
||||
// 2. If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
|
||||
if (first_pending_pull_into.buffer->is_detached()) {
|
||||
auto error = MUST_OR_THROW_OOM(JS::TypeError::create(realm, "Buffer is detached"sv));
|
||||
return JS::throw_completion(error);
|
||||
}
|
||||
|
||||
// 3. Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
|
||||
readable_byte_stream_controller_invalidate_byob_request(controller);
|
||||
|
||||
// 4. Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
|
||||
first_pending_pull_into.buffer = TRY(transfer_array_buffer(realm, first_pending_pull_into.buffer));
|
||||
|
||||
// 5. If firstPendingPullInto’s reader type is "none", perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto).
|
||||
if (first_pending_pull_into.reader_type == ReaderType::None)
|
||||
TRY(readable_byte_stream_controller_enqueue_detached_pull_into_queue(controller, first_pending_pull_into));
|
||||
}
|
||||
|
||||
// 9. If ! ReadableStreamHasDefaultReader(stream) is true,
|
||||
if (readable_stream_has_default_reader(*stream)) {
|
||||
// 1. Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
|
||||
TRY(readable_byte_stream_controller_process_read_requests_using_queue(controller));
|
||||
|
||||
// 2. If ! ReadableStreamGetNumReadRequests(stream) is 0,
|
||||
if (readable_stream_get_num_read_requests(*stream) == 0) {
|
||||
// 1. Assert: controller.[[pendingPullIntos]] is empty.
|
||||
VERIFY(controller.pending_pull_intos().is_empty());
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
|
||||
readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length);
|
||||
}
|
||||
// 3. Otherwise.
|
||||
else {
|
||||
// 1. Assert: controller.[[queue]] is empty.
|
||||
VERIFY(controller.queue().is_empty());
|
||||
|
||||
// 2. If controller.[[pendingPullIntos]] is not empty,
|
||||
if (!controller.pending_pull_intos().is_empty()) {
|
||||
// 1. Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
|
||||
VERIFY(controller.pending_pull_intos().first().reader_type == ReaderType::Default);
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
|
||||
readable_byte_stream_controller_shift_pending_pull_into(controller);
|
||||
}
|
||||
|
||||
// 3. Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
|
||||
auto transferred_view = MUST_OR_THROW_OOM(JS::construct(vm, *realm.intrinsics().uint8_array_constructor(), transferred_buffer, JS::Value(byte_offset), JS::Value(byte_length)));
|
||||
|
||||
// 4. Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
|
||||
readable_stream_fulfill_read_request(*stream, transferred_view, false);
|
||||
}
|
||||
}
|
||||
// 10. Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
|
||||
else if (readable_stream_has_byob_reader(*stream)) {
|
||||
// FIXME: 1. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
|
||||
// FIXME: 2. Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
|
||||
TODO();
|
||||
}
|
||||
// 11. Otherwise,
|
||||
else {
|
||||
// 1. Assert: ! IsReadableStreamLocked(stream) is false.
|
||||
VERIFY(!is_readable_stream_locked(*stream));
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
|
||||
readable_byte_stream_controller_enqueue_chunk_to_queue(controller, transferred_buffer, byte_offset, byte_length);
|
||||
}
|
||||
|
||||
// 12. Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
|
||||
TRY(readable_byte_stream_controller_call_pull_if_needed(controller));
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#transfer-array-buffer
|
||||
WebIDL::ExceptionOr<JS::NonnullGCPtr<JS::ArrayBuffer>> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer)
|
||||
{
|
||||
auto& vm = realm.vm();
|
||||
|
||||
// 1. Assert: ! IsDetachedBuffer(O) is false.
|
||||
VERIFY(!buffer.is_detached());
|
||||
|
||||
// 2. Let arrayBufferData be O.[[ArrayBufferData]].
|
||||
// 3. Let arrayBufferByteLength be O.[[ArrayBufferByteLength]].
|
||||
auto array_buffer = buffer.buffer();
|
||||
|
||||
// 4. Perform ? DetachArrayBuffer(O).
|
||||
TRY(JS::detach_array_buffer(vm, buffer));
|
||||
|
||||
// 5. Return a new ArrayBuffer object, created in the current Realm, whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength.
|
||||
return JS::ArrayBuffer::create(realm, array_buffer);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor)
|
||||
{
|
||||
// 1. Assert: pullIntoDescriptor’s reader type is "none".
|
||||
VERIFY(pull_into_descriptor.reader_type == ReaderType::None);
|
||||
|
||||
// 2. If pullIntoDescriptor’s bytes filled > 0, perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
|
||||
if (pull_into_descriptor.bytes_filled > 0)
|
||||
TRY(readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(controller, pull_into_descriptor.buffer, pull_into_descriptor.byte_offset, pull_into_descriptor.bytes_filled));
|
||||
|
||||
// 3. Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
|
||||
readable_byte_stream_controller_shift_pending_pull_into(controller);
|
||||
return {};
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller)
|
||||
{
|
||||
// 1. Let reader be controller.[[stream]].[[reader]].
|
||||
auto reader = controller.stream()->reader();
|
||||
|
||||
// 2. Assert: reader implements ReadableStreamDefaultReader.
|
||||
VERIFY(reader->has<JS::NonnullGCPtr<ReadableStreamDefaultReader>>());
|
||||
|
||||
// 3. While reader.[[readRequests]] is not empty,
|
||||
auto readable_stream_default_reader = reader->get<JS::NonnullGCPtr<ReadableStreamDefaultReader>>();
|
||||
while (!readable_stream_default_reader->read_requests().is_empty()) {
|
||||
// 1. If controller.[[queueTotalSize]] is 0, return.
|
||||
if (controller.queue_total_size() == 0.0)
|
||||
return {};
|
||||
|
||||
// 2. Let readRequest be reader.[[readRequests]][0].
|
||||
// 3. Remove readRequest from reader.[[readRequests]].
|
||||
auto read_request = readable_stream_default_reader->read_requests().take_first();
|
||||
|
||||
// 4. Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
|
||||
TRY(readable_byte_stream_controller_fill_read_request_from_queue(controller, read_request));
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue
|
||||
void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr<JS::ArrayBuffer> buffer, u32 byte_offset, u32 byte_length)
|
||||
{
|
||||
// 1. Append a new readable byte stream queue entry with buffer buffer, byte offset byteOffset, and byte length byteLength to controller.[[queue]].
|
||||
controller.queue().append(ReadableByteStreamQueueEntry {
|
||||
.buffer = buffer,
|
||||
.byte_offset = byte_offset,
|
||||
.byte_length = byte_length,
|
||||
});
|
||||
|
||||
// 2. Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
|
||||
controller.set_queue_total_size(controller.queue_total_size() + byte_length);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length)
|
||||
{
|
||||
auto& vm = controller.vm();
|
||||
|
||||
// 1. Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
|
||||
auto clone_result = JS::clone_array_buffer(vm, buffer, byte_offset, byte_length);
|
||||
|
||||
// 2. If cloneResult is an abrupt completion,
|
||||
if (clone_result.is_throw_completion()) {
|
||||
auto throw_completion = Bindings::throw_dom_exception_if_needed(vm, [&] { return clone_result; }).throw_completion();
|
||||
|
||||
// 1. Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
|
||||
readable_byte_stream_controller_error(controller, throw_completion.value().value());
|
||||
|
||||
// 2. Return cloneResult.
|
||||
// Note: We need to return the throw_completion object here, as enqueue needs to throw the same object that the controller is errored with
|
||||
return throw_completion;
|
||||
}
|
||||
|
||||
// 3. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength).
|
||||
readable_byte_stream_controller_enqueue_chunk_to_queue(controller, *clone_result.release_value(), 0, byte_length);
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into
|
||||
PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller)
|
||||
{
|
||||
// 1. Assert: controller.[[byobRequest]] is null.
|
||||
VERIFY(!controller.byob_request());
|
||||
|
||||
// 2. Let descriptor be controller.[[pendingPullIntos]][0].
|
||||
// 3. Remove descriptor from controller.[[pendingPullIntos]].
|
||||
auto descriptor = controller.pending_pull_intos().take_first();
|
||||
|
||||
// 4. Return descriptor.
|
||||
return descriptor;
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
|
||||
WebIDL::ExceptionOr<void> set_up_readable_stream_controller_with_byte_reading_support(ReadableStream& stream, Optional<PullAlgorithm>&& pull_algorithm, Optional<CancelAlgorithm>&& cancel_algorithm, double high_water_mark)
|
||||
{
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
/*
|
||||
* Copyright (c) 2022, Linus Groh <linusg@serenityos.org>
|
||||
* Copyright (c) 2023, Matthew Olsson <mattco@serenityos.org>
|
||||
* Copyright (c) 2023, Shannon Booth <shannon.ml.booth@gmail.com>
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
@ -59,6 +60,15 @@ WebIDL::ExceptionOr<void> set_up_readable_stream_default_controller_from_underly
|
|||
WebIDL::ExceptionOr<void> set_up_readable_stream_controller_with_byte_reading_support(ReadableStream&, Optional<PullAlgorithm>&& = {}, Optional<CancelAlgorithm>&& = {}, double high_water_mark = 0);
|
||||
WebIDL::ExceptionOr<void> set_up_readable_byte_stream_controller(ReadableStream&, ReadableByteStreamController&, StartAlgorithm&&, PullAlgorithm&&, CancelAlgorithm&&, double high_water_mark, JS::Value auto_allocate_chunk_size);
|
||||
|
||||
WebIDL::ExceptionOr<void> readable_stream_enqueue(ReadableStreamController& controller, JS::Value chunk);
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue(ReadableByteStreamController& controller, JS::Value chunk);
|
||||
WebIDL::ExceptionOr<JS::NonnullGCPtr<JS::ArrayBuffer>> transfer_array_buffer(JS::Realm& realm, JS::ArrayBuffer& buffer);
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue_detached_pull_into_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor);
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_process_read_requests_using_queue(ReadableByteStreamController& controller);
|
||||
void readable_byte_stream_controller_enqueue_chunk_to_queue(ReadableByteStreamController& controller, JS::NonnullGCPtr<JS::ArrayBuffer> buffer, u32 byte_offset, u32 byte_length);
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_enqueue_cloned_chunk_to_queue(ReadableByteStreamController& controller, JS::ArrayBuffer& buffer, u64 byte_offset, u64 byte_length);
|
||||
PullIntoDescriptor readable_byte_stream_controller_shift_pending_pull_into(ReadableByteStreamController& controller);
|
||||
|
||||
WebIDL::ExceptionOr<void> readable_byte_stream_controller_call_pull_if_needed(ReadableByteStreamController&);
|
||||
void readable_byte_stream_controller_clear_algorithms(ReadableByteStreamController&);
|
||||
void readable_byte_stream_controller_clear_pending_pull_intos(ReadableByteStreamController&);
|
||||
|
|
Loading…
Add table
Reference in a new issue