Skip to content

Commit a4d844d

Browse files
authored
Support of @ExecuteOn(Scheduler) (#865)
1 parent 07c9b05 commit a4d844d

File tree

10 files changed

+450
-49
lines changed

10 files changed

+450
-49
lines changed

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static io.scalecube.services.CommunicationMode.REQUEST_RESPONSE;
66
import static io.scalecube.services.CommunicationMode.REQUEST_STREAM;
77

8+
import io.scalecube.services.annotations.ExecuteOn;
89
import io.scalecube.services.annotations.RequestType;
910
import io.scalecube.services.annotations.ResponseType;
1011
import io.scalecube.services.annotations.Service;
@@ -28,6 +29,8 @@
2829
import org.reactivestreams.Publisher;
2930
import reactor.core.publisher.Flux;
3031
import reactor.core.publisher.Mono;
32+
import reactor.core.scheduler.Scheduler;
33+
import reactor.core.scheduler.Schedulers;
3134

3235
public class Reflect {
3336

@@ -172,7 +175,8 @@ public static Map<Method, MethodInfo> methodsInfo(Class<?> serviceInterface) {
172175
method.getParameterCount(),
173176
requestType(method),
174177
isRequestTypeServiceMessage(method),
175-
isSecured(method)))));
178+
isSecured(method),
179+
null))));
176180
}
177181

178182
/**
@@ -379,4 +383,54 @@ public static boolean isSecured(Method method) {
379383
return method.isAnnotationPresent(Secured.class)
380384
|| method.getDeclaringClass().isAnnotationPresent(Secured.class);
381385
}
386+
387+
public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler> schedulers) {
388+
final Class<?> declaringClass = method.getDeclaringClass();
389+
390+
if (method.isAnnotationPresent(ExecuteOn.class)) {
391+
final var executeOn = method.getAnnotation(ExecuteOn.class);
392+
final var name = executeOn.value();
393+
final var scheduler = schedulers.get(name);
394+
if (scheduler == null) {
395+
throw new IllegalArgumentException(
396+
"Wrong @ExecuteOn definition on "
397+
+ declaringClass.getName()
398+
+ "."
399+
+ method.getName()
400+
+ ": scheduler (name="
401+
+ name
402+
+ ") cannot be found");
403+
}
404+
return scheduler;
405+
}
406+
407+
// If @ExecuteOn annotation is not present on service method, then find it on service class
408+
409+
ExecuteOn executeOn = null;
410+
for (var clazz = declaringClass; clazz != null; clazz = clazz.getSuperclass()) {
411+
executeOn = clazz.getAnnotation(ExecuteOn.class);
412+
if (executeOn != null) {
413+
break;
414+
}
415+
}
416+
417+
if (executeOn == null) {
418+
return Schedulers.immediate();
419+
}
420+
421+
final var name = executeOn.value();
422+
final var scheduler = schedulers.get(name);
423+
if (scheduler == null) {
424+
throw new IllegalArgumentException(
425+
"Wrong @ExecuteOn definition on "
426+
+ declaringClass.getName()
427+
+ "."
428+
+ method.getName()
429+
+ ": scheduler (name="
430+
+ name
431+
+ ") cannot be found");
432+
}
433+
434+
return scheduler;
435+
}
382436
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.scalecube.services.annotations;
2+
3+
import static java.lang.annotation.ElementType.METHOD;
4+
import static java.lang.annotation.ElementType.TYPE;
5+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
6+
7+
import java.lang.annotation.Documented;
8+
import java.lang.annotation.Retention;
9+
import java.lang.annotation.Target;
10+
11+
/**
12+
* This annotation is used to mark that particular service method or all service methods will be
13+
* executed in the specified scheduler.
14+
*/
15+
@Documented
16+
@Target({METHOD, TYPE})
17+
@Retention(RUNTIME)
18+
public @interface ExecuteOn {
19+
20+
/**
21+
* Returns scheduler name.
22+
*
23+
* @return scheduler name
24+
*/
25+
String value();
26+
}

services-api/src/main/java/io/scalecube/services/annotations/Service.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.lang.annotation.RetentionPolicy;
66
import java.lang.annotation.Target;
77

8-
/** Indicates that an annotated class is an Service Fabric service object. */
8+
/** Indicates that annotated class is a ScaleCube service object. */
99
@Retention(RetentionPolicy.RUNTIME)
1010
@Target(ElementType.TYPE)
1111
public @interface Service {

services-api/src/main/java/io/scalecube/services/annotations/ServiceMethod.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.lang.annotation.Target;
77

88
/**
9-
* Indicates that an annotated method is a service method available via Service Fabric framework.
9+
* Indicates that annotated method is a ScaleCube service method.
1010
*/
1111
@Retention(RetentionPolicy.RUNTIME)
1212
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})

services-api/src/main/java/io/scalecube/services/methods/MethodInfo.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.scalecube.services.api.Qualifier;
55
import java.lang.reflect.Type;
66
import java.util.StringJoiner;
7+
import reactor.core.scheduler.Scheduler;
78

89
public final class MethodInfo {
910

@@ -17,6 +18,7 @@ public final class MethodInfo {
1718
private final Class<?> requestType;
1819
private final boolean isRequestTypeServiceMessage;
1920
private final boolean isSecured;
21+
private final Scheduler scheduler;
2022

2123
/**
2224
* Create a new service info.
@@ -30,6 +32,7 @@ public final class MethodInfo {
3032
* @param requestType the type of the request
3133
* @param isRequestTypeServiceMessage is request service message
3234
* @param isSecured is method protected by authentication
35+
* @param scheduler scheduler
3336
*/
3437
public MethodInfo(
3538
String serviceName,
@@ -40,7 +43,8 @@ public MethodInfo(
4043
int parameterCount,
4144
Class<?> requestType,
4245
boolean isRequestTypeServiceMessage,
43-
boolean isSecured) {
46+
boolean isSecured,
47+
Scheduler scheduler) {
4448
this.parameterizedReturnType = parameterizedReturnType;
4549
this.isReturnTypeServiceMessage = isReturnTypeServiceMessage;
4650
this.communicationMode = communicationMode;
@@ -51,6 +55,7 @@ public MethodInfo(
5155
this.requestType = requestType;
5256
this.isRequestTypeServiceMessage = isRequestTypeServiceMessage;
5357
this.isSecured = isSecured;
58+
this.scheduler = scheduler;
5459
}
5560

5661
public String serviceName() {
@@ -101,19 +106,24 @@ public boolean isSecured() {
101106
return isSecured;
102107
}
103108

109+
public Scheduler scheduler() {
110+
return scheduler;
111+
}
112+
104113
@Override
105114
public String toString() {
106115
return new StringJoiner(", ", MethodInfo.class.getSimpleName() + "[", "]")
107-
.add("serviceName=" + serviceName)
108-
.add("methodName=" + methodName)
109-
.add("qualifier=" + qualifier)
116+
.add("serviceName='" + serviceName + "'")
117+
.add("methodName='" + methodName + "'")
118+
.add("qualifier='" + qualifier + "'")
110119
.add("parameterizedReturnType=" + parameterizedReturnType)
111120
.add("isReturnTypeServiceMessage=" + isReturnTypeServiceMessage)
112121
.add("communicationMode=" + communicationMode)
113122
.add("parameterCount=" + parameterCount)
114123
.add("requestType=" + requestType)
115124
.add("isRequestTypeServiceMessage=" + isRequestTypeServiceMessage)
116125
.add("isSecured=" + isSecured)
126+
.add("scheduler=" + scheduler)
117127
.toString();
118128
}
119129
}

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
7070
.flatMap(authData -> deferWithContextOne(message, authData))
7171
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
7272
.onErrorResume(
73-
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)));
73+
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)))
74+
.subscribeOn(methodInfo.scheduler());
7475
}
7576

7677
/**
@@ -84,7 +85,8 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
8485
.flatMapMany(authData -> deferWithContextMany(message, authData))
8586
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
8687
.onErrorResume(
87-
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)));
88+
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)))
89+
.subscribeOn(methodInfo.scheduler());
8890
}
8991

9092
/**
@@ -104,7 +106,8 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
104106
toResponse(response, first.get().qualifier(), first.get().dataFormat()))
105107
.onErrorResume(
106108
throwable ->
107-
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable))));
109+
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable)))
110+
.subscribeOn(methodInfo.scheduler()));
108111
}
109112

110113
private Mono<?> deferWithContextOne(ServiceMessage message, Object authData) {

services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.mockito.Mockito;
1919
import reactor.core.publisher.Flux;
2020
import reactor.core.publisher.Mono;
21+
import reactor.core.scheduler.Schedulers;
2122
import reactor.test.StepVerifier;
2223

2324
class ServiceMethodInvokerTest {
@@ -62,7 +63,8 @@ void testInvokeOneWhenReturnNull() throws Exception {
6263
method.getParameterCount(),
6364
Void.TYPE,
6465
IS_REQUEST_TYPE_SERVICE_MESSAGE,
65-
AUTH);
66+
AUTH,
67+
Schedulers.immediate());
6668

6769
serviceMethodInvoker =
6870
new ServiceMethodInvoker(
@@ -100,7 +102,8 @@ void testInvokeManyWhenReturnNull() throws Exception {
100102
method.getParameterCount(),
101103
Void.TYPE,
102104
IS_REQUEST_TYPE_SERVICE_MESSAGE,
103-
AUTH);
105+
AUTH,
106+
Schedulers.immediate());
104107

105108
serviceMethodInvoker =
106109
new ServiceMethodInvoker(
@@ -138,7 +141,8 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception {
138141
method.getParameterCount(),
139142
Void.TYPE,
140143
IS_REQUEST_TYPE_SERVICE_MESSAGE,
141-
AUTH);
144+
AUTH,
145+
Schedulers.immediate());
142146

143147
serviceMethodInvoker =
144148
new ServiceMethodInvoker(
@@ -177,7 +181,8 @@ void testInvokeOneWhenThrowException() throws Exception {
177181
method.getParameterCount(),
178182
Void.TYPE,
179183
IS_REQUEST_TYPE_SERVICE_MESSAGE,
180-
AUTH);
184+
AUTH,
185+
Schedulers.immediate());
181186

182187
serviceMethodInvoker =
183188
new ServiceMethodInvoker(
@@ -219,7 +224,8 @@ void testInvokeManyWhenThrowException() throws Exception {
219224
method.getParameterCount(),
220225
Void.TYPE,
221226
IS_REQUEST_TYPE_SERVICE_MESSAGE,
222-
AUTH);
227+
AUTH,
228+
Schedulers.immediate());
223229

224230
serviceMethodInvoker =
225231
new ServiceMethodInvoker(
@@ -260,7 +266,8 @@ void testInvokeBidirectionalWhenThrowException() throws Exception {
260266
method.getParameterCount(),
261267
Void.TYPE,
262268
IS_REQUEST_TYPE_SERVICE_MESSAGE,
263-
AUTH);
269+
AUTH,
270+
Schedulers.immediate());
264271

265272
serviceMethodInvoker =
266273
new ServiceMethodInvoker(
@@ -305,7 +312,8 @@ void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception {
305312
method.getParameterCount(),
306313
Void.TYPE,
307314
IS_REQUEST_TYPE_SERVICE_MESSAGE,
308-
AUTH);
315+
AUTH,
316+
Schedulers.immediate());
309317

310318
serviceMethodInvoker =
311319
new ServiceMethodInvoker(
@@ -347,7 +355,8 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception {
347355
method.getParameterCount(),
348356
Void.TYPE,
349357
IS_REQUEST_TYPE_SERVICE_MESSAGE,
350-
AUTH);
358+
AUTH,
359+
Schedulers.immediate());
351360

352361
serviceMethodInvoker =
353362
new ServiceMethodInvoker(
@@ -387,7 +396,8 @@ void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception {
387396
method.getParameterCount(),
388397
Void.TYPE,
389398
IS_REQUEST_TYPE_SERVICE_MESSAGE,
390-
AUTH);
399+
AUTH,
400+
Schedulers.immediate());
391401

392402
//noinspection unchecked,rawtypes
393403
Authenticator<Map> mockedAuthenticator = Mockito.mock(Authenticator.class);

0 commit comments

Comments
 (0)