Skip to content

Commit 835a63f

Browse files
committed
Add unit test for affinity
1 parent e57a0b0 commit 835a63f

File tree

2 files changed

+198
-3
lines changed

2 files changed

+198
-3
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,12 @@ private NativeConnectionWrapper connect(
235235
}
236236
}
237237

238-
private static NativeConnectionWrapper enforceAffinity(
238+
static NativeConnectionWrapper enforceAffinity(
239239
Function<List<Address>, NativeConnectionWrapper> connectionFactory,
240240
AmqpManagement management,
241241
ConnectionUtils.ConnectionAffinity affinity,
242242
ConnectionUtils.AffinityCache affinityCache) {
243+
// TODO add retry for sensitive operations in affinity mechanism
243244
if (affinity == null) {
244245
return connectionFactory.apply(null);
245246
}
@@ -712,18 +713,26 @@ public String toString() {
712713
return this.environment.toString() + "-" + this.id;
713714
}
714715

715-
private static class NativeConnectionWrapper {
716+
static class NativeConnectionWrapper {
716717

717718
private final org.apache.qpid.protonj2.client.Connection connection;
718719
private final String nodename;
719720
private final Address address;
720721

721-
private NativeConnectionWrapper(
722+
NativeConnectionWrapper(
722723
org.apache.qpid.protonj2.client.Connection connection, String nodename, Address address) {
723724
this.connection = connection;
724725
this.nodename = nodename;
725726
this.address = address;
726727
}
728+
729+
String nodename() {
730+
return this.nodename;
731+
}
732+
733+
public Address address() {
734+
return this.address;
735+
}
727736
}
728737

729738
@Override
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
// info@rabbitmq.com.
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import static com.rabbitmq.client.amqp.impl.AmqpConnection.enforceAffinity;
21+
import static org.assertj.core.api.Assertions.fail;
22+
import static org.mockito.ArgumentMatchers.anyList;
23+
import static org.mockito.Mockito.*;
24+
25+
import com.rabbitmq.client.amqp.Address;
26+
import com.rabbitmq.client.amqp.Management;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.function.Function;
31+
import org.apache.qpid.protonj2.client.impl.ClientConnection;
32+
import org.assertj.core.api.AbstractObjectAssert;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.mockito.Mock;
37+
import org.mockito.MockitoAnnotations;
38+
39+
public class AmqpConnectionAffinityTest {
40+
41+
private static final String LEADER_NODENAME = "l";
42+
private static final Address LEADER_ADDRESS = new Address(LEADER_NODENAME, 5672);
43+
private static final String FOLLOWER1_NODENAME = "f1";
44+
private static final Address FOLLOWER1_ADDRESS = new Address(FOLLOWER1_NODENAME, 5672);
45+
private static final String FOLLOWER2_NODENAME = "f2";
46+
private static final Address FOLLOWER2_ADDRESS = new Address(FOLLOWER2_NODENAME, 5672);
47+
private static final String Q = "my-queue";
48+
49+
AutoCloseable mocks;
50+
51+
@Mock AmqpManagement management;
52+
53+
@Mock Function<List<Address>, AmqpConnection.NativeConnectionWrapper> cf;
54+
55+
@Mock ClientConnection nativeConnection;
56+
57+
ConnectionUtils.AffinityCache cache;
58+
59+
@BeforeEach
60+
void init() {
61+
mocks = MockitoAnnotations.openMocks(this);
62+
cache = new ConnectionUtils.AffinityCache();
63+
}
64+
65+
@AfterEach
66+
void tearDown() throws Exception {
67+
mocks.close();
68+
}
69+
70+
@Test
71+
void noInfoLookupIfAlreadyInCache() {
72+
cache.queueInfo(info());
73+
when(cf.apply(anyList())).thenReturn(leaderConnection());
74+
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
75+
assertThat(w).isLeader();
76+
verifyNoInteractions(management);
77+
verify(cf, times(1)).apply(anyList());
78+
}
79+
80+
AmqpConnection.NativeConnectionWrapper leaderConnection() {
81+
return new AmqpConnection.NativeConnectionWrapper(
82+
this.nativeConnection, LEADER_NODENAME, LEADER_ADDRESS);
83+
}
84+
85+
static ConnectionUtils.ConnectionAffinity affinity() {
86+
return new ConnectionUtils.ConnectionAffinity(Q, null);
87+
}
88+
89+
static Management.QueueInfo info() {
90+
return info(LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME);
91+
}
92+
93+
static Management.QueueInfo info(String leader, String... followers) {
94+
Management.QueueType type = Management.QueueType.QUORUM;
95+
List<String> replicas = new ArrayList<>();
96+
replicas.add(leader);
97+
if (followers != null && followers.length > 0) {
98+
replicas.addAll(List.of(followers));
99+
}
100+
return new Management.QueueInfo() {
101+
102+
@Override
103+
public String name() {
104+
return Q;
105+
}
106+
107+
@Override
108+
public String leader() {
109+
return leader;
110+
}
111+
112+
@Override
113+
public List<String> replicas() {
114+
return replicas;
115+
}
116+
117+
@Override
118+
public Management.QueueType type() {
119+
return type;
120+
}
121+
122+
@Override
123+
public boolean durable() {
124+
return false;
125+
}
126+
127+
@Override
128+
public boolean autoDelete() {
129+
return false;
130+
}
131+
132+
@Override
133+
public boolean exclusive() {
134+
return false;
135+
}
136+
137+
@Override
138+
public Map<String, Object> arguments() {
139+
return Map.of();
140+
}
141+
142+
@Override
143+
public long messageCount() {
144+
return 0;
145+
}
146+
147+
@Override
148+
public int consumerCount() {
149+
return 0;
150+
}
151+
};
152+
}
153+
154+
static NativeConnectionWrapperAssert assertThat(AmqpConnection.NativeConnectionWrapper wrapper) {
155+
return new NativeConnectionWrapperAssert(wrapper);
156+
}
157+
158+
static class NativeConnectionWrapperAssert
159+
extends AbstractObjectAssert<
160+
NativeConnectionWrapperAssert, AmqpConnection.NativeConnectionWrapper> {
161+
162+
private NativeConnectionWrapperAssert(AmqpConnection.NativeConnectionWrapper wrapper) {
163+
super(wrapper, NativeConnectionWrapperAssert.class);
164+
}
165+
166+
NativeConnectionWrapperAssert hasNodename(String nodename) {
167+
isNotNull();
168+
if (!actual.nodename().equals(nodename)) {
169+
fail("Nodename should be '%s' but is '%s'", nodename, actual.nodename());
170+
}
171+
return this;
172+
}
173+
174+
NativeConnectionWrapperAssert hasAddress(Address address) {
175+
isNotNull();
176+
if (!actual.address().equals(address)) {
177+
fail("Address should be '%s' but is '%s'", address, actual.address());
178+
}
179+
return this;
180+
}
181+
182+
NativeConnectionWrapperAssert isLeader() {
183+
return this.hasNodename(LEADER_NODENAME).hasAddress(LEADER_ADDRESS);
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)