Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/multipart-parser/src/lib/multipart-request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MultipartParserOptions, MultipartPart } from './multipart.ts'
import type { MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts'
import { MultipartParseError, parseMultipartStream } from './multipart.ts'

/**
Expand Down Expand Up @@ -31,10 +31,10 @@ export function isMultipartRequest(request: Request): boolean {
* @param options Optional parser options, such as `maxHeaderSize` and `maxFileSize`
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartRequest(
export async function* parseMultipartRequest<S extends boolean | undefined>(
request: Request,
options?: MultipartParserOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options?: MultipartParserOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
if (!isMultipartRequest(request)) {
throw new MultipartParseError('Request is not a multipart request')
}
Expand Down
20 changes: 10 additions & 10 deletions packages/multipart-parser/src/lib/multipart.node.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as http from 'node:http'
import { Readable } from 'node:stream'

import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart } from './multipart.ts'
import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts'
import {
MultipartParseError,
parseMultipart as parseMultipartWeb,
Expand All @@ -19,10 +19,10 @@ import { getMultipartBoundary } from './multipart-request.ts'
* @param options Options for the parser
* @return A generator yielding `MultipartPart` objects
*/
export function* parseMultipart(
export async function* parseMultipart<S extends boolean | undefined>(
message: Buffer | Iterable<Buffer>,
options: ParseMultipartOptions,
): Generator<MultipartPart, void, unknown> {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
yield* parseMultipartWeb(message as Uint8Array | Iterable<Uint8Array>, options)
}

Expand All @@ -36,10 +36,10 @@ export function* parseMultipart(
* @param options Options for the parser
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartStream(
export async function* parseMultipartStream<S extends boolean | undefined>(
stream: Readable,
options: ParseMultipartOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
yield* parseMultipartStreamWeb(Readable.toWeb(stream) as ReadableStream, options)
}

Expand All @@ -61,10 +61,10 @@ export function isMultipartRequest(req: http.IncomingMessage): boolean {
* @param options Options for the parser
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartRequest(
export async function* parseMultipartRequest<S extends boolean | undefined>(
req: http.IncomingMessage,
options?: MultipartParserOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options?: MultipartParserOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
if (!isMultipartRequest(req)) {
throw new MultipartParseError('Request is not a multipart request')
}
Expand Down
182 changes: 138 additions & 44 deletions packages/multipart-parser/src/lib/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export class MaxFileSizeExceededError extends MultipartParseError {
}
}

export interface ParseMultipartOptions {
export type MultipartPartType<S extends boolean | undefined> = S extends true ? MultipartPart : MultipartContentPart

export interface ParseMultipartOptions<S extends boolean | undefined> {
/**
* The boundary string used to separate parts in the multipart message,
* e.g. the `boundary` parameter in the `Content-Type` header.
Expand All @@ -54,6 +56,48 @@ export interface ParseMultipartOptions {
* Default: 2 MiB
*/
maxFileSize?: number

/**
* If `true`, the parser will only store the size of each part's content,
* and will not store the actual content in memory. You must set `onEmitBytes`
* to get the content of each part as it is received.
*
* This is useful for handling large file uploads without consuming
* large amounts of memory.
*
* If this is set to `true`, the `content`, `bytes`, and `text` properties of each part
* will be undefined, and only the `size` property will be available.
*
* Default: `false`
*/
onlyStreamContents?: S

/**
* A callback that is called each time a new part is created. This can be used to
* perform any setup or initialization for the part before any data is received.
*
* The part will contain the full header information, but the content will be empty.
*
* The callback will be awaited so you can return a promise to create backpressure.
*
* @param part The multipart part that was just created
* @returns Promise to be awaited before continuing parsing
*/
onCreatePart?(part: MultipartPartType<S>): Promise<void> | void

/**
* A callback that is called each time a chunk of bytes is received and parsed.
* This can be used to process the data as it is received, to stream
* it to disk or to a cloud storage service.
*
* The callback will be awaited so you can return a promise to create backpressure.
*
* @param part The multipart part being emitted
* @param chunk The chunk of data being emitted
* @returns Promise to be awaited before continuing parsing
*/
onEmitBytes?(part: MultipartPartType<S>, chunk: Uint8Array): Promise<void> | void

}

/**
Expand All @@ -66,11 +110,11 @@ export interface ParseMultipartOptions {
* @param options Options for the parser
* @return A generator that yields `MultipartPart` objects
*/
export function* parseMultipart(
export async function* parseMultipart<S extends boolean | undefined>(
message: Uint8Array | Iterable<Uint8Array>,
options: ParseMultipartOptions,
): Generator<MultipartPart, void, unknown> {
let parser = new MultipartParser(options.boundary, {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
let parser = new MultipartParser<S>(options.boundary, {
maxHeaderSize: options.maxHeaderSize,
maxFileSize: options.maxFileSize,
})
Expand Down Expand Up @@ -100,11 +144,11 @@ export function* parseMultipart(
* @param options Options for the parser
* @return An async generator that yields `MultipartPart` objects
*/
export async function* parseMultipartStream(
export async function* parseMultipartStream<S extends boolean | undefined>(
stream: ReadableStream<Uint8Array>,
options: ParseMultipartOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
let parser = new MultipartParser(options.boundary, {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
let parser = new MultipartParser<S>(options.boundary, {
maxHeaderSize: options.maxHeaderSize,
maxFileSize: options.maxFileSize,
})
Expand All @@ -120,7 +164,7 @@ export async function* parseMultipartStream(
parser.finish()
}

export type MultipartParserOptions = Omit<ParseMultipartOptions, 'boundary'>
export type MultipartParserOptions<S extends boolean | undefined> = Omit<ParseMultipartOptions<S>, 'boundary'>

const MultipartParserStateStart = 0
const MultipartParserStateAfterBoundary = 1
Expand All @@ -136,7 +180,7 @@ const oneMb = 1024 * oneKb
/**
* A streaming parser for `multipart/*` HTTP messages.
*/
export class MultipartParser {
export class MultipartParser<S extends boolean | undefined> {
readonly boundary: string
readonly maxHeaderSize: number
readonly maxFileSize: number
Expand All @@ -149,10 +193,14 @@ export class MultipartParser {

#state = MultipartParserStateStart
#buffer: Uint8Array | null = null
#currentPart: MultipartPart | null = null
#currentPart: MultipartContentPart | MultipartPart | null = null
#contentLength = 0

constructor(boundary: string, options?: MultipartParserOptions) {
#onlyStreamContents: boolean
#onCreatePart?: (part: MultipartPart) => Promise<void> | void
#onEmitBytes?: (part: MultipartPart, chunk: Uint8Array) => Promise<void> | void

constructor(boundary: string, options?: MultipartParserOptions<any>) {
this.boundary = boundary
this.maxHeaderSize = options?.maxHeaderSize ?? 8 * oneKb
this.maxFileSize = options?.maxFileSize ?? 2 * oneMb
Expand All @@ -162,6 +210,10 @@ export class MultipartParser {
this.#findBoundary = createSearch(`\r\n--${boundary}`)
this.#findPartialTailBoundary = createPartialTailSearch(`\r\n--${boundary}`)
this.#boundaryLength = 4 + boundary.length // length of '\r\n--' + boundary

this.#onCreatePart = options?.onCreatePart
this.#onEmitBytes = options?.onEmitBytes
this.#onlyStreamContents = options?.onlyStreamContents ?? false
}

/**
Expand All @@ -170,7 +222,8 @@ export class MultipartParser {
* @param chunk A chunk of data to write to the parser
* @return A generator yielding `MultipartPart` objects as they are parsed
*/
*write(chunk: Uint8Array): Generator<MultipartPart, void, unknown> {
write(chunk: Uint8Array): AsyncGenerator<MultipartPartType<S>, void, unknown>;
async *write(chunk: Uint8Array): AsyncGenerator<MultipartPart | MultipartContentPart, void, unknown> {
if (this.#state === MultipartParserStateDone) {
throw new MultipartParseError('Unexpected data after end of stream')
}
Expand Down Expand Up @@ -201,16 +254,16 @@ export class MultipartParser {
let partialTailIndex = this.#findPartialTailBoundary(chunk)

if (partialTailIndex === -1) {
this.#append(index === 0 ? chunk : chunk.subarray(index))
await this.#append(index === 0 ? chunk : chunk.subarray(index))
} else {
this.#append(chunk.subarray(index, partialTailIndex))
await this.#append(chunk.subarray(index, partialTailIndex))
this.#buffer = chunk.subarray(partialTailIndex)
}

break
}

this.#append(chunk.subarray(index, boundaryIndex))
await this.#append(chunk.subarray(index, boundaryIndex))

yield this.#currentPart!

Expand Down Expand Up @@ -256,13 +309,19 @@ export class MultipartParser {
throw new MaxHeaderSizeExceededError(this.maxHeaderSize)
}

this.#currentPart = new MultipartPart(chunk.subarray(index, headerEndIndex), [])
const header = chunk.subarray(index, headerEndIndex)
this.#currentPart = this.#onlyStreamContents
? new MultipartPart(header)
: new MultipartContentPart(header, [])

this.#contentLength = 0

index = headerEndIndex + 4 // Skip header + \r\n\r\n

this.#state = MultipartParserStateBody

await this.#onCreatePart?.(this.#currentPart)

continue
}

Expand All @@ -283,12 +342,19 @@ export class MultipartParser {
}
}

#append(chunk: Uint8Array): void {
async #append(chunk: Uint8Array): Promise<void> {
if (this.#contentLength + chunk.length > this.maxFileSize) {
throw new MaxFileSizeExceededError(this.maxFileSize)
}

this.#currentPart!.content.push(chunk)
if (this.#currentPart!.hasContents()) {
this.#currentPart!.content.push(chunk)
} else {
this.#currentPart!.size += chunk.length
}

await this.#onEmitBytes?.(this.#currentPart!, chunk)

this.#contentLength += chunk.length
}

Expand All @@ -313,40 +379,23 @@ const decoder = new TextDecoder('utf-8', { fatal: true })
* A part of a `multipart/*` HTTP message.
*/
export class MultipartPart {
/**
* The raw content of this part as an array of `Uint8Array` chunks.
*/
readonly content: Uint8Array[]

#header: Uint8Array
#headers?: Headers
#size: number = 0

constructor(header: Uint8Array, content: Uint8Array[]) {
constructor(header: Uint8Array) {
this.#header = header
this.content = content
}

/**
* The content of this part as an `ArrayBuffer`.
* The size of the content emitted so far in bytes.
*/
get arrayBuffer(): ArrayBuffer {
return this.bytes.buffer as ArrayBuffer
get size(): number {
return this.#size
}

/**
* The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful
* for reading the value of files that were uploaded using `<input type="file">` fields.
*/
get bytes(): Uint8Array {
let buffer = new Uint8Array(this.size)

let offset = 0
for (let chunk of this.content) {
buffer.set(chunk, offset)
offset += chunk.length
}

return buffer
set size(value: number) {
this.#size = value
}

/**
Expand Down Expand Up @@ -395,6 +444,47 @@ export class MultipartPart {
return this.headers.contentDisposition.name
}

hasContents(): this is MultipartContentPart {
return false;
}

}

export class MultipartContentPart extends MultipartPart {

/**
* The raw content of this part as an array of `Uint8Array` chunks.
*/
readonly content: Uint8Array[]

constructor(header: Uint8Array, content: Uint8Array[]) {
super(header);
this.content = content
}

/**
* The content of this part as an `ArrayBuffer`.
*/
get arrayBuffer(): ArrayBuffer {
return this.bytes.buffer as ArrayBuffer
}

/**
* The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful
* for reading the value of files that were uploaded using `<input type="file">` fields.
*/
get bytes(): Uint8Array {
let buffer = new Uint8Array(this.size)

let offset = 0
for (let chunk of this.content) {
buffer.set(chunk, offset)
offset += chunk.length
}

return buffer
}

/**
* The size of the content in bytes.
*/
Expand All @@ -417,4 +507,8 @@ export class MultipartPart {
get text(): string {
return decoder.decode(this.bytes)
}

hasContents(): this is MultipartContentPart {
return true;
}
}