Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ path = "examples/ordermessage/ordermessage_producer.rs"

[[example]]
name = "ordermessage-consumer"
path = "examples/ordermessage/ordermessage_consumer.rs"
path = "examples/ordermessage/ordermessage_consumer.rs"

[[example]]
name = "transaction-producer"
path = "examples/transaction/transaction_producer.rs"
108 changes: 108 additions & 0 deletions rocketmq-client/examples/transaction/transaction_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Import TransactionMQProducer for transactional messaging

Since this example is intended to demonstrate transactional message sending, you should import TransactionMQProducer instead of DefaultMQProducer to enable transactional functionality.

Apply this diff to import TransactionMQProducer:

-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
+use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;

use rocketmq_client::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::producer::transaction_listener::TransactionListener;
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = TransactionMQProducer::builder();

let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.build();
Comment on lines +48 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding retry configuration for better reliability

The producer configuration could benefit from retry settings to handle temporary network issues or broker unavailability.

 let mut producer = builder
     .producer_group(PRODUCER_GROUP.to_string())
     .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
     .topics(vec![TOPIC.to_string()])
     .transaction_listener(TransactionListenerImpl::default())
+    .retry_times(3)
+    .retry_interval_ms(1000)
     .build();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.build();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.retry_times(3)
.retry_interval_ms(1000)
.build();


producer.start().await?;

for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
println!("send result: {}", send_result);
}
Comment on lines +57 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and logging for send results

The current implementation only prints send results to stdout. Consider using proper logging for better observability in production environments.

 for _ in 0..10 {
     let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
     let send_result = producer
         .send_message_in_transaction::<()>(message, None)
         .await?;
-    println!("send result: {}", send_result);
+    log::info!("Transaction message sent: {}", send_result);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
println!("send result: {}", send_result);
}
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
log::info!("Transaction message sent: {}", send_result);
}

let _ = tokio::signal::ctrl_c().await;
producer.shutdown().await;

Ok(())
}

struct TransactionListenerImpl {
local_trans: Arc<Mutex<HashMap<String, i32>>>,
transaction_index: AtomicI32,
}

impl Default for TransactionListenerImpl {
fn default() -> Self {
Self {
local_trans: Arc::new(Default::default()),
transaction_index: Default::default(),
}
}
}

impl TransactionListener for TransactionListenerImpl {
fn execute_local_transaction(
&self,
msg: &Message,
arg: Option<&(dyn Any + Send + Sync)>,
) -> LocalTransactionState {
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider logging transaction status for better observability

Adding logging statements within execute_local_transaction can help in tracing and debugging transaction states during execution.

Apply this diff to add logging:

 let mut guard = self.local_trans.lock();
 guard.insert(msg.get_transaction_id().to_string(), status);
+log::info!("Executed local transaction: ID = {}, Status = {}", msg.get_transaction_id(), status);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
log::info!("Executed local transaction: ID = {}, Status = {}", msg.get_transaction_id(), status);
LocalTransactionState::Unknown

Comment on lines +90 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve transaction state management and logging

The current implementation has several areas for improvement:

  1. Transaction state determination is overly simplistic using modulo
  2. Missing logging for state changes
  3. No error handling for transaction ID
 let value = self
     .transaction_index
     .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-let status = value % 3;
+// Implement proper business logic for transaction state determination
+let status = determine_transaction_status(msg, value);
 let mut guard = self.local_trans.lock();
-guard.insert(msg.get_transaction_id().to_string(), status);
+if let Some(tx_id) = msg.get_transaction_id() {
+    guard.insert(tx_id.to_string(), status);
+    log::info!("Transaction state set: ID={}, status={}", tx_id, status);
+} else {
+    log::error!("Missing transaction ID in message");
+}

Committable suggestion was skipped due to low confidence.

}

fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
Comment on lines +99 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve transaction check robustness and logging

The current implementation could panic on missing transaction IDs and lacks proper logging.

 fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
     let mut guard = self.local_trans.lock();
-    let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
-    match status {
-        1 => LocalTransactionState::CommitMessage,
-        2 => LocalTransactionState::RollbackMessage,
-        _ => LocalTransactionState::Unknown,
+    let tx_id = msg.get_transaction_id();
+    if let Some(tx_id) = tx_id {
+        let status = guard.get(tx_id).copied().unwrap_or(-1);
+        let state = match status {
+            1 => LocalTransactionState::CommitMessage,
+            2 => LocalTransactionState::RollbackMessage,
+            _ => LocalTransactionState::Unknown,
+        };
+        log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state);
+        state
+    } else {
+        log::error!("Missing transaction ID in message during check");
+        LocalTransactionState::RollbackMessage
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let tx_id = msg.get_transaction_id();
if let Some(tx_id) = tx_id {
let status = guard.get(tx_id).copied().unwrap_or(-1);
let state = match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
};
log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state);
state
} else {
log::error!("Missing transaction ID in message during check");
LocalTransactionState::RollbackMessage
}
}

}
17 changes: 11 additions & 6 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::producer::default_mq_producer::DefaultMQProducer;
use crate::producer::default_mq_producer::ProducerConfig;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::Result;

const LOCK_TIMEOUT_MILLIS: u64 = 3000;

pub struct MQClientInstance {
pub(crate) client_config: Arc<ClientConfig>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) client_id: String,
boot_timestamp: u64,
/**
* The container of the producer in the current client. The key is the name of
* producerGroup.
*/
producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>,
producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>,
/**
* The container of the consumer in the current client. The key is the name of
* consumer_group.
Expand Down Expand Up @@ -177,7 +177,7 @@
) -> ArcRefCellWrapper<MQClientInstance> {
let broker_addr_table = Arc::new(Default::default());
let mut instance = ArcRefCellWrapper::new(MQClientInstance {
client_config: Arc::new(client_config.clone()),
client_config: ArcRefCellWrapper::new(client_config.clone()),

Check warning on line 180 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L180

Added line #L180 was not covered by tests
client_id,
boot_timestamp: get_current_millis(),
producer_table: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -326,7 +326,7 @@

pub async fn shutdown(&mut self) {}

pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool {
pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {

Check warning on line 329 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L329

Added line #L329 was not covered by tests
if group.is_empty() {
return false;
}
Expand All @@ -335,7 +335,7 @@
warn!("the producer group[{}] exist already.", group);
return false;
}
producer_table.insert(group.to_string(), Box::new(producer));
producer_table.insert(group.to_string(), producer);
true
}

Expand Down Expand Up @@ -1055,6 +1055,11 @@
consumer_table.get(group).cloned()
}

pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
}

Check warning on line 1061 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L1058-L1061

Added lines #L1058 - L1061 were not covered by tests
Comment on lines +1058 to +1061
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

Transaction producer cleanup needs implementation

The verification reveals incomplete producer cleanup implementation:

  • unregister_producer is properly implemented to handle producer table cleanup
  • However, transaction producer's shutdown() is unimplemented (marked with unimplemented!())
  • Default producer's shutdown only handles accumulator and trace dispatcher cleanup
🔗 Analysis chain

Verify producer cleanup in transaction scenarios.

Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure.


Let me gather more information about the producer cleanup implementation and transaction handling.


Let me gather more specific information about producer cleanup in the unregister flow.


Let me check one final aspect about producer cleanup in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove" 

Length of output: 1583


Script:

#!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction" 

# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'

# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"

Length of output: 22008


Script:

#!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
  $$$
}'

# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"

Length of output: 3022


Script:

#!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
  $$$
}'

# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"

Length of output: 3087


pub async fn unregister_consumer(&mut self, group: impl Into<String>) {
self.unregister_client(None, Some(group.into())).await;
}
Expand Down
14 changes: 13 additions & 1 deletion rocketmq-client/src/hook/end_transaction_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub struct EndTransactionContext {}
use rocketmq_common::common::message::message_single::Message;

use crate::producer::local_transaction_state::LocalTransactionState;

pub struct EndTransactionContext<'a> {
pub producer_group: String,
pub broker_addr: String,
pub message: &'a Message,
pub msg_id: String,
pub transaction_id: String,
pub transaction_state: LocalTransactionState,
pub from_transaction_check: bool,
}
Comment on lines +21 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider encapsulating fields and adding validation.

The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation.

Here's a suggested implementation with better encapsulation and validation:

/// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
    producer_group: String,
    broker_addr: String,
    message: &'a Message,
    msg_id: String,
    transaction_id: String,
    transaction_state: LocalTransactionState,
    from_transaction_check: bool,
}

impl<'a> EndTransactionContext<'a> {
    /// Creates a new EndTransactionContext with validation
    pub fn new(
        producer_group: String,
        broker_addr: String,
        message: &'a Message,
        msg_id: String,
        transaction_id: String,
        transaction_state: LocalTransactionState,
        from_transaction_check: bool,
    ) -> Result<Self, &'static str> {
        // Validate inputs
        if producer_group.is_empty() {
            return Err("producer_group cannot be empty");
        }
        if broker_addr.is_empty() {
            return Err("broker_addr cannot be empty");
        }
        if msg_id.is_empty() {
            return Err("msg_id cannot be empty");
        }
        if transaction_id.is_empty() {
            return Err("transaction_id cannot be empty");
        }

        Ok(Self {
            producer_group,
            broker_addr,
            message,
            msg_id,
            transaction_id,
            transaction_state,
            from_transaction_check,
        })
    }

    // Getters
    pub fn producer_group(&self) -> &str {
        &self.producer_group
    }

    pub fn broker_addr(&self) -> &str {
        &self.broker_addr
    }

    pub fn message(&self) -> &Message {
        self.message
    }

    pub fn msg_id(&self) -> &str {
        &self.msg_id
    }

    pub fn transaction_id(&self) -> &str {
        &self.transaction_id
    }

    pub fn transaction_state(&self) -> LocalTransactionState {
        self.transaction_state
    }

    pub fn from_transaction_check(&self) -> bool {
        self.from_transaction_check
    }

    // Setter for mutable state only
    pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
        self.transaction_state = state;
    }
}

3 changes: 2 additions & 1 deletion rocketmq-client/src/hook/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::ArcRefCellWrapper;

use crate::implementation::communication_mode::CommunicationMode;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
Expand All @@ -38,7 +39,7 @@
pub exception: Option<Arc<Box<dyn Error + Send + Sync>>>,
pub mq_trace_context: Option<Arc<Box<dyn std::any::Any + Send + Sync>>>,
pub props: HashMap<String, String>,
pub producer: Option<DefaultMQProducerImpl>,
pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,

Check warning on line 42 in rocketmq-client/src/hook/send_message_context.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/hook/send_message_context.rs#L42

Added line #L42 was not covered by tests
pub msg_type: Option<MessageType>,
pub namespace: Option<String>,
}
79 changes: 79 additions & 0 deletions rocketmq-client/src/implementation/client_remoting_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use rocketmq_common::common::compression::compressor_factory::CompressorFactory;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::MessageDecoder;
Expand All @@ -28,8 +29,10 @@
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader;
use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor::RequestProcessor;
Expand Down Expand Up @@ -62,10 +65,26 @@
let request_code = RequestCode::from(request.code());
info!("process_request: {:?}", request_code);
match request_code {
RequestCode::CheckTransactionState => {
self.check_transaction_state(channel, ctx, request).await

Check warning on line 69 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L69

Added line #L69 was not covered by tests
}
RequestCode::ResetConsumerClientOffset => {
unimplemented!("ResetConsumerClientOffset")

Check warning on line 72 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L72

Added line #L72 was not covered by tests
}
RequestCode::GetConsumerStatusFromClient => {
unimplemented!("GetConsumerStatusFromClient")

Check warning on line 75 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L75

Added line #L75 was not covered by tests
}
RequestCode::GetConsumerRunningInfo => {
unimplemented!("GetConsumerRunningInfo")

Check warning on line 78 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L78

Added line #L78 was not covered by tests
}
RequestCode::ConsumeMessageDirectly => {
unimplemented!("ConsumeMessageDirectly")

Check warning on line 81 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L81

Added line #L81 was not covered by tests
}
RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await,
RequestCode::NotifyConsumerIdsChanged => {
self.notify_consumer_ids_changed(channel, ctx, request)
}

Comment on lines +68 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement handling for unimplemented request codes to ensure complete functionality

The methods for ResetConsumerClientOffset, GetConsumerStatusFromClient, GetConsumerRunningInfo, and ConsumeMessageDirectly are currently unimplemented. This may lead to incomplete functionality or unexpected behavior when these requests are received. Consider implementing these methods or providing appropriate response handling.

Would you like assistance in implementing these methods or providing default responses?

_ => {
info!("Unknown request code: {:?}", request_code);
Ok(None)
Expand Down Expand Up @@ -190,4 +209,64 @@
}
Ok(None)
}

async fn check_transaction_state(
&mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
mut request: RemotingCommand,
) -> Result<Option<RemotingCommand>> {
let request_header = request

Check warning on line 219 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L213-L219

Added lines #L213 - L219 were not covered by tests
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
.unwrap();
Comment on lines +219 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors when decoding the request header to prevent panics

Using .unwrap() can cause a panic if decoding fails. Consider handling the potential error to make the code more robust.

Here's how you can modify the code to handle decoding errors:

             let request_header = request
-                .decode_command_custom_header::<CheckTransactionStateRequestHeader>()
-                .unwrap();
+                .decode_command_custom_header::<CheckTransactionStateRequestHeader>();
+            let request_header = match request_header {
+                Ok(header) => header,
+                Err(e) => {
+                    warn!("Failed to decode request header: {:?}", e);
+                    return Ok(Some(
+                        RemotingCommand::create_response_command()
+                            .set_code(ResponseCode::SystemError)
+                            .set_remark(Some("Failed to decode request header".to_string())),
+                    ));
+                }
+            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
.unwrap();
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>();
let request_header = match request_header {
Ok(header) => header,
Err(e) => {
warn!("Failed to decode request header: {:?}", e);
return Ok(Some(
RemotingCommand::create_response_command()
.set_code(ResponseCode::SystemError)
.set_remark(Some("Failed to decode request header".to_string())),
));
}
};

let message_ext = MessageDecoder::decode(
request.get_body_mut().unwrap(),

Check warning on line 223 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L221-L223

Added lines #L221 - L223 were not covered by tests
true,
true,
false,
false,
false,
);
Comment on lines +223 to +229
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid unwrapping on get_body_mut() to prevent potential panics

Unwrapping request.get_body_mut() can lead to a panic if the body is None. It's safer to handle the Option and provide error handling.

Consider the following change:

-            let message_ext = MessageDecoder::decode(
-                request.get_body_mut().unwrap(),
-                true,
-                true,
-                false,
-                false,
-                false,
-            );
+            let message_ext = if let Some(body) = request.get_body_mut() {
+                MessageDecoder::decode(
+                    body,
+                    true,
+                    true,
+                    false,
+                    false,
+                    false,
+                )
+            } else {
+                warn!("Request body is missing");
+                return Ok(Some(
+                    RemotingCommand::create_response_command()
+                        .set_code(ResponseCode::SystemError)
+                        .set_remark(Some("Request body is missing".to_string())),
+                ));
+            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
request.get_body_mut().unwrap(),
true,
true,
false,
false,
false,
);
let message_ext = if let Some(body) = request.get_body_mut() {
MessageDecoder::decode(
body,
true,
true,
false,
false,
false,
)
} else {
warn!("Request body is missing");
return Ok(Some(
RemotingCommand::create_response_command()
.set_code(ResponseCode::SystemError)
.set_remark(Some("Request body is missing".to_string())),
));
};

if let Some(mut message_ext) = message_ext {
if let Some(mut client_instance) = self.client_instance.upgrade() {
if let Some(ref namespace) = client_instance.client_config.get_namespace() {
let topic = NamespaceUtil::without_namespace_with_namespace(
message_ext.get_topic(),
client_instance

Check warning on line 235 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L229-L235

Added lines #L229 - L235 were not covered by tests
.client_config
.get_namespace()
.unwrap_or_default()
.as_str(),
);
message_ext.set_topic(topic.as_str());
}

Check warning on line 242 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L240-L242

Added lines #L240 - L242 were not covered by tests
let transaction_id =
message_ext.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
if !transaction_id.is_empty() {
message_ext.set_transaction_id(transaction_id.as_str());

Check warning on line 247 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L244-L247

Added lines #L244 - L247 were not covered by tests
}
}
let group = message_ext.get_property(MessageConst::PROPERTY_PRODUCER_GROUP);
if let Some(group) = group {
let producer = client_instance.select_producer(&group).await;
if let Some(producer) = producer {
let addr = channel.remote_address().to_string();
producer.check_transaction_state(
addr.as_str(),
message_ext,
request_header,

Check warning on line 258 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L249-L258

Added lines #L249 - L258 were not covered by tests
);
} else {
warn!("checkTransactionState, pick producer group failed");

Check warning on line 261 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L260-L261

Added lines #L260 - L261 were not covered by tests
}
} else {
warn!("checkTransactionState, pick producer group failed");

Check warning on line 264 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L263-L264

Added lines #L263 - L264 were not covered by tests
}
}
} else {
warn!("checkTransactionState, decode message failed");

Check warning on line 268 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L266-L268

Added lines #L266 - L268 were not covered by tests
};
Ok(None)
}

Check warning on line 271 in rocketmq-client/src/implementation/client_remoting_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/client_remoting_processor.rs#L270-L271

Added lines #L270 - L271 were not covered by tests
}
18 changes: 18 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
Expand Down Expand Up @@ -1130,4 +1131,21 @@
))
}
}

pub async fn end_transaction_oneway(
&mut self,
addr: &str,
request_header: EndTransactionRequestHeader,
remark: String,
timeout_millis: u64,
) -> Result<()> {

Check warning on line 1141 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1135-L1141

Added lines #L1135 - L1141 were not covered by tests
let request =
RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header)
.set_remark(Some(remark));

Check warning on line 1144 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1143-L1144

Added lines #L1143 - L1144 were not covered by tests

self.remoting_client
.invoke_oneway(addr.to_string(), request, timeout_millis)
.await;
Ok(())
}

Check warning on line 1150 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1146-L1150

Added lines #L1146 - L1150 were not covered by tests
}
2 changes: 2 additions & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
#![allow(dead_code)]
#![allow(unused_variables)]
#![recursion_limit = "256"]

extern crate core;

use crate::error::MQClientError;
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ pub mod send_callback;
pub mod send_result;
pub mod send_status;
pub mod transaction_listener;
pub mod transaction_mq_produce_builder;
pub mod transaction_mq_producer;
pub mod transaction_send_result;
Loading
Loading