Skip to content

Commit ac1be2f

Browse files
committed
Add AlarmManager unit tests
1 parent 77222d6 commit ac1be2f

1 file changed

Lines changed: 350 additions & 0 deletions

File tree

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.dromara.dynamictp.test.core.notify.manager;
19+
20+
import org.dromara.dynamictp.common.em.NotifyItemEnum;
21+
import org.dromara.dynamictp.common.entity.NotifyItem;
22+
import org.dromara.dynamictp.common.pattern.filter.Invoker;
23+
import org.dromara.dynamictp.common.pattern.filter.InvokerChain;
24+
import org.dromara.dynamictp.core.notifier.context.BaseNotifyCtx;
25+
import org.dromara.dynamictp.core.notifier.manager.AlarmManager;
26+
import org.dromara.dynamictp.core.support.ExecutorWrapper;
27+
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.parallel.Execution;
32+
import org.junit.jupiter.api.parallel.ExecutionMode;
33+
import org.junit.jupiter.api.extension.ExtendWith;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.test.context.ContextConfiguration;
37+
import org.springframework.test.context.junit.jupiter.SpringExtension;
38+
39+
import java.lang.reflect.Field;
40+
import java.util.Collections;
41+
import java.util.concurrent.Executor;
42+
import java.util.concurrent.LinkedBlockingQueue;
43+
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.concurrent.atomic.AtomicReference;
46+
47+
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.api.Assertions.assertNull;
49+
import static org.junit.jupiter.api.Assertions.assertSame;
50+
import static org.junit.jupiter.api.Assertions.assertTrue;
51+
52+
/**
53+
* AlarmManager test
54+
*
55+
* @author yanhom
56+
* @since 1.2.2
57+
*/
58+
@ExtendWith(SpringExtension.class)
59+
@ContextConfiguration(classes = AlarmManagerTest.ContextConfig.class)
60+
@Execution(ExecutionMode.SAME_THREAD)
61+
class AlarmManagerTest {
62+
63+
private final AtomicReference<BaseNotifyCtx> alarmContext = new AtomicReference<>();
64+
65+
private final AtomicInteger alarmCount = new AtomicInteger();
66+
67+
private InvokerChain<BaseNotifyCtx> alarmInvokerChain;
68+
69+
private Invoker<BaseNotifyCtx> originalHead;
70+
71+
private Field headField;
72+
73+
@BeforeEach
74+
void setUp() throws Exception {
75+
alarmInvokerChain = getAlarmInvokerChain();
76+
headField = InvokerChain.class.getDeclaredField("head");
77+
headField.setAccessible(true);
78+
originalHead = (Invoker<BaseNotifyCtx>) headField.get(alarmInvokerChain);
79+
Invoker<BaseNotifyCtx> testInvoker = context -> {
80+
alarmContext.set(context);
81+
alarmCount.incrementAndGet();
82+
};
83+
headField.set(alarmInvokerChain, testInvoker);
84+
}
85+
86+
@AfterEach
87+
void tearDown() throws Exception {
88+
headField.set(alarmInvokerChain, originalHead);
89+
}
90+
91+
@Test
92+
void testDoCheckAndTryAlarmTriggersWhenCapacityReachesThreshold() {
93+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.CAPACITY, 80);
94+
ExecutorWrapper wrapper = executorWrapper("capacity-reached", notifyItem, mockExecutor(1, 1, 0, 8, 10));
95+
96+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.CAPACITY);
97+
98+
assertEquals(1, alarmCount.get());
99+
assertSame(notifyItem, alarmContext.get().getNotifyItem());
100+
assertEquals(NotifyItemEnum.CAPACITY, alarmContext.get().getNotifyItemEnum());
101+
assertEquals("capacity-reached", alarmContext.get().getExecutorWrapper().getThreadPoolName());
102+
}
103+
104+
@Test
105+
void testDoCheckAndTryAlarmSkipsWhenCapacityBelowThreshold() {
106+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.CAPACITY, 90);
107+
ExecutorWrapper wrapper = executorWrapper("capacity-below", notifyItem, mockExecutor(1, 1, 0, 8, 10));
108+
109+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.CAPACITY);
110+
111+
assertNoAlarm();
112+
}
113+
114+
@Test
115+
void testDoCheckAndTryAlarmSkipsWhenQueueIsEmpty() {
116+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.CAPACITY, 1);
117+
ExecutorWrapper wrapper = executorWrapper("capacity-empty", notifyItem, mockExecutor(1, 1, 0, 0, 10));
118+
119+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.CAPACITY);
120+
121+
assertNoAlarm();
122+
}
123+
124+
@Test
125+
void testDoCheckAndTryAlarmTriggersWhenLivenessReachesThreshold() {
126+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.LIVENESS, 70);
127+
ExecutorWrapper wrapper = executorWrapper("liveness-reached", notifyItem, mockExecutor(1, 10, 7, 0, 10));
128+
129+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.LIVENESS);
130+
131+
assertEquals(1, alarmCount.get());
132+
assertSame(notifyItem, alarmContext.get().getNotifyItem());
133+
assertEquals(NotifyItemEnum.LIVENESS, alarmContext.get().getNotifyItemEnum());
134+
assertEquals(7, alarmContext.get().getExecutorWrapper().getExecutor().getActiveCount());
135+
}
136+
137+
@Test
138+
void testDoCheckAndTryAlarmSkipsWhenLivenessBelowThreshold() {
139+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.LIVENESS, 80);
140+
ExecutorWrapper wrapper = executorWrapper("liveness-below", notifyItem, mockExecutor(1, 10, 7, 0, 10));
141+
142+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.LIVENESS);
143+
144+
assertNoAlarm();
145+
}
146+
147+
@Test
148+
void testDoCheckAndTryAlarmTriggersDirectAlarmTypesWithoutThresholdCheck() {
149+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.REJECT, 100);
150+
ExecutorWrapper wrapper = executorWrapper("reject-direct", notifyItem, mockExecutor(1, 10, 0, 0, 10));
151+
152+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.REJECT);
153+
154+
assertEquals(1, alarmCount.get());
155+
assertSame(notifyItem, alarmContext.get().getNotifyItem());
156+
assertEquals(NotifyItemEnum.REJECT, alarmContext.get().getNotifyItemEnum());
157+
}
158+
159+
@Test
160+
void testDoCheckAndTryAlarmSkipsUnsupportedScheduleType() {
161+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.CHANGE, 1);
162+
ExecutorWrapper wrapper = executorWrapper("change-unsupported", notifyItem, mockExecutor(1, 10, 10, 10, 10));
163+
164+
AlarmManager.doCheckAndTryAlarm(wrapper, NotifyItemEnum.CHANGE);
165+
166+
assertNoAlarm();
167+
}
168+
169+
@Test
170+
void testDoTryAlarmTriggersWhenNotifyItemExists() {
171+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.QUEUE_TIMEOUT, 100);
172+
ExecutorWrapper wrapper = executorWrapper("queue-timeout-direct", notifyItem, mockExecutor(1, 10, 0, 0, 10));
173+
174+
AlarmManager.doTryAlarm(wrapper, NotifyItemEnum.QUEUE_TIMEOUT);
175+
176+
assertEquals(1, alarmCount.get());
177+
assertSame(notifyItem, alarmContext.get().getNotifyItem());
178+
assertEquals(NotifyItemEnum.QUEUE_TIMEOUT, alarmContext.get().getNotifyItemEnum());
179+
}
180+
181+
@Test
182+
void testDoTryAlarmSkipsWhenNotifyItemMissing() {
183+
NotifyItem notifyItem = notifyItem(NotifyItemEnum.RUN_TIMEOUT, 100);
184+
ExecutorWrapper wrapper = executorWrapper("missing-notify-item", notifyItem, mockExecutor(1, 10, 0, 0, 10));
185+
186+
AlarmManager.doTryAlarm(wrapper, NotifyItemEnum.REJECT);
187+
188+
assertNoAlarm();
189+
}
190+
191+
private void assertNoAlarm() {
192+
assertEquals(0, alarmCount.get());
193+
assertNull(alarmContext.get());
194+
}
195+
196+
private NotifyItem notifyItem(NotifyItemEnum notifyItemEnum, int threshold) {
197+
NotifyItem notifyItem = new NotifyItem();
198+
notifyItem.setType(notifyItemEnum.getValue());
199+
notifyItem.setThreshold(threshold);
200+
notifyItem.setPlatformIds(Collections.singletonList("platform"));
201+
return notifyItem;
202+
}
203+
204+
private ExecutorWrapper executorWrapper(String poolName, NotifyItem notifyItem, ExecutorAdapter<?> executor) {
205+
ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);
206+
wrapper.setNotifyItems(Collections.singletonList(notifyItem));
207+
wrapper.setNotifyEnabled(true);
208+
return wrapper;
209+
}
210+
211+
private ExecutorAdapter<?> mockExecutor(int corePoolSize,
212+
int maximumPoolSize,
213+
int activeCount,
214+
int queueSize,
215+
int queueCapacity) {
216+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueCapacity);
217+
for (int i = 0; i < queueSize; i++) {
218+
assertTrue(queue.offer(() -> {
219+
// mock queued task
220+
}));
221+
}
222+
return new TestExecutorAdapter(corePoolSize, maximumPoolSize, activeCount, queue);
223+
}
224+
225+
private InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() throws Exception {
226+
Field field = AlarmManager.class.getDeclaredField("ALARM_INVOKER_CHAIN");
227+
field.setAccessible(true);
228+
return (InvokerChain<BaseNotifyCtx>) field.get(null);
229+
}
230+
231+
@Configuration
232+
static class ContextConfig {
233+
234+
@Bean
235+
org.dromara.dynamictp.spring.holder.SpringContextHolder springContextHolder() {
236+
return new org.dromara.dynamictp.spring.holder.SpringContextHolder();
237+
}
238+
}
239+
240+
private static class TestExecutorAdapter implements ExecutorAdapter<Executor> {
241+
242+
private final int corePoolSize;
243+
244+
private final int maximumPoolSize;
245+
246+
private final int activeCount;
247+
248+
private final LinkedBlockingQueue<Runnable> queue;
249+
250+
TestExecutorAdapter(int corePoolSize,
251+
int maximumPoolSize,
252+
int activeCount,
253+
LinkedBlockingQueue<Runnable> queue) {
254+
this.corePoolSize = corePoolSize;
255+
this.maximumPoolSize = maximumPoolSize;
256+
this.activeCount = activeCount;
257+
this.queue = queue;
258+
}
259+
260+
@Override
261+
public Executor getOriginal() {
262+
return Runnable::run;
263+
}
264+
265+
@Override
266+
public int getCorePoolSize() {
267+
return corePoolSize;
268+
}
269+
270+
@Override
271+
public void setCorePoolSize(int corePoolSize) {
272+
throw new UnsupportedOperationException();
273+
}
274+
275+
@Override
276+
public int getMaximumPoolSize() {
277+
return maximumPoolSize;
278+
}
279+
280+
@Override
281+
public void setMaximumPoolSize(int maximumPoolSize) {
282+
throw new UnsupportedOperationException();
283+
}
284+
285+
@Override
286+
public int getPoolSize() {
287+
return activeCount;
288+
}
289+
290+
@Override
291+
public int getActiveCount() {
292+
return activeCount;
293+
}
294+
295+
@Override
296+
public int getLargestPoolSize() {
297+
return maximumPoolSize;
298+
}
299+
300+
@Override
301+
public long getTaskCount() {
302+
return activeCount + queue.size();
303+
}
304+
305+
@Override
306+
public long getCompletedTaskCount() {
307+
return 0L;
308+
}
309+
310+
@Override
311+
public LinkedBlockingQueue<Runnable> getQueue() {
312+
return queue;
313+
}
314+
315+
@Override
316+
public String getQueueType() {
317+
return queue.getClass().getSimpleName();
318+
}
319+
320+
@Override
321+
public int getQueueSize() {
322+
return queue.size();
323+
}
324+
325+
@Override
326+
public int getQueueRemainingCapacity() {
327+
return queue.remainingCapacity();
328+
}
329+
330+
@Override
331+
public int getQueueCapacity() {
332+
return queue.size() + queue.remainingCapacity();
333+
}
334+
335+
@Override
336+
public String getRejectHandlerType() {
337+
return "AbortPolicy";
338+
}
339+
340+
@Override
341+
public boolean allowsCoreThreadTimeOut() {
342+
return false;
343+
}
344+
345+
@Override
346+
public long getKeepAliveTime(TimeUnit unit) {
347+
return unit.convert(60, TimeUnit.SECONDS);
348+
}
349+
}
350+
}

0 commit comments

Comments
 (0)