From 4fc4980d176f8ef3dd567749d9dcdb44c7ed3c18 Mon Sep 17 00:00:00 2001 From: Arlen Beiler <439872+Arlen22@users.noreply.github.com> Date: Tue, 30 Sep 2025 21:40:20 +0000 Subject: [PATCH 1/4] update multi-part to support streaming --- .../src/lib/multipart.node.ts | 4 +- .../multipart-parser/src/lib/multipart.ts | 104 ++++++++++++------ 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/packages/multipart-parser/src/lib/multipart.node.ts b/packages/multipart-parser/src/lib/multipart.node.ts index a90b6c65396..0dba349322c 100644 --- a/packages/multipart-parser/src/lib/multipart.node.ts +++ b/packages/multipart-parser/src/lib/multipart.node.ts @@ -19,10 +19,10 @@ import { getMultipartBoundary } from './multipart-request.ts' * @param options Options for the parser * @return A generator yielding `MultipartPart` objects */ -export function* parseMultipart( +export async function* parseMultipart( message: Buffer | Iterable, options: ParseMultipartOptions, -): Generator { +): AsyncGenerator { yield* parseMultipartWeb(message as Uint8Array | Iterable, options) } diff --git a/packages/multipart-parser/src/lib/multipart.ts b/packages/multipart-parser/src/lib/multipart.ts index ec3b4b5ec5a..95a2dbe5c6c 100644 --- a/packages/multipart-parser/src/lib/multipart.ts +++ b/packages/multipart-parser/src/lib/multipart.ts @@ -54,6 +54,9 @@ export interface ParseMultipartOptions { * Default: 2 MiB */ maxFileSize?: number + + useContentPart?: boolean + onCreatePart?(part: MultipartPart): Promise | void } /** @@ -66,10 +69,10 @@ export interface ParseMultipartOptions { * @param options Options for the parser * @return A generator that yields `MultipartPart` objects */ -export function* parseMultipart( +export async function* parseMultipart( message: Uint8Array | Iterable, options: ParseMultipartOptions, -): Generator { +): AsyncGenerator { let parser = new MultipartParser(options.boundary, { maxHeaderSize: options.maxHeaderSize, maxFileSize: options.maxFileSize, @@ -152,6 +155,9 @@ export class MultipartParser { #currentPart: MultipartPart | null = null #contentLength = 0 + #useContentPart: MultipartParserOptions['useContentPart'] + #onCreatePart: MultipartParserOptions['onCreatePart'] + constructor(boundary: string, options?: MultipartParserOptions) { this.boundary = boundary this.maxHeaderSize = options?.maxHeaderSize ?? 8 * oneKb @@ -162,6 +168,9 @@ export class MultipartParser { this.#findBoundary = createSearch(`\r\n--${boundary}`) this.#findPartialTailBoundary = createPartialTailSearch(`\r\n--${boundary}`) this.#boundaryLength = 4 + boundary.length // length of '\r\n--' + boundary + + this.#onCreatePart = options?.onCreatePart + this.#useContentPart = options?.useContentPart ?? true } /** @@ -170,7 +179,7 @@ export class MultipartParser { * @param chunk A chunk of data to write to the parser * @return A generator yielding `MultipartPart` objects as they are parsed */ - *write(chunk: Uint8Array): Generator { + async *write(chunk: Uint8Array): AsyncGenerator { if (this.#state === MultipartParserStateDone) { throw new MultipartParseError('Unexpected data after end of stream') } @@ -201,16 +210,16 @@ export class MultipartParser { let partialTailIndex = this.#findPartialTailBoundary(chunk) if (partialTailIndex === -1) { - this.#append(index === 0 ? chunk : chunk.subarray(index)) + await this.#append(index === 0 ? chunk : chunk.subarray(index)) } else { - this.#append(chunk.subarray(index, partialTailIndex)) + await this.#append(chunk.subarray(index, partialTailIndex)) this.#buffer = chunk.subarray(partialTailIndex) } break } - this.#append(chunk.subarray(index, boundaryIndex)) + await this.#append(chunk.subarray(index, boundaryIndex)) yield this.#currentPart! @@ -256,13 +265,19 @@ export class MultipartParser { throw new MaxHeaderSizeExceededError(this.maxHeaderSize) } - this.#currentPart = new MultipartPart(chunk.subarray(index, headerEndIndex), []) + const header = chunk.subarray(index, headerEndIndex); + this.#currentPart = this.#useContentPart + ? new MultipartContentPart(header, []) + : new MultipartPart(header) + this.#contentLength = 0 index = headerEndIndex + 4 // Skip header + \r\n\r\n this.#state = MultipartParserStateBody + await this.#onCreatePart?.(this.#currentPart) + continue } @@ -283,12 +298,12 @@ export class MultipartParser { } } - #append(chunk: Uint8Array): void { + async #append(chunk: Uint8Array): Promise { if (this.#contentLength + chunk.length > this.maxFileSize) { throw new MaxFileSizeExceededError(this.maxFileSize) } - this.#currentPart!.content.push(chunk) + await this.#currentPart!.append(chunk) this.#contentLength += chunk.length } @@ -313,40 +328,16 @@ const decoder = new TextDecoder('utf-8', { fatal: true }) * A part of a `multipart/*` HTTP message. */ export class MultipartPart { - /** - * The raw content of this part as an array of `Uint8Array` chunks. - */ - readonly content: Uint8Array[] #header: Uint8Array #headers?: Headers - constructor(header: Uint8Array, content: Uint8Array[]) { + constructor(header: Uint8Array) { this.#header = header - this.content = content } - /** - * The content of this part as an `ArrayBuffer`. - */ - get arrayBuffer(): ArrayBuffer { - return this.bytes.buffer as ArrayBuffer - } - - /** - * The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful - * for reading the value of files that were uploaded using `` fields. - */ - get bytes(): Uint8Array { - let buffer = new Uint8Array(this.size) - - let offset = 0 - for (let chunk of this.content) { - buffer.set(chunk, offset) - offset += chunk.length - } - - return buffer + async append(chunk: Uint8Array) { + throw new Error("Not implemented. Please assign or override this method."); } /** @@ -395,6 +386,47 @@ export class MultipartPart { return this.headers.contentDisposition.name } +} + +export class MultipartContentPart extends MultipartPart { + + /** + * The raw content of this part as an array of `Uint8Array` chunks. + */ + readonly content: Uint8Array[] + + async append(chunk: Uint8Array): Promise { + this.content.push(chunk) + } + + constructor(header: Uint8Array, content: Uint8Array[]) { + super(header); + this.content = content + } + + /** + * The content of this part as an `ArrayBuffer`. + */ + get arrayBuffer(): ArrayBuffer { + return this.bytes.buffer as ArrayBuffer + } + + /** + * The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful + * for reading the value of files that were uploaded using `` fields. + */ + get bytes(): Uint8Array { + let buffer = new Uint8Array(this.size) + + let offset = 0 + for (let chunk of this.content) { + buffer.set(chunk, offset) + offset += chunk.length + } + + return buffer + } + /** * The size of the content in bytes. */ From 382a25747535341f1b3e9a61925da5ac5caf7e5c Mon Sep 17 00:00:00 2001 From: Arlen Beiler <439872+Arlen22@users.noreply.github.com> Date: Mon, 6 Oct 2025 14:51:48 +0000 Subject: [PATCH 2/4] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/multipart-parser/src/lib/multipart.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/packages/multipart-parser/src/lib/multipart.ts b/packages/multipart-parser/src/lib/multipart.ts index 95a2dbe5c6c..d16f960641b 100644 --- a/packages/multipart-parser/src/lib/multipart.ts +++ b/packages/multipart-parser/src/lib/multipart.ts @@ -55,7 +55,21 @@ export interface ParseMultipartOptions { */ maxFileSize?: number + /** + * If this is true, or not defined, use MultipartContentPart class, which includes a contents array and getters referencing it, and stores the entire file in the contents array in memory. + * + * If this is false, use the MultipartPart class, which only has header related fields. The append method must be overriden in the onCreatePart callback to receive each chunk and process it as desired. + * + */ useContentPart?: boolean + /** + * A callback called for each multipart part created. This is called immediately after the header is parsed, and before any body chunks are processed, including the partial chunk after the header. + * + * If you want to immediately write chunks to the file system, set useContentPart to false, and then set the part.append method of each part this callback is called with. part.append will be called with each chunk, including partial chunks, after the returned promise resolves, and before the iterator yields the completed chunk. + * + * This callback and part.append are both awaited. + * + */ onCreatePart?(part: MultipartPart): Promise | void } @@ -265,7 +279,7 @@ export class MultipartParser { throw new MaxHeaderSizeExceededError(this.maxHeaderSize) } - const header = chunk.subarray(index, headerEndIndex); + const header = chunk.subarray(index, headerEndIndex) this.#currentPart = this.#useContentPart ? new MultipartContentPart(header, []) : new MultipartPart(header) @@ -336,7 +350,7 @@ export class MultipartPart { this.#header = header } - async append(chunk: Uint8Array) { + async append(chunk: Uint8Array): Promise { throw new Error("Not implemented. Please assign or override this method."); } From c4fb6b39e61c631119c63c078f4d2abc0b373051 Mon Sep 17 00:00:00 2001 From: Arlen Beiler <439872+Arlen22@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:07:32 +0000 Subject: [PATCH 3/4] use multiple callbacks instead of setting instance method --- .../src/lib/multipart-request.ts | 8 +- .../src/lib/multipart.node.ts | 20 +-- .../multipart-parser/src/lib/multipart.ts | 118 ++++++++++++------ 3 files changed, 97 insertions(+), 49 deletions(-) diff --git a/packages/multipart-parser/src/lib/multipart-request.ts b/packages/multipart-parser/src/lib/multipart-request.ts index d69ec38c754..7ba8bea4472 100644 --- a/packages/multipart-parser/src/lib/multipart-request.ts +++ b/packages/multipart-parser/src/lib/multipart-request.ts @@ -1,4 +1,4 @@ -import type { MultipartParserOptions, MultipartPart } from './multipart.ts' +import type { MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts' import { MultipartParseError, parseMultipartStream } from './multipart.ts' /** @@ -31,10 +31,10 @@ export function isMultipartRequest(request: Request): boolean { * @param options Optional parser options, such as `maxHeaderSize` and `maxFileSize` * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartRequest( +export async function* parseMultipartRequest( request: Request, - options?: MultipartParserOptions, -): AsyncGenerator { + options?: MultipartParserOptions, +): AsyncGenerator, void, unknown> { if (!isMultipartRequest(request)) { throw new MultipartParseError('Request is not a multipart request') } diff --git a/packages/multipart-parser/src/lib/multipart.node.ts b/packages/multipart-parser/src/lib/multipart.node.ts index 0dba349322c..a62ebd28ac2 100644 --- a/packages/multipart-parser/src/lib/multipart.node.ts +++ b/packages/multipart-parser/src/lib/multipart.node.ts @@ -1,7 +1,7 @@ import type * as http from 'node:http' import { Readable } from 'node:stream' -import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart } from './multipart.ts' +import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts' import { MultipartParseError, parseMultipart as parseMultipartWeb, @@ -19,10 +19,10 @@ import { getMultipartBoundary } from './multipart-request.ts' * @param options Options for the parser * @return A generator yielding `MultipartPart` objects */ -export async function* parseMultipart( +export async function* parseMultipart( message: Buffer | Iterable, - options: ParseMultipartOptions, -): AsyncGenerator { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { yield* parseMultipartWeb(message as Uint8Array | Iterable, options) } @@ -36,10 +36,10 @@ export async function* parseMultipart( * @param options Options for the parser * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartStream( +export async function* parseMultipartStream( stream: Readable, - options: ParseMultipartOptions, -): AsyncGenerator { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { yield* parseMultipartStreamWeb(Readable.toWeb(stream) as ReadableStream, options) } @@ -61,10 +61,10 @@ export function isMultipartRequest(req: http.IncomingMessage): boolean { * @param options Options for the parser * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartRequest( +export async function* parseMultipartRequest( req: http.IncomingMessage, - options?: MultipartParserOptions, -): AsyncGenerator { + options?: MultipartParserOptions, +): AsyncGenerator, void, unknown> { if (!isMultipartRequest(req)) { throw new MultipartParseError('Request is not a multipart request') } diff --git a/packages/multipart-parser/src/lib/multipart.ts b/packages/multipart-parser/src/lib/multipart.ts index d16f960641b..a07302b68d7 100644 --- a/packages/multipart-parser/src/lib/multipart.ts +++ b/packages/multipart-parser/src/lib/multipart.ts @@ -34,7 +34,9 @@ export class MaxFileSizeExceededError extends MultipartParseError { } } -export interface ParseMultipartOptions { +export type MultipartPartType = S extends true ? MultipartPart : MultipartContentPart + +export interface ParseMultipartOptions { /** * The boundary string used to separate parts in the multipart message, * e.g. the `boundary` parameter in the `Content-Type` header. @@ -56,21 +58,46 @@ export interface ParseMultipartOptions { maxFileSize?: number /** - * If this is true, or not defined, use MultipartContentPart class, which includes a contents array and getters referencing it, and stores the entire file in the contents array in memory. + * If `true`, the parser will only store the size of each part's content, + * and will not store the actual content in memory. You must set `onEmitBytes` + * to get the content of each part as it is received. + * + * This is useful for handling large file uploads without consuming + * large amounts of memory. * - * If this is false, use the MultipartPart class, which only has header related fields. The append method must be overriden in the onCreatePart callback to receive each chunk and process it as desired. + * If this is set to `true`, the `content`, `bytes`, and `text` properties of each part + * will be undefined, and only the `size` property will be available. * + * Default: `false` */ - useContentPart?: boolean + onlyStreamContents?: S + /** - * A callback called for each multipart part created. This is called immediately after the header is parsed, and before any body chunks are processed, including the partial chunk after the header. + * A callback that is called each time a new part is created. This can be used to + * perform any setup or initialization for the part before any data is received. * - * If you want to immediately write chunks to the file system, set useContentPart to false, and then set the part.append method of each part this callback is called with. part.append will be called with each chunk, including partial chunks, after the returned promise resolves, and before the iterator yields the completed chunk. - * - * This callback and part.append are both awaited. + * The part will contain the full header information, but the content will be empty. + * + * The callback will be awaited so you can return a promise to create backpressure. + * + * @param part The multipart part that was just created + * @returns Promise to be awaited before continuing parsing + */ + onCreatePart?(part: MultipartPartType): Promise | void + + /** + * A callback that is called each time a chunk of bytes is received and parsed. + * This can be used to process the data as it is received, to stream + * it to disk or to a cloud storage service. * + * The callback will be awaited so you can return a promise to create backpressure. + * + * @param part The multipart part being emitted + * @param chunk The chunk of data being emitted + * @returns Promise to be awaited before continuing parsing */ - onCreatePart?(part: MultipartPart): Promise | void + onEmitBytes?(part: MultipartPartType, chunk: Uint8Array): Promise | void + } /** @@ -83,11 +110,11 @@ export interface ParseMultipartOptions { * @param options Options for the parser * @return A generator that yields `MultipartPart` objects */ -export async function* parseMultipart( +export async function* parseMultipart( message: Uint8Array | Iterable, - options: ParseMultipartOptions, -): AsyncGenerator { - let parser = new MultipartParser(options.boundary, { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { + let parser = new MultipartParser(options.boundary, { maxHeaderSize: options.maxHeaderSize, maxFileSize: options.maxFileSize, }) @@ -117,11 +144,11 @@ export async function* parseMultipart( * @param options Options for the parser * @return An async generator that yields `MultipartPart` objects */ -export async function* parseMultipartStream( +export async function* parseMultipartStream( stream: ReadableStream, - options: ParseMultipartOptions, -): AsyncGenerator { - let parser = new MultipartParser(options.boundary, { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { + let parser = new MultipartParser(options.boundary, { maxHeaderSize: options.maxHeaderSize, maxFileSize: options.maxFileSize, }) @@ -137,7 +164,7 @@ export async function* parseMultipartStream( parser.finish() } -export type MultipartParserOptions = Omit +export type MultipartParserOptions = Omit, 'boundary'> const MultipartParserStateStart = 0 const MultipartParserStateAfterBoundary = 1 @@ -153,7 +180,7 @@ const oneMb = 1024 * oneKb /** * A streaming parser for `multipart/*` HTTP messages. */ -export class MultipartParser { +export class MultipartParser { readonly boundary: string readonly maxHeaderSize: number readonly maxFileSize: number @@ -166,13 +193,14 @@ export class MultipartParser { #state = MultipartParserStateStart #buffer: Uint8Array | null = null - #currentPart: MultipartPart | null = null + #currentPart: MultipartContentPart | MultipartPart | null = null #contentLength = 0 - #useContentPart: MultipartParserOptions['useContentPart'] - #onCreatePart: MultipartParserOptions['onCreatePart'] + #onlyStreamContents: boolean + #onCreatePart?: (part: MultipartPart) => Promise | void + #onEmitBytes?: (part: MultipartPart, chunk: Uint8Array) => Promise | void - constructor(boundary: string, options?: MultipartParserOptions) { + constructor(boundary: string, options?: MultipartParserOptions) { this.boundary = boundary this.maxHeaderSize = options?.maxHeaderSize ?? 8 * oneKb this.maxFileSize = options?.maxFileSize ?? 2 * oneMb @@ -184,7 +212,8 @@ export class MultipartParser { this.#boundaryLength = 4 + boundary.length // length of '\r\n--' + boundary this.#onCreatePart = options?.onCreatePart - this.#useContentPart = options?.useContentPart ?? true + this.#onEmitBytes = options?.onEmitBytes + this.#onlyStreamContents = options?.onlyStreamContents ?? false } /** @@ -193,7 +222,8 @@ export class MultipartParser { * @param chunk A chunk of data to write to the parser * @return A generator yielding `MultipartPart` objects as they are parsed */ - async *write(chunk: Uint8Array): AsyncGenerator { + write(chunk: Uint8Array): AsyncGenerator, void, unknown>; + async *write(chunk: Uint8Array): AsyncGenerator { if (this.#state === MultipartParserStateDone) { throw new MultipartParseError('Unexpected data after end of stream') } @@ -280,9 +310,9 @@ export class MultipartParser { } const header = chunk.subarray(index, headerEndIndex) - this.#currentPart = this.#useContentPart - ? new MultipartContentPart(header, []) - : new MultipartPart(header) + this.#currentPart = this.#onlyStreamContents + ? new MultipartPart(header) + : new MultipartContentPart(header, []) this.#contentLength = 0 @@ -317,7 +347,14 @@ export class MultipartParser { throw new MaxFileSizeExceededError(this.maxFileSize) } - await this.#currentPart!.append(chunk) + if (this.#currentPart!.hasContents()) { + this.#currentPart!.content.push(chunk) + } else { + this.#currentPart!.size += chunk.length + } + + await this.#onEmitBytes?.(this.#currentPart!, chunk) + this.#contentLength += chunk.length } @@ -345,13 +382,20 @@ export class MultipartPart { #header: Uint8Array #headers?: Headers + #size: number = 0 constructor(header: Uint8Array) { this.#header = header } - async append(chunk: Uint8Array): Promise { - throw new Error("Not implemented. Please assign or override this method."); + /** + * The size of the content emitted so far in bytes. + */ + get size(): number { + return this.#size + } + set size(value: number) { + this.#size = value } /** @@ -400,6 +444,10 @@ export class MultipartPart { return this.headers.contentDisposition.name } + hasContents(): this is MultipartContentPart { + return false; + } + } export class MultipartContentPart extends MultipartPart { @@ -409,10 +457,6 @@ export class MultipartContentPart extends MultipartPart { */ readonly content: Uint8Array[] - async append(chunk: Uint8Array): Promise { - this.content.push(chunk) - } - constructor(header: Uint8Array, content: Uint8Array[]) { super(header); this.content = content @@ -463,4 +507,8 @@ export class MultipartContentPart extends MultipartPart { get text(): string { return decoder.decode(this.bytes) } + + hasContents(): this is MultipartContentPart { + return true; + } } From 5f409a7365f0da88fdbc66f7e2b860614b09fd7c Mon Sep 17 00:00:00 2001 From: Arlen Beiler <439872+Arlen22@users.noreply.github.com> Date: Mon, 6 Oct 2025 17:22:51 +0000 Subject: [PATCH 4/4] semicolon fixes --- packages/multipart-parser/src/lib/multipart.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/multipart-parser/src/lib/multipart.ts b/packages/multipart-parser/src/lib/multipart.ts index a07302b68d7..65906b0d6a0 100644 --- a/packages/multipart-parser/src/lib/multipart.ts +++ b/packages/multipart-parser/src/lib/multipart.ts @@ -445,7 +445,7 @@ export class MultipartPart { } hasContents(): this is MultipartContentPart { - return false; + return false } } @@ -509,6 +509,6 @@ export class MultipartContentPart extends MultipartPart { } hasContents(): this is MultipartContentPart { - return true; + return true } }