From e571e75abd87588f8ed8c48c42d296870140f487 Mon Sep 17 00:00:00 2001 From: Victor Date: Fri, 3 Oct 2025 11:37:28 +0100 Subject: [PATCH 1/7] Fix race conditions in FTP close --- .../java/ch/cyberduck/core/ftp/FTPClient.java | 3 +- .../java/ch/cyberduck/core/ftp/FTPSocket.java | 293 ++++++++++++++++++ 2 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPClient.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPClient.java index 2fcd95d53dc..558aa88d303 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPClient.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPClient.java @@ -84,7 +84,8 @@ protected Socket _openDataConnection_(final String command, final String arg) th if(null == socket) { throw new FTPException(this.getReplyCode(), this.getReplyString()); } - return socket; + // Wrap socket to ensure proper TCP shutdown sequence + return new FTPSocket(socket); } @Override diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java new file mode 100644 index 00000000000..d93e13c27d1 --- /dev/null +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -0,0 +1,293 @@ +package ch.cyberduck.core.ftp; + +/* + * Copyright (c) 2002-2025 David Kocher. All rights reserved. + * http://cyberduck.ch/ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * Bug fixes, suggestions and comments should be sent to feedback@cyberduck.ch + */ + +import ch.cyberduck.core.ConnectionTimeout; +import ch.cyberduck.core.ConnectionTimeoutFactory; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * Socket wrapper that enforces proper TCP shutdown sequence to prevent + * race conditions due to premature socket closure during FTP data connections. + *

+ * This fixes issues where: + * - macOS: Intermittent "426 Connection closed; transfer aborted" errors due to socket closure before server ACKs + * - Windows: Intermittent transfer hanging due to socket closure before client sends FIN with last data packet + *

+ * The proper TCP shutdown sequence is: + * 1. Call shutdownOutput() to send FIN while keeping socket input open for ACKs + * 2. Drain input to wait for ACKs until we receive server FIN + * 3. Close the socket to release resources + */ +public class FTPSocket extends Socket { + private static final Logger log = LogManager.getLogger(FTPSocket.class); + + private final Socket delegate; + + private final ConnectionTimeout connectionTimeoutPreferences = ConnectionTimeoutFactory.get(); + + public FTPSocket(final Socket delegate) { + this.delegate = delegate; + } + + @Override + public void close() throws IOException { + try { + if(delegate.isClosed() || delegate.isOutputShutdown() || !delegate.isConnected()) { + log.debug("Socket already closed {}", delegate); + } + else { + // Shutdown output to send FIN, but keep socket open to receive ACKs + log.debug("Shutting down output for socket {}", delegate); + delegate.shutdownOutput(); + + // Wait for server FIN by draining any remaining data + // This ensures all our data packets are ACKed before closing + try { + // Avoid hanging in case the server malfunctions + delegate.setSoTimeout(connectionTimeoutPreferences.getTimeout() * 1000); + InputStream in = delegate.getInputStream(); + byte[] buffer = new byte[8192]; + int bytesRead; + // Read until EOF (server closes its side) or timeout + while((bytesRead = in.read(buffer)) != -1) { + log.debug("Drained {} bytes from socket after shutdown", bytesRead); + } + log.debug("Received EOF from server, all data acknowledged"); + } + catch(java.net.SocketTimeoutException e) { + // Timeout is acceptable - server may not close its side + log.debug("Timeout waiting for server EOF, proceeding with close"); + } + catch(IOException e) { + // Other errors during drain are acceptable + log.debug("Error draining socket: {}", e.getMessage()); + } + } + } + catch(IOException e) { + log.warn("Failed to shutdown output for socket {}: {}", delegate, e.getMessage()); + } + finally { + log.debug("Closing socket {}", delegate); + delegate.close(); + } + } + + @Override + public void connect(SocketAddress endpoint) throws IOException { + delegate.connect(endpoint); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + delegate.connect(endpoint, timeout); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException { + delegate.bind(bindpoint); + } + + @Override + public SocketAddress getRemoteSocketAddress() { + return delegate.getRemoteSocketAddress(); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return delegate.getLocalSocketAddress(); + } + + @Override + public SocketChannel getChannel() { + return delegate.getChannel(); + } + + @Override + public InputStream getInputStream() throws IOException { + return new FilterInputStream(delegate.getInputStream()) { + @Override + public void close() throws IOException { + FTPSocket.this.close(); // Call wrapper's close, not delegate's + } + }; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return new FilterOutputStream(delegate.getOutputStream()) { + @Override + public void close() throws IOException { + FTPSocket.this.close(); // Call wrapper's close, not delegate's + } + }; + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + delegate.setTcpNoDelay(on); + } + + @Override + public boolean getTcpNoDelay() throws SocketException { + return delegate.getTcpNoDelay(); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + delegate.setSoLinger(on, linger); + } + + @Override + public int getSoLinger() throws SocketException { + return delegate.getSoLinger(); + } + + @Override + public void sendUrgentData(int data) throws IOException { + delegate.sendUrgentData(data); + } + + @Override + public void setOOBInline(boolean on) throws SocketException { + delegate.setOOBInline(on); + } + + @Override + public boolean getOOBInline() throws SocketException { + return delegate.getOOBInline(); + } + + @Override + public void setSoTimeout(int timeout) throws SocketException { + delegate.setSoTimeout(timeout); + } + + @Override + public int getSoTimeout() throws SocketException { + return delegate.getSoTimeout(); + } + + @Override + public void setSendBufferSize(int size) throws SocketException { + delegate.setSendBufferSize(size); + } + + @Override + public int getSendBufferSize() throws SocketException { + return delegate.getSendBufferSize(); + } + + @Override + public void setReceiveBufferSize(int size) throws SocketException { + delegate.setReceiveBufferSize(size); + } + + @Override + public int getReceiveBufferSize() throws SocketException { + return delegate.getReceiveBufferSize(); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException { + delegate.setKeepAlive(on); + } + + @Override + public boolean getKeepAlive() throws SocketException { + return delegate.getKeepAlive(); + } + + @Override + public void setTrafficClass(int tc) throws SocketException { + delegate.setTrafficClass(tc); + } + + @Override + public int getTrafficClass() throws SocketException { + return delegate.getTrafficClass(); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + delegate.setReuseAddress(on); + } + + @Override + public boolean getReuseAddress() throws SocketException { + return delegate.getReuseAddress(); + } + + @Override + public void shutdownInput() throws IOException { + delegate.shutdownInput(); + } + + @Override + public void shutdownOutput() throws IOException { + delegate.shutdownOutput(); + } + + @Override + public boolean isConnected() { + return delegate.isConnected(); + } + + @Override + public boolean isBound() { + return delegate.isBound(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public boolean isInputShutdown() { + return delegate.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() { + return delegate.isOutputShutdown(); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + delegate.setPerformancePreferences(connectionTime, latency, bandwidth); + } + + @Override + public String toString() { + return "FTPSocket{" + delegate + "}"; + } +} From e84e5ee10f888ad9c0d12f0e8e100f8562ae6cb5 Mon Sep 17 00:00:00 2001 From: Victor Date: Fri, 3 Oct 2025 19:30:15 +0100 Subject: [PATCH 2/7] Improvements in FTPSocket --- .../java/ch/cyberduck/core/ftp/FTPSocket.java | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index d93e13c27d1..fb5a813b89e 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -32,6 +32,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.nio.channels.SocketChannel; +import java.util.concurrent.CompletableFuture; /** * Socket wrapper that enforces proper TCP shutdown sequence to prevent @@ -59,9 +60,16 @@ public FTPSocket(final Socket delegate) { @Override public void close() throws IOException { + if(delegate.isClosed()) { + log.debug("Socket already closed {}", delegate); + return; + } try { - if(delegate.isClosed() || delegate.isOutputShutdown() || !delegate.isConnected()) { - log.debug("Socket already closed {}", delegate); + if(delegate.isOutputShutdown()) { + log.debug("Socket output already closed {}", delegate); + } + else if(!delegate.isConnected()) { + log.debug("Socket is already disconnected {}", delegate); } else { // Shutdown output to send FIN, but keep socket open to receive ACKs @@ -70,34 +78,30 @@ public void close() throws IOException { // Wait for server FIN by draining any remaining data // This ensures all our data packets are ACKed before closing - try { - // Avoid hanging in case the server malfunctions - delegate.setSoTimeout(connectionTimeoutPreferences.getTimeout() * 1000); - InputStream in = delegate.getInputStream(); - byte[] buffer = new byte[8192]; - int bytesRead; - // Read until EOF (server closes its side) or timeout - while((bytesRead = in.read(buffer)) != -1) { - log.debug("Drained {} bytes from socket after shutdown", bytesRead); - } - log.debug("Received EOF from server, all data acknowledged"); - } - catch(java.net.SocketTimeoutException e) { - // Timeout is acceptable - server may not close its side - log.debug("Timeout waiting for server EOF, proceeding with close"); - } - catch(IOException e) { - // Other errors during drain are acceptable - log.debug("Error draining socket: {}", e.getMessage()); + log.debug("Draining input for socket {}", delegate); + InputStream in = delegate.getInputStream(); + byte[] buffer = new byte[8192]; + int bytesRead; + // Read until EOF (server closes its side) or timeout + while((bytesRead = in.read(buffer)) != -1) { + log.debug("Drained {} bytes from socket {}", bytesRead, delegate); } } } catch(IOException e) { - log.warn("Failed to shutdown output for socket {}: {}", delegate, e.getMessage()); + log.error("Failed to shutdown output for socket {}: {}", delegate, e.getMessage()); } finally { log.debug("Closing socket {}", delegate); - delegate.close(); + // Work around macOS bug where Java NIO's SocketDispatcher.close0() has a 1,000ms delay + CompletableFuture.runAsync(() -> { + try { + delegate.close(); + } + catch(IOException e) { + log.error("Error closing socket {}: {}", delegate, e.getMessage()); + } + }); } } From c79862d262b8b9a849167314d50b1b8683bf2640 Mon Sep 17 00:00:00 2001 From: Victor Date: Fri, 3 Oct 2025 20:53:42 +0100 Subject: [PATCH 3/7] Fix constant flushing in FTPSocket --- .../java/ch/cyberduck/core/ftp/FTPSocket.java | 63 ++++++++++++------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index fb5a813b89e..e32c685cedc 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -17,14 +17,13 @@ * Bug fixes, suggestions and comments should be sent to feedback@cyberduck.ch */ -import ch.cyberduck.core.ConnectionTimeout; -import ch.cyberduck.core.ConnectionTimeoutFactory; +import ch.cyberduck.core.preferences.PreferencesFactory; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.FilterInputStream; -import java.io.FilterOutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -51,15 +50,15 @@ public class FTPSocket extends Socket { private static final Logger log = LogManager.getLogger(FTPSocket.class); private final Socket delegate; - - private final ConnectionTimeout connectionTimeoutPreferences = ConnectionTimeoutFactory.get(); + private InputStream inputStreamWrapper; + private OutputStream outputStreamWrapper; public FTPSocket(final Socket delegate) { this.delegate = delegate; } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if(delegate.isClosed()) { log.debug("Socket already closed {}", delegate); return; @@ -93,7 +92,7 @@ else if(!delegate.isConnected()) { } finally { log.debug("Closing socket {}", delegate); - // Work around macOS bug where Java NIO's SocketDispatcher.close0() has a 1,000ms delay + // Work around macOS quirk where Java NIO's SocketDispatcher.close0() has a 1,000ms delay CompletableFuture.runAsync(() -> { try { delegate.close(); @@ -136,23 +135,43 @@ public SocketChannel getChannel() { } @Override - public InputStream getInputStream() throws IOException { - return new FilterInputStream(delegate.getInputStream()) { - @Override - public void close() throws IOException { - FTPSocket.this.close(); // Call wrapper's close, not delegate's - } - }; + public synchronized OutputStream getOutputStream() throws IOException { + if(outputStreamWrapper == null) { + outputStreamWrapper = new BufferedOutputStream(delegate.getOutputStream(), + PreferencesFactory.get().getInteger("connection.buffer")) { + @Override + public void close() throws IOException { + try { + // We can't close this stream as it would propagate to delegate.close() + // Therefore, we must flush before we manually close the delegate + super.flush(); + } + catch(IOException e) { + log.error("Error flushing output stream for socket {}: {}", delegate, e.getMessage()); + } + finally { + // Stream close will call Socket.close(), so override it with ours + FTPSocket.this.close(); + } + } + }; + } + return outputStreamWrapper; } @Override - public OutputStream getOutputStream() throws IOException { - return new FilterOutputStream(delegate.getOutputStream()) { - @Override - public void close() throws IOException { - FTPSocket.this.close(); // Call wrapper's close, not delegate's - } - }; + public synchronized InputStream getInputStream() throws IOException { + if(inputStreamWrapper == null) { + inputStreamWrapper = new BufferedInputStream(delegate.getInputStream(), + PreferencesFactory.get().getInteger("connection.buffer")) { + @Override + public void close() throws IOException { + // Stream close will call Socket.close(), so override it with ours + FTPSocket.this.close(); + } + }; + } + return inputStreamWrapper; } @Override From ad035ed11771c541f40477bf37c6ee4ae8c4eada Mon Sep 17 00:00:00 2001 From: Victor Date: Sun, 5 Oct 2025 18:00:58 +0100 Subject: [PATCH 4/7] Replace buffered with proxy streams in FTPSocket --- .../main/java/ch/cyberduck/core/ftp/FTPSocket.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index e32c685cedc..b3eaccf0827 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -17,13 +17,11 @@ * Bug fixes, suggestions and comments should be sent to feedback@cyberduck.ch */ -import ch.cyberduck.core.preferences.PreferencesFactory; - +import org.apache.commons.io.input.ProxyInputStream; +import org.apache.commons.io.output.ProxyOutputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -137,8 +135,7 @@ public SocketChannel getChannel() { @Override public synchronized OutputStream getOutputStream() throws IOException { if(outputStreamWrapper == null) { - outputStreamWrapper = new BufferedOutputStream(delegate.getOutputStream(), - PreferencesFactory.get().getInteger("connection.buffer")) { + outputStreamWrapper = new ProxyOutputStream(delegate.getOutputStream()) { @Override public void close() throws IOException { try { @@ -162,8 +159,7 @@ public void close() throws IOException { @Override public synchronized InputStream getInputStream() throws IOException { if(inputStreamWrapper == null) { - inputStreamWrapper = new BufferedInputStream(delegate.getInputStream(), - PreferencesFactory.get().getInteger("connection.buffer")) { + inputStreamWrapper = new ProxyInputStream(delegate.getInputStream()) { @Override public void close() throws IOException { // Stream close will call Socket.close(), so override it with ours From b227e01cb17e99b71ab3800718961fd4158f68d1 Mon Sep 17 00:00:00 2001 From: Victor Date: Sun, 5 Oct 2025 18:05:09 +0100 Subject: [PATCH 5/7] Surface critical errors in FTPSocket --- .../main/java/ch/cyberduck/core/ftp/FTPSocket.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index b3eaccf0827..79923c5cf1d 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -85,9 +85,6 @@ else if(!delegate.isConnected()) { } } } - catch(IOException e) { - log.error("Failed to shutdown output for socket {}: {}", delegate, e.getMessage()); - } finally { log.debug("Closing socket {}", delegate); // Work around macOS quirk where Java NIO's SocketDispatcher.close0() has a 1,000ms delay @@ -138,16 +135,12 @@ public synchronized OutputStream getOutputStream() throws IOException { outputStreamWrapper = new ProxyOutputStream(delegate.getOutputStream()) { @Override public void close() throws IOException { + // We can't call super.close() as it would call delegate.close() + // Therefore, we flush here and close the underlying stream ourselves try { - // We can't close this stream as it would propagate to delegate.close() - // Therefore, we must flush before we manually close the delegate super.flush(); } - catch(IOException e) { - log.error("Error flushing output stream for socket {}: {}", delegate, e.getMessage()); - } finally { - // Stream close will call Socket.close(), so override it with ours FTPSocket.this.close(); } } @@ -162,7 +155,7 @@ public synchronized InputStream getInputStream() throws IOException { inputStreamWrapper = new ProxyInputStream(delegate.getInputStream()) { @Override public void close() throws IOException { - // Stream close will call Socket.close(), so override it with ours + // super.close() will call delegate.close(), so override it with ours instead FTPSocket.this.close(); } }; From 352e11c34e01f5347d1e8d81bae0de03d99ed626 Mon Sep 17 00:00:00 2001 From: Victor Date: Sun, 5 Oct 2025 21:13:06 +0100 Subject: [PATCH 6/7] Fix read amplification in FTPSocket --- .../java/ch/cyberduck/core/ftp/FTPSocket.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index 79923c5cf1d..290179cd39b 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -30,6 +30,7 @@ import java.net.SocketException; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; /** * Socket wrapper that enforces proper TCP shutdown sequence to prevent @@ -48,6 +49,8 @@ public class FTPSocket extends Socket { private static final Logger log = LogManager.getLogger(FTPSocket.class); private final Socket delegate; + private final AtomicInteger outputBytesCount = new AtomicInteger(0); + private InputStream inputStreamWrapper; private OutputStream outputStreamWrapper; @@ -62,26 +65,28 @@ public synchronized void close() throws IOException { return; } try { - if(delegate.isOutputShutdown()) { - log.debug("Socket output already closed {}", delegate); - } - else if(!delegate.isConnected()) { - log.debug("Socket is already disconnected {}", delegate); - } - else { - // Shutdown output to send FIN, but keep socket open to receive ACKs - log.debug("Shutting down output for socket {}", delegate); - delegate.shutdownOutput(); - - // Wait for server FIN by draining any remaining data - // This ensures all our data packets are ACKed before closing - log.debug("Draining input for socket {}", delegate); - InputStream in = delegate.getInputStream(); - byte[] buffer = new byte[8192]; - int bytesRead; - // Read until EOF (server closes its side) or timeout - while((bytesRead = in.read(buffer)) != -1) { - log.debug("Drained {} bytes from socket {}", bytesRead, delegate); + // Only do full TCP shutdown if we have output bytes, otherwise close socket directly + if(outputBytesCount.get() > 0) { + if(delegate.isOutputShutdown()) { + log.debug("Socket output already closed {}", delegate); + } + else if(!delegate.isConnected()) { + log.debug("Socket is already disconnected {}", delegate); + } + else { + // Shutdown output to send FIN, but keep socket open to receive ACKs + log.debug("Shutting down output for socket {}", delegate); + delegate.shutdownOutput(); + + log.debug("Waiting for input to close for socket {}", delegate); + int bytesRead = 0; + // Read until EOF (server FIN) or timeout + while(delegate.getInputStream().read() != -1) { + bytesRead++; + } + if(bytesRead > 0) { + log.debug("Drained {} bytes from socket {}", bytesRead, delegate); + } } } } @@ -144,6 +149,11 @@ public void close() throws IOException { FTPSocket.this.close(); } } + + @Override + protected void afterWrite(final int n) { + outputBytesCount.addAndGet(n); + } }; } return outputStreamWrapper; From 47b7033fc4fde660213d1d635f7e87590903fd3a Mon Sep 17 00:00:00 2001 From: Victor Date: Thu, 9 Oct 2025 14:19:50 +0200 Subject: [PATCH 7/7] Refactor FTPSocket to address comments --- .../java/ch/cyberduck/core/ftp/FTPSocket.java | 232 ++---------------- 1 file changed, 26 insertions(+), 206 deletions(-) diff --git a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java index 290179cd39b..c2d44fde776 100644 --- a/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java +++ b/ftp/src/main/java/ch/cyberduck/core/ftp/FTPSocket.java @@ -17,8 +17,10 @@ * Bug fixes, suggestions and comments should be sent to feedback@cyberduck.ch */ +import com.amazonaws.internal.DelegateSocket; + import org.apache.commons.io.input.ProxyInputStream; -import org.apache.commons.io.output.ProxyOutputStream; +import org.apache.commons.io.output.CountingOutputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,11 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; /** * Socket wrapper that enforces proper TCP shutdown sequence to prevent @@ -45,102 +43,69 @@ * 2. Drain input to wait for ACKs until we receive server FIN * 3. Close the socket to release resources */ -public class FTPSocket extends Socket { +public class FTPSocket extends DelegateSocket { private static final Logger log = LogManager.getLogger(FTPSocket.class); - private final Socket delegate; - private final AtomicInteger outputBytesCount = new AtomicInteger(0); - private InputStream inputStreamWrapper; - private OutputStream outputStreamWrapper; + private CountingOutputStream outputStreamWrapper; - public FTPSocket(final Socket delegate) { - this.delegate = delegate; + public FTPSocket(final Socket sock) { + super(sock); } @Override public synchronized void close() throws IOException { - if(delegate.isClosed()) { - log.debug("Socket already closed {}", delegate); + if(sock.isClosed()) { + log.debug("Socket already closed {}", sock); return; } try { // Only do full TCP shutdown if we have output bytes, otherwise close socket directly - if(outputBytesCount.get() > 0) { - if(delegate.isOutputShutdown()) { - log.debug("Socket output already closed {}", delegate); + if(outputStreamWrapper != null && outputStreamWrapper.getByteCount() > 0) { + if(sock.isOutputShutdown()) { + log.debug("Socket output already closed {}", sock); } - else if(!delegate.isConnected()) { - log.debug("Socket is already disconnected {}", delegate); + else if(!sock.isConnected()) { + log.debug("Socket is already disconnected {}", sock); } else { // Shutdown output to send FIN, but keep socket open to receive ACKs - log.debug("Shutting down output for socket {}", delegate); - delegate.shutdownOutput(); + log.debug("Shutting down output for socket {}", sock); + sock.shutdownOutput(); - log.debug("Waiting for input to close for socket {}", delegate); + log.debug("Waiting for input to close for socket {}", sock); int bytesRead = 0; // Read until EOF (server FIN) or timeout - while(delegate.getInputStream().read() != -1) { + while(sock.getInputStream().read() != -1) { bytesRead++; } if(bytesRead > 0) { - log.debug("Drained {} bytes from socket {}", bytesRead, delegate); + log.warn("Drained {} bytes from socket {}", bytesRead, sock); } } } } finally { - log.debug("Closing socket {}", delegate); + log.debug("Closing socket {}", sock); // Work around macOS quirk where Java NIO's SocketDispatcher.close0() has a 1,000ms delay CompletableFuture.runAsync(() -> { try { - delegate.close(); + sock.close(); } catch(IOException e) { - log.error("Error closing socket {}: {}", delegate, e.getMessage()); + log.error("Error closing socket {}: {}", sock, e.getMessage()); } }); } } - @Override - public void connect(SocketAddress endpoint) throws IOException { - delegate.connect(endpoint); - } - - @Override - public void connect(SocketAddress endpoint, int timeout) throws IOException { - delegate.connect(endpoint, timeout); - } - - @Override - public void bind(SocketAddress bindpoint) throws IOException { - delegate.bind(bindpoint); - } - - @Override - public SocketAddress getRemoteSocketAddress() { - return delegate.getRemoteSocketAddress(); - } - - @Override - public SocketAddress getLocalSocketAddress() { - return delegate.getLocalSocketAddress(); - } - - @Override - public SocketChannel getChannel() { - return delegate.getChannel(); - } - @Override public synchronized OutputStream getOutputStream() throws IOException { if(outputStreamWrapper == null) { - outputStreamWrapper = new ProxyOutputStream(delegate.getOutputStream()) { + outputStreamWrapper = new CountingOutputStream(sock.getOutputStream()) { @Override public void close() throws IOException { - // We can't call super.close() as it would call delegate.close() + // We can't call super.close() as it would call sock.close() // Therefore, we flush here and close the underlying stream ourselves try { super.flush(); @@ -149,11 +114,6 @@ public void close() throws IOException { FTPSocket.this.close(); } } - - @Override - protected void afterWrite(final int n) { - outputBytesCount.addAndGet(n); - } }; } return outputStreamWrapper; @@ -162,154 +122,14 @@ protected void afterWrite(final int n) { @Override public synchronized InputStream getInputStream() throws IOException { if(inputStreamWrapper == null) { - inputStreamWrapper = new ProxyInputStream(delegate.getInputStream()) { + inputStreamWrapper = new ProxyInputStream(sock.getInputStream()) { @Override public void close() throws IOException { - // super.close() will call delegate.close(), so override it with ours instead + // super.close() will call sock.close(), so override it with ours instead FTPSocket.this.close(); } }; } return inputStreamWrapper; } - - @Override - public void setTcpNoDelay(boolean on) throws SocketException { - delegate.setTcpNoDelay(on); - } - - @Override - public boolean getTcpNoDelay() throws SocketException { - return delegate.getTcpNoDelay(); - } - - @Override - public void setSoLinger(boolean on, int linger) throws SocketException { - delegate.setSoLinger(on, linger); - } - - @Override - public int getSoLinger() throws SocketException { - return delegate.getSoLinger(); - } - - @Override - public void sendUrgentData(int data) throws IOException { - delegate.sendUrgentData(data); - } - - @Override - public void setOOBInline(boolean on) throws SocketException { - delegate.setOOBInline(on); - } - - @Override - public boolean getOOBInline() throws SocketException { - return delegate.getOOBInline(); - } - - @Override - public void setSoTimeout(int timeout) throws SocketException { - delegate.setSoTimeout(timeout); - } - - @Override - public int getSoTimeout() throws SocketException { - return delegate.getSoTimeout(); - } - - @Override - public void setSendBufferSize(int size) throws SocketException { - delegate.setSendBufferSize(size); - } - - @Override - public int getSendBufferSize() throws SocketException { - return delegate.getSendBufferSize(); - } - - @Override - public void setReceiveBufferSize(int size) throws SocketException { - delegate.setReceiveBufferSize(size); - } - - @Override - public int getReceiveBufferSize() throws SocketException { - return delegate.getReceiveBufferSize(); - } - - @Override - public void setKeepAlive(boolean on) throws SocketException { - delegate.setKeepAlive(on); - } - - @Override - public boolean getKeepAlive() throws SocketException { - return delegate.getKeepAlive(); - } - - @Override - public void setTrafficClass(int tc) throws SocketException { - delegate.setTrafficClass(tc); - } - - @Override - public int getTrafficClass() throws SocketException { - return delegate.getTrafficClass(); - } - - @Override - public void setReuseAddress(boolean on) throws SocketException { - delegate.setReuseAddress(on); - } - - @Override - public boolean getReuseAddress() throws SocketException { - return delegate.getReuseAddress(); - } - - @Override - public void shutdownInput() throws IOException { - delegate.shutdownInput(); - } - - @Override - public void shutdownOutput() throws IOException { - delegate.shutdownOutput(); - } - - @Override - public boolean isConnected() { - return delegate.isConnected(); - } - - @Override - public boolean isBound() { - return delegate.isBound(); - } - - @Override - public boolean isClosed() { - return delegate.isClosed(); - } - - @Override - public boolean isInputShutdown() { - return delegate.isInputShutdown(); - } - - @Override - public boolean isOutputShutdown() { - return delegate.isOutputShutdown(); - } - - @Override - public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { - delegate.setPerformancePreferences(connectionTime, latency, bandwidth); - } - - @Override - public String toString() { - return "FTPSocket{" + delegate + "}"; - } }