diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx
index 69f0e3e9..946c6318 100644
--- a/frontend/src/App.jsx
+++ b/frontend/src/App.jsx
@@ -58,6 +58,21 @@ class App extends Component {
this.refreshData();
}
+ handleReturn = async (item) =>{
+ await fetch('/rent/return', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json'
+ },
+ body: JSON.stringify({
+ catalog_id: item.id
+ })
+ });
+ this.refreshData();
+
+ }
+
+
refreshData = async () => {
const catalogPromise = fetch('/catalog')
.then(res => res.json())
@@ -132,12 +147,14 @@ class App extends Component {
cost={cost}
titles={rental.data}
loaded={rental.loaded}
+ onReturn={this.handleReturn}
/>
@@ -173,7 +190,7 @@ const DevToast = () => {
class TitleList extends Component {
renderList() {
- const { titles = [], loaded, onRent } = this.props;
+ const { titles = [], loaded, onRent, onReturn } = this.props;
const movies = titles.filter(item => !item?.rented);
if (loaded) {
@@ -193,6 +210,7 @@ class TitleList extends Component {
item={item}
backdrop={backDrop}
onRent={onRent}
+ onReturn={onReturn}
/>
);
});
@@ -220,7 +238,7 @@ class TitleList extends Component {
}
}
-const Item = ({ item, onRent, backdrop }) => {
+const Item = ({ item, onRent, onReturn, backdrop }) => {
return (
@@ -242,6 +260,9 @@ const Item = ({ item, onRent, backdrop }) => {
Watch Now
+
onReturn(item)}>
+ Return
+
>
}
diff --git a/rent/src/main/java/com/okteto/rent/controller/RentController.java b/rent/src/main/java/com/okteto/rent/controller/RentController.java
index fcbe8a9e..5f99f419 100644
--- a/rent/src/main/java/com/okteto/rent/controller/RentController.java
+++ b/rent/src/main/java/com/okteto/rent/controller/RentController.java
@@ -18,7 +18,8 @@
@RestController
public class RentController {
- private static final String KAFKA_TOPIC = "rentals";
+ private static final String KAFKA_TOPIC_RENTALS = "rentals";
+ private static final String KAFKA_TOPIC_RETURNS = "returns";
private final Logger logger = LoggerFactory.getLogger(RentController.class);
@@ -37,7 +38,7 @@ List
rent(@RequestBody Rent rentInput) {
logger.info("Rent [{},{}] received", catalogID, price);
- kafkaTemplate.send(KAFKA_TOPIC, catalogID, price)
+ kafkaTemplate.send(KAFKA_TOPIC_RENTALS, catalogID, price)
.thenAccept(result -> logger.info("Message [{}] delivered with offset {}",
catalogID,
result.getRecordMetadata().offset()))
@@ -50,6 +51,24 @@ List rent(@RequestBody Rent rentInput) {
return new LinkedList<>();
}
+ @PostMapping(path= "/rent/return", consumes = "application/json", produces = "application/json")
+ public Map returnMovie(@RequestBody ReturnRequest returnRequest) {
+ String catalogID = returnRequest.getMovieID();
+
+ logger.info("Return [{}] received", catalogID);
+
+ kafkaTemplate.send(KAFKA_TOPIC_RETURNS, catalogID, catalogID)
+ .thenAccept(result -> logger.info("Return message [{}] delivered with offset {}",
+ catalogID,
+ result.getRecordMetadata().offset()))
+ .exceptionally(ex -> {
+ logger.warn("Unable to deliver return message [{}]. {}", catalogID, ex.getMessage());
+ return null;
+ });
+
+ return Collections.singletonMap("status", "return processed");
+ }
+
public static class Rent {
@JsonProperty("catalog_id")
private String movieID;
@@ -72,4 +91,17 @@ public String getPrice() {
return price;
}
}
+
+ public static class ReturnRequest {
+ @JsonProperty("catalog_id")
+ private String movieID;
+
+ public void setMovieID(String movieID) {
+ this.movieID = movieID;
+ }
+
+ public String getMovieID() {
+ return movieID;
+ }
+ }
}
diff --git a/worker/cmd/worker/main.go b/worker/cmd/worker/main.go
index 22de285c..7ca1cc67 100644
--- a/worker/cmd/worker/main.go
+++ b/worker/cmd/worker/main.go
@@ -41,7 +41,14 @@ func main() {
master := kafka.GetMaster()
defer master.Close()
- consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+ // Consumer for "rentals" topic
+ consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest)
+ if err != nil {
+ log.Panic(err)
+ }
+
+ // Consumer for "returns" topic
+ consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest)
if err != nil {
log.Panic(err)
}
@@ -49,20 +56,27 @@ func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
doneCh := make(chan struct{})
+
go func() {
for {
select {
- case err := <-consumer.Errors():
+ case err := <-consumerRentals.Errors():
fmt.Println(err)
- case msg := <-consumer.Messages():
+ case msg := <-consumerRentals.Messages():
*messageCountStart++
fmt.Printf("Received message: movies %s price %s\n", string(msg.Key), string(msg.Value))
price, _ := strconv.ParseFloat(string(msg.Value), 64)
- // price *= 0.5
insertDynStmt := `insert into "rentals"("id", "price") values($1, $2) on conflict(id) do update set price = $2`
if _, err := db.Exec(insertDynStmt, string(msg.Key), fmt.Sprintf("%f", price)); err != nil {
log.Panic(err)
}
+ case msg := <-consumerReturns.Messages():
+ catalogID := string(msg.Value)
+ fmt.Printf("Received return message: catalogID %s\n", catalogID)
+ deleteStmt := `DELETE FROM rentals WHERE id = $1`
+ if _, err := db.Exec(deleteStmt, catalogID); err != nil {
+ log.Panic(err)
+ }
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}