Skip to content

Commit ac4c7ef

Browse files
committed
Maintain state in event loop
1 parent 7226a9b commit ac4c7ef

File tree

4 files changed

+201
-92
lines changed

4 files changed

+201
-92
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/EventLoop.java

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,39 @@
2020
import com.rabbitmq.client.amqp.AmqpException;
2121
import java.time.Duration;
2222
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.Consumer;
26+
import java.util.function.Supplier;
2427
import org.slf4j.Logger;
2528
import org.slf4j.LoggerFactory;
2629

27-
final class EventLoop implements AutoCloseable {
30+
final class EventLoop<S> implements AutoCloseable {
2831

2932
private static final Duration TIMEOUT = Duration.ofSeconds(60);
3033
private static final Logger LOGGER = LoggerFactory.getLogger(EventLoop.class);
3134

35+
private final AtomicBoolean closed = new AtomicBoolean(false);
3236
private final String label;
3337
private final Future<?> loop;
3438
private final AtomicReference<Thread> loopThread = new AtomicReference<>();
35-
private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(100);
39+
private final AtomicReference<S> stateReference = new AtomicReference<>();
40+
private final BlockingQueue<Consumer<S>> taskQueue = new ArrayBlockingQueue<>(100);
3641

37-
EventLoop(String label, ExecutorService executorService) {
42+
EventLoop(Supplier<S> stateSupplier, String label, ExecutorService executorService) {
3843
this.label = label;
3944
CountDownLatch loopThreadSetLatch = new CountDownLatch(1);
40-
4145
this.loop =
4246
executorService.submit(
4347
() -> {
48+
S state = stateSupplier.get();
4449
loopThread.set(Thread.currentThread());
50+
stateReference.set(state);
4551
loopThreadSetLatch.countDown();
4652
while (!Thread.currentThread().isInterrupted()) {
4753
try {
48-
Runnable task = this.taskQueue.take();
49-
task.run();
54+
Consumer<S> task = this.taskQueue.take();
55+
task.accept(state);
5056
} catch (InterruptedException e) {
5157
return;
5258
} catch (Exception e) {
@@ -64,43 +70,56 @@ final class EventLoop implements AutoCloseable {
6470
}
6571
}
6672

67-
void submit(Runnable task) {
68-
if (Thread.currentThread().equals(this.loopThread.get())) {
69-
task.run();
73+
void submit(Consumer<S> task) {
74+
if (this.closed.get()) {
75+
throw new IllegalStateException("Event loop is closed");
7076
} else {
71-
CountDownLatch latch = new CountDownLatch(1);
72-
try {
73-
boolean added =
74-
this.taskQueue.offer(
75-
() -> {
76-
try {
77-
task.run();
78-
} catch (Exception e) {
79-
LOGGER.info("Error during {} task", this.label, e);
80-
} finally {
81-
latch.countDown();
82-
}
83-
},
84-
TIMEOUT.toMillis(),
85-
TimeUnit.MILLISECONDS);
86-
if (!added) {
87-
throw new AmqpException("Enqueueing of %s task timed out", this.label);
77+
if (Thread.currentThread().equals(this.loopThread.get())) {
78+
task.accept(this.stateReference.get());
79+
} else {
80+
CountDownLatch latch = new CountDownLatch(1);
81+
try {
82+
boolean added =
83+
this.taskQueue.offer(
84+
state -> {
85+
try {
86+
task.accept(state);
87+
} catch (Exception e) {
88+
LOGGER.info("Error during {} task", this.label, e);
89+
} finally {
90+
latch.countDown();
91+
}
92+
},
93+
TIMEOUT.toMillis(),
94+
TimeUnit.MILLISECONDS);
95+
if (!added) {
96+
throw new AmqpException("Enqueueing of %s task timed out", this.label);
97+
}
98+
} catch (InterruptedException e) {
99+
Thread.currentThread().interrupt();
100+
throw new AmqpException(this.label + " task enqueueing has been interrupted", e);
101+
}
102+
try {
103+
boolean completed = latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
104+
if (!completed) {
105+
LOGGER.warn("Event loop task did not complete in {} second(s)", TIMEOUT.toSeconds());
106+
}
107+
} catch (InterruptedException e) {
108+
Thread.currentThread().interrupt();
109+
throw new AmqpException(this.label + " Topology task processing has been interrupted", e);
88110
}
89-
} catch (InterruptedException e) {
90-
Thread.currentThread().interrupt();
91-
throw new AmqpException(this.label + " task enqueueing has been interrupted", e);
92-
}
93-
try {
94-
latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
95-
} catch (InterruptedException e) {
96-
Thread.currentThread().interrupt();
97-
throw new AmqpException(this.label + " Topology task processing has been interrupted", e);
98111
}
99112
}
100113
}
101114

115+
S state() {
116+
return this.stateReference.get();
117+
}
118+
102119
@Override
103120
public void close() {
104-
this.loop.cancel(true);
121+
if (this.closed.compareAndSet(false, true)) {
122+
this.loop.cancel(true);
123+
}
105124
}
106125
}

src/main/java/com/rabbitmq/client/amqp/impl/RecordingTopologyListener.java

Lines changed: 67 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,80 +21,86 @@
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.Consumer;
2425

2526
final class RecordingTopologyListener implements TopologyListener, AutoCloseable {
2627

27-
private final EventLoop eventLoop;
28-
private final Map<String, ExchangeSpec> exchanges = new LinkedHashMap<>();
29-
private final Map<String, QueueSpec> queues = new LinkedHashMap<>();
30-
private final Set<BindingSpec> bindings = new LinkedHashSet<>();
31-
private final Map<Long, ConsumerSpec> consumers = new LinkedHashMap<>();
28+
private final EventLoop<State> eventLoop;
29+
3230
private final AtomicBoolean closed = new AtomicBoolean(false);
3331

3432
RecordingTopologyListener(ExecutorService executorService) {
35-
this.eventLoop = new EventLoop("topology", executorService);
33+
this.eventLoop = new EventLoop<>(State::new, "topology", executorService);
34+
}
35+
36+
private static class State {
37+
38+
private final Map<String, ExchangeSpec> exchanges = new LinkedHashMap<>();
39+
private final Map<String, QueueSpec> queues = new LinkedHashMap<>();
40+
private final Set<BindingSpec> bindings = new LinkedHashSet<>();
41+
private final Map<Long, ConsumerSpec> consumers = new LinkedHashMap<>();
3642
}
3743

3844
@Override
3945
public void exchangeDeclared(AmqpExchangeSpecification specification) {
40-
this.submit(() -> this.exchanges.put(specification.name(), new ExchangeSpec(specification)));
46+
this.submit(s -> s.exchanges.put(specification.name(), new ExchangeSpec(specification)));
4147
}
4248

4349
@Override
4450
public void exchangeDeleted(String name) {
4551
this.submit(
46-
() -> {
47-
this.exchanges.remove(name);
48-
Set<BindingSpec> deletedBindings = this.deleteBindings(name, true);
49-
this.deleteAutoDeleteExchanges(deletedBindings);
52+
s -> {
53+
s.exchanges.remove(name);
54+
Set<BindingSpec> deletedBindings = this.deleteBindings(s, name, true);
55+
this.deleteAutoDeleteExchanges(s, deletedBindings);
5056
});
5157
}
5258

5359
@Override
5460
public void queueDeclared(AmqpQueueSpecification specification) {
55-
this.submit(() -> this.queues.put(specification.name(), new QueueSpec(specification)));
61+
this.submit(s -> s.queues.put(specification.name(), new QueueSpec(specification)));
5662
}
5763

5864
@Override
5965
public void queueDeleted(String name) {
6066
this.submit(
61-
() -> {
62-
this.queues.remove(name);
63-
Set<BindingSpec> deletedBindings = this.deleteBindings(name, false);
64-
this.deleteAutoDeleteExchanges(deletedBindings);
67+
s -> {
68+
s.queues.remove(name);
69+
Set<BindingSpec> deletedBindings = this.deleteBindings(s, name, false);
70+
this.deleteAutoDeleteExchanges(s, deletedBindings);
6571
});
6672
}
6773

6874
@Override
6975
public void bindingDeclared(AmqpBindingManagement.AmqpBindingSpecification specification) {
70-
this.submit(() -> this.bindings.add(new BindingSpec(specification.state())));
76+
this.submit(s -> s.bindings.add(new BindingSpec(specification.state())));
7177
}
7278

7379
@Override
7480
public void bindingDeleted(AmqpBindingManagement.AmqpUnbindSpecification specification) {
7581
this.submit(
76-
() -> {
82+
s -> {
7783
BindingSpec spec = new BindingSpec(specification.state());
78-
this.bindings.remove(spec);
79-
this.deleteAutoDeleteExchanges(Collections.singleton(spec));
84+
s.bindings.remove(spec);
85+
this.deleteAutoDeleteExchanges(s, Collections.singleton(spec));
8086
});
8187
}
8288

8389
@Override
8490
public void consumerCreated(long id, String queue) {
85-
this.submit(() -> this.consumers.put(id, new ConsumerSpec(id, queue)));
91+
this.submit(s -> s.consumers.put(id, new ConsumerSpec(id, queue)));
8692
}
8793

8894
@Override
8995
public void consumerDeleted(long id, String queue) {
9096
this.submit(
91-
() -> {
92-
this.consumers.remove(id);
97+
s -> {
98+
s.consumers.remove(id);
9399
// if there's no consumer anymore on the queue, delete it if it's auto-delete
94100
boolean atLeastOneConsumerOnQueue =
95-
this.consumers.values().stream().anyMatch(spec -> spec.queue.equals(queue));
101+
s.consumers.values().stream().anyMatch(spec -> spec.queue.equals(queue));
96102
if (!atLeastOneConsumerOnQueue) {
97-
QueueSpec queueSpec = this.queues.get(queue);
103+
QueueSpec queueSpec = s.queues.get(queue);
98104
if (queueSpec != null && queueSpec.autoDelete) {
99105
this.queueDeleted(queue);
100106
}
@@ -109,16 +115,16 @@ public void close() {
109115
}
110116
}
111117

112-
private void submit(Runnable task) {
118+
private void submit(Consumer<State> task) {
113119
if (!this.closed.get()) {
114120
this.eventLoop.submit(task);
115121
}
116122
}
117123

118-
private Set<BindingSpec> deleteBindings(String name, boolean exchange) {
124+
private Set<BindingSpec> deleteBindings(State s, String name, boolean exchange) {
119125
Set<BindingSpec> deletedBindings = new LinkedHashSet<>();
120126
// delete bindings that depend on this exchange or queue
121-
Iterator<BindingSpec> iterator = this.bindings.iterator();
127+
Iterator<BindingSpec> iterator = s.bindings.iterator();
122128
while (iterator.hasNext()) {
123129
BindingSpec spec = iterator.next();
124130
if (spec.isInvolved(name, exchange)) {
@@ -129,57 +135,37 @@ private Set<BindingSpec> deleteBindings(String name, boolean exchange) {
129135
return deletedBindings;
130136
}
131137

132-
private void deleteAutoDeleteExchanges(Set<BindingSpec> deletedBindings) {
138+
private void deleteAutoDeleteExchanges(State s, Set<BindingSpec> deletedBindings) {
133139
// delete auto-delete exchanges which are no longer sources in any bindings
134140
for (BindingSpec binding : deletedBindings) {
135141
String source = binding.source;
136142
boolean exchangeStillSource;
137-
exchangeStillSource = this.bindings.stream().anyMatch(b -> b.source.equals(source));
143+
exchangeStillSource = s.bindings.stream().anyMatch(b -> b.source.equals(source));
138144

139145
if (!exchangeStillSource) {
140-
ExchangeSpec exchange = this.exchanges.get(source);
146+
ExchangeSpec exchange = s.exchanges.get(source);
141147
if (exchange != null && exchange.autoDelete) {
142148
exchangeDeleted(exchange.name);
143149
}
144150
}
145151
}
146152
}
147153

148-
Map<String, ExchangeSpec> exchanges() {
149-
return new LinkedHashMap<>(this.exchanges);
150-
}
151-
152-
Map<String, QueueSpec> queues() {
153-
return new LinkedHashMap<>(this.queues);
154-
}
155-
156154
void accept(Visitor visitor) {
157155
AtomicReference<List<ExchangeSpec>> exchangeCopy = new AtomicReference<>();
158156
AtomicReference<List<QueueSpec>> queueCopy = new AtomicReference<>();
159157
AtomicReference<Set<BindingSpec>> bindingCopy = new AtomicReference<>();
160158
submit(
161-
() -> {
162-
exchangeCopy.set(new ArrayList<>(this.exchanges.values()));
163-
queueCopy.set(new ArrayList<>(this.queues.values()));
164-
bindingCopy.set(new LinkedHashSet<>(this.bindings));
159+
s -> {
160+
exchangeCopy.set(new ArrayList<>(s.exchanges.values()));
161+
queueCopy.set(new ArrayList<>(s.queues.values()));
162+
bindingCopy.set(new LinkedHashSet<>(s.bindings));
165163
});
166164
visitor.visitExchanges(exchangeCopy.get());
167165
visitor.visitQueues(queueCopy.get());
168166
visitor.visitBindings(bindingCopy.get());
169167
}
170168

171-
int bindingCount() {
172-
return this.bindings.size();
173-
}
174-
175-
int exchangeCount() {
176-
return this.exchanges.size();
177-
}
178-
179-
int queueCount() {
180-
return this.queues.size();
181-
}
182-
183169
static class ExchangeSpec {
184170

185171
private final String name;
@@ -340,4 +326,30 @@ interface Visitor {
340326

341327
void visitBindings(Collection<BindingSpec> bindings);
342328
}
329+
330+
// for test assertions
331+
332+
private State state() {
333+
return this.eventLoop.state();
334+
}
335+
336+
Map<String, ExchangeSpec> exchanges() {
337+
return new LinkedHashMap<>(state().exchanges);
338+
}
339+
340+
Map<String, QueueSpec> queues() {
341+
return new LinkedHashMap<>(state().queues);
342+
}
343+
344+
int bindingCount() {
345+
return state().bindings.size();
346+
}
347+
348+
int exchangeCount() {
349+
return state().exchanges.size();
350+
}
351+
352+
int queueCount() {
353+
return state().queues.size();
354+
}
343355
}

0 commit comments

Comments
 (0)