Skip to content

Commit ead727b

Browse files
committed
调整kafka默认配置参数, 去除new client
1 parent f143226 commit ead727b

File tree

2 files changed

+7
-19
lines changed

2 files changed

+7
-19
lines changed

bus/broker/kafka/publisher.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,20 @@ func (b *Publisher) Connect() error {
5656
b.mux.Lock()
5757
defer b.mux.Unlock()
5858

59-
b.l.Debugf("try connect: %v ...", b.conf.Hosts)
60-
61-
client, err := sarama.NewClient(b.conf.Hosts, b.kc)
62-
if err != nil {
63-
b.l.Errorf("new kafka client error, %s", err)
64-
return err
65-
}
66-
b.l.Debugf("connect %v success", b.conf.Hosts)
67-
6859
// try to connect
69-
producer, err := sarama.NewAsyncProducerFromClient(client)
60+
b.l.Debugf("try connect: %v ...", b.conf.Hosts)
61+
producer, err := sarama.NewAsyncProducer(b.conf.Hosts, b.kc)
7062
if err != nil {
7163
b.l.Errorf("new kafka producer fails with: %+v", err)
7264
return err
7365
}
7466

7567
b.producer = producer
7668
b.pubChan = producer.Input()
69+
b.l.Debugf("connect %v success", b.conf.Hosts)
70+
7771
go b.watchSuccess(producer.Successes())
7872
go b.watchFailed(producer.Errors())
79-
8073
return nil
8174
}
8275

bus/broker/kafka/subscriber.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,15 @@ func (s *Subscriber) Connect() error {
5454
s.mux.Lock()
5555
defer s.mux.Unlock()
5656

57+
// try to connect
5758
s.l.Debugf("try connect: %v ...", s.conf.Hosts)
58-
client, err := sarama.NewClient(s.conf.Hosts, s.kc)
59-
if err != nil {
60-
s.l.Errorf("new kafka client error, %s", err)
61-
return err
62-
}
63-
s.l.Debugf("connect %v success", s.conf.Hosts)
64-
65-
consumer, err := sarama.NewConsumerGroupFromClient(s.conf.GroupID, client)
59+
consumer, err := sarama.NewConsumerGroup(s.conf.Hosts, s.conf.GroupID, s.kc)
6660
if err != nil {
6761
s.l.Errorf("Kafka consummer connect fails with: %+v", err)
6862
return err
6963
}
7064
s.comsummer = consumer
65+
s.l.Debugf("connect %v success", s.conf.Hosts)
7166
return nil
7267
}
7368

0 commit comments

Comments
 (0)