Skip to content

Commit 5a5ad01

Browse files
committed
pool: emit firefly onStart marker from RemoteHttpDataTransferProtocol
Move the firefly flow-start marker emission from AbstractMoverProtocolTransferService.MoverTask into RemoteHttpDataTransferProtocol, where the actual HTTP connection's local socket address (correct IP and port) is available. Previously, the start marker was emitted in MoverTask.run() before the HTTP connection was established, using NetworkUtils.getLocalAddress() to derive the local endpoint. This produced the wrong port (0) and could select the wrong interface on multi-homed hosts. Now, RemoteHttpTransferService passes the TransferLifeCycle to RemoteHttpDataTransferProtocol at construction time and sets the Subject via the overridden createMover(). The protocol calls onStart() in doGet() and sendFile() immediately after capturing the local endpoint from HttpInetConnection, which provides the real bound address and port. Signed-off-by: Shawn McKee <smckee@umich.edu>
1 parent 5fe6ec9 commit 5a5ad01

4 files changed

Lines changed: 56 additions & 3 deletions

File tree

dcache-docs-pr

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 306c412a0c424d6cf05c02fb4858aced27ddef3d

modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
7373
_transferLifeCycle = transferLifeCycle;
7474
}
7575

76+
protected TransferLifeCycle getTransferLifeCycle() {
77+
return _transferLifeCycle;
78+
}
79+
7680
@Override
7781
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
7882
CellPath pathToDoor)

modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Splitter;
2121
import diskCacheV111.util.CacheException;
22+
import diskCacheV111.vehicles.PoolIoFileMessage;
2223
import diskCacheV111.vehicles.ProtocolInfo;
2324
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2425
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
@@ -58,8 +59,11 @@
5859
import org.apache.http.impl.client.HttpClients;
5960
import org.apache.http.protocol.HttpContext;
6061
import org.apache.http.protocol.HttpRequestExecutor;
62+
import org.dcache.pool.movers.Mover;
6163
import org.dcache.pool.movers.MoverProtocol;
64+
import org.dcache.pool.movers.MoverProtocolMover;
6265
import org.dcache.pool.movers.RemoteHttpDataTransferProtocol;
66+
import org.dcache.pool.repository.ReplicaDescriptor;
6367
import org.dcache.security.trust.AggregateX509TrustManager;
6468
import org.dcache.util.Version;
6569
import org.slf4j.Logger;
@@ -69,6 +73,8 @@
6973
import static com.google.common.base.Preconditions.checkArgument;
7074
import static dmg.util.Exceptions.meaningfulMessage;
7175

76+
import dmg.cells.nucleus.CellPath;
77+
7278
public class RemoteHttpTransferService extends SecureRemoteTransferService {
7379

7480
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class);
@@ -128,6 +134,15 @@ public HttpUriRequest getRedirect(final HttpRequest request,
128134
private X509TrustManager trustManager;
129135
private CloseableHttpClient sharedClient;
130136

137+
@Override
138+
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
139+
CellPath pathToDoor) throws CacheException {
140+
Mover<?> mover = super.createMover(handle, message, pathToDoor);
141+
MoverProtocolMover mpm = (MoverProtocolMover) mover;
142+
((RemoteHttpDataTransferProtocol) mpm.getMover()).setSubject(message.getSubject());
143+
return mover;
144+
}
145+
131146
@Override
132147
protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception {
133148
if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) {
@@ -145,7 +160,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception
145160
SSLContext context = buildSSLContext(credential.getKeyManager());
146161
CloseableHttpClient client = createClient(context);
147162

148-
return new RemoteHttpDataTransferProtocol(client) {
163+
return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) {
149164
@Override
150165
protected void afterTransfer() {
151166
super.afterTransfer();
@@ -159,7 +174,7 @@ protected void afterTransfer() {
159174
}
160175
}
161176

162-
return new RemoteHttpDataTransferProtocol(sharedClient);
177+
return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle());
163178
}
164179

165180
@PostConstruct

modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import diskCacheV111.util.CacheException;
1818
import diskCacheV111.util.ThirdPartyTransferFailedCacheException;
19+
import diskCacheV111.vehicles.IpProtocolInfo;
1920
import diskCacheV111.vehicles.ProtocolInfo;
2021
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2122
import java.io.IOException;
@@ -39,6 +40,7 @@
3940
import java.util.function.Consumer;
4041
import java.util.stream.Collectors;
4142
import javax.annotation.concurrent.GuardedBy;
43+
import javax.security.auth.Subject;
4244
import org.apache.http.Header;
4345
import org.apache.http.HttpEntity;
4446
import org.apache.http.HttpInetConnection;
@@ -213,9 +215,18 @@ private enum HeaderFlags {
213215
private Long _expectedTransferSize;
214216

215217
private InetSocketAddress _localEndpoint;
218+
private final TransferLifeCycle _transferLifeCycle;
219+
private Subject _subject;
220+
private boolean _startMarkerSent;
216221

217-
public RemoteHttpDataTransferProtocol(CloseableHttpClient client) {
222+
public RemoteHttpDataTransferProtocol(CloseableHttpClient client,
223+
TransferLifeCycle transferLifeCycle) {
218224
_client = requireNonNull(client);
225+
_transferLifeCycle = requireNonNull(transferLifeCycle);
226+
}
227+
228+
public void setSubject(Subject subject) {
229+
_subject = subject;
219230
}
220231

221232
private static void checkThat(boolean isOk, String message) throws CacheException {
@@ -540,6 +551,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf
540551
CloseableHttpResponse response = _client.execute(get, context);
541552

542553
_localEndpoint = localAddress().orElse(null);
554+
startFlowMarker();
543555

544556
boolean isSuccessful = false;
545557
try {
@@ -605,6 +617,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info)
605617

606618
try (CloseableHttpResponse response = _client.execute(put, context)) {
607619
_localEndpoint = localAddress().orElse(null);
620+
startFlowMarker();
608621
StatusLine status = response.getStatusLine();
609622
switch (status.getStatusCode()) {
610623
case 200: /* OK (not actually a valid response from PUT) */
@@ -981,6 +994,26 @@ public Long getBytesExpected() {
981994
return _expectedTransferSize;
982995
}
983996

997+
private void startFlowMarker() {
998+
if (_startMarkerSent || _localEndpoint == null || _subject == null) {
999+
return;
1000+
}
1001+
1002+
ProtocolInfo protocolInfo = _channel.getProtocolInfo();
1003+
if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) {
1004+
return;
1005+
}
1006+
1007+
InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress();
1008+
if (remoteEndpoint == null) {
1009+
return;
1010+
}
1011+
1012+
_transferLifeCycle.onStart(remoteEndpoint, _localEndpoint,
1013+
protocolInfo, _subject);
1014+
_startMarkerSent = true;
1015+
}
1016+
9841017
@Override
9851018
public Optional<InetSocketAddress> getLocalEndpoint() {
9861019
return Optional.ofNullable(_localEndpoint);

0 commit comments

Comments
 (0)