Skip to content

Commit 5497115

Browse files
authored
add a button to return movies (#126)
* add a return movie functionality
1 parent 7d4a5a8 commit 5497115

File tree

3 files changed

+75
-8
lines changed

3 files changed

+75
-8
lines changed

frontend/src/App.jsx

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,21 @@ class App extends Component {
5858
this.refreshData();
5959
}
6060

61+
handleReturn = async (item) =>{
62+
await fetch('/rent/return', {
63+
method: 'POST',
64+
headers: {
65+
'Content-Type': 'application/json'
66+
},
67+
body: JSON.stringify({
68+
catalog_id: item.id
69+
})
70+
});
71+
this.refreshData();
72+
73+
}
74+
75+
6176
refreshData = async () => {
6277
const catalogPromise = fetch('/catalog')
6378
.then(res => res.json())
@@ -132,12 +147,14 @@ class App extends Component {
132147
cost={cost}
133148
titles={rental.data}
134149
loaded={rental.loaded}
150+
onReturn={this.handleReturn}
135151
/>
136152
<TitleList
137153
title="Store"
138154
titles={catalog.data}
139155
loaded={catalog.loaded}
140156
onRent={this.handleRent}
157+
141158
/>
142159
</div>
143160
</Route>
@@ -173,7 +190,7 @@ const DevToast = () => {
173190

174191
class TitleList extends Component {
175192
renderList() {
176-
const { titles = [], loaded, onRent } = this.props;
193+
const { titles = [], loaded, onRent, onReturn } = this.props;
177194
const movies = titles.filter(item => !item?.rented);
178195

179196
if (loaded) {
@@ -193,6 +210,7 @@ class TitleList extends Component {
193210
item={item}
194211
backdrop={backDrop}
195212
onRent={onRent}
213+
onReturn={onReturn}
196214
/>
197215
);
198216
});
@@ -220,7 +238,7 @@ class TitleList extends Component {
220238
}
221239
}
222240

223-
const Item = ({ item, onRent, backdrop }) => {
241+
const Item = ({ item, onRent, onReturn, backdrop }) => {
224242
return (
225243
<div className="Item">
226244
<div className="Item__container" style={{ backgroundImage: `url(./${backdrop})` }}>
@@ -242,6 +260,9 @@ const Item = ({ item, onRent, backdrop }) => {
242260
<div className="Item__button Item__button--rented button">
243261
Watch Now
244262
</div>
263+
<div className="Item__button button" onClick={() => onReturn(item)}>
264+
Return
265+
</div>
245266
</>
246267
}
247268
</div>

rent/src/main/java/com/okteto/rent/controller/RentController.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
@RestController
2020
public class RentController {
21-
private static final String KAFKA_TOPIC = "rentals";
21+
private static final String KAFKA_TOPIC_RENTALS = "rentals";
22+
private static final String KAFKA_TOPIC_RETURNS = "returns";
2223

2324
private final Logger logger = LoggerFactory.getLogger(RentController.class);
2425

@@ -37,7 +38,7 @@ List<String> rent(@RequestBody Rent rentInput) {
3738

3839
logger.info("Rent [{},{}] received", catalogID, price);
3940

40-
kafkaTemplate.send(KAFKA_TOPIC, catalogID, price)
41+
kafkaTemplate.send(KAFKA_TOPIC_RENTALS, catalogID, price)
4142
.thenAccept(result -> logger.info("Message [{}] delivered with offset {}",
4243
catalogID,
4344
result.getRecordMetadata().offset()))
@@ -50,6 +51,24 @@ List<String> rent(@RequestBody Rent rentInput) {
5051
return new LinkedList<>();
5152
}
5253

54+
@PostMapping(path= "/rent/return", consumes = "application/json", produces = "application/json")
55+
public Map<String, String> returnMovie(@RequestBody ReturnRequest returnRequest) {
56+
String catalogID = returnRequest.getMovieID();
57+
58+
logger.info("Return [{}] received", catalogID);
59+
60+
kafkaTemplate.send(KAFKA_TOPIC_RETURNS, catalogID, catalogID)
61+
.thenAccept(result -> logger.info("Return message [{}] delivered with offset {}",
62+
catalogID,
63+
result.getRecordMetadata().offset()))
64+
.exceptionally(ex -> {
65+
logger.warn("Unable to deliver return message [{}]. {}", catalogID, ex.getMessage());
66+
return null;
67+
});
68+
69+
return Collections.singletonMap("status", "return processed");
70+
}
71+
5372
public static class Rent {
5473
@JsonProperty("catalog_id")
5574
private String movieID;
@@ -72,4 +91,17 @@ public String getPrice() {
7291
return price;
7392
}
7493
}
94+
95+
public static class ReturnRequest {
96+
@JsonProperty("catalog_id")
97+
private String movieID;
98+
99+
public void setMovieID(String movieID) {
100+
this.movieID = movieID;
101+
}
102+
103+
public String getMovieID() {
104+
return movieID;
105+
}
106+
}
75107
}

worker/cmd/worker/main.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,42 @@ func main() {
4141
master := kafka.GetMaster()
4242
defer master.Close()
4343

44-
consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
44+
// Consumer for "rentals" topic
45+
consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest)
46+
if err != nil {
47+
log.Panic(err)
48+
}
49+
50+
// Consumer for "returns" topic
51+
consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest)
4552
if err != nil {
4653
log.Panic(err)
4754
}
4855

4956
signals := make(chan os.Signal, 1)
5057
signal.Notify(signals, os.Interrupt)
5158
doneCh := make(chan struct{})
59+
5260
go func() {
5361
for {
5462
select {
55-
case err := <-consumer.Errors():
63+
case err := <-consumerRentals.Errors():
5664
fmt.Println(err)
57-
case msg := <-consumer.Messages():
65+
case msg := <-consumerRentals.Messages():
5866
*messageCountStart++
5967
fmt.Printf("Received message: movies %s price %s\n", string(msg.Key), string(msg.Value))
6068
price, _ := strconv.ParseFloat(string(msg.Value), 64)
61-
// price *= 0.5
6269
insertDynStmt := `insert into "rentals"("id", "price") values($1, $2) on conflict(id) do update set price = $2`
6370
if _, err := db.Exec(insertDynStmt, string(msg.Key), fmt.Sprintf("%f", price)); err != nil {
6471
log.Panic(err)
6572
}
73+
case msg := <-consumerReturns.Messages():
74+
catalogID := string(msg.Value)
75+
fmt.Printf("Received return message: catalogID %s\n", catalogID)
76+
deleteStmt := `DELETE FROM rentals WHERE id = $1`
77+
if _, err := db.Exec(deleteStmt, catalogID); err != nil {
78+
log.Panic(err)
79+
}
6680
case <-signals:
6781
fmt.Println("Interrupt is detected")
6882
doneCh <- struct{}{}

0 commit comments

Comments
 (0)