Skip to content

Commit 7415b82

Browse files
committed
异步任务扩展
1 parent deb27c1 commit 7415b82

File tree

8 files changed

+79
-66
lines changed

8 files changed

+79
-66
lines changed

spring-boot-data-aggregator-autoconfigure/src/main/java/io/github/lvyahui8/spring/autoconfigure/BeanAggregateAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public DataBeanAggregateQueryService dataBeanAggregateQueryService (
136136
service.setRuntimeSettings(runtimeSettings);
137137
service.setExecutorService(aggregateExecutorService());
138138
service.setInterceptorChain(aggregateQueryInterceptorChain());
139-
service.setTaskClazz(properties.getAsyncTaskClass());
139+
service.setTaskWrapperClazz(properties.getTaskWrapperClass());
140140
service.setApplicationContext(applicationContext);
141141
return service;
142142
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.lvyahui8.spring.autoconfigure;
22

3-
import io.github.lvyahui8.spring.aggregate.service.AbstractAsyncQueryTask;
4-
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskAdapter;
3+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapper;
4+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapperAdapter;
55
import lombok.Data;
66
import org.springframework.boot.context.properties.ConfigurationProperties;
77

@@ -15,33 +15,33 @@ public class BeanAggregateProperties {
1515
/**
1616
* Packages that need to scan for aggregated annotations
1717
*/
18-
private String[] basePackages;
18+
private String[] basePackages;
1919
/**
2020
* Thread name prefix for asynchronous threads
2121
*/
22-
private String threadPrefix = "aggregateTask-";
22+
private String threadPrefix = "aggregateTask-";
2323
/**
2424
* Thread size of the asynchronous thread pool
2525
*/
2626
private int threadNumber = Runtime.getRuntime().availableProcessors() * 3;
2727
/**
2828
* The size of the queue that holds the task to be executed
2929
*/
30-
private int queueSize = 1000;
30+
private int queueSize = 1000;
3131
/**
3232
* Set a default timeout for the method of providing data
3333
*/
34-
private Long defaultTimeout = 3000L;
34+
private Long defaultTimeout = 3000L;
3535
/**
3636
* Allow output log
3737
*/
38-
private Boolean enableLogging = true;
38+
private Boolean enableLogging = true;
3939
/**
4040
* Ignore exception thrown by asynchronous execution, method returns null value
4141
*/
42-
private boolean ignoreException = false;
42+
private boolean ignoreException = false;
4343
/**
4444
* Async task implement
4545
*/
46-
private Class<? extends AbstractAsyncQueryTask> asyncTaskClass = AsyncQueryTaskAdapter.class;
46+
private Class<? extends AsyncQueryTaskWrapper> taskWrapperClass = AsyncQueryTaskWrapperAdapter.class;
4747
}

spring-boot-data-aggregator-core/src/main/java/io/github/lvyahui8/spring/aggregate/service/AbstractAsyncQueryTask.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

spring-boot-data-aggregator-core/src/main/java/io/github/lvyahui8/spring/aggregate/service/AsyncQueryTask.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,35 @@
44

55
/**
66
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
7-
* @since 2019/12/24 23:07
7+
* @since 2019/12/25 22:40
88
*/
9-
public interface AsyncQueryTask<T> extends Callable<T> {
10-
/**
11-
* 任务提交之前执行. 此方法在提交任务的那个线程中执行
12-
*/
13-
void beforeSubmit();
9+
public abstract class AsyncQueryTask<T> implements Callable<T> {
10+
Thread taskFromThread;
11+
AsyncQueryTaskWrapper asyncQueryTaskWrapper;
1412

15-
/**
16-
* 任务开始执行前执行. 此方法在异步线程中执行
17-
* @param taskFrom 提交任务的那个线程
18-
*/
19-
void beforeExecute(Thread taskFrom);
13+
public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
14+
this.taskFromThread = taskFromThread;
15+
this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
16+
}
17+
18+
@Override
19+
public T call() throws Exception {
20+
try {
21+
if(asyncQueryTaskWrapper != null) {
22+
asyncQueryTaskWrapper.beforeExecute(taskFromThread);
23+
}
24+
return execute();
25+
} finally {
26+
if (asyncQueryTaskWrapper != null) {
27+
asyncQueryTaskWrapper.afterExecute(taskFromThread);
28+
}
29+
}
30+
}
2031

2132
/**
22-
* 任务执行结束后执行. 此方法在异步线程中执行
23-
* 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
24-
* @param taskFrom 提交任务的那个线程
33+
*
34+
* @return
35+
* @throws Exception
2536
*/
26-
void afterExecute(Thread taskFrom);
37+
public abstract T execute() throws Exception;
2738
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
/**
4+
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
5+
* @since 2019/12/24 23:07
6+
*/
7+
public interface AsyncQueryTaskWrapper {
8+
/**
9+
* 任务提交之前执行. 此方法在提交任务的那个线程中执行
10+
*/
11+
void beforeSubmit();
12+
13+
/**
14+
* 任务开始执行前执行. 此方法在异步线程中执行
15+
* @param taskFrom 提交任务的那个线程
16+
*/
17+
void beforeExecute(Thread taskFrom);
18+
19+
/**
20+
* 任务执行结束后执行. 此方法在异步线程中执行
21+
* 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
22+
* @param taskFrom 提交任务的那个线程
23+
*/
24+
void afterExecute(Thread taskFrom);
25+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
55
* @since 2019/12/25 22:57
66
*/
7-
public class AsyncQueryTaskAdapter<T> extends AbstractAsyncQueryTask<T> {
7+
public class AsyncQueryTaskWrapperAdapter implements AsyncQueryTaskWrapper{
88

99
@Override
1010
public void beforeSubmit() {

spring-boot-data-aggregator-core/src/main/java/io/github/lvyahui8/spring/aggregate/service/impl/DataBeanAggregateQueryServiceImpl.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import io.github.lvyahui8.spring.aggregate.interceptor.AggregateQueryInterceptorChain;
77
import io.github.lvyahui8.spring.aggregate.model.*;
88
import io.github.lvyahui8.spring.aggregate.repository.DataProviderRepository;
9-
import io.github.lvyahui8.spring.aggregate.service.AbstractAsyncQueryTask;
9+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTask;
10+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapper;
1011
import io.github.lvyahui8.spring.aggregate.service.DataBeanAggregateQueryService;
1112
import lombok.Setter;
1213
import lombok.extern.slf4j.Slf4j;
@@ -42,7 +43,7 @@ public class DataBeanAggregateQueryServiceImpl implements DataBeanAggregateQuery
4243
private AggregateQueryInterceptorChain interceptorChain;
4344

4445
@Setter
45-
private Class<? extends AbstractAsyncQueryTask> taskClazz ;
46+
private Class<? extends AsyncQueryTaskWrapper> taskWrapperClazz;
4647

4748
private AggregationContext initQueryContext(DataProvideDefinition rootProvider, Map<InvokeSignature,Object> queryCache) {
4849
AggregationContext aggregationContext = new AggregationContext();
@@ -143,22 +144,24 @@ private Map<String, Object> getDependObjectMap(Map<String, Object> invokeParams,
143144
Map<String,DataConsumeDefinition> consumeDefinitionMap = new HashMap<>(consumeDefinitions.size());
144145
for (DataConsumeDefinition depend : consumeDefinitions) {
145146
consumeDefinitionMap.put(depend.getId(),depend);
146-
AbstractAsyncQueryTask queryTask = null;
147+
AsyncQueryTaskWrapper taskWrapper = null;
147148
try {
148-
queryTask = taskClazz.newInstance();
149+
taskWrapper = taskWrapperClazz.newInstance();
149150
} catch (InstantiationException e) {
150-
throw new RuntimeException("task instance create failed.",e);
151+
throw new RuntimeException("task wrapper instance create failed.",e);
151152
}
152-
queryTask.setCallable(() -> {
153-
try {
154-
Object o = innerGet(repository.get(depend.getId()),invokeParams, depend.getClazz(),context,depend);
155-
return depend.getClazz().cast(o);
156-
} finally {
157-
stopDownLatch.countDown();
153+
taskWrapper.beforeSubmit();
154+
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
155+
@Override
156+
public Object execute() throws Exception {
157+
try {
158+
Object o = innerGet(repository.get(depend.getId()),invokeParams, depend.getClazz(),context,depend);
159+
return depend.getClazz().cast(o);
160+
} finally {
161+
stopDownLatch.countDown();
162+
}
158163
}
159164
});
160-
queryTask.setTaskFromThread(Thread.currentThread());
161-
Future<?> future = executorService.submit(queryTask);
162165
futureMap.put(depend.getId() + "_" + depend.getOriginalParameterName(),future);
163166
}
164167
stopDownLatch.await(timeout, TimeUnit.MILLISECONDS);
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
spring.main.banner-mode=off
22
io.github.lvyahui8.spring.base-packages=io.github.lvyahui8.spring.example
33
io.github.lvyahui8.spring.ignore-exception=false
4-
io.github.lvyahui8.spring.async-task-class=io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskAdapter
4+
io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapperAdapter
55
example.logging=true

0 commit comments

Comments
 (0)