6
6
import org .springframework .beans .factory .annotation .Autowired ;
7
7
import org .springframework .kafka .core .KafkaTemplate ;
8
8
import org .springframework .kafka .support .SendResult ;
9
- import org .springframework .util .concurrent .ListenableFuture ;
10
- import org .springframework .util .concurrent .ListenableFutureCallback ;
11
9
import org .springframework .web .bind .annotation .PostMapping ;
12
10
import org .springframework .web .bind .annotation .GetMapping ;
13
11
import org .springframework .web .bind .annotation .RequestBody ;
14
12
import org .springframework .web .bind .annotation .RestController ;
15
13
16
- import javax .servlet .http .HttpServletResponse ;
17
14
import java .util .LinkedList ;
18
15
import java .util .List ;
19
16
import java .util .Map ;
@@ -34,29 +31,21 @@ Map<String, String> healthz() {
34
31
}
35
32
36
33
@ PostMapping (path = "/rent" , consumes = "application/json" , produces = "application/json" )
37
- List <String > rent (@ RequestBody Rent rentInput , HttpServletResponse response ) {
34
+ List <String > rent (@ RequestBody Rent rentInput ) {
38
35
String catalogID = rentInput .getMovieID ();
39
36
String price = rentInput .getPrice ();
40
37
41
38
logger .info ("Rent [{},{}] received" , catalogID , price );
42
39
43
- ListenableFuture <SendResult <String , String >> future = kafkaTemplate .send (KAFKA_TOPIC , catalogID , price );
44
-
45
- future .addCallback (new ListenableFutureCallback <SendResult <String , String >>() {
46
- @ Override
47
- public void onSuccess (SendResult <String , String > result ) {
48
- logger .info ("Message [{}] delivered with offset {}" ,
49
- catalogID ,
50
- result .getRecordMetadata ().offset ());
51
- }
52
-
53
- @ Override
54
- public void onFailure (Throwable ex ) {
55
- logger .warn ("Unable to deliver message [{}]. {}" ,
40
+ kafkaTemplate .send (KAFKA_TOPIC , catalogID , price )
41
+ .thenAccept (result -> logger .info ("Message [{}] delivered with offset {}" ,
56
42
catalogID ,
57
- ex .getMessage ());
58
- }
43
+ result .getRecordMetadata ().offset ()))
44
+ .exceptionally (ex -> {
45
+ logger .warn ("Unable to deliver message [{}]. {}" , catalogID , ex .getMessage ());
46
+ return null ;
59
47
});
48
+
60
49
61
50
return new LinkedList <>();
62
51
}
0 commit comments