Skip to content

8329829: HttpClient: Add a BodyPublishers.ofFileChannel method #26155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
42 changes: 39 additions & 3 deletions src/java.net.http/share/classes/java/net/http/HttpRequest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -26,13 +26,13 @@
package java.net.http;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
Expand Down Expand Up @@ -720,6 +720,42 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
return RequestPublishers.FilePublisher.create(path);
}

/**
* {@return a request body publisher whose body is the {@code length}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the text in this javadoc would need changes/clarifications. I haven't added any review comments for it yet and will come to it later once we have settled on the rest of the review.

* content bytes read from the provided file {@code channel} starting
* from the specified {@code offset}}
* <p>
* The {@linkplain FileChannel file channel} will be read using
* {@link FileChannel#read(ByteBuffer, long) FileChannel.read(ByteBuffer buffer, long position)},
* which does not modify the channel's position. Thus, the same file
* channel may be shared between several publishers passed to
* concurrent requests.
* <p>
* The file channel will not be closed upon completion. The caller is
* expected to manage the life cycle of the channel, and close it
* appropriately when not needed anymore.
*
* @param channel a file channel
* @param offset the offset of the first byte
* @param length the number of bytes to read from the file channel
*
* @throws IndexOutOfBoundsException if the specified byte range is
* found to be {@linkplain Objects#checkFromIndexSize(long, long, long)
* out of bounds} compared with the size of the file referred by the
* channel
*
* @throws IOException if the size of the file referred by the provided
* channel cannot be read while verifying the specified byte range
*
* @throws NullPointerException if {@code channel} is null
*
* @since 26
*/
public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) throws IOException {
Objects.requireNonNull(channel, "channel");
return new RequestPublishers.FileChannelPublisher(channel, offset, length);
}

/**
* A request body publisher that takes data from an {@code Iterable}
* of byte arrays. An {@link Iterable} is provided which supplies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -418,6 +419,81 @@ public long contentLength() {
}
}

public static final class FileChannelPublisher implements BodyPublisher {

private final FileChannel channel;

private final long position;

private final long limit;

public FileChannelPublisher(FileChannel channel, long offset, long length) throws IOException {
this.channel = Objects.requireNonNull(channel, "channel");
long fileSize = channel.size();
Objects.checkFromIndexSize(offset, length, fileSize);
this.position = offset;
this.limit = offset + length;
}

@Override
public long contentLength() {
return limit - position;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Iterable<ByteBuffer> iterable = () -> new FileChannelIterator(channel, position, limit);
new PullPublisher<>(iterable).subscribe(subscriber);
}

}

private static final class FileChannelIterator implements Iterator<ByteBuffer> {

private final FileChannel channel;

private final long limit;

private long position;

private boolean terminated;

private FileChannelIterator(FileChannel channel, long position, long limit) {
this.channel = channel;
this.position = position;
this.limit = limit;
}

@Override
public boolean hasNext() {
return position < limit && !terminated;
}

@Override
public ByteBuffer next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
long remaining = limit - position;
ByteBuffer buffer = Utils.getBufferWithAtMost(remaining);
try {
int readLength = channel.read(buffer, position);
// Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime
if (readLength < 0) {
// Throw to signal that the request needs to be cancelled
throw new IOException("Unexpected EOF (position=%s)".formatted(position));
} else {
position += readLength;
}
} catch (IOException ioe) {
terminated = true;
throw new UncheckedIOException(ioe);
}
return buffer.flip();
}

}

public static final class PublisherAdapter implements BodyPublisher {

private final Publisher<? extends ByteBuffer> publisher;
Expand All @@ -430,12 +506,12 @@ public PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
}

@Override
public final long contentLength() {
public long contentLength() {
return contentLength;
}

@Override
public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,32 @@ public static String describeOps(int interestOps) {
public static IllegalArgumentException newIAE(String message, Object... args) {
return new IllegalArgumentException(format(message, args));
}

/**
* {@return a new {@link ByteBuffer} instance of {@link #BUFSIZE} capacity}
*/
public static ByteBuffer getBuffer() {
return ByteBuffer.allocate(BUFSIZE);
}

/**
* {@return a new {@link ByteBuffer} instance whose capacity is set to the
* smaller of the specified {@code maxCapacity} and the default
* ({@value BUFSIZE})}
*
* @param maxCapacity a buffer capacity, in bytes
* @throws IllegalArgumentException if {@code maxCapacity < 0}
*/
public static ByteBuffer getBufferWithAtMost(long maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException(
// Match the message produced by `ByteBuffer::createCapacityException`
"capacity < 0: (%s < 0)".formatted(maxCapacity));
}
int effectiveCapacity = (int) Math.min(maxCapacity, BUFSIZE);
return ByteBuffer.allocate(effectiveCapacity);
}

public static Throwable getCompletionCause(Throwable x) {
Throwable cause = x;
while ((cause instanceof CompletionException)
Expand Down
Loading