Skip to content

Commit 2ed1d35

Browse files
committed
Add support for a new ReadableStream "owning" type.
1 parent 2942e89 commit 2ed1d35

13 files changed

+132
-55
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ jobs:
1919
submodules: true
2020
- uses: actions/setup-node@v1
2121
with:
22-
node-version: 14
22+
node-version: 19
2323
- run: npm install
2424
- run: npm test

index.bs

Lines changed: 71 additions & 30 deletions
Large diffs are not rendered by default.

reference-implementation/lib/ReadableStream-impl.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
2929
this, underlyingSource, underlyingSourceDict, highWaterMark
3030
);
3131
} else {
32-
assert(!('type' in underlyingSourceDict));
32+
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning');
3333
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
3434
const highWaterMark = ExtractHighWaterMark(strategy, 1);
3535
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(

reference-implementation/lib/ReadableStreamDefaultController-impl.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
1717
aos.ReadableStreamDefaultControllerClose(this);
1818
}
1919

20-
enqueue(chunk) {
20+
enqueue(chunk, options) {
21+
const transferList = options ? options.transfer : undefined;
2122
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
2223
throw new TypeError('The stream is not in a state that permits enqueue');
2324
}
2425

25-
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
26+
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList);
2627
}
2728

2829
error(e) {
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
dictionary StructuredSerializeOptions {
2+
sequence<object> transfer = [];
3+
};
4+
15
[Exposed=(Window,Worker,Worklet)]
26
interface ReadableStreamDefaultController {
37
readonly attribute unrestricted double? desiredSize;
48

59
undefined close();
6-
undefined enqueue(optional any chunk);
10+
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
711
undefined error(optional any e);
812
};

reference-implementation/lib/UnderlyingSource.webidl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
1212
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
1313
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);
1414

15-
enum ReadableStreamType { "bytes" };
15+
enum ReadableStreamType { "bytes", "owning" };

reference-implementation/lib/abstract-ops/miscellaneous.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ exports.CloneAsUint8Array = O => {
2020
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
2121
return new Uint8Array(buffer);
2222
};
23+
24+
exports.StructuredTransferOrClone = (value, transferList) => {
25+
return globalThis.structuredClone(value, { transfer: transferList });
26+
};

reference-implementation/lib/abstract-ops/queue-with-sizes.js

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22
const assert = require('assert');
3-
const { IsNonNegativeNumber } = require('./miscellaneous.js');
3+
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
44

55
exports.DequeueValue = container => {
66
assert('_queue' in container && '_queueTotalSize' in container);
@@ -15,7 +15,7 @@ exports.DequeueValue = container => {
1515
return pair.value;
1616
};
1717

18-
exports.EnqueueValueWithSize = (container, value, size) => {
18+
exports.EnqueueValueWithSize = (container, value, size, transferList) => {
1919
assert('_queue' in container && '_queueTotalSize' in container);
2020

2121
if (!IsNonNegativeNumber(size)) {
@@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
2424
if (size === Infinity) {
2525
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
2626
}
27-
27+
if (container._isOwning) {
28+
value = StructuredTransferOrClone(value, transferList);
29+
}
2830
container._queue.push({ value, size });
2931
container._queueTotalSize += size;
3032
};
@@ -40,6 +42,18 @@ exports.PeekQueueValue = container => {
4042
exports.ResetQueue = container => {
4143
assert('_queue' in container && '_queueTotalSize' in container);
4244

45+
if (container._isOwning) {
46+
while (container._queue.length > 0) {
47+
const value = exports.DequeueValue(container);
48+
if (typeof value.close === 'function') {
49+
try {
50+
value.close();
51+
} catch (closeException) {
52+
// Nothing to do.
53+
}
54+
}
55+
}
56+
}
4357
container._queue = [];
4458
container._queueTotalSize = 0;
4559
};

reference-implementation/lib/abstract-ops/readable-streams.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
66
require('../helpers/webidl.js');
77
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
88
require('./ecmascript.js');
9-
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
9+
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
1010
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
1111
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
1212
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
@@ -89,7 +89,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi
8989

9090
const controller = ReadableStreamDefaultController.new(globalThis);
9191
SetUpReadableStreamDefaultController(
92-
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
92+
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false
9393
);
9494

9595
return stream;
@@ -340,7 +340,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
340340
if (ReadableByteStreamController.isImpl(stream._controller)) {
341341
return ReadableByteStreamTee(stream);
342342
}
343-
return ReadableStreamDefaultTee(stream, cloneForBranch2);
343+
return ReadableStreamDefaultTee(stream, stream._controller._isOwning ? true : cloneForBranch2);
344344
}
345345

346346
function ReadableStreamDefaultTee(stream, cloneForBranch2) {
@@ -392,10 +392,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
392392
// }
393393

394394
if (canceled1 === false) {
395-
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
395+
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined);
396396
}
397397
if (canceled2 === false) {
398-
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
398+
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined);
399399
}
400400

401401
reading = false;
@@ -1074,14 +1074,22 @@ function ReadableStreamDefaultControllerClose(controller) {
10741074
}
10751075
}
10761076

1077-
function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
1077+
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) {
10781078
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
10791079
return;
10801080
}
10811081

10821082
const stream = controller._stream;
10831083

10841084
if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
1085+
if (controller._isOwning) {
1086+
try {
1087+
chunk = StructuredTransferOrClone(chunk, transferList);
1088+
} catch (chunkCloneError) {
1089+
ReadableStreamDefaultControllerError(controller, chunkCloneError);
1090+
throw chunkCloneError;
1091+
}
1092+
}
10851093
ReadableStreamFulfillReadRequest(stream, chunk, false);
10861094
} else {
10871095
let chunkSize;
@@ -1093,7 +1101,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
10931101
}
10941102

10951103
try {
1096-
EnqueueValueWithSize(controller, chunk, chunkSize);
1104+
EnqueueValueWithSize(controller, chunk, chunkSize, transferList);
10971105
} catch (enqueueE) {
10981106
ReadableStreamDefaultControllerError(controller, enqueueE);
10991107
throw enqueueE;
@@ -1148,7 +1156,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
11481156
}
11491157

11501158
function SetUpReadableStreamDefaultController(
1151-
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
1159+
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
11521160
assert(stream._controller === undefined);
11531161

11541162
controller._stream = stream;
@@ -1169,6 +1177,8 @@ function SetUpReadableStreamDefaultController(
11691177
controller._pullAlgorithm = pullAlgorithm;
11701178
controller._cancelAlgorithm = cancelAlgorithm;
11711179

1180+
controller._isOwning = isOwning;
1181+
11721182
stream._controller = controller;
11731183

11741184
const startResult = startAlgorithm();
@@ -1195,7 +1205,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
11951205
let startAlgorithm = () => undefined;
11961206
let pullAlgorithm = () => promiseResolvedWith(undefined);
11971207
let cancelAlgorithm = () => promiseResolvedWith(undefined);
1198-
1208+
const isOwning = underlyingSourceDict.type === 'owning';
11991209
if ('start' in underlyingSourceDict) {
12001210
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
12011211
}
@@ -1207,8 +1217,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
12071217
}
12081218

12091219
SetUpReadableStreamDefaultController(
1210-
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
1211-
);
1220+
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
1221+
isOwning);
12121222
}
12131223

12141224
// Byte stream controllers

reference-implementation/lib/abstract-ops/transform-streams.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
155155
// accept TransformStreamDefaultControllerEnqueue() calls.
156156

157157
try {
158-
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
158+
ReadableStreamDefaultControllerEnqueue(readableController, chunk, undefined);
159159
} catch (e) {
160160
// This happens when readableStrategy.size() throws.
161161
TransformStreamErrorWritableAndUnblockWrite(stream, e);

0 commit comments

Comments
 (0)