From bdeb999b8bde6098a5c3e653e3bc248e61208af7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Tue, 17 Jun 2025 11:08:43 +0200
Subject: [PATCH 01/14] Add `HttpRequest.BodyPublishers::ofFileChannel` along
with tests
---
.../classes/java/net/http/HttpRequest.java | 33 +-
.../internal/net/http/RequestPublishers.java | 91 ++-
.../jdk/internal/net/http/common/Utils.java | 22 +
.../httpclient/FileChannelPublisherTest.java | 693 ++++++++++++++++++
4 files changed, 834 insertions(+), 5 deletions(-)
create mode 100644 test/jdk/java/net/httpclient/FileChannelPublisherTest.java
diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
index 7ba6ed25b41e3..fd0af07697b15 100644
--- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java
+++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -29,10 +29,9 @@
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
@@ -720,6 +719,34 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
return RequestPublishers.FilePublisher.create(path);
}
+ /**
+ * {@return a request body publisher whose body is the {@code length}
+ * content bytes read from the provided file {@code channel} starting
+ * from the specified {@code offset}}
+ *
+ * The {@linkplain FileChannel file channel} will be read using
+ * {@link FileChannel#read(ByteBuffer, long) FileChannel.read(ByteBuffer buffer, long position)},
+ * which does not modify the channel's position. Thus, the same file
+ * channel may be shared between several publishers passed to
+ * concurrent requests.
+ *
+ * The file channel will not be closed upon completion. The caller is
+ * expected to manage the life cycle of the channel, and close it
+ * appropriately when not needed anymore.
+ *
+ * @param channel a file channel
+ * @param offset the offset of the first byte
+ * @param length the number of bytes to use
+ *
+ * @throws IndexOutOfBoundsException if the specified byte range is
+ * found to be out of bounds compared with the size of the file
+ * referred by the channel
+ */
+ public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) {
+ Objects.requireNonNull(channel, "channel");
+ return new RequestPublishers.FileChannelPublisher(channel, offset, length);
+ }
+
/**
* A request body publisher that takes data from an {@code Iterable}
* of byte arrays. An {@link Iterable} is provided which supplies
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
index dd5443c503567..0572826eecfa0 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
@@ -32,6 +32,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@@ -418,6 +419,92 @@ public long contentLength() {
}
}
+ public static final class FileChannelPublisher implements BodyPublisher {
+
+ private final FileChannel channel;
+
+ private final long position;
+
+ private final long limit;
+
+ public FileChannelPublisher(FileChannel channel, long offset, long length) {
+ this.channel = Objects.requireNonNull(channel, "channel");
+ long fileSize = fileSize(channel);
+ Objects.checkFromIndexSize(offset, length, fileSize);
+ this.position = offset;
+ this.limit = offset + length;
+ }
+
+ private static long fileSize(FileChannel channel) {
+ try {
+ return channel.size();
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ @Override
+ public long contentLength() {
+ return limit - position;
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber super ByteBuffer> subscriber) {
+ Iterable iterable = () -> new FileChannelIterator(channel, position, limit);
+ new PullPublisher<>(iterable).subscribe(subscriber);
+ }
+
+ }
+
+ private static final class FileChannelIterator implements Iterator {
+
+ private final FileChannel channel;
+
+ private final long limit;
+
+ private long position;
+
+ private boolean terminated;
+
+ private FileChannelIterator(FileChannel channel, long position, long limit) {
+ this.channel = channel;
+ this.position = position;
+ this.limit = limit;
+ }
+
+ @Override
+ public synchronized boolean hasNext() {
+ return position < limit && !terminated;
+ }
+
+ @Override
+ public synchronized ByteBuffer next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ long remaining = limit - position;
+ ByteBuffer buffer = Utils.BUFSIZE > remaining
+ ? Utils.getBufferWithAtMost((int) remaining)
+ : Utils.getBuffer();
+ try {
+ int readLength = channel.read(buffer, position);
+ // Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime
+ if (readLength < 0) {
+ // We *must* throw to signal that the request needs to be cancelled.
+ // Otherwise, the server will continue waiting data.
+ throw new IOException("Unexpected EOF (position=%s)".formatted(position));
+ } else {
+ position += readLength;
+ }
+ } catch (IOException ioe) {
+ terminated = true;
+ throw new UncheckedIOException(ioe);
+ }
+ return buffer.flip();
+ }
+
+ }
+
public static final class PublisherAdapter implements BodyPublisher {
private final Publisher extends ByteBuffer> publisher;
@@ -430,12 +517,12 @@ public PublisherAdapter(Publisher extends ByteBuffer> publisher,
}
@Override
- public final long contentLength() {
+ public long contentLength() {
return contentLength;
}
@Override
- public final void subscribe(Flow.Subscriber super ByteBuffer> subscriber) {
+ public void subscribe(Flow.Subscriber super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
index 8aefa0ee5baf4..2785ee849a381 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
@@ -367,10 +367,32 @@ public static String describeOps(int interestOps) {
public static IllegalArgumentException newIAE(String message, Object... args) {
return new IllegalArgumentException(format(message, args));
}
+
+ /**
+ * {@return a new {@link ByteBuffer} instance of configured capacity for the HTTP Client}
+ */
public static ByteBuffer getBuffer() {
return ByteBuffer.allocate(BUFSIZE);
}
+ /**
+ * {@return a new {@link ByteBuffer} instance whose capacity is set to the
+ * smaller of the specified {@code maxCapacity} and the default
+ * ({@value BUFSIZE})}
+ *
+ * @param maxCapacity a buffer capacity, in bytes
+ * @throws IllegalArgumentException if {@code capacity < 0}
+ */
+ public static ByteBuffer getBufferWithAtMost(int maxCapacity) {
+ if (maxCapacity < 0) {
+ throw new IllegalArgumentException(
+ // Match the message produced by `ByteBuffer::createCapacityException`
+ "capacity < 0: (%s < 0)".formatted(maxCapacity));
+ }
+ int effectiveCapacity = Math.clamp(maxCapacity, 0, BUFSIZE);
+ return ByteBuffer.allocate(effectiveCapacity);
+ }
+
public static Throwable getCompletionCause(Throwable x) {
Throwable cause = x;
while ((cause instanceof CompletionException)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
new file mode 100644
index 0000000000000..7350e66b52cb4
--- /dev/null
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -0,0 +1,693 @@
+/*
+ * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Verifies `HttpRequest.BodyPublishers::ofFileChannel`
+ * @library /test/lib
+ * /test/jdk/java/net/httpclient/lib
+ * @build jdk.httpclient.test.lib.common.HttpServerAdapters
+ * jdk.test.lib.net.SimpleSSLContext
+ * @run junit FileChannelPublisherTest
+ */
+
+import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestHandler;
+import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
+import jdk.internal.net.http.common.Logger;
+import jdk.internal.net.http.common.Utils;
+import jdk.test.lib.net.SimpleSSLContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.CleanupMode;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpClient.Version;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static java.net.http.HttpClient.Builder.NO_PROXY;
+import static java.net.http.HttpRequest.BodyPublishers.ofFileChannel;
+import static java.net.http.HttpResponse.BodyHandlers.discarding;
+import static java.net.http.HttpResponse.BodyHandlers.ofInputStream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class FileChannelPublisherTest {
+
+ private static final String CLASS_NAME = FileChannelPublisherTest.class.getSimpleName();
+
+ private static final Logger LOGGER = Utils.getDebugLogger(CLASS_NAME::toString, Utils.DEBUG);
+
+ private static final int DEFAULT_BUFFER_SIZE = Utils.getBuffer().capacity();
+
+ private static final SSLContext SSL_CONTEXT = createSslContext();
+
+ private static final HttpClient CLIENT = HttpClient.newBuilder().sslContext(SSL_CONTEXT).proxy(NO_PROXY).build();
+
+ private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
+
+ private static final ServerRequestPair HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false);
+
+ private static final ServerRequestPair HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true);
+
+ private static final ServerRequestPair HTTP2 = ServerRequestPair.of(Version.HTTP_2, false);
+
+ private static final ServerRequestPair HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true);
+
+ private static SSLContext createSslContext() {
+ try {
+ return new SimpleSSLContext().get();
+ } catch (IOException exception) {
+ throw new UncheckedIOException(exception);
+ }
+ }
+
+ private record ServerRequestPair(
+ String serverName,
+ HttpTestServer server,
+ BlockingQueue serverReadRequestBodyBytes,
+ HttpRequest.Builder requestBuilder,
+ boolean secure) {
+
+ private static CountDownLatch SERVER_REQUEST_RECEIVED_SIGNAL = null;
+
+ private static CountDownLatch SERVER_READ_PERMISSION = null;
+
+ private static ServerRequestPair of(Version version, boolean secure) {
+
+ // Create the server
+ SSLContext sslContext = secure ? SSL_CONTEXT : null;
+ HttpTestServer server = createServer(version, sslContext);
+ String serverName = secure ? version.toString().replaceFirst("_", "S_") : version.toString();
+
+ // Add the handler
+ String handlerPath = "/%s/".formatted(CLASS_NAME);
+ BlockingQueue serverReadRequestBodyBytes =
+ addRequestBodyConsumingServerHandler(serverName, server, handlerPath);
+
+ // Create the request builder
+ String requestUriScheme = secure ? "https" : "http";
+ // `x` suffix in the URI is not a typo, but ensures that *only* the parent handler path is matched
+ URI requestUri = URI.create("%s://%s%sx".formatted(requestUriScheme, server.serverAuthority(), handlerPath));
+ HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(requestUri).version(version);
+
+ // Create the pair
+ ServerRequestPair pair = new ServerRequestPair(serverName, server, serverReadRequestBodyBytes, requestBuilder, secure);
+ pair.server.start();
+ LOGGER.log("Server[%s] is started at `%s`", pair, server.serverAuthority());
+
+ return pair;
+
+ }
+
+ private static HttpTestServer createServer(Version version, SSLContext sslContext) {
+ try {
+ // The default HTTP/1.1 test server processes requests sequentially.
+ // This causes a deadlock for concurrent tests such as `testSlicedUpload()`.
+ // Hence, explicitly providing a multithreaded executor for HTTP/1.1.
+ ExecutorService executor = Version.HTTP_1_1.equals(version) ? EXECUTOR : null;
+ return HttpTestServer.create(version, sslContext, executor);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ private static BlockingQueue addRequestBodyConsumingServerHandler(
+ String serverName, HttpTestServer server, String handlerPath) {
+ BlockingQueue readRequestBodyBytes = new LinkedBlockingQueue<>();
+ HttpTestHandler handler = exchange -> {
+ // `HttpTestExchange::toString` changes on failure, pin it
+ String exchangeName = exchange.toString();
+ try (exchange) {
+
+ // Discard `HEAD` requests used for initial connection admission
+ if ("HEAD".equals(exchange.getRequestMethod())) {
+ exchange.sendResponseHeaders(200, -1L);
+ return;
+ }
+
+ signalServerRequestReceived(serverName, exchangeName);
+ awaitServerReadPermission(serverName, exchangeName);
+
+ LOGGER.log("Server[%s] is reading the request body (exchange=%s)", serverName, exchangeName);
+ byte[] requestBodyBytes = exchange.getRequestBody().readAllBytes();
+ LOGGER.log("Server[%s] has read %s bytes (exchange=%s)", serverName, requestBodyBytes.length, exchangeName);
+ readRequestBodyBytes.add(requestBodyBytes);
+
+ LOGGER.log("Server[%s] is writing the response (exchange=%s)", serverName, exchangeName);
+ exchange.sendResponseHeaders(200, requestBodyBytes.length);
+ exchange.getResponseBody().write(requestBodyBytes);
+
+ } catch (Exception exception) {
+ LOGGER.log(
+ "Server[%s] failed to process the request (exchange=%s)".formatted(serverName, exception),
+ exception);
+ readRequestBodyBytes.add(new byte[0]);
+ } finally {
+ LOGGER.log("Server[%s] completed processing the request (exchange=%s)", serverName, exchangeName);
+ }
+ };
+ server.addHandler(handler, handlerPath);
+ return readRequestBodyBytes;
+ }
+
+ private static void signalServerRequestReceived(String serverName, String exchangeName) {
+ if (SERVER_REQUEST_RECEIVED_SIGNAL != null) {
+ LOGGER.log("Server[%s] is signaling that the request is received (exchange=%s)", serverName, exchangeName);
+ SERVER_REQUEST_RECEIVED_SIGNAL.countDown();
+ }
+ }
+
+ private static void awaitServerReadPermission(String serverName, String exchangeName) {
+ if (SERVER_READ_PERMISSION != null) {
+ LOGGER.log("Server[%s] is waiting for the read permission (exchange=%s)", serverName, exchangeName);
+ try {
+ SERVER_READ_PERMISSION.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt(); // Restore the `interrupted` flag
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return serverName;
+ }
+
+ }
+
+ @AfterAll
+ static void shutDown() {
+ LOGGER.log("Closing the client");
+ CLIENT.close();
+ LOGGER.log("Closing servers");
+ closeServers();
+ LOGGER.log("Closing the executor");
+ EXECUTOR.shutdownNow();
+ }
+
+ private static void closeServers() {
+ Exception[] exceptionRef = {null};
+ Stream
+ .of(HTTP1, HTTPS1, HTTP2, HTTPS2)
+ .map(pair -> (Runnable) pair.server::stop)
+ .forEach(terminator -> {
+ try {
+ terminator.run();
+ } catch (Exception exception) {
+ if (exceptionRef[0] == null) {
+ exceptionRef[0] = exception;
+ } else {
+ exceptionRef[0].addSuppressed(exception);
+ }
+ }
+ });
+ if (exceptionRef[0] != null) {
+ throw new RuntimeException("failed closing one or more server resources", exceptionRef[0]);
+ }
+ }
+
+ /**
+ * Resets {@link ServerRequestPair#serverReadRequestBodyBytes()} to avoid leftover state from a test leaking to the next.
+ */
+ @BeforeEach
+ void resetServerHandlerResults() {
+ Stream
+ .of(HTTP1, HTTPS1, HTTP2, HTTPS2)
+ .forEach(pair -> pair.serverReadRequestBodyBytes.clear());
+ }
+
+ static ServerRequestPair[] serverRequestPairs() {
+ return new ServerRequestPair[]{
+ HTTP1,
+ HTTPS1,
+ HTTP2,
+ HTTPS2
+ };
+ }
+
+ @Test
+ void testNullFileChannel() {
+ assertThrows(NullPointerException.class, () -> ofFileChannel(null, 0, 1));
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "6,-1,1", // offset < 0
+ "6,7,1", // offset > fileSize
+ "6,0,-1", // length < 0
+ "6,0,7", // length > fileSize
+ "6,2,5" // (offset + length) > fileSize
+ })
+ void testIllegalOffset(
+ int fileLength,
+ int fileChannelOffset,
+ int fileChannelLength,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+ withFileChannel(tempDir.resolve("data.txt"), fileLength, (_, fileChannel) ->
+ assertThrows(
+ IndexOutOfBoundsException.class,
+ () -> ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testContentLessThanBufferSize(
+ ServerRequestPair pair,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+
+ int fileLength = 6;
+ assertTrue(fileLength < DEFAULT_BUFFER_SIZE);
+
+ testSuccessfulContentDelivery(
+ "Complete content",
+ pair, tempDir, fileLength, 0, fileLength);
+
+ {
+ int fileChannelOffset = 1;
+ int fileChannelLength = fileLength - 1;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertEquals(
+ fileLength - fileChannelOffset, fileChannelLength,
+ "must be until EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content until the EOF " + debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ {
+ int fileChannelOffset = 1;
+ int fileChannelLength = fileLength - 2;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertTrue(
+ fileLength - fileChannelOffset > fileChannelLength,
+ "must end before EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content *before* the EOF " + debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testContentMoreThanBufferSize(
+ ServerRequestPair pair,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+
+ int fileLength = 1 + 3 * DEFAULT_BUFFER_SIZE;
+
+ testSuccessfulContentDelivery(
+ "Complete content",
+ pair, tempDir, fileLength, 0, fileLength);
+
+ {
+ int fileChannelOffset = 1;
+ int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertEquals(
+ fileLength - fileChannelOffset, fileChannelLength,
+ "must be until EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content until the EOF. Occupies exactly 3 buffers. " + debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ {
+ int fileChannelOffset = 2;
+ int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 1;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertEquals(
+ fileLength - fileChannelOffset, fileChannelLength,
+ "must be until EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content until the EOF. Occupies 3 buffers, the last is custom sized. " + debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ {
+ int fileChannelOffset = 2;
+ int fileChannelLength = 2 * DEFAULT_BUFFER_SIZE;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertTrue(
+ fileLength - fileChannelOffset > fileChannelLength,
+ "must end before EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content *before* the EOF. Occupies exactly 2 buffers. " + debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ {
+ int fileChannelOffset = 2;
+ int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 2;
+ String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength);
+ assertTrue(
+ fileLength - fileChannelOffset > fileChannelLength,
+ "must end before EOF " + debuggingContext);
+ testSuccessfulContentDelivery(
+ "Partial content *before* the EOF. Occupies 3 buffers, the last is custom sized. "+ debuggingContext,
+ pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
+ }
+
+ }
+
+ private static String debuggingContext(int fileLength, int fileChannelOffset, int fileChannelLength) {
+ Map context = new LinkedHashMap<>(); // Using `LHM` to preserve the insertion order
+ context.put("DEFAULT_BUFFER_SIZE", DEFAULT_BUFFER_SIZE);
+ context.put("fileLength", fileLength);
+ context.put("fileChannelOffset", fileChannelOffset);
+ context.put("fileChannelLength", fileChannelLength);
+ boolean customSizedBuffer = fileChannelLength % DEFAULT_BUFFER_SIZE == 0;
+ context.put("customSizedBuffer", customSizedBuffer);
+ return context.toString();
+ }
+
+ private void testSuccessfulContentDelivery(
+ String caseDescription,
+ ServerRequestPair pair,
+ Path tempDir,
+ int fileLength,
+ int fileChannelOffset,
+ int fileChannelLength) throws Exception {
+
+ // Case names come handy even when no debug logging is enabled.
+ // Hence, intentionally avoiding `Logger`.
+ System.err.printf("Case: %s%n", caseDescription);
+
+ // Create the file to upload
+ String fileName = "data-%d-%d-%d.txt".formatted(fileLength, fileChannelOffset, fileChannelLength);
+ Path filePath = tempDir.resolve(fileName);
+ withFileChannel(filePath, fileLength, (fileBytes, fileChannel) -> {
+
+ // Upload the file
+ HttpRequest request = pair
+ .requestBuilder
+ .POST(ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))
+ .build();
+ CLIENT.send(request, discarding());
+
+ // Verify the received request body
+ byte[] expectedRequestBodyBytes = new byte[fileChannelLength];
+ System.arraycopy(fileBytes, fileChannelOffset, expectedRequestBodyBytes, 0, fileChannelLength);
+ byte[] actualRequestBodyBytes = pair.serverReadRequestBodyBytes.take();
+ assertArrayEquals(expectedRequestBodyBytes, actualRequestBodyBytes);
+
+ });
+
+ }
+
+ /**
+ * Big enough file length to observe the effects of publisher state corruption while uploading.
+ *
+ * Certain tests follow below steps:
+ *
+ *
+ * - Issue the request
+ * - Wait for the server's signal that the request (not the body!) is received
+ * - Corrupt the publisher's state; modify the file, close the file channel, etc.
+ * - Signal the server to proceed with reading
+ *
+ *
+ * With small files, even before we permit the server to read (step 4), file gets already uploaded.
+ * This voids the effect of state corruption (step 3).
+ * To circumvent this, use this big enough file size.
+ *
+ *
+ * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path)
+ * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path)
+ */
+ private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testChannelCloseDuringPublisherRead(
+ ServerRequestPair pair,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
+ throws Exception {
+ establishInitialConnection(pair);
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
+ ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
+ try {
+
+ int fileLength = BIG_FILE_LENGTH;
+ AtomicReference>> responseFutureRef = new AtomicReference<>();
+ withFileChannel(tempDir.resolve("data.txt"), fileLength, ((_, fileChannel) -> {
+
+ // Issue the request
+ LOGGER.log("Issuing the request");
+ HttpRequest request = pair
+ .requestBuilder
+ .POST(ofFileChannel(fileChannel, 0, fileLength))
+ .build();
+ responseFutureRef.set(CLIENT.sendAsync(request, discarding()));
+
+ // Wait for server to receive the request
+ LOGGER.log("Waiting for the request to be received");
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL.await();
+
+ }));
+
+ LOGGER.log("File channel is closed");
+
+ // Let the server proceed
+ LOGGER.log("Permitting the server to proceed");
+ ServerRequestPair.SERVER_READ_PERMISSION.countDown();
+
+ // Verifying the client failure
+ LOGGER.log("Verifying the client failure");
+ Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
+ assertInstanceOf(UncheckedIOException.class, requestFailure.getCause());
+ assertInstanceOf(ClosedChannelException.class, requestFailure.getCause().getCause());
+
+ verifyServerIncompleteRead(pair, fileLength);
+
+ } finally {
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = null;
+ ServerRequestPair.SERVER_READ_PERMISSION = null;
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ // On Windows, modification while reading is not possible.
+ // Recall the infamous `The process cannot access the file because it is being used by another process`.
+ @DisabledOnOs(OS.WINDOWS)
+ void testFileModificationDuringPublisherRead(
+ ServerRequestPair pair,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
+ throws Exception {
+ establishInitialConnection(pair);
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
+ ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
+ try {
+
+ int fileLength = BIG_FILE_LENGTH;
+ Path filePath = tempDir.resolve("data.txt");
+ withFileChannel(filePath, fileLength, ((_, fileChannel) -> {
+
+ // Issue the request
+ LOGGER.log("Issuing the request");
+ HttpRequest request = pair
+ .requestBuilder
+ .POST(ofFileChannel(fileChannel, 0, fileLength))
+ .build();
+ CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding());
+
+ // Wait for server to receive the request
+ LOGGER.log("Waiting for the request to be received");
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL.await();
+
+ // Modify the file
+ LOGGER.log("Modifying the file");
+ Files.write(filePath, generateFileBytes(1));
+
+ // Let the server proceed
+ LOGGER.log("Permitting the server to proceed");
+ ServerRequestPair.SERVER_READ_PERMISSION.countDown();
+
+ // Verifying the client failure
+ LOGGER.log("Verifying the client failure");
+ Exception requestFailure = assertThrows(ExecutionException.class, responseFuture::get);
+ String requestFailureMessage = requestFailure.getMessage();
+ assertTrue(
+ requestFailureMessage.contains("Unexpected EOF"),
+ "unexpected message: " + requestFailureMessage);
+
+ verifyServerIncompleteRead(pair, fileLength);
+
+ }));
+
+ } finally {
+ ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = null;
+ ServerRequestPair.SERVER_READ_PERMISSION = null;
+ }
+ }
+
+ private static void verifyServerIncompleteRead(ServerRequestPair pair, int fileLength) throws InterruptedException {
+ LOGGER.log("Verifying the server's incomplete read");
+ byte[] readRequestBodyBytes = pair.serverReadRequestBodyBytes.take();
+ assertTrue(
+ readRequestBodyBytes.length < fileLength,
+ "was expecting `readRequestBodyBytes < fileLength` (%s < %s)".formatted(
+ readRequestBodyBytes.length, fileLength));
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testSlicedUpload(
+ ServerRequestPair pair,
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
+ throws Exception {
+
+ // Populate the file
+ int sliceCount = 4;
+ int sliceLength = 14_281; // Intentionally using a prime number to increase the chances of hitting corner cases
+ int fileLength = sliceCount * sliceLength;
+ byte[] fileBytes = generateFileBytes(fileLength);
+ Path filePath = tempDir.resolve("data.txt");
+ Files.write(filePath, fileBytes, StandardOpenOption.CREATE);
+
+ List responseBodyStreams = new ArrayList<>(sliceCount);
+ try (FileChannel fileChannel = FileChannel.open(filePath)) {
+
+ // Upload the complete file in mutually exclusive slices
+ List>> responseFutures = new ArrayList<>(sliceCount);
+ for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) {
+ LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount);
+ HttpRequest request = pair
+ .requestBuilder
+ .POST(ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength))
+ .build();
+ responseFutures.add(CLIENT.sendAsync(
+ request,
+ // Intentionally using an `InputStream` response
+ // handler to defer consuming the response body
+ // until after the file channel is closed:
+ ofInputStream()));
+ }
+
+ // Collect response body `InputStream`s from all requests
+ for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) {
+ LOGGER.log("Collecting response body `InputStream` for request %d/%d", (sliceIndex + 1), sliceCount);
+ HttpResponse response = responseFutures.get(sliceIndex).get();
+ assertEquals(200, response.statusCode());
+ responseBodyStreams.add(response.body());
+ }
+
+ }
+
+ LOGGER.log("File channel is closed");
+
+ // Verify response bodies
+ for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) {
+ LOGGER.log("Consuming response body %d/%d", (sliceIndex + 1), sliceCount);
+ byte[] expectedResponseBodyBytes = new byte[sliceLength];
+ System.arraycopy(fileBytes, sliceIndex * sliceLength, expectedResponseBodyBytes, 0, sliceLength);
+ try (InputStream responseBodyStream = responseBodyStreams.get(sliceIndex)) {
+ byte[] responseBodyBytes = responseBodyStream.readAllBytes();
+ assertArrayEquals(expectedResponseBodyBytes, responseBodyBytes);
+ }
+ }
+
+ }
+
+ /**
+ * Performs the initial {@code HEAD} request to the specified server. This
+ * effectively admits a connection to the client's pool, where all protocol
+ * upgrades, handshakes, etc. are already performed.
+ *
+ * HTTP/2 test server consumes the complete request payload in the very
+ * first upgrade frame. That is, if a client sends 100 MiB of data, all
+ * of it will be consumed first before the configured handler is
+ * invoked. Though certain tests expect the data to be consumed
+ * piecemeal. To accommodate this, we ensure client has an upgraded
+ * connection in the pool.
+ *
+ */
+ private static void establishInitialConnection(ServerRequestPair pair) {
+ LOGGER.log("Server[%s] is getting queried for the initial connection pool admission", pair);
+ try {
+ CLIENT.send(pair.requestBuilder.HEAD().build(), discarding());
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+
+ private static void withFileChannel(Path filePath, int fileLength, FileChannelConsumer fileChannelConsumer) throws Exception {
+ byte[] fileBytes = generateFileBytes(fileLength);
+ Files.write(filePath, fileBytes, StandardOpenOption.CREATE);
+ try (FileChannel fileChannel = FileChannel.open(filePath)) {
+ fileChannelConsumer.consume(fileBytes, fileChannel);
+ }
+ }
+
+ @FunctionalInterface
+ private interface FileChannelConsumer {
+
+ void consume(byte[] fileBytes, FileChannel fileChannel) throws Exception;
+
+ }
+
+ private static byte[] generateFileBytes(int length) {
+ byte[] bytes = new byte[length];
+ for (int i = 0; i < length; i++) {
+ bytes[i] = (byte) i;
+ }
+ return bytes;
+ }
+
+}
From 0eda56f18a251d58c30d3fa940998d177d256d9c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Mon, 7 Jul 2025 11:27:07 +0200
Subject: [PATCH 02/14] Implement review feedback
---
.../share/classes/java/net/http/HttpRequest.java | 2 +-
.../classes/jdk/internal/net/http/RequestPublishers.java | 4 +---
.../share/classes/jdk/internal/net/http/common/Utils.java | 4 ++--
3 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
index fd0af07697b15..f6a93ff26a87c 100644
--- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java
+++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
@@ -736,7 +736,7 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
*
* @param channel a file channel
* @param offset the offset of the first byte
- * @param length the number of bytes to use
+ * @param length the number of bytes to read from the file channel
*
* @throws IndexOutOfBoundsException if the specified byte range is
* found to be out of bounds compared with the size of the file
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
index 0572826eecfa0..e90acdc609df1 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
@@ -483,9 +483,7 @@ public synchronized ByteBuffer next() {
throw new NoSuchElementException();
}
long remaining = limit - position;
- ByteBuffer buffer = Utils.BUFSIZE > remaining
- ? Utils.getBufferWithAtMost((int) remaining)
- : Utils.getBuffer();
+ ByteBuffer buffer = Utils.getBufferWithAtMost(remaining);
try {
int readLength = channel.read(buffer, position);
// Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
index 2785ee849a381..ea4e429f40209 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
@@ -383,13 +383,13 @@ public static ByteBuffer getBuffer() {
* @param maxCapacity a buffer capacity, in bytes
* @throws IllegalArgumentException if {@code capacity < 0}
*/
- public static ByteBuffer getBufferWithAtMost(int maxCapacity) {
+ public static ByteBuffer getBufferWithAtMost(long maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException(
// Match the message produced by `ByteBuffer::createCapacityException`
"capacity < 0: (%s < 0)".formatted(maxCapacity));
}
- int effectiveCapacity = Math.clamp(maxCapacity, 0, BUFSIZE);
+ int effectiveCapacity = (int) Math.min(maxCapacity, BUFSIZE);
return ByteBuffer.allocate(effectiveCapacity);
}
From 3af61c5f015db200c12cc6b22e4c6a615f8e94a6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Mon, 7 Jul 2025 12:39:24 +0200
Subject: [PATCH 03/14] Improve docs on `IndexOutOfBoundsException` thrown
Co-authored-by: Daniel Fuchs <67001856+dfuch@users.noreply.github.com>
---
.../share/classes/java/net/http/HttpRequest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
index f6a93ff26a87c..61366b4a695d6 100644
--- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java
+++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
@@ -739,8 +739,8 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
* @param length the number of bytes to read from the file channel
*
* @throws IndexOutOfBoundsException if the specified byte range is
- * found to be out of bounds compared with the size of the file
- * referred by the channel
+ * found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds}
+ * compared with the size of the file referred by the channel
*/
public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) {
Objects.requireNonNull(channel, "channel");
From 4a9e303bfcc03ca387c9fdab23e6e6449ba698cb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:14:08 +0200
Subject: [PATCH 04/14] Add `@since 26` to `ofFileChannel`
---
src/java.net.http/share/classes/java/net/http/HttpRequest.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
index 61366b4a695d6..0c3b0899f5974 100644
--- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java
+++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
@@ -741,6 +741,7 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
* @throws IndexOutOfBoundsException if the specified byte range is
* found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds}
* compared with the size of the file referred by the channel
+ * @since 26
*/
public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) {
Objects.requireNonNull(channel, "channel");
From 8f66a32ec8b1b224bbc91827c17d9b0341bdf047 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:17:21 +0200
Subject: [PATCH 05/14] Improve exception handling and documentation for
`ofFileChannel`
---
.../share/classes/java/net/http/HttpRequest.java | 14 +++++++++++---
.../jdk/internal/net/http/RequestPublishers.java | 12 ++----------
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
index 0c3b0899f5974..3e2a7006acae4 100644
--- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java
+++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java
@@ -26,6 +26,7 @@
package java.net.http;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -739,11 +740,18 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
* @param length the number of bytes to read from the file channel
*
* @throws IndexOutOfBoundsException if the specified byte range is
- * found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds}
- * compared with the size of the file referred by the channel
+ * found to be {@linkplain Objects#checkFromIndexSize(long, long, long)
+ * out of bounds} compared with the size of the file referred by the
+ * channel
+ *
+ * @throws IOException if the size of the file referred by the provided
+ * channel cannot be read while verifying the specified byte range
+ *
+ * @throws NullPointerException if {@code channel} is null
+ *
* @since 26
*/
- public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) {
+ public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) throws IOException {
Objects.requireNonNull(channel, "channel");
return new RequestPublishers.FileChannelPublisher(channel, offset, length);
}
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
index e90acdc609df1..b9a25f60de97f 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
@@ -427,22 +427,14 @@ public static final class FileChannelPublisher implements BodyPublisher {
private final long limit;
- public FileChannelPublisher(FileChannel channel, long offset, long length) {
+ public FileChannelPublisher(FileChannel channel, long offset, long length) throws IOException {
this.channel = Objects.requireNonNull(channel, "channel");
- long fileSize = fileSize(channel);
+ long fileSize = channel.size();
Objects.checkFromIndexSize(offset, length, fileSize);
this.position = offset;
this.limit = offset + length;
}
- private static long fileSize(FileChannel channel) {
- try {
- return channel.size();
- } catch (IOException ioe) {
- throw new UncheckedIOException(ioe);
- }
- }
-
@Override
public long contentLength() {
return limit - position;
From b114fe9598e9e5dc38159339b30fb01ff514712c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:19:34 +0200
Subject: [PATCH 06/14] Remove synchronization for `FileChannelIterator`
---
.../classes/jdk/internal/net/http/RequestPublishers.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
index b9a25f60de97f..45a675b33b389 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
@@ -465,12 +465,12 @@ private FileChannelIterator(FileChannel channel, long position, long limit) {
}
@Override
- public synchronized boolean hasNext() {
+ public boolean hasNext() {
return position < limit && !terminated;
}
@Override
- public synchronized ByteBuffer next() {
+ public ByteBuffer next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
From 183da9d0235beba6e14c2aaaca5eb00afc55dea7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:30:10 +0200
Subject: [PATCH 07/14] Improve wording for signaling request cancellation
---
.../share/classes/jdk/internal/net/http/RequestPublishers.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
index 45a675b33b389..88cabe154193a 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
@@ -480,8 +480,7 @@ public ByteBuffer next() {
int readLength = channel.read(buffer, position);
// Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime
if (readLength < 0) {
- // We *must* throw to signal that the request needs to be cancelled.
- // Otherwise, the server will continue waiting data.
+ // Throw to signal that the request needs to be cancelled
throw new IOException("Unexpected EOF (position=%s)".formatted(position));
} else {
position += readLength;
From eacd5da23bb762033985f40c45d708c6224dc6ce Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:38:18 +0200
Subject: [PATCH 08/14] Link to `BUFSIZE` in `Utils::getBuffer` Javadoc
---
.../share/classes/jdk/internal/net/http/common/Utils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
index ea4e429f40209..96e49e95fa9f1 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
@@ -369,7 +369,7 @@ public static IllegalArgumentException newIAE(String message, Object... args) {
}
/**
- * {@return a new {@link ByteBuffer} instance of configured capacity for the HTTP Client}
+ * {@return a new {@link ByteBuffer} instance of {@link #BUFSIZE} capacity}
*/
public static ByteBuffer getBuffer() {
return ByteBuffer.allocate(BUFSIZE);
From da69840639ec9fda2a78855cd1c2193a3640896f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Fri, 18 Jul 2025 11:39:11 +0200
Subject: [PATCH 09/14] Fix typo in `Utils::getBufferWithAtMost` Javadoc
---
.../share/classes/jdk/internal/net/http/common/Utils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
index 96e49e95fa9f1..e5e66b9af38bb 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
@@ -381,7 +381,7 @@ public static ByteBuffer getBuffer() {
* ({@value BUFSIZE})}
*
* @param maxCapacity a buffer capacity, in bytes
- * @throws IllegalArgumentException if {@code capacity < 0}
+ * @throws IllegalArgumentException if {@code maxCapacity < 0}
*/
public static ByteBuffer getBufferWithAtMost(long maxCapacity) {
if (maxCapacity < 0) {
From 99503c0bac77a22437b6f793832eb20f4752232a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Wed, 30 Jul 2025 10:59:01 +0200
Subject: [PATCH 10/14] Apply review feedback for `FileChannelPublisherTest`
---
.../httpclient/FileChannelPublisherTest.java | 99 +++++++++++--------
1 file changed, 58 insertions(+), 41 deletions(-)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
index 7350e66b52cb4..358bc260ec7f6 100644
--- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -39,9 +39,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-import org.junit.jupiter.api.io.CleanupMode;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -55,6 +52,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
@@ -76,7 +74,6 @@
import java.util.stream.Stream;
import static java.net.http.HttpClient.Builder.NO_PROXY;
-import static java.net.http.HttpRequest.BodyPublishers.ofFileChannel;
import static java.net.http.HttpResponse.BodyHandlers.discarding;
import static java.net.http.HttpResponse.BodyHandlers.ofInputStream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -191,7 +188,7 @@ private static BlockingQueue addRequestBodyConsumingServerHandler(
exchange.sendResponseHeaders(200, requestBodyBytes.length);
exchange.getResponseBody().write(requestBodyBytes);
- } catch (Exception exception) {
+ } catch (Throwable exception) {
LOGGER.log(
"Server[%s] failed to process the request (exchange=%s)".formatted(serverName, exception),
exception);
@@ -282,7 +279,7 @@ static ServerRequestPair[] serverRequestPairs() {
@Test
void testNullFileChannel() {
- assertThrows(NullPointerException.class, () -> ofFileChannel(null, 0, 1));
+ assertThrows(NullPointerException.class, () -> BodyPublishers.ofFileChannel(null, 0, 1));
}
@ParameterizedTest
@@ -293,30 +290,38 @@ void testNullFileChannel() {
"6,0,7", // length > fileSize
"6,2,5" // (offset + length) > fileSize
})
- void testIllegalOffset(
+ void testIllegalOffsetOrLength(
int fileLength,
int fileChannelOffset,
int fileChannelLength,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+ @TempDir Path tempDir) throws Exception {
withFileChannel(tempDir.resolve("data.txt"), fileLength, (_, fileChannel) ->
assertThrows(
IndexOutOfBoundsException.class,
- () -> ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)));
+ () -> BodyPublishers.ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)));
}
+ /**
+ * Stresses corner cases in {@linkplain
+ * BodyPublishers#ofFileChannel(FileChannel, long, long) the file channel
+ * publisher}, which uses a {@linkplain #DEFAULT_BUFFER_SIZE fixed size}
+ * buffer to read files, by providing sub-ranges and files that are
+ * smaller than the buffer size.
+ */
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testContentLessThanBufferSize(
- ServerRequestPair pair,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+ void testContentLessThanBufferSize(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ // Use a file of length smaller than the default buffer size
int fileLength = 6;
assertTrue(fileLength < DEFAULT_BUFFER_SIZE);
+ // Publish the `[0, fileLength)` sub-range
testSuccessfulContentDelivery(
"Complete content",
pair, tempDir, fileLength, 0, fileLength);
+ // Publish the `[1, fileLength)` sub-range to stress the inclusion of EOF
{
int fileChannelOffset = 1;
int fileChannelLength = fileLength - 1;
@@ -329,6 +334,7 @@ void testContentLessThanBufferSize(
pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
}
+ // Publish the `[1, fileLength - 1)` sub-range to stress the exclusion of EOF
{
int fileChannelOffset = 1;
int fileChannelLength = fileLength - 2;
@@ -343,18 +349,30 @@ void testContentLessThanBufferSize(
}
+ /**
+ * Stresses corner cases in {@linkplain
+ * BodyPublishers#ofFileChannel(FileChannel, long, long) the file channel
+ * publisher}, which uses a {@linkplain #DEFAULT_BUFFER_SIZE fixed size}
+ * buffer to read files, by providing sub-ranges and files that are
+ * bigger than the buffer size.
+ */
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testContentMoreThanBufferSize(
- ServerRequestPair pair,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception {
+ void testContentMoreThanBufferSize(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ // Use a file of length that is
+ // 1. greater than the default buffer size
+ // 2. *not* a multitude of the buffer size
int fileLength = 1 + 3 * DEFAULT_BUFFER_SIZE;
+ // Publish the `[0, fileLength)` sub-range
testSuccessfulContentDelivery(
"Complete content",
pair, tempDir, fileLength, 0, fileLength);
+ // Publish the `[1, fileLength)` sub-range such that
+ // - EOF is included
+ // - the total length is a multitude of the buffer size
{
int fileChannelOffset = 1;
int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE;
@@ -367,6 +385,9 @@ void testContentMoreThanBufferSize(
pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
}
+ // Publish the `[1, fileLength)` sub-range such that
+ // - EOF is included
+ // - the total length is *not* a multitude of the buffer size
{
int fileChannelOffset = 2;
int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 1;
@@ -379,6 +400,9 @@ void testContentMoreThanBufferSize(
pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
}
+ // Publish the `[1, fileLength)` sub-range such that
+ // - EOF is *not* included
+ // - the total length is a multitude of the buffer size
{
int fileChannelOffset = 2;
int fileChannelLength = 2 * DEFAULT_BUFFER_SIZE;
@@ -391,6 +415,9 @@ void testContentMoreThanBufferSize(
pair, tempDir, fileLength, fileChannelOffset, fileChannelLength);
}
+ // Publish the `[1, fileLength)` sub-range such that
+ // - EOF is *not* included
+ // - the total length is *not* a multitude of the buffer size
{
int fileChannelOffset = 2;
int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 2;
@@ -436,7 +463,7 @@ private void testSuccessfulContentDelivery(
// Upload the file
HttpRequest request = pair
.requestBuilder
- .POST(ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))
+ .POST(BodyPublishers.ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))
.build();
CLIENT.send(request, discarding());
@@ -474,10 +501,7 @@ private void testSuccessfulContentDelivery(
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testChannelCloseDuringPublisherRead(
- ServerRequestPair pair,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
- throws Exception {
+ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -491,7 +515,7 @@ void testChannelCloseDuringPublisherRead(
LOGGER.log("Issuing the request");
HttpRequest request = pair
.requestBuilder
- .POST(ofFileChannel(fileChannel, 0, fileLength))
+ .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
responseFutureRef.set(CLIENT.sendAsync(request, discarding()));
@@ -509,9 +533,9 @@ void testChannelCloseDuringPublisherRead(
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
- assertInstanceOf(UncheckedIOException.class, requestFailure.getCause());
- assertInstanceOf(ClosedChannelException.class, requestFailure.getCause().getCause());
+ Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
verifyServerIncompleteRead(pair, fileLength);
@@ -523,13 +547,7 @@ void testChannelCloseDuringPublisherRead(
@ParameterizedTest
@MethodSource("serverRequestPairs")
- // On Windows, modification while reading is not possible.
- // Recall the infamous `The process cannot access the file because it is being used by another process`.
- @DisabledOnOs(OS.WINDOWS)
- void testFileModificationDuringPublisherRead(
- ServerRequestPair pair,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
- throws Exception {
+ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -543,7 +561,7 @@ void testFileModificationDuringPublisherRead(
LOGGER.log("Issuing the request");
HttpRequest request = pair
.requestBuilder
- .POST(ofFileChannel(fileChannel, 0, fileLength))
+ .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding());
@@ -561,11 +579,13 @@ void testFileModificationDuringPublisherRead(
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- Exception requestFailure = assertThrows(ExecutionException.class, responseFuture::get);
- String requestFailureMessage = requestFailure.getMessage();
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
+ String requestFailure2Message = requestFailure2.getMessage();
assertTrue(
- requestFailureMessage.contains("Unexpected EOF"),
- "unexpected message: " + requestFailureMessage);
+ requestFailure2Message.contains("Unexpected EOF"),
+ "unexpected message: " + requestFailure2Message);
verifyServerIncompleteRead(pair, fileLength);
@@ -588,10 +608,7 @@ private static void verifyServerIncompleteRead(ServerRequestPair pair, int fileL
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testSlicedUpload(
- ServerRequestPair pair,
- @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir)
- throws Exception {
+ void testSlicedUpload(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
// Populate the file
int sliceCount = 4;
@@ -610,7 +627,7 @@ void testSlicedUpload(
LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount);
HttpRequest request = pair
.requestBuilder
- .POST(ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength))
+ .POST(BodyPublishers.ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength))
.build();
responseFutures.add(CLIENT.sendAsync(
request,
From 8699b83bbe34b87bbdc96ffaa44bd0b48f0ef7fa Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Thu, 31 Jul 2025 08:00:03 +0200
Subject: [PATCH 11/14] Replace usage of `CompletableFuture` with `Future`
---
.../jdk/java/net/httpclient/FileChannelPublisherTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
index 358bc260ec7f6..e6a59e588fe59 100644
--- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -64,11 +64,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@@ -508,7 +508,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t
try {
int fileLength = BIG_FILE_LENGTH;
- AtomicReference>> responseFutureRef = new AtomicReference<>();
+ AtomicReference>> responseFutureRef = new AtomicReference<>();
withFileChannel(tempDir.resolve("data.txt"), fileLength, ((_, fileChannel) -> {
// Issue the request
@@ -563,7 +563,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa
.requestBuilder
.POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
- CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding());
+ Future> responseFuture = CLIENT.sendAsync(request, discarding());
// Wait for server to receive the request
LOGGER.log("Waiting for the request to be received");
@@ -622,7 +622,7 @@ void testSlicedUpload(ServerRequestPair pair, @TempDir Path tempDir) throws Exce
try (FileChannel fileChannel = FileChannel.open(filePath)) {
// Upload the complete file in mutually exclusive slices
- List>> responseFutures = new ArrayList<>(sliceCount);
+ List>> responseFutures = new ArrayList<>(sliceCount);
for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) {
LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount);
HttpRequest request = pair
From faa5d24d831ddf21fcc0ecd6572c7ad3bd234929 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Tue, 5 Aug 2025 12:30:16 +0200
Subject: [PATCH 12/14] Verify exceptions using both `send()` and `sendAsync()`
---
.../httpclient/FileChannelPublisherTest.java | 112 +++++++++++++++---
1 file changed, 96 insertions(+), 16 deletions(-)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
index e6a59e588fe59..c92ceb39f56fe 100644
--- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -71,6 +71,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static java.net.http.HttpClient.Builder.NO_PROXY;
@@ -494,14 +495,41 @@ private void testSuccessfulContentDelivery(
* To circumvent this, use this big enough file size.
*
*
- * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path)
- * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path)
+ * @see #testChannelCloseDuringPublisherReadAsync(ServerRequestPair, Path)
+ * @see #testChannelCloseDuringPublisherReadSync(ServerRequestPair, Path)
+ * @see #testFileModificationDuringPublisherReadAsync(ServerRequestPair, Path)
+ * @see #testFileModificationDuringPublisherReadSync(ServerRequestPair, Path)
*/
private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ void testChannelCloseDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> {
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testChannelCloseDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> {
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause());
+ Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
+ Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause());
+ assertInstanceOf(ClosedChannelException.class, requestFailure3.getCause());
+ });
+ }
+
+ private static void testChannelCloseDuringPublisherRead(
+ ServerRequestPair pair,
+ Path tempDir,
+ // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()`
+ Requestor requestor,
+ Consumer>> responseVerifier) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -517,7 +545,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t
.requestBuilder
.POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
- responseFutureRef.set(CLIENT.sendAsync(request, discarding()));
+ responseFutureRef.set(requestor.request(request, discarding()));
// Wait for server to receive the request
LOGGER.log("Waiting for the request to be received");
@@ -533,9 +561,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
- Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
- assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
+ responseVerifier.accept(responseFutureRef.get());
verifyServerIncompleteRead(pair, fileLength);
@@ -547,7 +573,40 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ void testFileModificationDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ testFileModificationDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> {
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
+ String requestFailure2Message = requestFailure2.getMessage();
+ assertTrue(
+ requestFailure2Message.contains("Unexpected EOF"),
+ "unexpected message: " + requestFailure2Message);
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("serverRequestPairs")
+ void testFileModificationDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
+ testFileModificationDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> {
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause());
+ Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
+ Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause());
+ Exception requestFailure4 = assertInstanceOf(IOException.class, requestFailure3.getCause());
+ String requestFailure4Message = requestFailure4.getMessage();
+ assertTrue(
+ requestFailure4Message.contains("Unexpected EOF"),
+ "unexpected message: " + requestFailure4Message);
+ });
+ }
+
+ private static void testFileModificationDuringPublisherRead(
+ ServerRequestPair pair,
+ Path tempDir,
+ // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()`
+ Requestor requestor,
+ Consumer>> responseVerifier) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -563,7 +622,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa
.requestBuilder
.POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
- Future> responseFuture = CLIENT.sendAsync(request, discarding());
+ Future> responseFuture = requestor.request(request, discarding());
// Wait for server to receive the request
LOGGER.log("Waiting for the request to be received");
@@ -579,13 +638,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
- Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
- Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
- String requestFailure2Message = requestFailure2.getMessage();
- assertTrue(
- requestFailure2Message.contains("Unexpected EOF"),
- "unexpected message: " + requestFailure2Message);
+ responseVerifier.accept(responseFuture);
verifyServerIncompleteRead(pair, fileLength);
@@ -707,4 +760,31 @@ private static byte[] generateFileBytes(int length) {
return bytes;
}
+ @FunctionalInterface
+ private interface Requestor {
+
+ Requestor ASYNC = CLIENT::sendAsync;
+
+ Requestor SYNC = new Requestor() {
+ @Override
+ public Future> request(
+ HttpRequest request,
+ HttpResponse.BodyHandler responseHandler) {
+ return EXECUTOR.submit(() -> {
+ try {
+ return CLIENT.send(request, responseHandler);
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt(); // Restore the interrupt
+ }
+ throw new RuntimeException(t);
+ }
+ });
+ }
+ };
+
+ Future> request(HttpRequest request, HttpResponse.BodyHandler responseHandler);
+
+ }
+
}
From c8a0257a35475383ca4e7d26e13931ec47d201c0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Tue, 5 Aug 2025 13:19:42 +0200
Subject: [PATCH 13/14] Revert the last commit: faa5d24d831
The `send()`-vs-`sendAsync()` discrepancy will be addressed separately.
---
.../httpclient/FileChannelPublisherTest.java | 112 +++---------------
1 file changed, 16 insertions(+), 96 deletions(-)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
index c92ceb39f56fe..e6a59e588fe59 100644
--- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -71,7 +71,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.stream.Stream;
import static java.net.http.HttpClient.Builder.NO_PROXY;
@@ -495,41 +494,14 @@ private void testSuccessfulContentDelivery(
* To circumvent this, use this big enough file size.
*
*
- * @see #testChannelCloseDuringPublisherReadAsync(ServerRequestPair, Path)
- * @see #testChannelCloseDuringPublisherReadSync(ServerRequestPair, Path)
- * @see #testFileModificationDuringPublisherReadAsync(ServerRequestPair, Path)
- * @see #testFileModificationDuringPublisherReadSync(ServerRequestPair, Path)
+ * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path)
+ * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path)
*/
private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testChannelCloseDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
- testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> {
- Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
- Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
- assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
- });
- }
-
- @ParameterizedTest
- @MethodSource("serverRequestPairs")
- void testChannelCloseDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
- testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> {
- Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
- Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause());
- Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
- Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause());
- assertInstanceOf(ClosedChannelException.class, requestFailure3.getCause());
- });
- }
-
- private static void testChannelCloseDuringPublisherRead(
- ServerRequestPair pair,
- Path tempDir,
- // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()`
- Requestor requestor,
- Consumer>> responseVerifier) throws Exception {
+ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -545,7 +517,7 @@ private static void testChannelCloseDuringPublisherRead(
.requestBuilder
.POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
- responseFutureRef.set(requestor.request(request, discarding()));
+ responseFutureRef.set(CLIENT.sendAsync(request, discarding()));
// Wait for server to receive the request
LOGGER.log("Waiting for the request to be received");
@@ -561,7 +533,9 @@ private static void testChannelCloseDuringPublisherRead(
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- responseVerifier.accept(responseFutureRef.get());
+ Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
verifyServerIncompleteRead(pair, fileLength);
@@ -573,40 +547,7 @@ private static void testChannelCloseDuringPublisherRead(
@ParameterizedTest
@MethodSource("serverRequestPairs")
- void testFileModificationDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
- testFileModificationDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> {
- Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
- Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
- Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
- String requestFailure2Message = requestFailure2.getMessage();
- assertTrue(
- requestFailure2Message.contains("Unexpected EOF"),
- "unexpected message: " + requestFailure2Message);
- });
- }
-
- @ParameterizedTest
- @MethodSource("serverRequestPairs")
- void testFileModificationDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
- testFileModificationDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> {
- Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
- Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause());
- Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
- Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause());
- Exception requestFailure4 = assertInstanceOf(IOException.class, requestFailure3.getCause());
- String requestFailure4Message = requestFailure4.getMessage();
- assertTrue(
- requestFailure4Message.contains("Unexpected EOF"),
- "unexpected message: " + requestFailure4Message);
- });
- }
-
- private static void testFileModificationDuringPublisherRead(
- ServerRequestPair pair,
- Path tempDir,
- // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()`
- Requestor requestor,
- Consumer>> responseVerifier) throws Exception {
+ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception {
establishInitialConnection(pair);
ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1);
ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1);
@@ -622,7 +563,7 @@ private static void testFileModificationDuringPublisherRead(
.requestBuilder
.POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength))
.build();
- Future> responseFuture = requestor.request(request, discarding());
+ Future> responseFuture = CLIENT.sendAsync(request, discarding());
// Wait for server to receive the request
LOGGER.log("Waiting for the request to be received");
@@ -638,7 +579,13 @@ private static void testFileModificationDuringPublisherRead(
// Verifying the client failure
LOGGER.log("Verifying the client failure");
- responseVerifier.accept(responseFuture);
+ Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
+ Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
+ Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
+ String requestFailure2Message = requestFailure2.getMessage();
+ assertTrue(
+ requestFailure2Message.contains("Unexpected EOF"),
+ "unexpected message: " + requestFailure2Message);
verifyServerIncompleteRead(pair, fileLength);
@@ -760,31 +707,4 @@ private static byte[] generateFileBytes(int length) {
return bytes;
}
- @FunctionalInterface
- private interface Requestor {
-
- Requestor ASYNC = CLIENT::sendAsync;
-
- Requestor SYNC = new Requestor() {
- @Override
- public Future> request(
- HttpRequest request,
- HttpResponse.BodyHandler responseHandler) {
- return EXECUTOR.submit(() -> {
- try {
- return CLIENT.send(request, responseHandler);
- } catch (Throwable t) {
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt(); // Restore the interrupt
- }
- throw new RuntimeException(t);
- }
- });
- }
- };
-
- Future> request(HttpRequest request, HttpResponse.BodyHandler responseHandler);
-
- }
-
}
From 91d3422ce3a07165b30c27a73ed9525eee7f88ba Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?=
Date: Mon, 11 Aug 2025 09:33:16 +0200
Subject: [PATCH 14/14] Improve style for declaring multiple consecutive fields
of the same type
---
.../net/httpclient/FileChannelPublisherTest.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
index e6a59e588fe59..5b064efa07810 100644
--- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
+++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java
@@ -96,13 +96,11 @@ class FileChannelPublisherTest {
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
- private static final ServerRequestPair HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false);
-
- private static final ServerRequestPair HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true);
-
- private static final ServerRequestPair HTTP2 = ServerRequestPair.of(Version.HTTP_2, false);
-
- private static final ServerRequestPair HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true);
+ private static final ServerRequestPair
+ HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false),
+ HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true),
+ HTTP2 = ServerRequestPair.of(Version.HTTP_2, false),
+ HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true);
private static SSLContext createSslContext() {
try {