Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ path = "examples/ordermessage/ordermessage_producer.rs"

[[example]]
name = "ordermessage-consumer"
path = "examples/ordermessage/ordermessage_consumer.rs"
path = "examples/ordermessage/ordermessage_consumer.rs"

[[example]]
name = "transaction-producer"
path = "examples/transaction/transaction_producer.rs"
99 changes: 99 additions & 0 deletions rocketmq-client/examples/transaction/transaction_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Import TransactionMQProducer for transactional messaging

Since this example is intended to demonstrate transactional message sending, you should import TransactionMQProducer instead of DefaultMQProducer to enable transactional functionality.

Apply this diff to import TransactionMQProducer:

-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
+use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;

use rocketmq_client::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::producer::transaction_listener::TransactionListener;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();

let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use TransactionMQProducer instead of DefaultMQProducer for transactional messaging

To properly handle transactional messages, you should instantiate a TransactionMQProducer rather than a DefaultMQProducer. This ensures that the producer is configured correctly for transactional operations.

Apply this diff to use TransactionMQProducer and attach the transaction listener:

-let builder = DefaultMQProducer::builder();
+let builder = TransactionMQProducer::builder();

 let mut producer = builder
+    .transaction_listener(Arc::new(TransactionListenerImpl::default()))
     .producer_group(PRODUCER_GROUP.to_string())
     .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
     .build();

Committable suggestion was skipped due to low confidence.


producer.start().await?;

for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use send_message_in_transaction method for transactional message sending

To send messages as part of a transaction, you should use the send_message_in_transaction method provided by TransactionMQProducer. This ensures that the messages are properly managed within the transaction context.

Apply this diff to send transactional messages:

 for _ in 0..10 {
     let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

-    let send_result = producer.send_with_timeout(message, 2000).await?;
+    let send_result = producer.send_message_in_transaction(message, None).await?;
     println!("send result: {:?}", send_result);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer.send_message_in_transaction(message, None).await?;
println!("send result: {:?}", send_result);
}

producer.shutdown().await;

Ok(())
}

struct TransactionListenerImpl {
local_trans: Arc<Mutex<HashMap<String, i32>>>,
transaction_index: AtomicI32,
}

impl Default for TransactionListenerImpl {
fn default() -> Self {
Self {
local_trans: Arc::new(Default::default()),
transaction_index: Default::default(),
}
}
}

impl TransactionListener for TransactionListenerImpl {
fn execute_local_transaction(&self, msg: &Message, arg: &dyn Any) -> LocalTransactionState {
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider logging transaction status for better observability

Adding logging statements within execute_local_transaction can help in tracing and debugging transaction states during execution.

Apply this diff to add logging:

 let mut guard = self.local_trans.lock();
 guard.insert(msg.get_transaction_id().to_string(), status);
+log::info!("Executed local transaction: ID = {}, Status = {}", msg.get_transaction_id(), status);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
log::info!("Executed local transaction: ID = {}, Status = {}", msg.get_transaction_id(), status);
LocalTransactionState::Unknown

Comment on lines +90 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve transaction state management and logging

The current implementation has several areas for improvement:

  1. Transaction state determination is overly simplistic using modulo
  2. Missing logging for state changes
  3. No error handling for transaction ID
 let value = self
     .transaction_index
     .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-let status = value % 3;
+// Implement proper business logic for transaction state determination
+let status = determine_transaction_status(msg, value);
 let mut guard = self.local_trans.lock();
-guard.insert(msg.get_transaction_id().to_string(), status);
+if let Some(tx_id) = msg.get_transaction_id() {
+    guard.insert(tx_id.to_string(), status);
+    log::info!("Transaction state set: ID={}, status={}", tx_id, status);
+} else {
+    log::error!("Missing transaction ID in message");
+}

Committable suggestion was skipped due to low confidence.

}

fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
}
17 changes: 11 additions & 6 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ use crate::implementation::mq_admin_impl::MQAdminImpl;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::producer::default_mq_producer::DefaultMQProducer;
use crate::producer::default_mq_producer::ProducerConfig;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::Result;

const LOCK_TIMEOUT_MILLIS: u64 = 3000;

pub struct MQClientInstance {
pub(crate) client_config: Arc<ClientConfig>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) client_id: String,
boot_timestamp: u64,
/**
* The container of the producer in the current client. The key is the name of
* producerGroup.
*/
producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>,
producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>,
/**
* The container of the consumer in the current client. The key is the name of
* consumer_group.
Expand Down Expand Up @@ -177,7 +177,7 @@ impl MQClientInstance {
) -> ArcRefCellWrapper<MQClientInstance> {
let broker_addr_table = Arc::new(Default::default());
let mut instance = ArcRefCellWrapper::new(MQClientInstance {
client_config: Arc::new(client_config.clone()),
client_config: ArcRefCellWrapper::new(client_config.clone()),
client_id,
boot_timestamp: get_current_millis(),
producer_table: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -326,7 +326,7 @@ impl MQClientInstance {

pub async fn shutdown(&mut self) {}

pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool {
pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
if group.is_empty() {
return false;
}
Expand All @@ -335,7 +335,7 @@ impl MQClientInstance {
warn!("the producer group[{}] exist already.", group);
return false;
}
producer_table.insert(group.to_string(), Box::new(producer));
producer_table.insert(group.to_string(), producer);
true
}

Expand Down Expand Up @@ -1055,6 +1055,11 @@ impl MQClientInstance {
consumer_table.get(group).cloned()
}

pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
}
Comment on lines +1058 to +1061
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

Transaction producer cleanup needs implementation

The verification reveals incomplete producer cleanup implementation:

  • unregister_producer is properly implemented to handle producer table cleanup
  • However, transaction producer's shutdown() is unimplemented (marked with unimplemented!())
  • Default producer's shutdown only handles accumulator and trace dispatcher cleanup
🔗 Analysis chain

Verify producer cleanup in transaction scenarios.

Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure.


Let me gather more information about the producer cleanup implementation and transaction handling.


Let me gather more specific information about producer cleanup in the unregister flow.


Let me check one final aspect about producer cleanup in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove" 

Length of output: 1583


Script:

#!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction" 

# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'

# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"

Length of output: 22008


Script:

#!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
  $$$
}'

# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"

Length of output: 3022


Script:

#!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
  $$$
}'

# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"

Length of output: 3087


pub async fn unregister_consumer(&mut self, group: impl Into<String>) {
self.unregister_client(None, Some(group.into())).await;
}
Expand Down
14 changes: 13 additions & 1 deletion rocketmq-client/src/hook/end_transaction_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub struct EndTransactionContext {}
use rocketmq_common::common::message::message_single::Message;

use crate::producer::local_transaction_state::LocalTransactionState;

pub struct EndTransactionContext<'a> {
pub producer_group: String,
pub broker_addr: String,
pub message: &'a Message,
pub msg_id: String,
pub transaction_id: String,
pub transaction_state: LocalTransactionState,
pub from_transaction_check: bool,
}
Comment on lines +21 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider encapsulating fields and adding validation.

The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation.

Here's a suggested implementation with better encapsulation and validation:

/// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
    producer_group: String,
    broker_addr: String,
    message: &'a Message,
    msg_id: String,
    transaction_id: String,
    transaction_state: LocalTransactionState,
    from_transaction_check: bool,
}

impl<'a> EndTransactionContext<'a> {
    /// Creates a new EndTransactionContext with validation
    pub fn new(
        producer_group: String,
        broker_addr: String,
        message: &'a Message,
        msg_id: String,
        transaction_id: String,
        transaction_state: LocalTransactionState,
        from_transaction_check: bool,
    ) -> Result<Self, &'static str> {
        // Validate inputs
        if producer_group.is_empty() {
            return Err("producer_group cannot be empty");
        }
        if broker_addr.is_empty() {
            return Err("broker_addr cannot be empty");
        }
        if msg_id.is_empty() {
            return Err("msg_id cannot be empty");
        }
        if transaction_id.is_empty() {
            return Err("transaction_id cannot be empty");
        }

        Ok(Self {
            producer_group,
            broker_addr,
            message,
            msg_id,
            transaction_id,
            transaction_state,
            from_transaction_check,
        })
    }

    // Getters
    pub fn producer_group(&self) -> &str {
        &self.producer_group
    }

    pub fn broker_addr(&self) -> &str {
        &self.broker_addr
    }

    pub fn message(&self) -> &Message {
        self.message
    }

    pub fn msg_id(&self) -> &str {
        &self.msg_id
    }

    pub fn transaction_id(&self) -> &str {
        &self.transaction_id
    }

    pub fn transaction_state(&self) -> LocalTransactionState {
        self.transaction_state
    }

    pub fn from_transaction_check(&self) -> bool {
        self.from_transaction_check
    }

    // Setter for mutable state only
    pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
        self.transaction_state = state;
    }
}

3 changes: 2 additions & 1 deletion rocketmq-client/src/hook/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;
use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::ArcRefCellWrapper;

use crate::implementation::communication_mode::CommunicationMode;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
Expand All @@ -38,7 +39,7 @@ pub struct SendMessageContext<'a> {
pub exception: Option<Arc<Box<dyn Error + Send + Sync>>>,
pub mq_trace_context: Option<Arc<Box<dyn std::any::Any + Send + Sync>>>,
pub props: HashMap<String, String>,
pub producer: Option<DefaultMQProducerImpl>,
pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
pub msg_type: Option<MessageType>,
pub namespace: Option<String>,
}
79 changes: 79 additions & 0 deletions rocketmq-client/src/implementation/client_remoting_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use bytes::Bytes;
use rocketmq_common::common::compression::compressor_factory::CompressorFactory;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::MessageDecoder;
Expand All @@ -28,8 +29,10 @@ use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader;
use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor::RequestProcessor;
Expand Down Expand Up @@ -62,10 +65,26 @@ impl RequestProcessor for ClientRemotingProcessor {
let request_code = RequestCode::from(request.code());
info!("process_request: {:?}", request_code);
match request_code {
RequestCode::CheckTransactionState => {
self.check_transaction_state(channel, ctx, request).await
}
RequestCode::ResetConsumerClientOffset => {
unimplemented!("ResetConsumerClientOffset")
}
RequestCode::GetConsumerStatusFromClient => {
unimplemented!("GetConsumerStatusFromClient")
}
RequestCode::GetConsumerRunningInfo => {
unimplemented!("GetConsumerRunningInfo")
}
RequestCode::ConsumeMessageDirectly => {
unimplemented!("ConsumeMessageDirectly")
}
RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await,
RequestCode::NotifyConsumerIdsChanged => {
self.notify_consumer_ids_changed(channel, ctx, request)
}

Comment on lines +68 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement handling for unimplemented request codes to ensure complete functionality

The methods for ResetConsumerClientOffset, GetConsumerStatusFromClient, GetConsumerRunningInfo, and ConsumeMessageDirectly are currently unimplemented. This may lead to incomplete functionality or unexpected behavior when these requests are received. Consider implementing these methods or providing appropriate response handling.

Would you like assistance in implementing these methods or providing default responses?

_ => {
info!("Unknown request code: {:?}", request_code);
Ok(None)
Expand Down Expand Up @@ -190,4 +209,64 @@ impl ClientRemotingProcessor {
}
Ok(None)
}

async fn check_transaction_state(
&mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
mut request: RemotingCommand,
) -> Result<Option<RemotingCommand>> {
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
.unwrap();
Comment on lines +219 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors when decoding the request header to prevent panics

Using .unwrap() can cause a panic if decoding fails. Consider handling the potential error to make the code more robust.

Here's how you can modify the code to handle decoding errors:

             let request_header = request
-                .decode_command_custom_header::<CheckTransactionStateRequestHeader>()
-                .unwrap();
+                .decode_command_custom_header::<CheckTransactionStateRequestHeader>();
+            let request_header = match request_header {
+                Ok(header) => header,
+                Err(e) => {
+                    warn!("Failed to decode request header: {:?}", e);
+                    return Ok(Some(
+                        RemotingCommand::create_response_command()
+                            .set_code(ResponseCode::SystemError)
+                            .set_remark(Some("Failed to decode request header".to_string())),
+                    ));
+                }
+            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
.unwrap();
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>();
let request_header = match request_header {
Ok(header) => header,
Err(e) => {
warn!("Failed to decode request header: {:?}", e);
return Ok(Some(
RemotingCommand::create_response_command()
.set_code(ResponseCode::SystemError)
.set_remark(Some("Failed to decode request header".to_string())),
));
}
};

let message_ext = MessageDecoder::decode(
request.get_body_mut().unwrap(),
true,
true,
false,
false,
false,
);
Comment on lines +223 to +229
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid unwrapping on get_body_mut() to prevent potential panics

Unwrapping request.get_body_mut() can lead to a panic if the body is None. It's safer to handle the Option and provide error handling.

Consider the following change:

-            let message_ext = MessageDecoder::decode(
-                request.get_body_mut().unwrap(),
-                true,
-                true,
-                false,
-                false,
-                false,
-            );
+            let message_ext = if let Some(body) = request.get_body_mut() {
+                MessageDecoder::decode(
+                    body,
+                    true,
+                    true,
+                    false,
+                    false,
+                    false,
+                )
+            } else {
+                warn!("Request body is missing");
+                return Ok(Some(
+                    RemotingCommand::create_response_command()
+                        .set_code(ResponseCode::SystemError)
+                        .set_remark(Some("Request body is missing".to_string())),
+                ));
+            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
request.get_body_mut().unwrap(),
true,
true,
false,
false,
false,
);
let message_ext = if let Some(body) = request.get_body_mut() {
MessageDecoder::decode(
body,
true,
true,
false,
false,
false,
)
} else {
warn!("Request body is missing");
return Ok(Some(
RemotingCommand::create_response_command()
.set_code(ResponseCode::SystemError)
.set_remark(Some("Request body is missing".to_string())),
));
};

if let Some(mut message_ext) = message_ext {
if let Some(mut client_instance) = self.client_instance.upgrade() {
if let Some(ref namespace) = client_instance.client_config.get_namespace() {
let topic = NamespaceUtil::without_namespace_with_namespace(
message_ext.get_topic(),
client_instance
.client_config
.get_namespace()
.unwrap_or_default()
.as_str(),
);
message_ext.set_topic(topic.as_str());
}
let transaction_id =
message_ext.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
if !transaction_id.is_empty() {
message_ext.set_transaction_id(transaction_id.as_str());
}
}
let group = message_ext.get_property(MessageConst::PROPERTY_PRODUCER_GROUP);
if let Some(group) = group {
let producer = client_instance.select_producer(&group).await;
if let Some(producer) = producer {
let addr = channel.remote_address().to_string();
producer.check_transaction_state(
addr.as_str(),
message_ext,
request_header,
);
} else {
warn!("checkTransactionState, pick producer group failed");
}
} else {
warn!("checkTransactionState, pick producer group failed");
}
}
} else {
warn!("checkTransactionState, decode message failed");
};
Ok(None)
}
}
18 changes: 18 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockB
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
Expand Down Expand Up @@ -1130,4 +1131,21 @@ impl MQClientAPIImpl {
))
}
}

pub async fn end_transaction_oneway(
&mut self,
addr: &str,
request_header: EndTransactionRequestHeader,
remark: String,
timeout_millis: u64,
) -> Result<()> {
let request =
RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header)
.set_remark(Some(remark));

self.remoting_client
.invoke_oneway(addr.to_string(), request, timeout_millis)
.await;
Ok(())
}
}
2 changes: 2 additions & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
#![allow(dead_code)]
#![allow(unused_variables)]
#![recursion_limit = "256"]

extern crate core;

use crate::error::MQClientError;
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ pub mod send_callback;
pub mod send_result;
pub mod send_status;
pub mod transaction_listener;
pub mod transaction_mq_produce_builder;
pub mod transaction_mq_producer;
pub mod transaction_send_result;
Loading
Loading