Skip to content

Commit e57a0b0

Browse files
committed
Fix affinity mechanism
1 parent d1b93c8 commit e57a0b0

File tree

1 file changed

+27
-25
lines changed

1 file changed

+27
-25
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -243,22 +243,33 @@ private static NativeConnectionWrapper enforceAffinity(
243243
if (affinity == null) {
244244
return connectionFactory.apply(null);
245245
}
246-
NativeConnectionWrapper connectionWrapper = null;
247-
String queue = affinity.queue();
248-
Management.QueueInfo info = affinityCache.queueInfo(queue);
249-
if (info == null) {
250-
connectionWrapper = connectionFactory.apply(null);
251-
management.init();
252-
info = management.queueInfo(affinity.queue());
253-
}
254-
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
255246
try {
256247
NativeConnectionWrapper pickedConnection = null;
257248
int attemptCount = 0;
258249
boolean queueInfoRefreshed = false;
250+
List<String> nodesWithAffinity = null;
251+
Management.QueueInfo info = affinityCache.queueInfo(affinity.queue());
259252
while (pickedConnection == null) {
260253
attemptCount++;
261-
List<String> nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
254+
NativeConnectionWrapper connectionWrapper = null;
255+
if (info == null) {
256+
connectionWrapper = connectionFactory.apply(null);
257+
management.init();
258+
info = management.queueInfo(affinity.queue());
259+
queueInfoRefreshed = true;
260+
}
261+
if (nodesWithAffinity == null) {
262+
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
263+
}
264+
if (connectionWrapper == null) {
265+
List<Address> addressHints =
266+
nodesWithAffinity.stream()
267+
.map(affinityCache::nodenameToAddress)
268+
.filter(Objects::nonNull)
269+
.collect(Collectors.toList());
270+
connectionWrapper = connectionFactory.apply(addressHints);
271+
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
272+
}
262273
LOGGER.debug("Currently connected to node {}", connectionWrapper.nodename);
263274
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
264275
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
@@ -271,29 +282,20 @@ private static NativeConnectionWrapper enforceAffinity(
271282
pickedConnection = connectionWrapper;
272283
} else {
273284
LOGGER.debug("Affinity {} not found with node {}", affinity, connectionWrapper.nodename);
274-
connectionWrapper.connection.close();
275-
management.releaseResources();
276-
List<Address> addressHints =
277-
nodesWithAffinity.stream()
278-
.map(affinityCache::nodenameToAddress)
279-
.filter(Objects::nonNull)
280-
.collect(Collectors.toList());
281-
connectionWrapper = connectionFactory.apply(addressHints);
282-
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
283285
if (!queueInfoRefreshed) {
284286
management.init();
285287
info = management.queueInfo(affinity.queue());
286288
affinityCache.queueInfo(info);
289+
queueInfoRefreshed = true;
287290
}
291+
connectionWrapper.connection.close();
292+
management.releaseResources();
288293
}
289294
}
290295
return pickedConnection;
291-
} catch (Exception e) {
292-
LOGGER.warn(
293-
"Cannot enforce affinity because of error when looking up queue '{}': {}",
294-
affinity.queue(),
295-
e.getMessage());
296-
return connectionWrapper;
296+
} catch (RuntimeException e) {
297+
LOGGER.warn("Cannot enforce affinity {} of error when looking up queue", affinity, e);
298+
throw e;
297299
}
298300
}
299301

0 commit comments

Comments
 (0)