12
12
import org .springframework .util .Assert ;
13
13
import reactor .core .publisher .Flux ;
14
14
import reactor .core .publisher .Mono ;
15
+ import reactor .core .scheduler .Schedulers ;
15
16
import run .ikaros .api .core .subject .Subject ;
16
17
import run .ikaros .api .core .subject .SubjectSynchronizer ;
17
18
import run .ikaros .api .exception .NoAvailableSubjectPlatformSynchronizerException ;
@@ -67,7 +68,8 @@ private Mono<Subject> syncBySubjectSynchronizer(@Nullable Long subjectId,
67
68
"No found available subject platform synchronizer for platform-id: "
68
69
+ platform .name () + "-" + platformId )))
69
70
.map (subjectSynchronizes -> subjectSynchronizes .get (0 ))
70
- .flatMap (subjectSynchronizer -> Mono .justOrEmpty (subjectSynchronizer .pull (platformId )))
71
+ .flatMap (subjectSynchronizer -> Mono .fromCallable (() ->
72
+ subjectSynchronizer .pull (platformId )))
71
73
.onErrorResume (Exception .class , e -> {
72
74
String msg =
73
75
"Operate not available, platform api domain can not access "
@@ -85,7 +87,8 @@ private Mono<Subject> syncBySubjectSynchronizer(@Nullable Long subjectId,
85
87
.then (Mono .defer (() -> subjectService .findById (subjectId ))))
86
88
.doOnSuccess (subject -> applicationContext .publishEvent (
87
89
new SubjectCoverImageDownloadAndUpdateEvent (this ,
88
- subject .getId (), subject .getCover ())));
90
+ subject .getId (), subject .getCover ())))
91
+ .subscribeOn (Schedulers .boundedElastic ());
89
92
}
90
93
91
94
0 commit comments