diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 69f0e3e9..946c6318 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -58,6 +58,21 @@ class App extends Component { this.refreshData(); } + handleReturn = async (item) =>{ + await fetch('/rent/return', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + catalog_id: item.id + }) + }); + this.refreshData(); + + } + + refreshData = async () => { const catalogPromise = fetch('/catalog') .then(res => res.json()) @@ -132,12 +147,14 @@ class App extends Component { cost={cost} titles={rental.data} loaded={rental.loaded} + onReturn={this.handleReturn} /> @@ -173,7 +190,7 @@ const DevToast = () => { class TitleList extends Component { renderList() { - const { titles = [], loaded, onRent } = this.props; + const { titles = [], loaded, onRent, onReturn } = this.props; const movies = titles.filter(item => !item?.rented); if (loaded) { @@ -193,6 +210,7 @@ class TitleList extends Component { item={item} backdrop={backDrop} onRent={onRent} + onReturn={onReturn} /> ); }); @@ -220,7 +238,7 @@ class TitleList extends Component { } } -const Item = ({ item, onRent, backdrop }) => { +const Item = ({ item, onRent, onReturn, backdrop }) => { return (
@@ -242,6 +260,9 @@ const Item = ({ item, onRent, backdrop }) => {
Watch Now
+
onReturn(item)}> + Return +
}
diff --git a/rent/src/main/java/com/okteto/rent/controller/RentController.java b/rent/src/main/java/com/okteto/rent/controller/RentController.java index fcbe8a9e..5f99f419 100644 --- a/rent/src/main/java/com/okteto/rent/controller/RentController.java +++ b/rent/src/main/java/com/okteto/rent/controller/RentController.java @@ -18,7 +18,8 @@ @RestController public class RentController { - private static final String KAFKA_TOPIC = "rentals"; + private static final String KAFKA_TOPIC_RENTALS = "rentals"; + private static final String KAFKA_TOPIC_RETURNS = "returns"; private final Logger logger = LoggerFactory.getLogger(RentController.class); @@ -37,7 +38,7 @@ List rent(@RequestBody Rent rentInput) { logger.info("Rent [{},{}] received", catalogID, price); - kafkaTemplate.send(KAFKA_TOPIC, catalogID, price) + kafkaTemplate.send(KAFKA_TOPIC_RENTALS, catalogID, price) .thenAccept(result -> logger.info("Message [{}] delivered with offset {}", catalogID, result.getRecordMetadata().offset())) @@ -50,6 +51,24 @@ List rent(@RequestBody Rent rentInput) { return new LinkedList<>(); } + @PostMapping(path= "/rent/return", consumes = "application/json", produces = "application/json") + public Map returnMovie(@RequestBody ReturnRequest returnRequest) { + String catalogID = returnRequest.getMovieID(); + + logger.info("Return [{}] received", catalogID); + + kafkaTemplate.send(KAFKA_TOPIC_RETURNS, catalogID, catalogID) + .thenAccept(result -> logger.info("Return message [{}] delivered with offset {}", + catalogID, + result.getRecordMetadata().offset())) + .exceptionally(ex -> { + logger.warn("Unable to deliver return message [{}]. {}", catalogID, ex.getMessage()); + return null; + }); + + return Collections.singletonMap("status", "return processed"); + } + public static class Rent { @JsonProperty("catalog_id") private String movieID; @@ -72,4 +91,17 @@ public String getPrice() { return price; } } + + public static class ReturnRequest { + @JsonProperty("catalog_id") + private String movieID; + + public void setMovieID(String movieID) { + this.movieID = movieID; + } + + public String getMovieID() { + return movieID; + } + } } diff --git a/worker/cmd/worker/main.go b/worker/cmd/worker/main.go index 22de285c..7ca1cc67 100644 --- a/worker/cmd/worker/main.go +++ b/worker/cmd/worker/main.go @@ -41,7 +41,14 @@ func main() { master := kafka.GetMaster() defer master.Close() - consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest) + // Consumer for "rentals" topic + consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest) + if err != nil { + log.Panic(err) + } + + // Consumer for "returns" topic + consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest) if err != nil { log.Panic(err) } @@ -49,20 +56,27 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) doneCh := make(chan struct{}) + go func() { for { select { - case err := <-consumer.Errors(): + case err := <-consumerRentals.Errors(): fmt.Println(err) - case msg := <-consumer.Messages(): + case msg := <-consumerRentals.Messages(): *messageCountStart++ fmt.Printf("Received message: movies %s price %s\n", string(msg.Key), string(msg.Value)) price, _ := strconv.ParseFloat(string(msg.Value), 64) - // price *= 0.5 insertDynStmt := `insert into "rentals"("id", "price") values($1, $2) on conflict(id) do update set price = $2` if _, err := db.Exec(insertDynStmt, string(msg.Key), fmt.Sprintf("%f", price)); err != nil { log.Panic(err) } + case msg := <-consumerReturns.Messages(): + catalogID := string(msg.Value) + fmt.Printf("Received return message: catalogID %s\n", catalogID) + deleteStmt := `DELETE FROM rentals WHERE id = $1` + if _, err := db.Exec(deleteStmt, catalogID); err != nil { + log.Panic(err) + } case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{}