Skip to content

Commit ff06833

Browse files
committed
add return endpoint
1 parent 7d4a5a8 commit ff06833

File tree

3 files changed

+81
-6
lines changed

3 files changed

+81
-6
lines changed

frontend/src/App.jsx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ const Item = ({ item, onRent, backdrop }) => {
242242
<div className="Item__button Item__button--rented button">
243243
Watch Now
244244
</div>
245+
<div className="Item__button button" onClick={() => handleReturn(item)}>
246+
Return
247+
</div>
245248
</>
246249
}
247250
</div>
@@ -250,6 +253,32 @@ const Item = ({ item, onRent, backdrop }) => {
250253
);
251254
}
252255

256+
function handleReturn(item) {
257+
fetch('/rent/return', {
258+
method: 'POST',
259+
headers: {
260+
'Content-Type': 'application/json'
261+
},
262+
body: JSON.stringify({
263+
catalog_id: item.id
264+
})
265+
})
266+
.then(response => {
267+
if (!response.ok) {
268+
throw new Error('Network response was not ok');
269+
}
270+
return response.json();
271+
})
272+
.then(data => {
273+
console.log(`Returned movie with ID: ${item.id}`);
274+
// Refresh data to update the UI
275+
this.refreshData();
276+
})
277+
.catch(error => {
278+
console.error('There was a problem with the return request:', error);
279+
});
280+
}
281+
253282
const CartIcon = () => {
254283
return (
255284
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 21 22" width="16">

rent/src/main/java/com/okteto/rent/controller/RentController.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
@RestController
2020
public class RentController {
21-
private static final String KAFKA_TOPIC = "rentals";
21+
private static final String KAFKA_TOPIC_RENTALS = "rentals";
22+
private static final String KAFKA_TOPIC_RETURNS = "returns";
2223

2324
private final Logger logger = LoggerFactory.getLogger(RentController.class);
2425

@@ -37,7 +38,7 @@ List<String> rent(@RequestBody Rent rentInput) {
3738

3839
logger.info("Rent [{},{}] received", catalogID, price);
3940

40-
kafkaTemplate.send(KAFKA_TOPIC, catalogID, price)
41+
kafkaTemplate.send(KAFKA_TOPIC_RENTALS, catalogID, price)
4142
.thenAccept(result -> logger.info("Message [{}] delivered with offset {}",
4243
catalogID,
4344
result.getRecordMetadata().offset()))
@@ -50,6 +51,24 @@ List<String> rent(@RequestBody Rent rentInput) {
5051
return new LinkedList<>();
5152
}
5253

54+
@PostMapping(path= "/rent/return", consumes = "application/json", produces = "application/json")
55+
public Map<String, String> returnMovie(@RequestBody ReturnRequest returnRequest) {
56+
String catalogID = returnRequest.getMovieID();
57+
58+
logger.info("Return [{}] received", catalogID);
59+
60+
kafkaTemplate.send(KAFKA_TOPIC_RETURNS, catalogID, catalogID)
61+
.thenAccept(result -> logger.info("Return message [{}] delivered with offset {}",
62+
catalogID,
63+
result.getRecordMetadata().offset()))
64+
.exceptionally(ex -> {
65+
logger.warn("Unable to deliver return message [{}]. {}", catalogID, ex.getMessage());
66+
return null;
67+
});
68+
69+
return Collections.singletonMap("status", "return processed");
70+
}
71+
5372
public static class Rent {
5473
@JsonProperty("catalog_id")
5574
private String movieID;
@@ -72,4 +91,17 @@ public String getPrice() {
7291
return price;
7392
}
7493
}
94+
95+
public static class ReturnRequest {
96+
@JsonProperty("catalog_id")
97+
private String movieID;
98+
99+
public void setMovieID(String movieID) {
100+
this.movieID = movieID;
101+
}
102+
103+
public String getMovieID() {
104+
return movieID;
105+
}
106+
}
75107
}

worker/cmd/worker/main.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,42 @@ func main() {
4141
master := kafka.GetMaster()
4242
defer master.Close()
4343

44-
consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
44+
// Consumer for "rentals" topic
45+
consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest)
46+
if err != nil {
47+
log.Panic(err)
48+
}
49+
50+
// Consumer for "returns" topic
51+
consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest)
4552
if err != nil {
4653
log.Panic(err)
4754
}
4855

4956
signals := make(chan os.Signal, 1)
5057
signal.Notify(signals, os.Interrupt)
5158
doneCh := make(chan struct{})
59+
5260
go func() {
5361
for {
5462
select {
55-
case err := <-consumer.Errors():
63+
case err := <-consumerRentals.Errors():
5664
fmt.Println(err)
57-
case msg := <-consumer.Messages():
65+
case msg := <-consumerRentals.Messages():
5866
*messageCountStart++
5967
fmt.Printf("Received message: movies %s price %s\n", string(msg.Key), string(msg.Value))
6068
price, _ := strconv.ParseFloat(string(msg.Value), 64)
61-
// price *= 0.5
6269
insertDynStmt := `insert into "rentals"("id", "price") values($1, $2) on conflict(id) do update set price = $2`
6370
if _, err := db.Exec(insertDynStmt, string(msg.Key), fmt.Sprintf("%f", price)); err != nil {
6471
log.Panic(err)
6572
}
73+
case msg := <-consumerReturns.Messages():
74+
catalogID := string(msg.Value)
75+
fmt.Printf("Received return message: catalogID %s\n", catalogID)
76+
deleteStmt := `DELETE FROM rentals WHERE id = $1`
77+
if _, err := db.Exec(deleteStmt, catalogID); err != nil {
78+
log.Panic(err)
79+
}
6680
case <-signals:
6781
fmt.Println("Interrupt is detected")
6882
doneCh <- struct{}{}

0 commit comments

Comments
 (0)