Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions frontend/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ class App extends Component {
this.refreshData();
}

handleReturn = async (item) =>{
await fetch('/rent/return', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
catalog_id: item.id
})
});
this.refreshData();

}


refreshData = async () => {
const catalogPromise = fetch('/catalog')
.then(res => res.json())
Expand Down Expand Up @@ -132,12 +147,14 @@ class App extends Component {
cost={cost}
titles={rental.data}
loaded={rental.loaded}
onReturn={this.handleReturn}
/>
<TitleList
title="Store"
titles={catalog.data}
loaded={catalog.loaded}
onRent={this.handleRent}

/>
</div>
</Route>
Expand Down Expand Up @@ -173,7 +190,7 @@ const DevToast = () => {

class TitleList extends Component {
renderList() {
const { titles = [], loaded, onRent } = this.props;
const { titles = [], loaded, onRent, onReturn } = this.props;
const movies = titles.filter(item => !item?.rented);

if (loaded) {
Expand All @@ -193,6 +210,7 @@ class TitleList extends Component {
item={item}
backdrop={backDrop}
onRent={onRent}
onReturn={onReturn}
/>
);
});
Expand Down Expand Up @@ -220,7 +238,7 @@ class TitleList extends Component {
}
}

const Item = ({ item, onRent, backdrop }) => {
const Item = ({ item, onRent, onReturn, backdrop }) => {
return (
<div className="Item">
<div className="Item__container" style={{ backgroundImage: `url(./${backdrop})` }}>
Expand All @@ -242,6 +260,9 @@ const Item = ({ item, onRent, backdrop }) => {
<div className="Item__button Item__button--rented button">
Watch Now
</div>
<div className="Item__button button" onClick={() => onReturn(item)}>
Return
</div>
</>
}
</div>
Expand Down
36 changes: 34 additions & 2 deletions rent/src/main/java/com/okteto/rent/controller/RentController.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

@RestController
public class RentController {
private static final String KAFKA_TOPIC = "rentals";
private static final String KAFKA_TOPIC_RENTALS = "rentals";
private static final String KAFKA_TOPIC_RETURNS = "returns";

private final Logger logger = LoggerFactory.getLogger(RentController.class);

Expand All @@ -37,7 +38,7 @@ List<String> rent(@RequestBody Rent rentInput) {

logger.info("Rent [{},{}] received", catalogID, price);

kafkaTemplate.send(KAFKA_TOPIC, catalogID, price)
kafkaTemplate.send(KAFKA_TOPIC_RENTALS, catalogID, price)
.thenAccept(result -> logger.info("Message [{}] delivered with offset {}",
catalogID,
result.getRecordMetadata().offset()))
Expand All @@ -50,6 +51,24 @@ List<String> rent(@RequestBody Rent rentInput) {
return new LinkedList<>();
}

@PostMapping(path= "/rent/return", consumes = "application/json", produces = "application/json")
public Map<String, String> returnMovie(@RequestBody ReturnRequest returnRequest) {
String catalogID = returnRequest.getMovieID();

logger.info("Return [{}] received", catalogID);

kafkaTemplate.send(KAFKA_TOPIC_RETURNS, catalogID, catalogID)
.thenAccept(result -> logger.info("Return message [{}] delivered with offset {}",
catalogID,
result.getRecordMetadata().offset()))
.exceptionally(ex -> {
logger.warn("Unable to deliver return message [{}]. {}", catalogID, ex.getMessage());
return null;
});

return Collections.singletonMap("status", "return processed");
}

public static class Rent {
@JsonProperty("catalog_id")
private String movieID;
Expand All @@ -72,4 +91,17 @@ public String getPrice() {
return price;
}
}

public static class ReturnRequest {
@JsonProperty("catalog_id")
private String movieID;

public void setMovieID(String movieID) {
this.movieID = movieID;
}

public String getMovieID() {
return movieID;
}
}
}
22 changes: 18 additions & 4 deletions worker/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,42 @@ func main() {
master := kafka.GetMaster()
defer master.Close()

consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
// Consumer for "rentals" topic
consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest)
if err != nil {
log.Panic(err)
}

// Consumer for "returns" topic
consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest)
if err != nil {
log.Panic(err)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
doneCh := make(chan struct{})

go func() {
for {
select {
case err := <-consumer.Errors():
case err := <-consumerRentals.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
case msg := <-consumerRentals.Messages():
*messageCountStart++
fmt.Printf("Received message: movies %s price %s\n", string(msg.Key), string(msg.Value))
price, _ := strconv.ParseFloat(string(msg.Value), 64)
// price *= 0.5
insertDynStmt := `insert into "rentals"("id", "price") values($1, $2) on conflict(id) do update set price = $2`
if _, err := db.Exec(insertDynStmt, string(msg.Key), fmt.Sprintf("%f", price)); err != nil {
log.Panic(err)
}
case msg := <-consumerReturns.Messages():
catalogID := string(msg.Value)
fmt.Printf("Received return message: catalogID %s\n", catalogID)
deleteStmt := `DELETE FROM rentals WHERE id = $1`
if _, err := db.Exec(deleteStmt, catalogID); err != nil {
log.Panic(err)
}
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
Expand Down