From bdeb999b8bde6098a5c3e653e3bc248e61208af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Tue, 17 Jun 2025 11:08:43 +0200 Subject: [PATCH 01/14] Add `HttpRequest.BodyPublishers::ofFileChannel` along with tests --- .../classes/java/net/http/HttpRequest.java | 33 +- .../internal/net/http/RequestPublishers.java | 91 ++- .../jdk/internal/net/http/common/Utils.java | 22 + .../httpclient/FileChannelPublisherTest.java | 693 ++++++++++++++++++ 4 files changed, 834 insertions(+), 5 deletions(-) create mode 100644 test/jdk/java/net/httpclient/FileChannelPublisherTest.java diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java index 7ba6ed25b41e3..fd0af07697b15 100644 --- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java +++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -29,10 +29,9 @@ import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.OpenOption; import java.nio.file.Path; import java.time.Duration; import java.util.Iterator; @@ -720,6 +719,34 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException { return RequestPublishers.FilePublisher.create(path); } + /** + * {@return a request body publisher whose body is the {@code length} + * content bytes read from the provided file {@code channel} starting + * from the specified {@code offset}} + *

+ * The {@linkplain FileChannel file channel} will be read using + * {@link FileChannel#read(ByteBuffer, long) FileChannel.read(ByteBuffer buffer, long position)}, + * which does not modify the channel's position. Thus, the same file + * channel may be shared between several publishers passed to + * concurrent requests. + *

+ * The file channel will not be closed upon completion. The caller is + * expected to manage the life cycle of the channel, and close it + * appropriately when not needed anymore. + * + * @param channel a file channel + * @param offset the offset of the first byte + * @param length the number of bytes to use + * + * @throws IndexOutOfBoundsException if the specified byte range is + * found to be out of bounds compared with the size of the file + * referred by the channel + */ + public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) { + Objects.requireNonNull(channel, "channel"); + return new RequestPublishers.FileChannelPublisher(channel, offset, length); + } + /** * A request body publisher that takes data from an {@code Iterable} * of byte arrays. An {@link Iterable} is provided which supplies diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index dd5443c503567..0572826eecfa0 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -32,6 +32,7 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.http.HttpRequest.BodyPublisher; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -418,6 +419,92 @@ public long contentLength() { } } + public static final class FileChannelPublisher implements BodyPublisher { + + private final FileChannel channel; + + private final long position; + + private final long limit; + + public FileChannelPublisher(FileChannel channel, long offset, long length) { + this.channel = Objects.requireNonNull(channel, "channel"); + long fileSize = fileSize(channel); + Objects.checkFromIndexSize(offset, length, fileSize); + this.position = offset; + this.limit = offset + length; + } + + private static long fileSize(FileChannel channel) { + try { + return channel.size(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public long contentLength() { + return limit - position; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + Iterable iterable = () -> new FileChannelIterator(channel, position, limit); + new PullPublisher<>(iterable).subscribe(subscriber); + } + + } + + private static final class FileChannelIterator implements Iterator { + + private final FileChannel channel; + + private final long limit; + + private long position; + + private boolean terminated; + + private FileChannelIterator(FileChannel channel, long position, long limit) { + this.channel = channel; + this.position = position; + this.limit = limit; + } + + @Override + public synchronized boolean hasNext() { + return position < limit && !terminated; + } + + @Override + public synchronized ByteBuffer next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + long remaining = limit - position; + ByteBuffer buffer = Utils.BUFSIZE > remaining + ? Utils.getBufferWithAtMost((int) remaining) + : Utils.getBuffer(); + try { + int readLength = channel.read(buffer, position); + // Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime + if (readLength < 0) { + // We *must* throw to signal that the request needs to be cancelled. + // Otherwise, the server will continue waiting data. + throw new IOException("Unexpected EOF (position=%s)".formatted(position)); + } else { + position += readLength; + } + } catch (IOException ioe) { + terminated = true; + throw new UncheckedIOException(ioe); + } + return buffer.flip(); + } + + } + public static final class PublisherAdapter implements BodyPublisher { private final Publisher publisher; @@ -430,12 +517,12 @@ public PublisherAdapter(Publisher publisher, } @Override - public final long contentLength() { + public long contentLength() { return contentLength; } @Override - public final void subscribe(Flow.Subscriber subscriber) { + public void subscribe(Flow.Subscriber subscriber) { publisher.subscribe(subscriber); } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java index 8aefa0ee5baf4..2785ee849a381 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java @@ -367,10 +367,32 @@ public static String describeOps(int interestOps) { public static IllegalArgumentException newIAE(String message, Object... args) { return new IllegalArgumentException(format(message, args)); } + + /** + * {@return a new {@link ByteBuffer} instance of configured capacity for the HTTP Client} + */ public static ByteBuffer getBuffer() { return ByteBuffer.allocate(BUFSIZE); } + /** + * {@return a new {@link ByteBuffer} instance whose capacity is set to the + * smaller of the specified {@code maxCapacity} and the default + * ({@value BUFSIZE})} + * + * @param maxCapacity a buffer capacity, in bytes + * @throws IllegalArgumentException if {@code capacity < 0} + */ + public static ByteBuffer getBufferWithAtMost(int maxCapacity) { + if (maxCapacity < 0) { + throw new IllegalArgumentException( + // Match the message produced by `ByteBuffer::createCapacityException` + "capacity < 0: (%s < 0)".formatted(maxCapacity)); + } + int effectiveCapacity = Math.clamp(maxCapacity, 0, BUFSIZE); + return ByteBuffer.allocate(effectiveCapacity); + } + public static Throwable getCompletionCause(Throwable x) { Throwable cause = x; while ((cause instanceof CompletionException) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java new file mode 100644 index 0000000000000..7350e66b52cb4 --- /dev/null +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -0,0 +1,693 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @summary Verifies `HttpRequest.BodyPublishers::ofFileChannel` + * @library /test/lib + * /test/jdk/java/net/httpclient/lib + * @build jdk.httpclient.test.lib.common.HttpServerAdapters + * jdk.test.lib.net.SimpleSSLContext + * @run junit FileChannelPublisherTest + */ + +import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestHandler; +import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer; +import jdk.internal.net.http.common.Logger; +import jdk.internal.net.http.common.Utils; +import jdk.test.lib.net.SimpleSSLContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.io.CleanupMode; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Version; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static java.net.http.HttpClient.Builder.NO_PROXY; +import static java.net.http.HttpRequest.BodyPublishers.ofFileChannel; +import static java.net.http.HttpResponse.BodyHandlers.discarding; +import static java.net.http.HttpResponse.BodyHandlers.ofInputStream; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class FileChannelPublisherTest { + + private static final String CLASS_NAME = FileChannelPublisherTest.class.getSimpleName(); + + private static final Logger LOGGER = Utils.getDebugLogger(CLASS_NAME::toString, Utils.DEBUG); + + private static final int DEFAULT_BUFFER_SIZE = Utils.getBuffer().capacity(); + + private static final SSLContext SSL_CONTEXT = createSslContext(); + + private static final HttpClient CLIENT = HttpClient.newBuilder().sslContext(SSL_CONTEXT).proxy(NO_PROXY).build(); + + private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); + + private static final ServerRequestPair HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false); + + private static final ServerRequestPair HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true); + + private static final ServerRequestPair HTTP2 = ServerRequestPair.of(Version.HTTP_2, false); + + private static final ServerRequestPair HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true); + + private static SSLContext createSslContext() { + try { + return new SimpleSSLContext().get(); + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + } + + private record ServerRequestPair( + String serverName, + HttpTestServer server, + BlockingQueue serverReadRequestBodyBytes, + HttpRequest.Builder requestBuilder, + boolean secure) { + + private static CountDownLatch SERVER_REQUEST_RECEIVED_SIGNAL = null; + + private static CountDownLatch SERVER_READ_PERMISSION = null; + + private static ServerRequestPair of(Version version, boolean secure) { + + // Create the server + SSLContext sslContext = secure ? SSL_CONTEXT : null; + HttpTestServer server = createServer(version, sslContext); + String serverName = secure ? version.toString().replaceFirst("_", "S_") : version.toString(); + + // Add the handler + String handlerPath = "/%s/".formatted(CLASS_NAME); + BlockingQueue serverReadRequestBodyBytes = + addRequestBodyConsumingServerHandler(serverName, server, handlerPath); + + // Create the request builder + String requestUriScheme = secure ? "https" : "http"; + // `x` suffix in the URI is not a typo, but ensures that *only* the parent handler path is matched + URI requestUri = URI.create("%s://%s%sx".formatted(requestUriScheme, server.serverAuthority(), handlerPath)); + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(requestUri).version(version); + + // Create the pair + ServerRequestPair pair = new ServerRequestPair(serverName, server, serverReadRequestBodyBytes, requestBuilder, secure); + pair.server.start(); + LOGGER.log("Server[%s] is started at `%s`", pair, server.serverAuthority()); + + return pair; + + } + + private static HttpTestServer createServer(Version version, SSLContext sslContext) { + try { + // The default HTTP/1.1 test server processes requests sequentially. + // This causes a deadlock for concurrent tests such as `testSlicedUpload()`. + // Hence, explicitly providing a multithreaded executor for HTTP/1.1. + ExecutorService executor = Version.HTTP_1_1.equals(version) ? EXECUTOR : null; + return HttpTestServer.create(version, sslContext, executor); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + private static BlockingQueue addRequestBodyConsumingServerHandler( + String serverName, HttpTestServer server, String handlerPath) { + BlockingQueue readRequestBodyBytes = new LinkedBlockingQueue<>(); + HttpTestHandler handler = exchange -> { + // `HttpTestExchange::toString` changes on failure, pin it + String exchangeName = exchange.toString(); + try (exchange) { + + // Discard `HEAD` requests used for initial connection admission + if ("HEAD".equals(exchange.getRequestMethod())) { + exchange.sendResponseHeaders(200, -1L); + return; + } + + signalServerRequestReceived(serverName, exchangeName); + awaitServerReadPermission(serverName, exchangeName); + + LOGGER.log("Server[%s] is reading the request body (exchange=%s)", serverName, exchangeName); + byte[] requestBodyBytes = exchange.getRequestBody().readAllBytes(); + LOGGER.log("Server[%s] has read %s bytes (exchange=%s)", serverName, requestBodyBytes.length, exchangeName); + readRequestBodyBytes.add(requestBodyBytes); + + LOGGER.log("Server[%s] is writing the response (exchange=%s)", serverName, exchangeName); + exchange.sendResponseHeaders(200, requestBodyBytes.length); + exchange.getResponseBody().write(requestBodyBytes); + + } catch (Exception exception) { + LOGGER.log( + "Server[%s] failed to process the request (exchange=%s)".formatted(serverName, exception), + exception); + readRequestBodyBytes.add(new byte[0]); + } finally { + LOGGER.log("Server[%s] completed processing the request (exchange=%s)", serverName, exchangeName); + } + }; + server.addHandler(handler, handlerPath); + return readRequestBodyBytes; + } + + private static void signalServerRequestReceived(String serverName, String exchangeName) { + if (SERVER_REQUEST_RECEIVED_SIGNAL != null) { + LOGGER.log("Server[%s] is signaling that the request is received (exchange=%s)", serverName, exchangeName); + SERVER_REQUEST_RECEIVED_SIGNAL.countDown(); + } + } + + private static void awaitServerReadPermission(String serverName, String exchangeName) { + if (SERVER_READ_PERMISSION != null) { + LOGGER.log("Server[%s] is waiting for the read permission (exchange=%s)", serverName, exchangeName); + try { + SERVER_READ_PERMISSION.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // Restore the `interrupted` flag + throw new RuntimeException(ie); + } + } + } + + @Override + public String toString() { + return serverName; + } + + } + + @AfterAll + static void shutDown() { + LOGGER.log("Closing the client"); + CLIENT.close(); + LOGGER.log("Closing servers"); + closeServers(); + LOGGER.log("Closing the executor"); + EXECUTOR.shutdownNow(); + } + + private static void closeServers() { + Exception[] exceptionRef = {null}; + Stream + .of(HTTP1, HTTPS1, HTTP2, HTTPS2) + .map(pair -> (Runnable) pair.server::stop) + .forEach(terminator -> { + try { + terminator.run(); + } catch (Exception exception) { + if (exceptionRef[0] == null) { + exceptionRef[0] = exception; + } else { + exceptionRef[0].addSuppressed(exception); + } + } + }); + if (exceptionRef[0] != null) { + throw new RuntimeException("failed closing one or more server resources", exceptionRef[0]); + } + } + + /** + * Resets {@link ServerRequestPair#serverReadRequestBodyBytes()} to avoid leftover state from a test leaking to the next. + */ + @BeforeEach + void resetServerHandlerResults() { + Stream + .of(HTTP1, HTTPS1, HTTP2, HTTPS2) + .forEach(pair -> pair.serverReadRequestBodyBytes.clear()); + } + + static ServerRequestPair[] serverRequestPairs() { + return new ServerRequestPair[]{ + HTTP1, + HTTPS1, + HTTP2, + HTTPS2 + }; + } + + @Test + void testNullFileChannel() { + assertThrows(NullPointerException.class, () -> ofFileChannel(null, 0, 1)); + } + + @ParameterizedTest + @CsvSource({ + "6,-1,1", // offset < 0 + "6,7,1", // offset > fileSize + "6,0,-1", // length < 0 + "6,0,7", // length > fileSize + "6,2,5" // (offset + length) > fileSize + }) + void testIllegalOffset( + int fileLength, + int fileChannelOffset, + int fileChannelLength, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + withFileChannel(tempDir.resolve("data.txt"), fileLength, (_, fileChannel) -> + assertThrows( + IndexOutOfBoundsException.class, + () -> ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))); + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testContentLessThanBufferSize( + ServerRequestPair pair, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + + int fileLength = 6; + assertTrue(fileLength < DEFAULT_BUFFER_SIZE); + + testSuccessfulContentDelivery( + "Complete content", + pair, tempDir, fileLength, 0, fileLength); + + { + int fileChannelOffset = 1; + int fileChannelLength = fileLength - 1; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertEquals( + fileLength - fileChannelOffset, fileChannelLength, + "must be until EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content until the EOF " + debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + { + int fileChannelOffset = 1; + int fileChannelLength = fileLength - 2; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertTrue( + fileLength - fileChannelOffset > fileChannelLength, + "must end before EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content *before* the EOF " + debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testContentMoreThanBufferSize( + ServerRequestPair pair, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + + int fileLength = 1 + 3 * DEFAULT_BUFFER_SIZE; + + testSuccessfulContentDelivery( + "Complete content", + pair, tempDir, fileLength, 0, fileLength); + + { + int fileChannelOffset = 1; + int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertEquals( + fileLength - fileChannelOffset, fileChannelLength, + "must be until EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content until the EOF. Occupies exactly 3 buffers. " + debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + { + int fileChannelOffset = 2; + int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 1; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertEquals( + fileLength - fileChannelOffset, fileChannelLength, + "must be until EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content until the EOF. Occupies 3 buffers, the last is custom sized. " + debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + { + int fileChannelOffset = 2; + int fileChannelLength = 2 * DEFAULT_BUFFER_SIZE; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertTrue( + fileLength - fileChannelOffset > fileChannelLength, + "must end before EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content *before* the EOF. Occupies exactly 2 buffers. " + debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + { + int fileChannelOffset = 2; + int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 2; + String debuggingContext = debuggingContext(fileLength, fileChannelOffset, fileChannelLength); + assertTrue( + fileLength - fileChannelOffset > fileChannelLength, + "must end before EOF " + debuggingContext); + testSuccessfulContentDelivery( + "Partial content *before* the EOF. Occupies 3 buffers, the last is custom sized. "+ debuggingContext, + pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); + } + + } + + private static String debuggingContext(int fileLength, int fileChannelOffset, int fileChannelLength) { + Map context = new LinkedHashMap<>(); // Using `LHM` to preserve the insertion order + context.put("DEFAULT_BUFFER_SIZE", DEFAULT_BUFFER_SIZE); + context.put("fileLength", fileLength); + context.put("fileChannelOffset", fileChannelOffset); + context.put("fileChannelLength", fileChannelLength); + boolean customSizedBuffer = fileChannelLength % DEFAULT_BUFFER_SIZE == 0; + context.put("customSizedBuffer", customSizedBuffer); + return context.toString(); + } + + private void testSuccessfulContentDelivery( + String caseDescription, + ServerRequestPair pair, + Path tempDir, + int fileLength, + int fileChannelOffset, + int fileChannelLength) throws Exception { + + // Case names come handy even when no debug logging is enabled. + // Hence, intentionally avoiding `Logger`. + System.err.printf("Case: %s%n", caseDescription); + + // Create the file to upload + String fileName = "data-%d-%d-%d.txt".formatted(fileLength, fileChannelOffset, fileChannelLength); + Path filePath = tempDir.resolve(fileName); + withFileChannel(filePath, fileLength, (fileBytes, fileChannel) -> { + + // Upload the file + HttpRequest request = pair + .requestBuilder + .POST(ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)) + .build(); + CLIENT.send(request, discarding()); + + // Verify the received request body + byte[] expectedRequestBodyBytes = new byte[fileChannelLength]; + System.arraycopy(fileBytes, fileChannelOffset, expectedRequestBodyBytes, 0, fileChannelLength); + byte[] actualRequestBodyBytes = pair.serverReadRequestBodyBytes.take(); + assertArrayEquals(expectedRequestBodyBytes, actualRequestBodyBytes); + + }); + + } + + /** + * Big enough file length to observe the effects of publisher state corruption while uploading. + *

+ * Certain tests follow below steps: + *

+ *
    + *
  1. Issue the request
  2. + *
  3. Wait for the server's signal that the request (not the body!) is received
  4. + *
  5. Corrupt the publisher's state; modify the file, close the file channel, etc.
  6. + *
  7. Signal the server to proceed with reading
  8. + *
+ *

+ * With small files, even before we permit the server to read (step 4), file gets already uploaded. + * This voids the effect of state corruption (step 3). + * To circumvent this, use this big enough file size. + *

+ * + * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path) + * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path) + */ + private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testChannelCloseDuringPublisherRead( + ServerRequestPair pair, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) + throws Exception { + establishInitialConnection(pair); + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); + ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); + try { + + int fileLength = BIG_FILE_LENGTH; + AtomicReference>> responseFutureRef = new AtomicReference<>(); + withFileChannel(tempDir.resolve("data.txt"), fileLength, ((_, fileChannel) -> { + + // Issue the request + LOGGER.log("Issuing the request"); + HttpRequest request = pair + .requestBuilder + .POST(ofFileChannel(fileChannel, 0, fileLength)) + .build(); + responseFutureRef.set(CLIENT.sendAsync(request, discarding())); + + // Wait for server to receive the request + LOGGER.log("Waiting for the request to be received"); + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL.await(); + + })); + + LOGGER.log("File channel is closed"); + + // Let the server proceed + LOGGER.log("Permitting the server to proceed"); + ServerRequestPair.SERVER_READ_PERMISSION.countDown(); + + // Verifying the client failure + LOGGER.log("Verifying the client failure"); + Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); + assertInstanceOf(UncheckedIOException.class, requestFailure.getCause()); + assertInstanceOf(ClosedChannelException.class, requestFailure.getCause().getCause()); + + verifyServerIncompleteRead(pair, fileLength); + + } finally { + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = null; + ServerRequestPair.SERVER_READ_PERMISSION = null; + } + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + // On Windows, modification while reading is not possible. + // Recall the infamous `The process cannot access the file because it is being used by another process`. + @DisabledOnOs(OS.WINDOWS) + void testFileModificationDuringPublisherRead( + ServerRequestPair pair, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) + throws Exception { + establishInitialConnection(pair); + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); + ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); + try { + + int fileLength = BIG_FILE_LENGTH; + Path filePath = tempDir.resolve("data.txt"); + withFileChannel(filePath, fileLength, ((_, fileChannel) -> { + + // Issue the request + LOGGER.log("Issuing the request"); + HttpRequest request = pair + .requestBuilder + .POST(ofFileChannel(fileChannel, 0, fileLength)) + .build(); + CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding()); + + // Wait for server to receive the request + LOGGER.log("Waiting for the request to be received"); + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL.await(); + + // Modify the file + LOGGER.log("Modifying the file"); + Files.write(filePath, generateFileBytes(1)); + + // Let the server proceed + LOGGER.log("Permitting the server to proceed"); + ServerRequestPair.SERVER_READ_PERMISSION.countDown(); + + // Verifying the client failure + LOGGER.log("Verifying the client failure"); + Exception requestFailure = assertThrows(ExecutionException.class, responseFuture::get); + String requestFailureMessage = requestFailure.getMessage(); + assertTrue( + requestFailureMessage.contains("Unexpected EOF"), + "unexpected message: " + requestFailureMessage); + + verifyServerIncompleteRead(pair, fileLength); + + })); + + } finally { + ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = null; + ServerRequestPair.SERVER_READ_PERMISSION = null; + } + } + + private static void verifyServerIncompleteRead(ServerRequestPair pair, int fileLength) throws InterruptedException { + LOGGER.log("Verifying the server's incomplete read"); + byte[] readRequestBodyBytes = pair.serverReadRequestBodyBytes.take(); + assertTrue( + readRequestBodyBytes.length < fileLength, + "was expecting `readRequestBodyBytes < fileLength` (%s < %s)".formatted( + readRequestBodyBytes.length, fileLength)); + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSlicedUpload( + ServerRequestPair pair, + @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) + throws Exception { + + // Populate the file + int sliceCount = 4; + int sliceLength = 14_281; // Intentionally using a prime number to increase the chances of hitting corner cases + int fileLength = sliceCount * sliceLength; + byte[] fileBytes = generateFileBytes(fileLength); + Path filePath = tempDir.resolve("data.txt"); + Files.write(filePath, fileBytes, StandardOpenOption.CREATE); + + List responseBodyStreams = new ArrayList<>(sliceCount); + try (FileChannel fileChannel = FileChannel.open(filePath)) { + + // Upload the complete file in mutually exclusive slices + List>> responseFutures = new ArrayList<>(sliceCount); + for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) { + LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount); + HttpRequest request = pair + .requestBuilder + .POST(ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength)) + .build(); + responseFutures.add(CLIENT.sendAsync( + request, + // Intentionally using an `InputStream` response + // handler to defer consuming the response body + // until after the file channel is closed: + ofInputStream())); + } + + // Collect response body `InputStream`s from all requests + for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) { + LOGGER.log("Collecting response body `InputStream` for request %d/%d", (sliceIndex + 1), sliceCount); + HttpResponse response = responseFutures.get(sliceIndex).get(); + assertEquals(200, response.statusCode()); + responseBodyStreams.add(response.body()); + } + + } + + LOGGER.log("File channel is closed"); + + // Verify response bodies + for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) { + LOGGER.log("Consuming response body %d/%d", (sliceIndex + 1), sliceCount); + byte[] expectedResponseBodyBytes = new byte[sliceLength]; + System.arraycopy(fileBytes, sliceIndex * sliceLength, expectedResponseBodyBytes, 0, sliceLength); + try (InputStream responseBodyStream = responseBodyStreams.get(sliceIndex)) { + byte[] responseBodyBytes = responseBodyStream.readAllBytes(); + assertArrayEquals(expectedResponseBodyBytes, responseBodyBytes); + } + } + + } + + /** + * Performs the initial {@code HEAD} request to the specified server. This + * effectively admits a connection to the client's pool, where all protocol + * upgrades, handshakes, etc. are already performed. + *

+ * HTTP/2 test server consumes the complete request payload in the very + * first upgrade frame. That is, if a client sends 100 MiB of data, all + * of it will be consumed first before the configured handler is + * invoked. Though certain tests expect the data to be consumed + * piecemeal. To accommodate this, we ensure client has an upgraded + * connection in the pool. + *

+ */ + private static void establishInitialConnection(ServerRequestPair pair) { + LOGGER.log("Server[%s] is getting queried for the initial connection pool admission", pair); + try { + CLIENT.send(pair.requestBuilder.HEAD().build(), discarding()); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + + private static void withFileChannel(Path filePath, int fileLength, FileChannelConsumer fileChannelConsumer) throws Exception { + byte[] fileBytes = generateFileBytes(fileLength); + Files.write(filePath, fileBytes, StandardOpenOption.CREATE); + try (FileChannel fileChannel = FileChannel.open(filePath)) { + fileChannelConsumer.consume(fileBytes, fileChannel); + } + } + + @FunctionalInterface + private interface FileChannelConsumer { + + void consume(byte[] fileBytes, FileChannel fileChannel) throws Exception; + + } + + private static byte[] generateFileBytes(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + bytes[i] = (byte) i; + } + return bytes; + } + +} From 0eda56f18a251d58c30d3fa940998d177d256d9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Mon, 7 Jul 2025 11:27:07 +0200 Subject: [PATCH 02/14] Implement review feedback --- .../share/classes/java/net/http/HttpRequest.java | 2 +- .../classes/jdk/internal/net/http/RequestPublishers.java | 4 +--- .../share/classes/jdk/internal/net/http/common/Utils.java | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java index fd0af07697b15..f6a93ff26a87c 100644 --- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java +++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java @@ -736,7 +736,7 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException { * * @param channel a file channel * @param offset the offset of the first byte - * @param length the number of bytes to use + * @param length the number of bytes to read from the file channel * * @throws IndexOutOfBoundsException if the specified byte range is * found to be out of bounds compared with the size of the file diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index 0572826eecfa0..e90acdc609df1 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -483,9 +483,7 @@ public synchronized ByteBuffer next() { throw new NoSuchElementException(); } long remaining = limit - position; - ByteBuffer buffer = Utils.BUFSIZE > remaining - ? Utils.getBufferWithAtMost((int) remaining) - : Utils.getBuffer(); + ByteBuffer buffer = Utils.getBufferWithAtMost(remaining); try { int readLength = channel.read(buffer, position); // Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java index 2785ee849a381..ea4e429f40209 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java @@ -383,13 +383,13 @@ public static ByteBuffer getBuffer() { * @param maxCapacity a buffer capacity, in bytes * @throws IllegalArgumentException if {@code capacity < 0} */ - public static ByteBuffer getBufferWithAtMost(int maxCapacity) { + public static ByteBuffer getBufferWithAtMost(long maxCapacity) { if (maxCapacity < 0) { throw new IllegalArgumentException( // Match the message produced by `ByteBuffer::createCapacityException` "capacity < 0: (%s < 0)".formatted(maxCapacity)); } - int effectiveCapacity = Math.clamp(maxCapacity, 0, BUFSIZE); + int effectiveCapacity = (int) Math.min(maxCapacity, BUFSIZE); return ByteBuffer.allocate(effectiveCapacity); } From 3af61c5f015db200c12cc6b22e4c6a615f8e94a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Mon, 7 Jul 2025 12:39:24 +0200 Subject: [PATCH 03/14] Improve docs on `IndexOutOfBoundsException` thrown Co-authored-by: Daniel Fuchs <67001856+dfuch@users.noreply.github.com> --- .../share/classes/java/net/http/HttpRequest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java index f6a93ff26a87c..61366b4a695d6 100644 --- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java +++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java @@ -739,8 +739,8 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException { * @param length the number of bytes to read from the file channel * * @throws IndexOutOfBoundsException if the specified byte range is - * found to be out of bounds compared with the size of the file - * referred by the channel + * found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds} + * compared with the size of the file referred by the channel */ public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) { Objects.requireNonNull(channel, "channel"); From 4a9e303bfcc03ca387c9fdab23e6e6449ba698cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:14:08 +0200 Subject: [PATCH 04/14] Add `@since 26` to `ofFileChannel` --- src/java.net.http/share/classes/java/net/http/HttpRequest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java index 61366b4a695d6..0c3b0899f5974 100644 --- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java +++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java @@ -741,6 +741,7 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException { * @throws IndexOutOfBoundsException if the specified byte range is * found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds} * compared with the size of the file referred by the channel + * @since 26 */ public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) { Objects.requireNonNull(channel, "channel"); From 8f66a32ec8b1b224bbc91827c17d9b0341bdf047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:17:21 +0200 Subject: [PATCH 05/14] Improve exception handling and documentation for `ofFileChannel` --- .../share/classes/java/net/http/HttpRequest.java | 14 +++++++++++--- .../jdk/internal/net/http/RequestPublishers.java | 12 ++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/java.net.http/share/classes/java/net/http/HttpRequest.java b/src/java.net.http/share/classes/java/net/http/HttpRequest.java index 0c3b0899f5974..3e2a7006acae4 100644 --- a/src/java.net.http/share/classes/java/net/http/HttpRequest.java +++ b/src/java.net.http/share/classes/java/net/http/HttpRequest.java @@ -26,6 +26,7 @@ package java.net.http; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; @@ -739,11 +740,18 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException { * @param length the number of bytes to read from the file channel * * @throws IndexOutOfBoundsException if the specified byte range is - * found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds} - * compared with the size of the file referred by the channel + * found to be {@linkplain Objects#checkFromIndexSize(long, long, long) + * out of bounds} compared with the size of the file referred by the + * channel + * + * @throws IOException if the size of the file referred by the provided + * channel cannot be read while verifying the specified byte range + * + * @throws NullPointerException if {@code channel} is null + * * @since 26 */ - public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) { + public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) throws IOException { Objects.requireNonNull(channel, "channel"); return new RequestPublishers.FileChannelPublisher(channel, offset, length); } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index e90acdc609df1..b9a25f60de97f 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -427,22 +427,14 @@ public static final class FileChannelPublisher implements BodyPublisher { private final long limit; - public FileChannelPublisher(FileChannel channel, long offset, long length) { + public FileChannelPublisher(FileChannel channel, long offset, long length) throws IOException { this.channel = Objects.requireNonNull(channel, "channel"); - long fileSize = fileSize(channel); + long fileSize = channel.size(); Objects.checkFromIndexSize(offset, length, fileSize); this.position = offset; this.limit = offset + length; } - private static long fileSize(FileChannel channel) { - try { - return channel.size(); - } catch (IOException ioe) { - throw new UncheckedIOException(ioe); - } - } - @Override public long contentLength() { return limit - position; From b114fe9598e9e5dc38159339b30fb01ff514712c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:19:34 +0200 Subject: [PATCH 06/14] Remove synchronization for `FileChannelIterator` --- .../classes/jdk/internal/net/http/RequestPublishers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index b9a25f60de97f..45a675b33b389 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -465,12 +465,12 @@ private FileChannelIterator(FileChannel channel, long position, long limit) { } @Override - public synchronized boolean hasNext() { + public boolean hasNext() { return position < limit && !terminated; } @Override - public synchronized ByteBuffer next() { + public ByteBuffer next() { if (!hasNext()) { throw new NoSuchElementException(); } From 183da9d0235beba6e14c2aaaca5eb00afc55dea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:30:10 +0200 Subject: [PATCH 07/14] Improve wording for signaling request cancellation --- .../share/classes/jdk/internal/net/http/RequestPublishers.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index 45a675b33b389..88cabe154193a 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -480,8 +480,7 @@ public ByteBuffer next() { int readLength = channel.read(buffer, position); // Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime if (readLength < 0) { - // We *must* throw to signal that the request needs to be cancelled. - // Otherwise, the server will continue waiting data. + // Throw to signal that the request needs to be cancelled throw new IOException("Unexpected EOF (position=%s)".formatted(position)); } else { position += readLength; From eacd5da23bb762033985f40c45d708c6224dc6ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:38:18 +0200 Subject: [PATCH 08/14] Link to `BUFSIZE` in `Utils::getBuffer` Javadoc --- .../share/classes/jdk/internal/net/http/common/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java index ea4e429f40209..96e49e95fa9f1 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java @@ -369,7 +369,7 @@ public static IllegalArgumentException newIAE(String message, Object... args) { } /** - * {@return a new {@link ByteBuffer} instance of configured capacity for the HTTP Client} + * {@return a new {@link ByteBuffer} instance of {@link #BUFSIZE} capacity} */ public static ByteBuffer getBuffer() { return ByteBuffer.allocate(BUFSIZE); From da69840639ec9fda2a78855cd1c2193a3640896f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Fri, 18 Jul 2025 11:39:11 +0200 Subject: [PATCH 09/14] Fix typo in `Utils::getBufferWithAtMost` Javadoc --- .../share/classes/jdk/internal/net/http/common/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java index 96e49e95fa9f1..e5e66b9af38bb 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java @@ -381,7 +381,7 @@ public static ByteBuffer getBuffer() { * ({@value BUFSIZE})} * * @param maxCapacity a buffer capacity, in bytes - * @throws IllegalArgumentException if {@code capacity < 0} + * @throws IllegalArgumentException if {@code maxCapacity < 0} */ public static ByteBuffer getBufferWithAtMost(long maxCapacity) { if (maxCapacity < 0) { From 99503c0bac77a22437b6f793832eb20f4752232a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Wed, 30 Jul 2025 10:59:01 +0200 Subject: [PATCH 10/14] Apply review feedback for `FileChannelPublisherTest` --- .../httpclient/FileChannelPublisherTest.java | 99 +++++++++++-------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index 7350e66b52cb4..358bc260ec7f6 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -39,9 +39,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; -import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -55,6 +52,7 @@ import java.net.http.HttpClient; import java.net.http.HttpClient.Version; import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; @@ -76,7 +74,6 @@ import java.util.stream.Stream; import static java.net.http.HttpClient.Builder.NO_PROXY; -import static java.net.http.HttpRequest.BodyPublishers.ofFileChannel; import static java.net.http.HttpResponse.BodyHandlers.discarding; import static java.net.http.HttpResponse.BodyHandlers.ofInputStream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -191,7 +188,7 @@ private static BlockingQueue addRequestBodyConsumingServerHandler( exchange.sendResponseHeaders(200, requestBodyBytes.length); exchange.getResponseBody().write(requestBodyBytes); - } catch (Exception exception) { + } catch (Throwable exception) { LOGGER.log( "Server[%s] failed to process the request (exchange=%s)".formatted(serverName, exception), exception); @@ -282,7 +279,7 @@ static ServerRequestPair[] serverRequestPairs() { @Test void testNullFileChannel() { - assertThrows(NullPointerException.class, () -> ofFileChannel(null, 0, 1)); + assertThrows(NullPointerException.class, () -> BodyPublishers.ofFileChannel(null, 0, 1)); } @ParameterizedTest @@ -293,30 +290,38 @@ void testNullFileChannel() { "6,0,7", // length > fileSize "6,2,5" // (offset + length) > fileSize }) - void testIllegalOffset( + void testIllegalOffsetOrLength( int fileLength, int fileChannelOffset, int fileChannelLength, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + @TempDir Path tempDir) throws Exception { withFileChannel(tempDir.resolve("data.txt"), fileLength, (_, fileChannel) -> assertThrows( IndexOutOfBoundsException.class, - () -> ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))); + () -> BodyPublishers.ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength))); } + /** + * Stresses corner cases in {@linkplain + * BodyPublishers#ofFileChannel(FileChannel, long, long) the file channel + * publisher}, which uses a {@linkplain #DEFAULT_BUFFER_SIZE fixed size} + * buffer to read files, by providing sub-ranges and files that are + * smaller than the buffer size. + */ @ParameterizedTest @MethodSource("serverRequestPairs") - void testContentLessThanBufferSize( - ServerRequestPair pair, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + void testContentLessThanBufferSize(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + // Use a file of length smaller than the default buffer size int fileLength = 6; assertTrue(fileLength < DEFAULT_BUFFER_SIZE); + // Publish the `[0, fileLength)` sub-range testSuccessfulContentDelivery( "Complete content", pair, tempDir, fileLength, 0, fileLength); + // Publish the `[1, fileLength)` sub-range to stress the inclusion of EOF { int fileChannelOffset = 1; int fileChannelLength = fileLength - 1; @@ -329,6 +334,7 @@ void testContentLessThanBufferSize( pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); } + // Publish the `[1, fileLength - 1)` sub-range to stress the exclusion of EOF { int fileChannelOffset = 1; int fileChannelLength = fileLength - 2; @@ -343,18 +349,30 @@ void testContentLessThanBufferSize( } + /** + * Stresses corner cases in {@linkplain + * BodyPublishers#ofFileChannel(FileChannel, long, long) the file channel + * publisher}, which uses a {@linkplain #DEFAULT_BUFFER_SIZE fixed size} + * buffer to read files, by providing sub-ranges and files that are + * bigger than the buffer size. + */ @ParameterizedTest @MethodSource("serverRequestPairs") - void testContentMoreThanBufferSize( - ServerRequestPair pair, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) throws Exception { + void testContentMoreThanBufferSize(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + // Use a file of length that is + // 1. greater than the default buffer size + // 2. *not* a multitude of the buffer size int fileLength = 1 + 3 * DEFAULT_BUFFER_SIZE; + // Publish the `[0, fileLength)` sub-range testSuccessfulContentDelivery( "Complete content", pair, tempDir, fileLength, 0, fileLength); + // Publish the `[1, fileLength)` sub-range such that + // - EOF is included + // - the total length is a multitude of the buffer size { int fileChannelOffset = 1; int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE; @@ -367,6 +385,9 @@ void testContentMoreThanBufferSize( pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); } + // Publish the `[1, fileLength)` sub-range such that + // - EOF is included + // - the total length is *not* a multitude of the buffer size { int fileChannelOffset = 2; int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 1; @@ -379,6 +400,9 @@ void testContentMoreThanBufferSize( pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); } + // Publish the `[1, fileLength)` sub-range such that + // - EOF is *not* included + // - the total length is a multitude of the buffer size { int fileChannelOffset = 2; int fileChannelLength = 2 * DEFAULT_BUFFER_SIZE; @@ -391,6 +415,9 @@ void testContentMoreThanBufferSize( pair, tempDir, fileLength, fileChannelOffset, fileChannelLength); } + // Publish the `[1, fileLength)` sub-range such that + // - EOF is *not* included + // - the total length is *not* a multitude of the buffer size { int fileChannelOffset = 2; int fileChannelLength = 3 * DEFAULT_BUFFER_SIZE - 2; @@ -436,7 +463,7 @@ private void testSuccessfulContentDelivery( // Upload the file HttpRequest request = pair .requestBuilder - .POST(ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)) + .POST(BodyPublishers.ofFileChannel(fileChannel, fileChannelOffset, fileChannelLength)) .build(); CLIENT.send(request, discarding()); @@ -474,10 +501,7 @@ private void testSuccessfulContentDelivery( @ParameterizedTest @MethodSource("serverRequestPairs") - void testChannelCloseDuringPublisherRead( - ServerRequestPair pair, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) - throws Exception { + void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -491,7 +515,7 @@ void testChannelCloseDuringPublisherRead( LOGGER.log("Issuing the request"); HttpRequest request = pair .requestBuilder - .POST(ofFileChannel(fileChannel, 0, fileLength)) + .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); responseFutureRef.set(CLIENT.sendAsync(request, discarding())); @@ -509,9 +533,9 @@ void testChannelCloseDuringPublisherRead( // Verifying the client failure LOGGER.log("Verifying the client failure"); - Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); - assertInstanceOf(UncheckedIOException.class, requestFailure.getCause()); - assertInstanceOf(ClosedChannelException.class, requestFailure.getCause().getCause()); + Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); verifyServerIncompleteRead(pair, fileLength); @@ -523,13 +547,7 @@ void testChannelCloseDuringPublisherRead( @ParameterizedTest @MethodSource("serverRequestPairs") - // On Windows, modification while reading is not possible. - // Recall the infamous `The process cannot access the file because it is being used by another process`. - @DisabledOnOs(OS.WINDOWS) - void testFileModificationDuringPublisherRead( - ServerRequestPair pair, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) - throws Exception { + void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -543,7 +561,7 @@ void testFileModificationDuringPublisherRead( LOGGER.log("Issuing the request"); HttpRequest request = pair .requestBuilder - .POST(ofFileChannel(fileChannel, 0, fileLength)) + .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding()); @@ -561,11 +579,13 @@ void testFileModificationDuringPublisherRead( // Verifying the client failure LOGGER.log("Verifying the client failure"); - Exception requestFailure = assertThrows(ExecutionException.class, responseFuture::get); - String requestFailureMessage = requestFailure.getMessage(); + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); + String requestFailure2Message = requestFailure2.getMessage(); assertTrue( - requestFailureMessage.contains("Unexpected EOF"), - "unexpected message: " + requestFailureMessage); + requestFailure2Message.contains("Unexpected EOF"), + "unexpected message: " + requestFailure2Message); verifyServerIncompleteRead(pair, fileLength); @@ -588,10 +608,7 @@ private static void verifyServerIncompleteRead(ServerRequestPair pair, int fileL @ParameterizedTest @MethodSource("serverRequestPairs") - void testSlicedUpload( - ServerRequestPair pair, - @TempDir(cleanup = CleanupMode.ON_SUCCESS) Path tempDir) - throws Exception { + void testSlicedUpload(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { // Populate the file int sliceCount = 4; @@ -610,7 +627,7 @@ void testSlicedUpload( LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount); HttpRequest request = pair .requestBuilder - .POST(ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength)) + .POST(BodyPublishers.ofFileChannel(fileChannel, sliceIndex * sliceLength, sliceLength)) .build(); responseFutures.add(CLIENT.sendAsync( request, From 8699b83bbe34b87bbdc96ffaa44bd0b48f0ef7fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Thu, 31 Jul 2025 08:00:03 +0200 Subject: [PATCH 11/14] Replace usage of `CompletableFuture` with `Future` --- .../jdk/java/net/httpclient/FileChannelPublisherTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index 358bc260ec7f6..e6a59e588fe59 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -64,11 +64,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -508,7 +508,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t try { int fileLength = BIG_FILE_LENGTH; - AtomicReference>> responseFutureRef = new AtomicReference<>(); + AtomicReference>> responseFutureRef = new AtomicReference<>(); withFileChannel(tempDir.resolve("data.txt"), fileLength, ((_, fileChannel) -> { // Issue the request @@ -563,7 +563,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa .requestBuilder .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); - CompletableFuture> responseFuture = CLIENT.sendAsync(request, discarding()); + Future> responseFuture = CLIENT.sendAsync(request, discarding()); // Wait for server to receive the request LOGGER.log("Waiting for the request to be received"); @@ -622,7 +622,7 @@ void testSlicedUpload(ServerRequestPair pair, @TempDir Path tempDir) throws Exce try (FileChannel fileChannel = FileChannel.open(filePath)) { // Upload the complete file in mutually exclusive slices - List>> responseFutures = new ArrayList<>(sliceCount); + List>> responseFutures = new ArrayList<>(sliceCount); for (int sliceIndex = 0; sliceIndex < sliceCount; sliceIndex++) { LOGGER.log("Issuing request %d/%d", (sliceIndex + 1), sliceCount); HttpRequest request = pair From faa5d24d831ddf21fcc0ecd6572c7ad3bd234929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Tue, 5 Aug 2025 12:30:16 +0200 Subject: [PATCH 12/14] Verify exceptions using both `send()` and `sendAsync()` --- .../httpclient/FileChannelPublisherTest.java | 112 +++++++++++++++--- 1 file changed, 96 insertions(+), 16 deletions(-) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index e6a59e588fe59..c92ceb39f56fe 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -71,6 +71,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Stream; import static java.net.http.HttpClient.Builder.NO_PROXY; @@ -494,14 +495,41 @@ private void testSuccessfulContentDelivery( * To circumvent this, use this big enough file size. *

* - * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path) - * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path) + * @see #testChannelCloseDuringPublisherReadAsync(ServerRequestPair, Path) + * @see #testChannelCloseDuringPublisherReadSync(ServerRequestPair, Path) + * @see #testFileModificationDuringPublisherReadAsync(ServerRequestPair, Path) + * @see #testFileModificationDuringPublisherReadSync(ServerRequestPair, Path) */ private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB @ParameterizedTest @MethodSource("serverRequestPairs") - void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + void testChannelCloseDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> { + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); + }); + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testChannelCloseDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> { + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause()); + Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); + Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause()); + assertInstanceOf(ClosedChannelException.class, requestFailure3.getCause()); + }); + } + + private static void testChannelCloseDuringPublisherRead( + ServerRequestPair pair, + Path tempDir, + // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()` + Requestor requestor, + Consumer>> responseVerifier) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -517,7 +545,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t .requestBuilder .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); - responseFutureRef.set(CLIENT.sendAsync(request, discarding())); + responseFutureRef.set(requestor.request(request, discarding())); // Wait for server to receive the request LOGGER.log("Waiting for the request to be received"); @@ -533,9 +561,7 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t // Verifying the client failure LOGGER.log("Verifying the client failure"); - Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); + responseVerifier.accept(responseFutureRef.get()); verifyServerIncompleteRead(pair, fileLength); @@ -547,7 +573,40 @@ void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path t @ParameterizedTest @MethodSource("serverRequestPairs") - void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + void testFileModificationDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + testFileModificationDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> { + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); + String requestFailure2Message = requestFailure2.getMessage(); + assertTrue( + requestFailure2Message.contains("Unexpected EOF"), + "unexpected message: " + requestFailure2Message); + }); + } + + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testFileModificationDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { + testFileModificationDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> { + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause()); + Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); + Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause()); + Exception requestFailure4 = assertInstanceOf(IOException.class, requestFailure3.getCause()); + String requestFailure4Message = requestFailure4.getMessage(); + assertTrue( + requestFailure4Message.contains("Unexpected EOF"), + "unexpected message: " + requestFailure4Message); + }); + } + + private static void testFileModificationDuringPublisherRead( + ServerRequestPair pair, + Path tempDir, + // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()` + Requestor requestor, + Consumer>> responseVerifier) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -563,7 +622,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa .requestBuilder .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); - Future> responseFuture = CLIENT.sendAsync(request, discarding()); + Future> responseFuture = requestor.request(request, discarding()); // Wait for server to receive the request LOGGER.log("Waiting for the request to be received"); @@ -579,13 +638,7 @@ void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Pa // Verifying the client failure LOGGER.log("Verifying the client failure"); - Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); - String requestFailure2Message = requestFailure2.getMessage(); - assertTrue( - requestFailure2Message.contains("Unexpected EOF"), - "unexpected message: " + requestFailure2Message); + responseVerifier.accept(responseFuture); verifyServerIncompleteRead(pair, fileLength); @@ -707,4 +760,31 @@ private static byte[] generateFileBytes(int length) { return bytes; } + @FunctionalInterface + private interface Requestor { + + Requestor ASYNC = CLIENT::sendAsync; + + Requestor SYNC = new Requestor() { + @Override + public Future> request( + HttpRequest request, + HttpResponse.BodyHandler responseHandler) { + return EXECUTOR.submit(() -> { + try { + return CLIENT.send(request, responseHandler); + } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); // Restore the interrupt + } + throw new RuntimeException(t); + } + }); + } + }; + + Future> request(HttpRequest request, HttpResponse.BodyHandler responseHandler); + + } + } From c8a0257a35475383ca4e7d26e13931ec47d201c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Tue, 5 Aug 2025 13:19:42 +0200 Subject: [PATCH 13/14] Revert the last commit: faa5d24d831 The `send()`-vs-`sendAsync()` discrepancy will be addressed separately. --- .../httpclient/FileChannelPublisherTest.java | 112 +++--------------- 1 file changed, 16 insertions(+), 96 deletions(-) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index c92ceb39f56fe..e6a59e588fe59 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -71,7 +71,6 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Stream; import static java.net.http.HttpClient.Builder.NO_PROXY; @@ -495,41 +494,14 @@ private void testSuccessfulContentDelivery( * To circumvent this, use this big enough file size. *

* - * @see #testChannelCloseDuringPublisherReadAsync(ServerRequestPair, Path) - * @see #testChannelCloseDuringPublisherReadSync(ServerRequestPair, Path) - * @see #testFileModificationDuringPublisherReadAsync(ServerRequestPair, Path) - * @see #testFileModificationDuringPublisherReadSync(ServerRequestPair, Path) + * @see #testChannelCloseDuringPublisherRead(ServerRequestPair, Path) + * @see #testFileModificationDuringPublisherRead(ServerRequestPair, Path) */ private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB @ParameterizedTest @MethodSource("serverRequestPairs") - void testChannelCloseDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { - testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> { - Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); - }); - } - - @ParameterizedTest - @MethodSource("serverRequestPairs") - void testChannelCloseDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { - testChannelCloseDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> { - Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause()); - Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); - Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause()); - assertInstanceOf(ClosedChannelException.class, requestFailure3.getCause()); - }); - } - - private static void testChannelCloseDuringPublisherRead( - ServerRequestPair pair, - Path tempDir, - // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()` - Requestor requestor, - Consumer>> responseVerifier) throws Exception { + void testChannelCloseDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -545,7 +517,7 @@ private static void testChannelCloseDuringPublisherRead( .requestBuilder .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); - responseFutureRef.set(requestor.request(request, discarding())); + responseFutureRef.set(CLIENT.sendAsync(request, discarding())); // Wait for server to receive the request LOGGER.log("Waiting for the request to be received"); @@ -561,7 +533,9 @@ private static void testChannelCloseDuringPublisherRead( // Verifying the client failure LOGGER.log("Verifying the client failure"); - responseVerifier.accept(responseFutureRef.get()); + Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); verifyServerIncompleteRead(pair, fileLength); @@ -573,40 +547,7 @@ private static void testChannelCloseDuringPublisherRead( @ParameterizedTest @MethodSource("serverRequestPairs") - void testFileModificationDuringPublisherReadAsync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { - testFileModificationDuringPublisherRead(pair, tempDir, Requestor.ASYNC, responseFuture -> { - Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); - String requestFailure2Message = requestFailure2.getMessage(); - assertTrue( - requestFailure2Message.contains("Unexpected EOF"), - "unexpected message: " + requestFailure2Message); - }); - } - - @ParameterizedTest - @MethodSource("serverRequestPairs") - void testFileModificationDuringPublisherReadSync(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { - testFileModificationDuringPublisherRead(pair, tempDir, Requestor.SYNC, responseFuture -> { - Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(RuntimeException.class, requestFailure0.getCause()); - Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); - Exception requestFailure3 = assertInstanceOf(UncheckedIOException.class, requestFailure2.getCause()); - Exception requestFailure4 = assertInstanceOf(IOException.class, requestFailure3.getCause()); - String requestFailure4Message = requestFailure4.getMessage(); - assertTrue( - requestFailure4Message.contains("Unexpected EOF"), - "unexpected message: " + requestFailure4Message); - }); - } - - private static void testFileModificationDuringPublisherRead( - ServerRequestPair pair, - Path tempDir, - // Receiving an explicit requestor to cover exceptions thrown by both `send()` and `sendAsync()` - Requestor requestor, - Consumer>> responseVerifier) throws Exception { + void testFileModificationDuringPublisherRead(ServerRequestPair pair, @TempDir Path tempDir) throws Exception { establishInitialConnection(pair); ServerRequestPair.SERVER_REQUEST_RECEIVED_SIGNAL = new CountDownLatch(1); ServerRequestPair.SERVER_READ_PERMISSION = new CountDownLatch(1); @@ -622,7 +563,7 @@ private static void testFileModificationDuringPublisherRead( .requestBuilder .POST(BodyPublishers.ofFileChannel(fileChannel, 0, fileLength)) .build(); - Future> responseFuture = requestor.request(request, discarding()); + Future> responseFuture = CLIENT.sendAsync(request, discarding()); // Wait for server to receive the request LOGGER.log("Waiting for the request to be received"); @@ -638,7 +579,13 @@ private static void testFileModificationDuringPublisherRead( // Verifying the client failure LOGGER.log("Verifying the client failure"); - responseVerifier.accept(responseFuture); + Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); + Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); + Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); + String requestFailure2Message = requestFailure2.getMessage(); + assertTrue( + requestFailure2Message.contains("Unexpected EOF"), + "unexpected message: " + requestFailure2Message); verifyServerIncompleteRead(pair, fileLength); @@ -760,31 +707,4 @@ private static byte[] generateFileBytes(int length) { return bytes; } - @FunctionalInterface - private interface Requestor { - - Requestor ASYNC = CLIENT::sendAsync; - - Requestor SYNC = new Requestor() { - @Override - public Future> request( - HttpRequest request, - HttpResponse.BodyHandler responseHandler) { - return EXECUTOR.submit(() -> { - try { - return CLIENT.send(request, responseHandler); - } catch (Throwable t) { - if (t instanceof InterruptedException) { - Thread.currentThread().interrupt(); // Restore the interrupt - } - throw new RuntimeException(t); - } - }); - } - }; - - Future> request(HttpRequest request, HttpResponse.BodyHandler responseHandler); - - } - } From 91d3422ce3a07165b30c27a73ed9525eee7f88ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20Yaz=C4=B1c=C4=B1?= Date: Mon, 11 Aug 2025 09:33:16 +0200 Subject: [PATCH 14/14] Improve style for declaring multiple consecutive fields of the same type --- .../net/httpclient/FileChannelPublisherTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index e6a59e588fe59..5b064efa07810 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -96,13 +96,11 @@ class FileChannelPublisherTest { private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); - private static final ServerRequestPair HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false); - - private static final ServerRequestPair HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true); - - private static final ServerRequestPair HTTP2 = ServerRequestPair.of(Version.HTTP_2, false); - - private static final ServerRequestPair HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true); + private static final ServerRequestPair + HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false), + HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true), + HTTP2 = ServerRequestPair.of(Version.HTTP_2, false), + HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true); private static SSLContext createSslContext() { try {