Implementing specifications using WHATWG Streams API
Streams API is a modern way to produce and consume data progressively and asynchronously. Multiple specifications are starting to use it, namely Fetch, File Stream, WebTransport, and so on. This documentation will briefly explain how to implement such specifications in Gecko.
Calling functions on stream objects
You can mostly follow the steps in a given spec as-is, as the implementation in Gecko is deliberately written in a way that a given spec prose can match 1:1 to a function call. Let’s say the spec says:
Enqueue
view
intostream
.
The prose can be written in C++ as:
stream->EnqueueNative(cx, view, rv);
Note that the function name ends with Native
to disambiguate itself from the Web IDL enqueue
method. See the list below for the complete mapping between spec terms and functions.
Creating a stream
The stream creation can be generally done by calling CreateNative()
. You may need to call something else if the spec:
Wants a byte stream and uses the term “Set up with byte reading support”. In that case you need to call
ByteNative
variant.Defines a new interface that inherits the base stream interfaces. In this case you need to define a subclass and call
SetUpNative()
inside its init method.To make the cycle collection happy, you need to pass
HoldDropJSObjectsCaller::Explicit
to the superclass constructor and callmozilla::HoldJSObjects(this)
/mozilla::DropJSObjects(this)
respectively in the constructor/destructor.
Both CreateNative()
/SetUpNative()
functions require an argument to implement custom algorithms for callbacks, whose corresponding spec phrases could be:
Let
readable
be a newReadableStream
.Let
pullAlgorithm
be the following steps:
(…)
Set up
stream
withpullAlgorithm
set topullAlgorithm
.
This can roughly translate to the following C++:
class MySourceAlgorithms : UnderlyingSourceAlgorithmsWrapper {
already_AddRefed<Promise> PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController,
ErrorResult& aRv) override;
};
already_AddRefed<ReadableStream> CreateMyReadableStream(
JSContext* aCx, nsIGlobalObject* aGlobal, ErrorResult& aRv) {
// Step 2: Let pullAlgorithm be the following steps:
auto algorithms = MakeRefPtr<MySourceAlgorithms>();
// Step 1: Let readable be a new ReadableStream.
// Step 3: Set up stream with pullAlgorithm set to pullAlgorithm.
RefPtr<ReadableStream> readable = ReadableStream::CreateNative(
aCx,
aGlobal,
*algorithms,
/* aHighWaterMark */ Nothing(),
/* aSizeAlgorithm */ nullptr,
aRv
);
}
Note that the new ReadableStream()
and “Set up” steps are done together inside CreateNative()
for convenience. For subclasses this needs to be split again:
class MyReadableStream : public ReadableStream {
public:
MyReadableStream(nsIGlobalObject* aGlobal)
: ReadableStream(aGlobal, ReadableStream::HoldDropJSObjectsCaller::Explicit) {
mozilla::HoldJSObjects(this);
}
~MyReadableStream() {
mozilla::DropJSObjects(this);
}
void Init(ErrorResult& aRv) {
// Step 2: Let pullAlgorithm be the following steps:
auto algorithms = MakeRefPtr<MySourceAlgorithms>();
// Step 3: Set up stream with pullAlgorithm set to pullAlgorithm.
//
// NOTE:
// For now there's no SetUpNative but only SetUpByteNative.
// File a bug on DOM: Streams if you need to create a subclass
// for non-byte ReadableStream.
SetUpNative(aCx, *algorithms, Nothing(), nullptr, aRv);
}
}
After creating the stream with the algorithms, the rough flow will look like this:
sequenceDiagram JavaScript->>ReadableStream: await reader.read() ReadableStream->>UnderlyingSourceAlgorithmsWrapper: PullCallback() UnderlyingSourceAlgorithmsWrapper->>(Data source): (implementation detail) NOTE left of (Data source): (Can be file IO, network IO, etc.) (Data source)->>UnderlyingSourceAlgorithmsWrapper: (notifies back) UnderlyingSourceAlgorithmsWrapper->>ReadableStream: EnqueueNative() ReadableStream->>JavaScript: Resolves reader.read()
Implementing the callbacks
As the flow says, the real implementation will be done inside the algorithms, in this case PullCallbackImpl(). Let’s say there’s a spec term:
Let
pullAlgorithm
be the following steps:
Enqueue a JavaScript string value “Hello Fox!”.
This can translate to the following C++:
class MySourceAlgorithms : UnderlyingSourceAlgorithmsWrapper {
// Step 1: Let `pullAlgorithm` be the following steps:
already_AddRefed<Promise> PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
RefPtr<ReadableStream> stream = aController.Stream();
// Step 1.1: Enqueue a JavaScript string value "Hello Fox!".
JS::Rooted<JSString*> hello(aCx, JS_NewStringCopyZ(aCx, "Hello Fox!"));
stream->EnqueueNative(aCx, JS::StringValue(hello), aRv);
// Return a promise if the task is asynchronous, or nullptr if not.
return nullptr;
// NOTE:
// Please don't use aController directly, as it's more for JavaScript.
// The *Native() functions are safer with additional assertions and more
// automatic state management.
// Please file a bug if there's no *Native() function that fulfills your need.
// In the future this function should receive a ReadableStream instead.
// Also note that you'll need to touch JS APIs frequently as the functions
// often expect JS::Value.
};
};
Note that PullCallbackImpl
returns a promise. The function will not be called again until the promise resolves. The call sequence would be roughly look like the following with repeated read requests:
await read()
from JSPullCallbackImpl()
call, which returns a PromiseThe second
await read()
from JS(Time flies)
The promise resolves
The second
PullCallbackImpl()
call
The same applies to write and transform callbacks in WritableStream
and TransformStream
, except they use UnderlyingSinkAlgorithmsWrapper
and TransformerAlgorithmsWrapper
respectively.
Exposing existing XPCOM streams as WHATWG Streams
You may simply want to expose an existing XPCOM stream to JavaScript without any more customization. Fortunately there are some helper functions for this. You can use:
InputToReadableStreamAlgorithms
to send data from nsIAsyncInputStream to ReadableStreamWritableStreamToOutputAlgorithms
to receive data from WritableStream to nsIAsyncOutputStream
The usage would look like the following:
// For nsIAsyncInputStream:
already_AddRefed<ReadableStream> ConvertInputStreamToReadableStream(
JSContext* aCx, nsIGlobalObject* aGlobal, nsIAsyncInputStream* aInput,
ErrorResult& aRv) {
auto algorithms = MakeRefPtr<InputToReadableStreamAlgorithms>(
stream->GetParentObject(), aInput);
return do_AddRef(ReadableStream::CreateNative(aCx, aGlobal, *algorithms,
Nothing(), nullptr, aRv));
}
// For nsIAsyncOutputStream
already_AddRefed<ReadableStream> ConvertOutputStreamToWritableStream(
JSContext* aCx, nsIGlobalObject* aGlobal, nsIAsyncOutputStream* aInput,
ErrorResult& aRv) {
auto algorithms = MakeRefPtr<WritableStreamToOutputAlgorithms>(
stream->GetParentObject(), aInput);
return do_AddRef(WritableStream::CreateNative(aCx, aGlobal, *algorithms,
Nothing(), nullptr, aRv));
}
Mapping spec terms to functions
-
Set up with byte reading support:
CreateByteNative()
: You can call this when the spec uses the term withnew ReadableStream
.SetUpByteNative()
: You need to use this instead when the spec uses the term with a subclass ofReadableStream
. Call this inside the constructor of the subclass.
-
Read a chunk on reader:
ReadChunk()
on ReadableStreamDefaultReader
-
-
CreateNative()
: You can call this when the spec uses the term withnew WritableStream
.SetUpNative()
: You need to use this instead when the spec uses the term with a subclass ofWritableStream
. Call this inside the constructor of the subclass.
-
TransformStream: For now this just uses the functions in TransfromStreamDefaultController, which will be provided as an argument of transform or flush algorithms.
The mapping is only implemented on demand and does not cover every function in the spec. Please file a bug on DOM: Streams component in Bugzilla if you need something that is missing here.