Skip to content

Commit e4bc478

Browse files
committed
Deal with non-existing queue in affinity mechanism
1 parent 29ff443 commit e4bc478

File tree

9 files changed

+133
-24
lines changed

9 files changed

+133
-24
lines changed

src/main/java/com/rabbitmq/client/amqp/AmqpException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public static class AmqpEntityDoesNotExistException extends AmqpException {
5454
public AmqpEntityDoesNotExistException(String message, Throwable cause) {
5555
super(message, cause);
5656
}
57+
58+
public AmqpEntityDoesNotExistException(String message) {
59+
super(message);
60+
}
5761
}
5862

5963
public static class AmqpResourceInvalidStateException extends AmqpException {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class AmqpManagement implements Management {
5858
private static final int CODE_200 = 200;
5959
private static final int CODE_201 = 201;
6060
private static final int CODE_204 = 204;
61+
private static final int CODE_404 = 404;
6162
private static final int CODE_409 = 409;
6263

6364
private final AmqpConnection connection;
@@ -384,12 +385,16 @@ private static void checkResponse(
384385
}
385386
int responseCode = request.mapResponse().code();
386387
if (IntStream.of(expectedResponseCodes).noneMatch(c -> c == responseCode)) {
387-
throw new AmqpException(
388-
"Unexpected response code: %d instead of %s",
389-
responseCode,
390-
IntStream.of(expectedResponseCodes)
391-
.mapToObj(String::valueOf)
392-
.collect(Collectors.joining(", ")));
388+
if (responseCode == CODE_404) {
389+
throw new AmqpException.AmqpEntityDoesNotExistException("Entity does not exist");
390+
} else {
391+
throw new AmqpException(
392+
"Unexpected response code: %d instead of %s",
393+
responseCode,
394+
IntStream.of(expectedResponseCodes)
395+
.mapToObj(String::valueOf)
396+
.collect(Collectors.joining(", ")));
397+
}
393398
}
394399
}
395400

src/main/java/com/rabbitmq/client/amqp/impl/Clock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
// info@rabbitmq.com.
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
public class Clock {
20+
final class Clock {
2121

2222
private volatile long time;
2323

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import com.rabbitmq.client.amqp.Address;
21+
import com.rabbitmq.client.amqp.AmqpException;
2122
import com.rabbitmq.client.amqp.ConnectionSettings;
2223
import com.rabbitmq.client.amqp.Management;
2324
import java.util.Collections;
@@ -43,6 +44,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
4344
AffinityCache affinityCache) {
4445
// TODO add retry for sensitive operations in affinity mechanism
4546
if (affinity == null) {
47+
// no affinity asked, we create a connection and return it
4648
return connectionFactory.apply(null);
4749
}
4850
try {
@@ -59,6 +61,10 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
5961
info = lookUpQueueInfo(management, affinity, affinityCache);
6062
queueInfoRefreshed = true;
6163
}
64+
if (info == null) {
65+
// likely we could not look up the queue info, because e.g. the queue does not exist
66+
return connectionWrapper;
67+
}
6268
LOGGER.debug(
6369
"Looking affinity with queue '{}' (type = {}, leader = {}, replicas = {})",
6470
info.name(),
@@ -81,24 +87,28 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
8187
affinityCache.nodenameToAddress(connectionWrapper.nodename(), connectionWrapper.address());
8288
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
8389
if (!queueInfoRefreshed) {
84-
info = lookUpQueueInfo(management, affinity, affinityCache);
8590
LOGGER.debug(
8691
"Found affinity, but refreshing queue information to check affinity is still valid.");
87-
nodesWithAffinity = findAffinity(affinity, info);
88-
queueInfoRefreshed = true;
89-
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
92+
info = lookUpQueueInfo(management, affinity, affinityCache);
93+
if (info == null) {
94+
LOGGER.debug("Could not look up info for queue '{}'", affinity.queue());
9095
pickedConnection = connectionWrapper;
9196
} else {
92-
LOGGER.debug("Affinity no longer valid, retrying.");
93-
management.releaseResources();
94-
connectionWrapper.connection().close();
97+
nodesWithAffinity = findAffinity(affinity, info);
98+
queueInfoRefreshed = true;
99+
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
100+
pickedConnection = connectionWrapper;
101+
} else {
102+
LOGGER.debug("Affinity no longer valid, retrying.");
103+
management.releaseResources();
104+
connectionWrapper.connection().close();
105+
}
95106
}
96107
} else {
97108
pickedConnection = connectionWrapper;
98109
}
99110
if (pickedConnection != null) {
100-
LOGGER.debug(
101-
"Affinity found with node {}, returning connection", pickedConnection.nodename());
111+
LOGGER.debug("Returning connection to node {}", pickedConnection.nodename());
102112
}
103113
} else if (attemptCount == 5) {
104114
LOGGER.debug(
@@ -111,8 +121,10 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
111121
"Affinity {} not found with node {}.", affinity, connectionWrapper.nodename());
112122
if (!queueInfoRefreshed) {
113123
info = lookUpQueueInfo(management, affinity, affinityCache);
114-
nodesWithAffinity = findAffinity(affinity, info);
115-
queueInfoRefreshed = true;
124+
if (info != null) {
125+
nodesWithAffinity = findAffinity(affinity, info);
126+
queueInfoRefreshed = true;
127+
}
116128
}
117129
management.releaseResources();
118130
connectionWrapper.connection().close();
@@ -127,12 +139,20 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
127139

128140
private static Management.QueueInfo lookUpQueueInfo(
129141
AmqpManagement management, ConnectionAffinity affinity, AffinityCache cache) {
142+
Management.QueueInfo info = null;
130143
management.init();
131-
Management.QueueInfo info = management.queueInfo(affinity.queue());
132-
cache.queueInfo(info);
144+
try {
145+
info = management.queueInfo(affinity.queue());
146+
cache.queueInfo(info);
147+
} catch (AmqpException.AmqpEntityDoesNotExistException e) {
148+
LOGGER.debug("Queue '{}' does not exist.", affinity.queue());
149+
cache.clearQueueInfoEntry(affinity.queue());
150+
// we just return null, caller will have to return the last connection
151+
}
133152
return info;
134153
}
135154

155+
// TODO clean affinity cache (LRU or size-based)
136156
static class AffinityCache {
137157

138158
private final ConcurrentMap<String, Management.QueueInfo> queueInfoCache =
@@ -149,6 +169,10 @@ Management.QueueInfo queueInfo(String queue) {
149169
return this.queueInfoCache.get(queue);
150170
}
151171

172+
void clearQueueInfoEntry(String queue) {
173+
this.queueInfoCache.remove(queue);
174+
}
175+
152176
AffinityCache nodenameToAddress(String nodename, Address address) {
153177
if (nodename != null && !nodename.isBlank()) {
154178
this.nodenameToAddressMapping.put(nodename, address);

src/test/java/com/rabbitmq/client/amqp/impl/AffinityTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.rabbitmq.client.amqp.*;
2323
import java.util.function.Consumer;
2424
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
2526
import org.junit.jupiter.api.TestInfo;
2627
import org.junit.jupiter.api.extension.ExtendWith;
2728
import org.junit.jupiter.params.ParameterizedTest;
@@ -63,6 +64,15 @@ void sameConnectionShouldBeReturnedIfSameAffinityAndReuseActivated(
6364
}
6465
}
6566

67+
@Test
68+
void connectionShouldSucceedEvenIfAffinityQueueDoesNotExist() {
69+
try (Connection c =
70+
environment.connectionBuilder().affinity().queue("does not exist").connection().build()) {
71+
// testing the connection
72+
c.publisherBuilder().exchange("amq.fanout").build();
73+
}
74+
}
75+
6676
AmqpConnection connection(Consumer<ConnectionBuilder> operation) {
6777
ConnectionBuilder builder = environment.connectionBuilder();
6878
operation.accept(builder);

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.*;
2626

2727
import com.rabbitmq.client.amqp.Address;
28+
import com.rabbitmq.client.amqp.AmqpException;
2829
import com.rabbitmq.client.amqp.ConnectionSettings;
2930
import com.rabbitmq.client.amqp.Management;
3031
import java.util.ArrayList;
@@ -37,6 +38,7 @@
3738
import org.junit.jupiter.api.AfterEach;
3839
import org.junit.jupiter.api.BeforeEach;
3940
import org.junit.jupiter.api.Test;
41+
import org.mockito.AdditionalMatchers;
4042
import org.mockito.Mock;
4143
import org.mockito.MockitoAnnotations;
4244

@@ -76,6 +78,48 @@ void tearDown() throws Exception {
7678
mocks.close();
7779
}
7880

81+
@Test
82+
void shouldReturnConnectionIfInfoIsNull() {
83+
when(management.queueInfo(Q)).thenThrow(new AmqpException.AmqpEntityDoesNotExistException(""));
84+
when(cf.apply(null)).thenReturn(follower1Connection());
85+
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
86+
assertThat(w).hasNodename(FOLLOWER1_NODENAME);
87+
verify(management, times(1)).queueInfo(Q);
88+
verify(cf, times(1)).apply(null);
89+
verifyNoMoreInteractions(cf);
90+
verify(nativeConnection, never()).close();
91+
assertThat(cache).doesNotContainInfoFor(Q);
92+
}
93+
94+
@Test
95+
void infoInCache_ShouldLookUpInfo_ShouldReturnConnectionIfInfoIsNull_cacheEntryShouldBeCleared() {
96+
cache.queueInfo(info());
97+
when(management.queueInfo(Q)).thenThrow(new AmqpException.AmqpEntityDoesNotExistException(""));
98+
when(cf.apply(anyList())).thenReturn(leaderConnection());
99+
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
100+
assertThat(w).isLeader();
101+
verify(management, times(1)).queueInfo(Q);
102+
verify(cf, times(1)).apply(anyList());
103+
verify(nativeConnection, never()).close();
104+
assertThat(cache).doesNotContainInfoFor(Q);
105+
}
106+
107+
@Test
108+
void
109+
infoInCache_ShouldRetry_ShouldLookUpInfo_ShouldReturnConnectionIfInfoIsNull_cacheEntryShouldBeCleared() {
110+
cache.queueInfo(info());
111+
when(management.queueInfo(Q)).thenThrow(new AmqpException.AmqpEntityDoesNotExistException(""));
112+
when(cf.apply(anyListOrNull()))
113+
.thenReturn(follower1Connection())
114+
.thenReturn(follower2Connection());
115+
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
116+
assertThat(w).hasNodename(FOLLOWER2_NODENAME);
117+
verify(management, times(2)).queueInfo(Q);
118+
verify(cf, times(2)).apply(anyListOrNull());
119+
verify(nativeConnection, times(1)).close();
120+
assertThat(cache).doesNotContainInfoFor(Q);
121+
}
122+
79123
@Test
80124
void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldUseConnectionIfMatch() {
81125
cache.queueInfo(info());
@@ -205,6 +249,10 @@ static Management.QueueInfo info(Management.QueueType type, String leader, Strin
205249
return new TestQueueInfo(Q, type, leader, replicas);
206250
}
207251

252+
static <T> List<T> anyListOrNull() {
253+
return AdditionalMatchers.or(anyList(), isNull());
254+
}
255+
208256
static NativeConnectionWrapperAssert assertThat(AmqpConnection.NativeConnectionWrapper wrapper) {
209257
return new NativeConnectionWrapperAssert(wrapper);
210258
}
@@ -262,6 +310,17 @@ AffinityCacheAssert contains(Management.QueueInfo info) {
262310
return this;
263311
}
264312

313+
AffinityCacheAssert doesNotContainInfoFor(String queue) {
314+
Assert.notNull(queue, "Queue argument cannot be null");
315+
isNotNull();
316+
317+
Management.QueueInfo queueInfo = actual.queueInfo(queue);
318+
if (queueInfo != null) {
319+
fail("There should be no info entry for queue '%s', but found '%s'", queue, queueInfo);
320+
}
321+
return this;
322+
}
323+
265324
AffinityCacheAssert hasMapping(String nodename, Address address) {
266325
if (nodename == null || address == null) {
267326
throw new IllegalArgumentException("Expected nodename/address mapping cannot be null");

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ void publisherShouldThrowWhenExchangeDoesNotExist() {
290290
}
291291

292292
@Test
293-
void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() throws Exception {
293+
void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() {
294294
connection.management().exchange(name).type(FANOUT).declare();
295295
Sync closedSync = sync();
296296
AtomicReference<Throwable> closedException = new AtomicReference<>();

src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2122

23+
import com.rabbitmq.client.amqp.AmqpException;
2224
import com.rabbitmq.client.amqp.Management;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426
import java.util.function.Supplier;
@@ -62,4 +64,10 @@ void queueDeclareWithClientNamedQueueShouldBeRetriedIfNameAlreadyExists() {
6264
assertThat(queueInfo.name()).isNotEqualTo(q);
6365
assertThat(nameSupplierCallCount).hasValue(1 + 2);
6466
}
67+
68+
@Test
69+
void queueInfoShouldThrowDoesNotExistExceptionWhenQueueDoesNotExist() {
70+
assertThatThrownBy(() -> connection.management().queueInfo("do not exists"))
71+
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class);
72+
}
6573
}

src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,14 +441,13 @@ void deletedQueueIsNotRecovered() {
441441
closeConnectionAndWaitForRecovery();
442442
assertThat(connectionAttemptCount).hasValue(2);
443443
assertThatThrownBy(() -> connection.management().queueInfo(q))
444-
.isInstanceOf(AmqpException.class)
445-
.hasMessageContaining("404");
444+
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class);
446445
}
447446
}
448447

449448
@ParameterizedTest
450449
@ValueSource(booleans = {true, false})
451-
void closedConsumerIsNotRecovered(boolean isolateResources) throws Exception {
450+
void closedConsumerIsNotRecovered(boolean isolateResources) {
452451
String q = queue();
453452
Connection connection = connection(isolateResources);
454453
assertThat(connectionAttemptCount).hasValue(1);

0 commit comments

Comments
 (0)