-
Notifications
You must be signed in to change notification settings - Fork 175
[ISSUE #1078]🚀Support send transaction message for client🍻 #1088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
9fb7611
1d41d4b
0c3dea4
8e8deed
4df11a6
ae75277
a23dd4c
83e1f06
4f4fc7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,99 @@ | ||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||
| use std::any::Any; | ||||||||||||||||||||||||||
| use std::collections::HashMap; | ||||||||||||||||||||||||||
| use std::sync::atomic::AtomicI32; | ||||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| use parking_lot::Mutex; | ||||||||||||||||||||||||||
| use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; | ||||||||||||||||||||||||||
| use rocketmq_client::producer::local_transaction_state::LocalTransactionState; | ||||||||||||||||||||||||||
| use rocketmq_client::producer::mq_producer::MQProducer; | ||||||||||||||||||||||||||
| use rocketmq_client::producer::transaction_listener::TransactionListener; | ||||||||||||||||||||||||||
| use rocketmq_client::Result; | ||||||||||||||||||||||||||
| use rocketmq_common::common::message::message_ext::MessageExt; | ||||||||||||||||||||||||||
| use rocketmq_common::common::message::message_single::Message; | ||||||||||||||||||||||||||
| use rocketmq_common::common::message::MessageTrait; | ||||||||||||||||||||||||||
| use rocketmq_rust::rocketmq; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| pub const MESSAGE_COUNT: usize = 1; | ||||||||||||||||||||||||||
| pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name"; | ||||||||||||||||||||||||||
| pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; | ||||||||||||||||||||||||||
| pub const TOPIC: &str = "TopicTest"; | ||||||||||||||||||||||||||
| pub const TAG: &str = "TagA"; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| #[rocketmq::main] | ||||||||||||||||||||||||||
| pub async fn main() -> Result<()> { | ||||||||||||||||||||||||||
| //init logger | ||||||||||||||||||||||||||
| rocketmq_common::log::init_logger(); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // create a producer builder with default configuration | ||||||||||||||||||||||||||
| let builder = DefaultMQProducer::builder(); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| let mut producer = builder | ||||||||||||||||||||||||||
| .producer_group(PRODUCER_GROUP.to_string()) | ||||||||||||||||||||||||||
| .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | ||||||||||||||||||||||||||
| .build(); | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use To properly handle transactional messages, you should instantiate a Apply this diff to use -let builder = DefaultMQProducer::builder();
+let builder = TransactionMQProducer::builder();
let mut producer = builder
+ .transaction_listener(Arc::new(TransactionListenerImpl::default()))
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| producer.start().await?; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for _ in 0..10 { | ||||||||||||||||||||||||||
| let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes()); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| let send_result = producer.send_with_timeout(message, 2000).await?; | ||||||||||||||||||||||||||
| println!("send result: {}", send_result); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use To send messages as part of a transaction, you should use the Apply this diff to send transactional messages: for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
- let send_result = producer.send_with_timeout(message, 2000).await?;
+ let send_result = producer.send_message_in_transaction(message, None).await?;
println!("send result: {:?}", send_result);
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
| producer.shutdown().await; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| struct TransactionListenerImpl { | ||||||||||||||||||||||||||
| local_trans: Arc<Mutex<HashMap<String, i32>>>, | ||||||||||||||||||||||||||
| transaction_index: AtomicI32, | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| impl Default for TransactionListenerImpl { | ||||||||||||||||||||||||||
| fn default() -> Self { | ||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||
| local_trans: Arc::new(Default::default()), | ||||||||||||||||||||||||||
| transaction_index: Default::default(), | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| impl TransactionListener for TransactionListenerImpl { | ||||||||||||||||||||||||||
| fn execute_local_transaction(&self, msg: &Message, arg: &dyn Any) -> LocalTransactionState { | ||||||||||||||||||||||||||
| let value = self | ||||||||||||||||||||||||||
| .transaction_index | ||||||||||||||||||||||||||
| .fetch_add(1, std::sync::atomic::Ordering::AcqRel); | ||||||||||||||||||||||||||
| let status = value % 3; | ||||||||||||||||||||||||||
| let mut guard = self.local_trans.lock(); | ||||||||||||||||||||||||||
| guard.insert(msg.get_transaction_id().to_string(), status); | ||||||||||||||||||||||||||
| LocalTransactionState::Unknown | ||||||||||||||||||||||||||
|
Comment on lines
+94
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider logging transaction status for better observability Adding logging statements within Apply this diff to add logging: let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
+log::info!("Executed local transaction: ID = {}, Status = {}", msg.get_transaction_id(), status);📝 Committable suggestion
Suggested change
Comment on lines
+90
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve transaction state management and logging The current implementation has several areas for improvement:
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-let status = value % 3;
+// Implement proper business logic for transaction state determination
+let status = determine_transaction_status(msg, value);
let mut guard = self.local_trans.lock();
-guard.insert(msg.get_transaction_id().to_string(), status);
+if let Some(tx_id) = msg.get_transaction_id() {
+ guard.insert(tx_id.to_string(), status);
+ log::info!("Transaction state set: ID={}, status={}", tx_id, status);
+} else {
+ log::error!("Missing transaction ID in message");
+}
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState { | ||||||||||||||||||||||||||
| let mut guard = self.local_trans.lock(); | ||||||||||||||||||||||||||
| let status = guard.get(msg.transaction_id()).unwrap_or(&-1); | ||||||||||||||||||||||||||
mxsm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
| match status { | ||||||||||||||||||||||||||
| 1 => LocalTransactionState::CommitMessage, | ||||||||||||||||||||||||||
| 2 => LocalTransactionState::RollbackMessage, | ||||||||||||||||||||||||||
| _ => LocalTransactionState::Unknown, | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,21 +59,21 @@ use crate::implementation::mq_admin_impl::MQAdminImpl; | |
| use crate::implementation::mq_client_api_impl::MQClientAPIImpl; | ||
| use crate::producer::default_mq_producer::DefaultMQProducer; | ||
| use crate::producer::default_mq_producer::ProducerConfig; | ||
| use crate::producer::producer_impl::mq_producer_inner::MQProducerInner; | ||
| use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl; | ||
| use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; | ||
| use crate::Result; | ||
|
|
||
| const LOCK_TIMEOUT_MILLIS: u64 = 3000; | ||
|
|
||
| pub struct MQClientInstance { | ||
| pub(crate) client_config: Arc<ClientConfig>, | ||
| pub(crate) client_config: ArcRefCellWrapper<ClientConfig>, | ||
| pub(crate) client_id: String, | ||
| boot_timestamp: u64, | ||
| /** | ||
| * The container of the producer in the current client. The key is the name of | ||
| * producerGroup. | ||
| */ | ||
| producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>, | ||
| producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>, | ||
| /** | ||
| * The container of the consumer in the current client. The key is the name of | ||
| * consumer_group. | ||
|
|
@@ -177,7 +177,7 @@ impl MQClientInstance { | |
| ) -> ArcRefCellWrapper<MQClientInstance> { | ||
| let broker_addr_table = Arc::new(Default::default()); | ||
| let mut instance = ArcRefCellWrapper::new(MQClientInstance { | ||
| client_config: Arc::new(client_config.clone()), | ||
| client_config: ArcRefCellWrapper::new(client_config.clone()), | ||
| client_id, | ||
| boot_timestamp: get_current_millis(), | ||
| producer_table: Arc::new(RwLock::new(HashMap::new())), | ||
|
|
@@ -326,7 +326,7 @@ impl MQClientInstance { | |
|
|
||
| pub async fn shutdown(&mut self) {} | ||
|
|
||
| pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool { | ||
| pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool { | ||
| if group.is_empty() { | ||
| return false; | ||
| } | ||
|
|
@@ -335,7 +335,7 @@ impl MQClientInstance { | |
| warn!("the producer group[{}] exist already.", group); | ||
| return false; | ||
| } | ||
| producer_table.insert(group.to_string(), Box::new(producer)); | ||
| producer_table.insert(group.to_string(), producer); | ||
| true | ||
| } | ||
|
|
||
|
|
@@ -1055,6 +1055,11 @@ impl MQClientInstance { | |
| consumer_table.get(group).cloned() | ||
| } | ||
|
|
||
| pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> { | ||
| let producer_table = self.producer_table.read().await; | ||
| producer_table.get(group).cloned() | ||
| } | ||
|
Comment on lines
+1058
to
+1061
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Based on the gathered information, I can now provide the final response: Transaction producer cleanup needs implementation The verification reveals incomplete producer cleanup implementation:
🔗 Analysis chainVerify producer cleanup in transaction scenarios. Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure. Let me gather more information about the producer cleanup implementation and transaction handling. Let me gather more specific information about producer cleanup in the unregister flow. Let me check one final aspect about producer cleanup in the implementation. 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove"
Length of output: 1583 Script: #!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction"
# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'
# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"
Length of output: 22008 Script: #!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
$$$
}'
# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"
Length of output: 3022 Script: #!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
$$$
}'
# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"
Length of output: 3087 |
||
|
|
||
| pub async fn unregister_consumer(&mut self, group: impl Into<String>) { | ||
| self.unregister_client(None, Some(group.into())).await; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,4 +14,16 @@ | |
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| pub struct EndTransactionContext {} | ||
| use rocketmq_common::common::message::message_single::Message; | ||
|
|
||
| use crate::producer::local_transaction_state::LocalTransactionState; | ||
|
|
||
| pub struct EndTransactionContext<'a> { | ||
| pub producer_group: String, | ||
| pub broker_addr: String, | ||
| pub message: &'a Message, | ||
| pub msg_id: String, | ||
| pub transaction_id: String, | ||
| pub transaction_state: LocalTransactionState, | ||
| pub from_transaction_check: bool, | ||
| } | ||
|
Comment on lines
+21
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider encapsulating fields and adding validation. The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation. Here's a suggested implementation with better encapsulation and validation: /// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
producer_group: String,
broker_addr: String,
message: &'a Message,
msg_id: String,
transaction_id: String,
transaction_state: LocalTransactionState,
from_transaction_check: bool,
}
impl<'a> EndTransactionContext<'a> {
/// Creates a new EndTransactionContext with validation
pub fn new(
producer_group: String,
broker_addr: String,
message: &'a Message,
msg_id: String,
transaction_id: String,
transaction_state: LocalTransactionState,
from_transaction_check: bool,
) -> Result<Self, &'static str> {
// Validate inputs
if producer_group.is_empty() {
return Err("producer_group cannot be empty");
}
if broker_addr.is_empty() {
return Err("broker_addr cannot be empty");
}
if msg_id.is_empty() {
return Err("msg_id cannot be empty");
}
if transaction_id.is_empty() {
return Err("transaction_id cannot be empty");
}
Ok(Self {
producer_group,
broker_addr,
message,
msg_id,
transaction_id,
transaction_state,
from_transaction_check,
})
}
// Getters
pub fn producer_group(&self) -> &str {
&self.producer_group
}
pub fn broker_addr(&self) -> &str {
&self.broker_addr
}
pub fn message(&self) -> &Message {
self.message
}
pub fn msg_id(&self) -> &str {
&self.msg_id
}
pub fn transaction_id(&self) -> &str {
&self.transaction_id
}
pub fn transaction_state(&self) -> LocalTransactionState {
self.transaction_state
}
pub fn from_transaction_check(&self) -> bool {
self.from_transaction_check
}
// Setter for mutable state only
pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
self.transaction_state = state;
}
} |
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,6 +20,7 @@ use bytes::Bytes; | |||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::common::compression::compressor_factory::CompressorFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::common::message::message_ext::MessageExt; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::common::message::MessageConst; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::common::message::MessageTrait; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::MessageAccessor::MessageAccessor; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_common::MessageDecoder; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -28,8 +29,10 @@ use rocketmq_common::WeakCellWrapper; | |||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::code::request_code::RequestCode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::code::response_code::ResponseCode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::net::channel::Channel; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::protocol::remoting_command::RemotingCommand; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| use rocketmq_remoting::runtime::processor::RequestProcessor; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -62,10 +65,26 @@ impl RequestProcessor for ClientRemotingProcessor { | |||||||||||||||||||||||||||||||||||||||||||||||||
| let request_code = RequestCode::from(request.code()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| info!("process_request: {:?}", request_code); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| match request_code { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::CheckTransactionState => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.check_transaction_state(channel, ctx, request).await | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::ResetConsumerClientOffset => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unimplemented!("ResetConsumerClientOffset") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::GetConsumerStatusFromClient => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unimplemented!("GetConsumerStatusFromClient") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::GetConsumerRunningInfo => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unimplemented!("GetConsumerRunningInfo") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::ConsumeMessageDirectly => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unimplemented!("ConsumeMessageDirectly") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| RequestCode::NotifyConsumerIdsChanged => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.notify_consumer_ids_changed(channel, ctx, request) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+68
to
+87
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement handling for unimplemented request codes to ensure complete functionality The methods for Would you like assistance in implementing these methods or providing default responses? |
||||||||||||||||||||||||||||||||||||||||||||||||||
| _ => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| info!("Unknown request code: {:?}", request_code); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -190,4 +209,64 @@ impl ClientRemotingProcessor { | |||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| async fn check_transaction_state( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| &mut self, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| channel: Channel, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ctx: ConnectionHandlerContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| mut request: RemotingCommand, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Result<Option<RemotingCommand>> { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let request_header = request | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .decode_command_custom_header::<CheckTransactionStateRequestHeader>() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+219
to
+221
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle errors when decoding the request header to prevent panics Using Here's how you can modify the code to handle decoding errors: let request_header = request
- .decode_command_custom_header::<CheckTransactionStateRequestHeader>()
- .unwrap();
+ .decode_command_custom_header::<CheckTransactionStateRequestHeader>();
+ let request_header = match request_header {
+ Ok(header) => header,
+ Err(e) => {
+ warn!("Failed to decode request header: {:?}", e);
+ return Ok(Some(
+ RemotingCommand::create_response_command()
+ .set_code(ResponseCode::SystemError)
+ .set_remark(Some("Failed to decode request header".to_string())),
+ ));
+ }
+ };📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| let message_ext = MessageDecoder::decode( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| request.get_body_mut().unwrap(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| true, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| true, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+223
to
+229
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid unwrapping on Unwrapping Consider the following change: - let message_ext = MessageDecoder::decode(
- request.get_body_mut().unwrap(),
- true,
- true,
- false,
- false,
- false,
- );
+ let message_ext = if let Some(body) = request.get_body_mut() {
+ MessageDecoder::decode(
+ body,
+ true,
+ true,
+ false,
+ false,
+ false,
+ )
+ } else {
+ warn!("Request body is missing");
+ return Ok(Some(
+ RemotingCommand::create_response_command()
+ .set_code(ResponseCode::SystemError)
+ .set_remark(Some("Request body is missing".to_string())),
+ ));
+ };📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(mut message_ext) = message_ext { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(mut client_instance) = self.client_instance.upgrade() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(ref namespace) = client_instance.client_config.get_namespace() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let topic = NamespaceUtil::without_namespace_with_namespace( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| message_ext.get_topic(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| client_instance | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .client_config | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .get_namespace() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .unwrap_or_default() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .as_str(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| message_ext.set_topic(topic.as_str()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let transaction_id = | ||||||||||||||||||||||||||||||||||||||||||||||||||
| message_ext.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(transaction_id) = transaction_id { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if !transaction_id.is_empty() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| message_ext.set_transaction_id(transaction_id.as_str()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let group = message_ext.get_property(MessageConst::PROPERTY_PRODUCER_GROUP); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(group) = group { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let producer = client_instance.select_producer(&group).await; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(producer) = producer { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| let addr = channel.remote_address().to_string(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| producer.check_transaction_state( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| addr.as_str(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| message_ext, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| request_header, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| warn!("checkTransactionState, pick producer group failed"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| warn!("checkTransactionState, pick producer group failed"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| warn!("checkTransactionState, decode message failed"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(None) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import
TransactionMQProducerfor transactional messagingSince this example is intended to demonstrate transactional message sending, you should import
TransactionMQProducerinstead ofDefaultMQProducerto enable transactional functionality.Apply this diff to import
TransactionMQProducer:📝 Committable suggestion