Skip to content

Commit 200a6dd

Browse files
authored
[ISSUE #4150]Broker master node address is offline (#4148)
* Broker master node address is offline * Broker master node address is offline
1 parent c7e94a4 commit 200a6dd

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,10 @@ impl BrokerOuterAPI {
818818
master_broker_addr.map_or("".to_string(), |s| s.to_string()),
819819
))
820820
}
821+
822+
pub fn close_channel(&self, addr_list: Vec<String>) {
823+
self.remoting_client.mut_from_ref().close_clients(addr_list);
824+
}
821825
}
822826

823827
fn process_pull_result(

rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020
use std::time::Duration;
2121

2222
use cheetah_string::CheetahString;
23+
use rocketmq_common::common::mix_all;
2324
use rocketmq_common::common::mix_all::MASTER_ID;
2425
use rocketmq_remoting::code::request_code::RequestCode;
2526
use rocketmq_remoting::code::response_code::ResponseCode;
@@ -178,8 +179,12 @@ impl<MS: MessageStore> NotifyMinBrokerChangeIdHandler<MS> {
178179
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
179180

180181
if let Some(slave_synchronize) = broker_runtime_inner.slave_synchronize() {
181-
if let Some(_master_addr) = slave_synchronize.master_addr() {
182-
// Call the close client method
182+
if let Some(master_addr) = slave_synchronize.master_addr() {
183+
let vip_channel = mix_all::broker_vip_channel(true, master_addr);
184+
let addr_list = vec![master_addr.to_string(), vip_channel.to_string()];
185+
self.broker_runtime_inner
186+
.broker_outer_api()
187+
.close_channel(addr_list);
183188
}
184189
}
185190

0 commit comments

Comments
 (0)