diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index fc425923..9c063254 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -1848,6 +1848,11 @@ impl BrokerRuntimeInner { &self.replicas_manager } + #[inline] + pub fn replicas_manager_mut(&mut self) -> &mut Option { + &mut self.replicas_manager + } + #[inline] pub fn store_host(&self) -> SocketAddr { self.store_host diff --git a/rocketmq-broker/src/controller/replicas_manager.rs b/rocketmq-broker/src/controller/replicas_manager.rs index 4605ddc3..b538e9bd 100644 --- a/rocketmq-broker/src/controller/replicas_manager.rs +++ b/rocketmq-broker/src/controller/replicas_manager.rs @@ -1,4 +1,3 @@ -use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,7 +14,13 @@ use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry; * See the License for the specific language governing permissions and * limitations under the License. */ + +use std::collections::HashSet; + +use cheetah_string::CheetahString; +use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry; use tracing::warn; + #[derive(Default)] pub struct ReplicasManager {} @@ -31,4 +36,15 @@ impl ReplicasManager { pub fn get_epoch_entries(&self) -> Vec { unimplemented!("") } + + pub async fn change_broker_role( + &mut self, + _new_master_broker_id: Option, + _new_master_address: Option, + _new_master_epoch: Option, + _sync_state_set_epoch: Option, + _sync_state_set: Option<&HashSet>, + ) -> rocketmq_error::RocketMQResult<()> { + Ok(()) + } } diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index 4309f2bb..bcb4b749 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -30,6 +30,7 @@ use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler; use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler; use crate::processor::admin_broker_processor::broker_epoch_cache_handler::BrokerEpochCacheHandler; use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler; +use crate::processor::admin_broker_processor::notify_broker_role_change_handler::NotifyBrokerRoleChangeHandler; use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler; use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler; use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler::ResetMasterFlushOffsetHandler; @@ -41,6 +42,7 @@ mod batch_mq_handler; mod broker_config_request_handler; mod broker_epoch_cache_handler; mod consumer_request_handler; +mod notify_broker_role_change_handler; mod notify_min_broker_id_handler; mod offset_request_handler; mod reset_master_flusg_offset_handler; @@ -62,6 +64,7 @@ pub struct AdminBrokerProcessor { update_broker_ha_handler: UpdateBrokerHaHandler, reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler, broker_epoch_cache_handler: BrokerEpochCacheHandler, + notify_broker_role_change_handler: NotifyBrokerRoleChangeHandler, } impl RequestProcessor for AdminBrokerProcessor @@ -102,6 +105,9 @@ impl AdminBrokerProcessor { let broker_epoch_cache_handler = BrokerEpochCacheHandler::new(broker_runtime_inner.clone()); + let notify_broker_role_change_handler = + NotifyBrokerRoleChangeHandler::new(broker_runtime_inner.clone()); + AdminBrokerProcessor { topic_request_handler, broker_config_request_handler, @@ -114,6 +120,7 @@ impl AdminBrokerProcessor { update_broker_ha_handler, reset_master_flusg_offset_handler, broker_epoch_cache_handler, + notify_broker_role_change_handler, } } } @@ -260,6 +267,11 @@ impl AdminBrokerProcessor { .get_broker_epoch_cache(channel, ctx, request_code, request) .await } + RequestCode::NotifyBrokerRoleChanged => { + self.notify_broker_role_change_handler + .notify_broker_role_changed(channel, ctx, request_code, request) + .await + } _ => Some(get_unknown_cmd_response(request_code)), } } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs new file mode 100644 index 00000000..06605c50 --- /dev/null +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet; +use rocketmq_remoting::protocol::header::notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader; +use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::protocol::RemotingDeserializable; +use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::ArcMut; +use rocketmq_store::base::message_store::MessageStore; +use tracing::info; + +use crate::broker_runtime::BrokerRuntimeInner; + +#[derive(Clone)] +pub struct NotifyBrokerRoleChangeHandler { + broker_runtime_inner: ArcMut>, +} + +impl NotifyBrokerRoleChangeHandler { + pub fn new(broker_runtime_inner: ArcMut>) -> Self { + Self { + broker_runtime_inner, + } + } + + pub async fn notify_broker_role_changed( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + request: &mut RemotingCommand, + ) -> Option { + let request_header = + request.decode_command_custom_header::(); + + let sync_state_set_info = + SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); + + let response = RemotingCommand::create_response_command(); + + info!( + "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", + request_header.as_ref().expect("null") + ); + + if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { + if let Ok(request_header) = request_header { + match replicas_mangesr + .change_broker_role( + request_header.master_broker_id, + request_header.master_address, + request_header.master_epoch, + request_header.sync_state_set_epoch, + sync_state_set_info.get_sync_state_set(), + ) + .await + { + Ok(_) => {} + Err(e) => { + panic!("Failed to call method change_broker_role: {}", e); + } + } + } + } + + Some(response.set_code(ResponseCode::Success)) + } +} diff --git a/rocketmq-remoting/src/protocol/body.rs b/rocketmq-remoting/src/protocol/body.rs index f94ab2f9..7e9a5107 100644 --- a/rocketmq-remoting/src/protocol/body.rs +++ b/rocketmq-remoting/src/protocol/body.rs @@ -56,6 +56,7 @@ pub mod request; pub mod response; pub mod set_message_request_mode_request_body; pub mod subscription_group_wrapper; +pub mod sync_state_set_body; pub mod timer_metrics_serialize_wrapper; pub mod topic; pub mod topic_info_wrapper; diff --git a/rocketmq-remoting/src/protocol/body/sync_state_set_body.rs b/rocketmq-remoting/src/protocol/body/sync_state_set_body.rs new file mode 100644 index 00000000..6dd21f78 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/sync_state_set_body.rs @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashSet; + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct SyncStateSet { + sync_state_set: Option>, + sync_state_set_epoch: i32, +} + +impl SyncStateSet { + pub fn get_sync_state_set(&self) -> Option<&HashSet> { + self.sync_state_set.as_ref() + } + + pub fn get_sync_state_set_epoch(&self) -> i32 { + self.sync_state_set_epoch + } +} diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index cbd9b42b..82a6c481 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -52,6 +52,7 @@ pub mod message_operation_header; pub mod namesrv; pub mod notification_request_header; pub mod notification_response_header; +pub mod notify_broker_role_change_request_header; pub mod notify_consumer_ids_changed_request_header; pub mod pop_message_request_header; pub mod pop_message_response_header; diff --git a/rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs b/rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs new file mode 100644 index 00000000..3adaf04f --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::fmt::Display; + +use cheetah_string::CheetahString; +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct NotifyBrokerRoleChangedRequestHeader { + #[required] + pub master_address: Option, + + #[required] + pub master_epoch: Option, + + #[required] + pub sync_state_set_epoch: Option, + + #[required] + pub master_broker_id: Option, +} + +impl Display for NotifyBrokerRoleChangedRequestHeader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "(master_address={:?}, master_epoch={:?}, sync_state_set_epoch={:?}, \ + master_broker_id={:?})", + self.master_address, + self.master_epoch, + self.sync_state_set_epoch, + self.master_broker_id + ) + } +}