Skip to content

Commit 11da28f

Browse files
committed
update commit configuration on workers
1 parent 2baf591 commit 11da28f

File tree

2 files changed

+5
-2
lines changed

2 files changed

+5
-2
lines changed

worker/cmd/worker/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ func main() {
4242
defer master.Close()
4343

4444
// Consumer for "rentals" topic
45-
consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetOldest)
45+
consumerRentals, err := master.ConsumePartition("rentals", 0, sarama.OffsetNewest)
4646
if err != nil {
4747
log.Panic(err)
4848
}
4949

5050
// Consumer for "returns" topic
51-
consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetOldest)
51+
consumerReturns, err := master.ConsumePartition("returns", 0, sarama.OffsetNewest)
5252
if err != nil {
5353
log.Panic(err)
5454
}

worker/pkg/kafka/kafka.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka
22

33
import (
44
"fmt"
5+
"time"
56

67
kingpin "gopkg.in/alecthomas/kingpin.v2"
78

@@ -16,6 +17,8 @@ func GetMaster() sarama.Consumer {
1617
kingpin.Parse()
1718
config := sarama.NewConfig()
1819
config.Consumer.Return.Errors = true
20+
config.Consumer.Offsets.AutoCommit.Enable = true
21+
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
1922
brokers := *brokerList
2023
fmt.Println("Waiting for kafka...")
2124
for {

0 commit comments

Comments
 (0)