Skip to content

Conversation

@mxsm
Copy link
Owner

@mxsm mxsm commented Oct 26, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1078

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new transaction-producer example for RocketMQ transaction management.
    • Added TransactionMQProducer and TransactionMQProducerBuilder for enhanced transactional message production.
    • New method for decoding message IDs added, improving message handling capabilities.
    • Expanded EndTransactionRequestHeader for better transaction request handling.
  • Improvements

    • Mutable access to the body field in RemotingCommand allows for direct modifications.
    • Updated TransactionListener to accept optional arguments in transaction execution.
  • Bug Fixes

    • Corrected formatting in Cargo.toml for better readability.

These updates provide users with improved functionality and usability in managing transactions within the RocketMQ client.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 26, 2024

Walkthrough

The pull request introduces several enhancements to the RocketMQ client, including the addition of a new transaction producer example and modifications to existing structures and methods to support transactional message handling. Key changes involve the implementation of a TransactionMQProducer, updates to the MQClientInstance, and the introduction of new request handling capabilities. The changes also include structural updates for better type safety and mutability, along with new utility functions for message ID decoding.

Changes

File Path Change Summary
rocketmq-client/Cargo.toml Added example transaction-producer and ensured newline at the end for ordermessage-consumer.
rocketmq-client/examples/transaction/transaction_producer.rs Implemented a transaction producer with message configuration and transaction listener.
rocketmq-client/src/factory/mq_client_instance.rs Updated MQClientInstance to use MQProducerInnerImpl and ArcRefCellWrapper<ClientConfig>.
rocketmq-client/src/hook/end_transaction_context.rs Expanded EndTransactionContext with new fields for transaction management.
rocketmq-client/src/hook/send_message_context.rs Changed producer field type to Option<ArcRefCellWrapper<DefaultMQProducerImpl>>.
rocketmq-client/src/implementation/client_remoting_processor.rs Added handling for new request codes and implemented check_transaction_state method.
rocketmq-client/src/implementation/mq_client_api_impl.rs Introduced end_transaction_oneway method for sending transaction end requests.
rocketmq-client/src/lib.rs Added #![recursion_limit = "256"] for macro recursion depth.
rocketmq-client/src/producer.rs Added new modules: transaction_mq_produce_builder and transaction_mq_producer.
rocketmq-client/src/producer/default_mq_producer.rs Updated send_message_in_transaction method signature and handling.
rocketmq-client/src/producer/local_transaction_state.rs Implemented Display trait for LocalTransactionState.
rocketmq-client/src/producer/mq_producer.rs Updated send_message_in_transaction method signature to accept a generic type parameter.
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs Added transaction-related fields and methods for handling message transactions.
rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs Introduced MQProducerInnerImpl struct with updated method signatures.
rocketmq-client/src/producer/transaction_mq_produce_builder.rs Created builder for TransactionMQProducer with multiple configuration options.
rocketmq-client/src/producer/transaction_mq_producer.rs Implemented TransactionMQProducer struct with transactional message handling capabilities.
rocketmq-common/src/common/message/message_decoder.rs Added decode_message_id function for message ID decoding.
rocketmq-common/src/common/message/message_id.rs Made MessageId struct public and added cloning and comparison traits.
rocketmq-common/src/utils/util_all.rs Changed string_to_bytes function visibility to public.
rocketmq-remoting/src/protocol/header.rs Added new module for end_transaction_request_header.
rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs Introduced EndTransactionRequestHeader struct with serialization and validation logic.
rocketmq-remoting/src/protocol/remoting_command.rs Added get_body_mut method for mutable access to body field.

Assessment against linked issues

Objective Addressed Explanation
Support sending transaction messages for client (#1078)

Possibly related PRs

Suggested labels

approved

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

Poem

In the garden where messages bloom,
A transaction producer finds room.
With each message sent, a tale unfolds,
In RocketMQ's realm, where magic beholds.
So hop with joy, let the data flow,
For every transaction, we reap what we sow! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Oct 26, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 34

🧹 Outside diff range and nitpick comments (24)
rocketmq-common/src/common/message/message_id.rs (1)

20-24: Consider adding documentation for this public API.

Since this is now a public struct used for message identification, it would be helpful to add documentation explaining:

  • The purpose of MessageId
  • The significance of the address and offset fields
  • Any constraints or assumptions about the values

Example documentation:

 #[derive(Debug, Clone, PartialEq, Eq, Copy)]
+/// Uniquely identifies a message in RocketMQ using broker address and offset.
+///
+/// # Fields
+/// * `address` - The socket address of the broker storing the message
+/// * `offset` - The position of the message in the broker's storage
 pub struct MessageId {
     pub address: SocketAddr,
     pub offset: i64,
 }
rocketmq-client/src/hook/end_transaction_context.rs (1)

21-29: Add documentation for the EndTransactionContext struct.

The struct appears to be well-structured for transaction handling, but it lacks documentation explaining its purpose, usage, and the significance of each field. Consider adding rustdoc comments.

Apply this diff to add documentation:

+/// Represents the context for ending a transaction in RocketMQ.
+/// This struct contains all necessary information to complete a transaction operation.
 pub struct EndTransactionContext<'a> {
+    /// The producer group responsible for the transaction
     pub producer_group: String,
+    /// The address of the broker handling the transaction
     pub broker_addr: String,
+    /// Reference to the message involved in the transaction
     pub message: &'a Message,
+    /// Unique identifier for the message
     pub msg_id: String,
+    /// Unique identifier for the transaction
     pub transaction_id: String,
+    /// Current state of the local transaction
     pub transaction_state: LocalTransactionState,
+    /// Indicates if this context is created from a transaction check
     pub from_transaction_check: bool,
 }
rocketmq-client/src/producer.rs (1)

31-32: LGTM! Consider grouping transaction-related modules together.

The new module declarations align well with the PR's objective of supporting transaction messages and follow the existing naming conventions.

Consider grouping all transaction-related modules together for better code organization. You could move these declarations next to other transaction-related modules like transaction_listener and transaction_send_result.

pub mod send_callback;
pub mod send_result;
pub mod send_status;
pub mod transaction_listener;
+pub mod transaction_mq_produce_builder;
+pub mod transaction_mq_producer;
pub mod transaction_send_result;
-pub mod transaction_mq_produce_builder;
-pub mod transaction_mq_producer;
rocketmq-client/src/hook/send_message_context.rs (1)

41-43: Consider adding documentation for the producer field.

Adding documentation comments would help explain:

  • The purpose of using ArcRefCellWrapper
  • Thread safety guarantees
  • Usage guidelines for interior mutability
+    /// Producer instance wrapped in ArcRefCellWrapper for thread-safe mutable access.
+    /// The wrapper provides:
+    /// - Thread-safe sharing through Arc
+    /// - Interior mutability through RefCell
     pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
rocketmq-common/src/utils/util_all.rs (2)

194-194: Add documentation for the public function.

Since this function is now public, it should have documentation explaining its purpose, parameters, return value, and example usage.

Add documentation like this:

+/// Converts a hexadecimal string to a byte vector.
+///
+/// # Arguments
+///
+/// * `hex_string` - A string containing hexadecimal characters (0-9, A-F). Case-insensitive.
+///
+/// # Returns
+///
+/// * `Some(Vec<u8>)` - The converted bytes if the input is valid
+/// * `None` - If the input is empty or contains invalid characters
+///
+/// # Examples
+///
+/// ```
+/// let bytes = string_to_bytes("414243").unwrap();
+/// assert_eq!(bytes, vec![0x41, 0x42, 0x43]);
+/// ```
pub fn string_to_bytes(hex_string: impl Into<String>) -> Option<Vec<u8>> {

Function needs test coverage for hex string conversion functionality

The implementation looks correct, but there are no specific tests for the string_to_bytes function despite it being used in a critical message decoding path. The function is used in message_decoder.rs to decode message IDs, which makes it an important component that should be thoroughly tested.

Key findings:

  • Function is used in decode_message_id for parsing message identifiers
  • No dedicated test cases found for hex string conversion
  • No alternative hex conversion utilities exist in the codebase
🔗 Analysis chain

Line range hint 194-215: LGTM! The implementation is correct and well-tested.

The function is well-implemented with proper:

  • Input validation
  • Error handling using Option
  • Performance optimization with capacity pre-allocation
  • Safe operations throughout

Let's verify its usage in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of string_to_bytes function
# Expected: Used in message_decoder.rs for decoding message IDs

# Search for direct usage of the function
rg "string_to_bytes\(" --type rust

# Search for potential test cases
rg "string_to_bytes" -A 5 --type rust -g "*test*.rs"

Length of output: 465


Script:

#!/bin/bash
# Let's verify the implementation and test coverage more thoroughly

# Check the test file in the same directory as util_all.rs
fd "util_all" --type f --extension rs | grep "test"

# Look for test modules within util_all.rs
rg "^mod test" -A 10 rocketmq-common/src/utils/util_all.rs

# Check message_decoder.rs usage context
rg -B 5 -A 5 "string_to_bytes" rocketmq-common/src/common/message/message_decoder.rs

# Check if there are any other hex string conversion utilities
rg "hex.*string|string.*hex" --type rust

Length of output: 1546

rocketmq-common/src/common/message/message_decoder.rs (2)

391-395: Fix indentation in store size calculation comments.

The comments are accurate but have inconsistent indentation compared to the surrounding code.

Apply this diff to fix the indentation:

-           + 4 // 2 MAGICCOD
-           + 4 // 3 BODYCRC
-           + 4 // 4 FLAG
-           + 4 + body_len // 4 BODY
-           + 2 + properties_length;
+        + 4 // 2 MAGICCOD
+        + 4 // 3 BODYCRC
+        + 4 // 4 FLAG
+        + 4 + body_len // 4 BODY
+        + 2 + properties_length;

582-588: Add more test cases for message ID decoding.

The current test only covers the IPv4 case. Consider adding:

  1. IPv6 message ID test case
  2. Invalid length test case
  3. Invalid format test case

Here's an example of additional test cases:

#[test]
fn decode_message_id_ipv6() {
    let msg_id = "ABCD1234ABCD1234ABCD1234ABCD12340007D8260BF075769D36C348";
    let message_id = decode_message_id(msg_id);
    assert_eq!(message_id.address, "[abcd:1234:abcd:1234:abcd:1234:abcd:1234]:55334".parse().unwrap());
    assert_eq!(message_id.offset, 860316681131967304);
}

#[test]
#[should_panic(expected = "Invalid message ID length")]
fn decode_message_id_invalid_length() {
    decode_message_id("invalid");
}
rocketmq-remoting/src/protocol/remoting_command.rs (1)

519-521: LGTM! Consider adding documentation.

The implementation of get_body_mut follows Rust idioms well and provides the necessary mutable access to support transaction message handling. Consider adding documentation to explain its purpose and usage in the transaction message context.

Add documentation like this:

+    /// Returns a mutable reference to the command body.
+    /// 
+    /// This method is particularly useful when modifying message bodies during transaction processing.
     pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
         self.body.as_mut()
     }
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

1135-1150: Consider adding parameter validation.

The implementation looks good and follows the established patterns. However, consider adding validation for:

  • addr: Ensure it's not empty
  • timeout_millis: Ensure it's within reasonable bounds
 pub async fn end_transaction_oneway(
     &mut self,
     addr: &str,
     request_header: EndTransactionRequestHeader,
     remark: String,
     timeout_millis: u64,
 ) -> Result<()> {
+    if addr.is_empty() {
+        return Err(MQClientError::MQClientErr(-1, "addr cannot be empty".to_string()));
+    }
+    if timeout_millis == 0 || timeout_millis > 3600000 { // 1 hour max
+        return Err(MQClientError::MQClientErr(-1, "invalid timeout_millis".to_string()));
+    }
+
     let request =
         RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header)
             .set_remark(Some(remark));
rocketmq-client/examples/transaction/transaction_producer.rs (4)

60-60: Handle the result of producer.shutdown().await

The shutdown method returns a Result. It's good practice to handle potential errors by using the ? operator to propagate them.

Apply this diff to handle possible errors during shutdown:

-producer.shutdown().await;
+producer.shutdown().await?;

33-33: Remove unused MESSAGE_COUNT constant

The MESSAGE_COUNT constant is defined but not used in the code. If it's unnecessary, consider removing it to keep the code clean.

Apply this diff to remove the unused constant:

-pub const MESSAGE_COUNT: usize = 1;

34-34: Replace placeholder producer group name with a meaningful identifier

The PRODUCER_GROUP constant is set to "please_rename_unique_group_name". For clarity and to avoid confusion, replace it with a meaningful producer group name.

Apply this diff to update the producer group name:

-pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
+pub const PRODUCER_GROUP: &str = "transaction_producer_group";

94-96: Define constants for transaction statuses

Using named constants instead of magic numbers improves code readability and maintainability. Define constants for the transaction status codes.

Apply this diff to define and use status constants:

+const STATUS_COMMIT: i32 = 1;
+const STATUS_ROLLBACK: i32 = 2;

 match status {
-    1 => LocalTransactionState::CommitMessage,
-    2 => LocalTransactionState::RollbackMessage,
+    STATUS_COMMIT => LocalTransactionState::CommitMessage,
+    STATUS_ROLLBACK => LocalTransactionState::RollbackMessage,
     _ => LocalTransactionState::Unknown,
 }
rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)

74-83: Ensure consistent parameter naming in method signatures.

In the check_transaction_state method of MQProducerInnerImpl, the parameter is named addr, whereas in the trait it's broker_addr. This inconsistency can cause confusion.

Consider renaming addr to broker_addr for consistency:

pub fn check_transaction_state(
    &self,
-   addr: &str,
+   broker_addr: &str,
    msg: MessageExt,
    check_request_header: CheckTransactionStateRequestHeader,
) {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
-       default_mqproducer_impl_inner.check_transaction_state(addr, msg, check_request_header);
+       default_mqproducer_impl_inner.check_transaction_state(broker_addr, msg, check_request_header);
    }
}

91-96: Simplify the is_unit_mode method with default value.

Since is_unit_mode returns false when default_mqproducer_impl_inner is None, consider simplifying the method.

pub fn is_unit_mode(&self) -> bool {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
        return default_mqproducer_impl_inner.is_unit_mode();
    }
-   false
+   // Default to false if not set
    false
}

Ensure that returning false is the intended default behavior.

rocketmq-client/src/implementation/client_remoting_processor.rs (1)

261-264: Differentiate warning messages for better debugging

The warning messages at lines 261 and 264 are the same, which may make debugging difficult. Consider making them more specific.

You can update the warnings as follows:

                     } else {
-                        warn!("checkTransactionState, pick producer group failed");
+                        warn!("checkTransactionState: No producer found for group '{}'", group);
                     }
                 } else {
-                    warn!("checkTransactionState, pick producer group failed");
+                    warn!("checkTransactionState: Producer group not specified in message properties");
                 }
rocketmq-client/src/producer/transaction_mq_produce_builder.rs (2)

35-61: Improve readability by grouping related fields or adding documentation

The TransactionMQProducerBuilder struct contains numerous optional fields, which can make the code harder to read and maintain. Consider grouping related fields together or adding documentation comments to explain the purpose of each field group. This will enhance clarity for future maintainers.


248-335: Ensure required configurations are validated in the build method

The build method does not currently validate whether all necessary configurations have been provided. To prevent runtime errors, consider adding validation logic to check for the presence of essential fields and provide default values or error messages when they are missing.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)

Line range hint 99-111: Review use of Arc and ArcRefCellWrapper for shared state

The struct DefaultMQProducerImpl uses Arc and ArcRefCellWrapper for shared fields. Ensure that this usage properly handles concurrency and avoids potential deadlocks or mutable borrow issues.

Consider whether using Arc<Mutex<T>> or other concurrency primitives would be more appropriate for the shared mutable state.


1847-1849: Clear delay properties only if necessary

The code clears the PROPERTY_DELAY_TIME_LEVEL if it's not zero. Ensure that this is the intended behavior and that it doesn't affect other message properties unexpectedly.

Double-check that clearing this property is required and doesn't have side effects on message delivery timing.


1939-1945: Clarify logic for commit_or_rollback in request header

The mapping of local_transaction_state to commit_or_rollback flags in EndTransactionRequestHeader might benefit from comments or refactoring for clarity.

Consider adding comments or refactoring to make the mapping between transaction states and sys flags clearer.

rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (2)

27-40: Ensure field ordering aligns with serialization expectations

The #[serde(flatten)] attribute is applied to rpc_request_header after other fields. While this works, it's generally recommended to place flattened fields before other fields to avoid potential conflicts or unexpected behavior during serialization.

Consider rearranging the struct fields:

 pub struct EndTransactionRequestHeader {
+    #[serde(flatten)]
+    pub rpc_request_header: RpcRequestHeader,
     pub topic: String,
     pub producer_group: String,
     tran_state_table_offset: u64,
     commit_log_offset: u64,
     commit_or_rollback: i32,
     from_transaction_check: bool,
     msg_id: String,
     transaction_id: Option<String>,
-    #[serde(flatten)]
-    pub rpc_request_header: RpcRequestHeader,
 }

17-26: Organize imports alphabetically and by crate

For better readability and maintenance, consider organizing imports first by external crates, then by internal crates, and sorting them alphabetically.

Reorder the imports:

 use anyhow::anyhow;
 use anyhow::Error;
 use serde::Deserialize;
 use serde::Serialize;
+use crate::protocol::command_custom_header::CommandCustomHeader;
+use crate::protocol::command_custom_header::FromMap;
+use crate::rpc::rpc_request_header::RpcRequestHeader;
+use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 18afaac and ae75277.

📒 Files selected for processing (22)
  • rocketmq-client/Cargo.toml (1 hunks)
  • rocketmq-client/examples/transaction/transaction_producer.rs (1 hunks)
  • rocketmq-client/src/factory/mq_client_instance.rs (5 hunks)
  • rocketmq-client/src/hook/end_transaction_context.rs (1 hunks)
  • rocketmq-client/src/hook/send_message_context.rs (2 hunks)
  • rocketmq-client/src/implementation/client_remoting_processor.rs (4 hunks)
  • rocketmq-client/src/implementation/mq_client_api_impl.rs (2 hunks)
  • rocketmq-client/src/lib.rs (1 hunks)
  • rocketmq-client/src/producer.rs (1 hunks)
  • rocketmq-client/src/producer/default_mq_producer.rs (2 hunks)
  • rocketmq-client/src/producer/local_transaction_state.rs (2 hunks)
  • rocketmq-client/src/producer/mq_producer.rs (1 hunks)
  • rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (16 hunks)
  • rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2 hunks)
  • rocketmq-client/src/producer/transaction_mq_produce_builder.rs (1 hunks)
  • rocketmq-client/src/producer/transaction_mq_producer.rs (1 hunks)
  • rocketmq-common/src/common/message/message_decoder.rs (5 hunks)
  • rocketmq-common/src/common/message/message_id.rs (1 hunks)
  • rocketmq-common/src/utils/util_all.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • rocketmq-client/src/lib.rs
🔇 Additional comments (23)
rocketmq-common/src/common/message/message_id.rs (1)

20-21: LGTM! Appropriate trait implementations for message identification.

The added traits (Clone, PartialEq, Eq, Copy) and public visibility are well-suited for message identification in a distributed system. The implementation is safe as both fields (SocketAddr and i64) implement these traits.

rocketmq-client/src/hook/end_transaction_context.rs (1)

21-29: Verify transaction state handling across the codebase.

The introduction of transaction state handling might require consistent implementation across other components.

Let's verify the transaction state handling implementation:

✅ Verification successful

Based on the shell script results, I can now generate the final response:

Transaction state handling is consistently implemented across the codebase

The transaction state implementation shows a well-structured and consistent pattern:

  • LocalTransactionState enum is properly defined with serialization/deserialization
  • Transaction state is correctly used in producer implementations for commit/rollback handling
  • End transaction hooks provide proper extension points
  • Example code demonstrates correct usage patterns

The only minor concern is the unimplemented end_transaction method in EndTransactionTraceHookImpl, but this appears to be a planned extension point rather than a critical issue.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for consistent transaction state handling across the codebase

# Check for LocalTransactionState usage
echo "Checking LocalTransactionState usage patterns:"
rg -A 5 "LocalTransactionState"

# Check for transaction-related implementations
echo "Checking transaction-related implementations:"
ast-grep --pattern 'impl$$$Transaction$$$'

# Check for other transaction context usage
echo "Checking EndTransactionContext usage:"
rg -A 5 "EndTransactionContext"

Length of output: 19939

rocketmq-client/src/hook/send_message_context.rs (1)

42-42: LGTM! Thread-safe producer field type.

The change to Option<ArcRefCellWrapper<DefaultMQProducerImpl>> is a good improvement as it provides:

  • Thread-safe sharing through Arc
  • Interior mutability through RefCell
  • Consistent with the codebase's concurrency patterns
rocketmq-client/Cargo.toml (2)

84-85: LGTM! Improved formatting with newline.

The added newline improves readability and follows Rust's formatting conventions.


86-88: Verify the transaction example directory structure.

The new transaction producer example follows the established pattern of organizing examples by feature. Let's verify the directory structure exists.

✅ Verification successful

Directory structure verified and follows the established pattern

The transaction example follows the established pattern in the codebase:

  • Located at the correct path: rocketmq-client/examples/transaction/transaction_producer.rs
  • Follows the same directory structure as other examples like:
    • rocketmq-client/examples/batch/
    • rocketmq-client/examples/broadcast/
    • rocketmq-client/examples/ordermessage/
    • rocketmq-client/examples/quickstart/

The example's location and structure are consistent with the project's conventions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the transaction example directory structure and file

# Check if the example directory and file exist
fd --type d "transaction$" "examples/"
fd --type f "transaction_producer.rs$" "examples/"

# Verify the example follows the same structure as other examples
fd --type f "_producer.rs$" "examples/" --exec echo "Found producer example: {}"

Length of output: 461


Script:

#!/bin/bash
# Let's first check if the examples directory exists and its location
fd -t d "examples"

# Then check for any transaction-related files in the repository
fd -t f "transaction.*\.rs$"

# List all Rust files to understand the example structure
fd -t f "\.rs$"

Length of output: 31904

rocketmq-remoting/src/protocol/header.rs (1)

23-23: LGTM! Verify the new module implementation.

The new module declaration is correctly placed and follows the established naming pattern.

Let's verify that the module file exists and contains the expected transaction-related header implementation:

✅ Verification successful

The new module is properly implemented and integrated

The verification confirms:

  • The module file exists at the correct location
  • The EndTransactionRequestHeader struct is properly implemented with all necessary fields and methods
  • The module is correctly integrated and used in the producer implementation for transaction message handling
  • The implementation follows the codebase's patterns by implementing required traits (CommandCustomHeader, FromMap)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the existence and implementation of the end_transaction_request_header module

# Test 1: Check if the module file exists
fd "end_transaction_request_header.rs" --type f

# Test 2: Verify the implementation includes transaction-related structures
rg -A 5 "EndTransactionRequestHeader" 

Length of output: 10953

rocketmq-client/src/producer/local_transaction_state.rs (3)

18-18: LGTM!

The import of Display trait is correctly added.


Line range hint 24-30: LGTM!

The enum definition follows Rust best practices with appropriate derive macros and a sensible default variant.


81-89: ⚠️ Potential issue

Consider standardizing the "UNKNOWN" spelling across the codebase.

The current implementation uses "UNKNOW" in the Display implementation, while the Deserialize implementation accepts both "UNKNOW" and "UNKNOWN". This inconsistency could lead to confusion:

  1. "UNKNOW" appears to be a typo of "UNKNOWN"
  2. The deserialize implementation suggests "UNKNOWN" is also valid
  3. The comment "java is UNKNOW" indicates this might be for compatibility

Consider either:

  1. Standardizing on "UNKNOWN" throughout the codebase:
-            LocalTransactionState::Unknown => write!(f, "UNKNOW"),
+            LocalTransactionState::Unknown => write!(f, "UNKNOWN"),
  1. Or documenting the reason for maintaining "UNKNOW" if it's required for Java compatibility.

Let's check if "UNKNOW" is used consistently across the codebase:

✅ Verification successful

Let me gather more context about the Java compatibility aspect since there's a comment suggesting this might be intentional.


Based on the gathered evidence, I can now provide the final response:

Document the Java compatibility requirement for "UNKNOW" spelling

The "UNKNOW" spelling appears to be intentionally maintained for Java compatibility, as indicated by the comment "//java is UNKNOW" in the enum definition. The current implementation correctly handles both spellings in deserialization while maintaining the Java-compatible "UNKNOW" in serialization.

Recommended actions:

  • Add a more detailed documentation comment explaining the Java compatibility requirement:
/// Represents the state of a local transaction.
/// Note: The `Unknown` variant serializes to "UNKNOW" (without 'N') 
/// to maintain compatibility with the Java implementation of RocketMQ.
pub enum LocalTransactionState {
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for both spellings to understand the convention
echo "Checking for 'UNKNOW' usage:"
rg "UNKNOW" --type rust

echo -e "\nChecking for 'UNKNOWN' usage:"
rg "UNKNOWN" --type rust

Length of output: 2943


Script:

#!/bin/bash
# Check for any Java-related compatibility code or comments
rg -B 2 -A 2 "java" --type rust

# Also check the specific enum implementation more thoroughly
ast-grep --pattern 'enum LocalTransactionState {
  $$$
}'

Length of output: 3174

rocketmq-common/src/common/message/message_decoder.rs (1)

18-18: LGTM! Import changes are well-organized.

The new imports are properly organized and necessary for the message ID decoding functionality.

Also applies to: 34-34, 39-39

rocketmq-client/src/producer/mq_producer.rs (1)

431-437: LGTM! The signature changes improve flexibility and safety.

The updated method signature introduces several improvements:

  • Generic type parameter allows passing any thread-safe type as transaction argument
  • Taking ownership of the message is more idiomatic in Rust
  • Mutability requirement ensures thread-safety

Let's verify that all implementations have been updated to match the new signature:

✅ Verification successful

All implementations correctly match the new signature

The search results show that all implementations of send_message_in_transaction have been properly updated to match the new signature:

  • transaction_mq_producer.rs: Implements with the new generic signature
  • default_mq_producer.rs: Implements with the new signature but explicitly unimplements the functionality
  • default_mq_producer_impl.rs: Internal implementation matches the new signature

The changes are consistent across the codebase and all implementations properly handle the breaking changes introduced in the trait.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for implementations of send_message_in_transaction
ast-grep --pattern 'impl $_ for $_ {
  $$$
  async fn send_message_in_transaction$_($_, $_, $_) {
    $$$
  }
  $$$
}'

Length of output: 120


Script:

#!/bin/bash
# Let's try a broader search for send_message_in_transaction implementations
rg "send_message_in_transaction" -A 5 -B 2

Length of output: 3988

rocketmq-client/src/producer/default_mq_producer.rs (2)

351-356: LGTM: Thread-safe producer implementation handling.

The implementation correctly manages thread-safe sharing of the producer implementation using ArcRefCellWrapper.


1005-1013: ⚠️ Potential issue

Implementation missing for transaction message support.

The PR aims to add transaction message support, but the send_message_in_transaction method is marked as unimplemented. This will cause runtime panics when called. Please implement the transaction message handling logic as per the PR objectives.

Let's verify if there are any transaction-related implementations in other files:

rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

49-49: LGTM!

The import statement for EndTransactionRequestHeader is correctly placed and follows the existing import pattern.

rocketmq-client/src/factory/mq_client_instance.rs (3)

69-69: LGTM: Improved type safety and mutability handling.

The changes to field types enhance the implementation by:

  1. Using ArcRefCellWrapper for client_config enables thread-safe interior mutability
  2. Replacing boxed trait objects with concrete MQProducerInnerImpl improves type safety and performance

Also applies to: 76-76


Line range hint 329-338: LGTM: Thread-safe producer registration.

The implementation correctly handles producer registration with proper thread safety using RwLock and appropriate error handling.


1058-1061: LGTM: Clean producer selection implementation.

The new select_producer method provides thread-safe access to producers with proper ownership handling through cloning.

rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)

67-72: ⚠️ Potential issue

Avoid using unreachable!() macro in production code.

In the get_check_listener method, using unreachable!() may lead to runtime panics if default_mqproducer_impl_inner is None. This could cause the application to crash unexpectedly.

Consider handling the None case gracefully or ensuring that default_mqproducer_impl_inner cannot be None:

pub fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>> {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
        return default_mqproducer_impl_inner.get_check_listener();
    }
-   unreachable!("default_mqproducer_impl_inner is None")
+   panic!("default_mqproducer_impl_inner should not be None");
}

Alternatively, remove the Option wrapper as suggested earlier.

Likely invalid or redundant comment.


85-89: Ensure mutable access is properly handled.

The method update_topic_publish_info takes &mut self, but the borrowing of default_mqproducer_impl_inner should ensure mutable access correctly. Verify that there are no data races or borrowing issues.

Run the following script to check for any borrowing conflicts:

✅ Verification successful

Let me gather more information about the implementation and usage of TopicPublishInfo to ensure thread safety and proper mutation.


Let me analyze the mutable access pattern by checking the implementation of TopicPublishInfo and its usage.


Based on the code analysis, I can now provide the final response:

The mutable access is properly handled and safe.

The TopicPublishInfo struct is designed to be thread-safe through:

  1. #[derive(Clone)] implementation allowing safe copying
  2. Storage in an Arc<RwLock<HashMap>> in the producer implementation, providing synchronized access
  3. Proper mutable access through RwLock when updating via update_topic_publish_info

The implementation follows Rust's ownership and borrowing rules correctly, with synchronized access patterns preventing data races.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for mutable borrowing issues in update_topic_publish_info method.

# Expect: No error messages about borrowing issues.
rg 'update_topic_publish_info' -A 5

Length of output: 2754


Script:

#!/bin/bash
# Check TopicPublishInfo implementation and usage
ast-grep --pattern 'struct TopicPublishInfo {
  $$$
}'

# Check for any concurrent access patterns
rg "topic_publish_info" -B 2 -A 2

# Check for any Arc/Mutex usage with TopicPublishInfo
rg "Arc|Mutex.*TopicPublishInfo" -B 1 -A 1

Length of output: 383290


Script:

#!/bin/bash
# Check TopicPublishInfo implementation and usage
ast-grep --pattern 'struct TopicPublishInfo {
  $$$
}'

# Check for any concurrent access patterns in TopicPublishInfo
rg "TopicPublishInfo" -A 5 -B 2 rocketmq-client/src/producer/producer_impl/topic_publish_info.rs

Length of output: 696

rocketmq-client/src/implementation/client_remoting_processor.rs (1)

213-271: check_transaction_state method implementation looks correct

The check_transaction_state method is implemented properly, with appropriate logic for handling transaction state checks.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)

1861-1906: Handle possible None values for transaction IDs

When retrieving and setting the transaction ID from message properties, ensure that transaction_id is available to prevent potential issues.

Confirm that transaction_id is always present in message properties or handle the None case appropriately.


2078-2081: Handle potential None values when retrieving unique message ID

When retrieving unique_key from message properties, if it is None, you fall back to msg.msg_id.clone(). Ensure that msg.msg_id is always set or handle the case where both values could be None to prevent issues.

Add checks or defaults to handle cases where both unique_key and msg.msg_id might be missing.


1973-2002: 🛠️ Refactor suggestion

Ensure end_transaction_hook_list is properly synchronized

The end_transaction_hook_list may be accessed concurrently without synchronization. Consider wrapping it with a thread-safe structure like Arc<Mutex<...>> to prevent data races.

Check if synchronization is necessary and implement appropriate thread safety mechanisms.

Comment on lines +21 to +29
pub struct EndTransactionContext<'a> {
pub producer_group: String,
pub broker_addr: String,
pub message: &'a Message,
pub msg_id: String,
pub transaction_id: String,
pub transaction_state: LocalTransactionState,
pub from_transaction_check: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider encapsulating fields and adding validation.

The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation.

Here's a suggested implementation with better encapsulation and validation:

/// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
    producer_group: String,
    broker_addr: String,
    message: &'a Message,
    msg_id: String,
    transaction_id: String,
    transaction_state: LocalTransactionState,
    from_transaction_check: bool,
}

impl<'a> EndTransactionContext<'a> {
    /// Creates a new EndTransactionContext with validation
    pub fn new(
        producer_group: String,
        broker_addr: String,
        message: &'a Message,
        msg_id: String,
        transaction_id: String,
        transaction_state: LocalTransactionState,
        from_transaction_check: bool,
    ) -> Result<Self, &'static str> {
        // Validate inputs
        if producer_group.is_empty() {
            return Err("producer_group cannot be empty");
        }
        if broker_addr.is_empty() {
            return Err("broker_addr cannot be empty");
        }
        if msg_id.is_empty() {
            return Err("msg_id cannot be empty");
        }
        if transaction_id.is_empty() {
            return Err("transaction_id cannot be empty");
        }

        Ok(Self {
            producer_group,
            broker_addr,
            message,
            msg_id,
            transaction_id,
            transaction_state,
            from_transaction_check,
        })
    }

    // Getters
    pub fn producer_group(&self) -> &str {
        &self.producer_group
    }

    pub fn broker_addr(&self) -> &str {
        &self.broker_addr
    }

    pub fn message(&self) -> &Message {
        self.message
    }

    pub fn msg_id(&self) -> &str {
        &self.msg_id
    }

    pub fn transaction_id(&self) -> &str {
        &self.transaction_id
    }

    pub fn transaction_state(&self) -> LocalTransactionState {
        self.transaction_state
    }

    pub fn from_transaction_check(&self) -> bool {
        self.from_transaction_check
    }

    // Setter for mutable state only
    pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
        self.transaction_state = state;
    }
}

Comment on lines +524 to +542
pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
offset: buffer.get_i64(),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and improve variable naming.

The function handles both IPv4 and IPv6 addresses well, but could be improved in two areas:

  1. There's no validation of the input string length before processing
  2. The variable name len is misleading as it stores a socket address

Apply these improvements:

 pub fn decode_message_id(msg_id: &str) -> MessageId {
+    // Message ID should be either 32 chars (IPv4) or 56 chars (IPv6)
+    if msg_id.len() != 32 && msg_id.len() != 56 {
+        panic!("Invalid message ID length: {}", msg_id.len());
+    }
     let bytes = util_all::string_to_bytes(msg_id).unwrap();
     let mut buffer = Bytes::from(bytes);
-    let len = if msg_id.len() == 32 {
+    let address = if msg_id.len() == 32 {
         let mut ip = [0u8; 4];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
         SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
     } else {
         let mut ip = [0u8; 16];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
         SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
     };
     MessageId {
-        address: len,
+        address,
         offset: buffer.get_i64(),
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
offset: buffer.get_i64(),
}
}
pub fn decode_message_id(msg_id: &str) -> MessageId {
// Message ID should be either 32 chars (IPv4) or 56 chars (IPv6)
if msg_id.len() != 32 && msg_id.len() != 56 {
panic!("Invalid message ID length: {}", msg_id.len());
}
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let address = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address,
offset: buffer.get_i64(),
}
}

Comment on lines +1058 to +1061
pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

Transaction producer cleanup needs implementation

The verification reveals incomplete producer cleanup implementation:

  • unregister_producer is properly implemented to handle producer table cleanup
  • However, transaction producer's shutdown() is unimplemented (marked with unimplemented!())
  • Default producer's shutdown only handles accumulator and trace dispatcher cleanup
🔗 Analysis chain

Verify producer cleanup in transaction scenarios.

Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure.


Let me gather more information about the producer cleanup implementation and transaction handling.


Let me gather more specific information about producer cleanup in the unregister flow.


Let me check one final aspect about producer cleanup in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove" 

Length of output: 1583


Script:

#!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction" 

# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'

# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"

Length of output: 22008


Script:

#!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
  $$$
}'

# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"

Length of output: 3022


Script:

#!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
  $$$
}'

# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"

Length of output: 3087

use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Import TransactionMQProducer for transactional messaging

Since this example is intended to demonstrate transactional message sending, you should import TransactionMQProducer instead of DefaultMQProducer to enable transactional functionality.

Apply this diff to import TransactionMQProducer:

-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
+use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;

Comment on lines 47 to 50
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use TransactionMQProducer instead of DefaultMQProducer for transactional messaging

To properly handle transactional messages, you should instantiate a TransactionMQProducer rather than a DefaultMQProducer. This ensures that the producer is configured correctly for transactional operations.

Apply this diff to use TransactionMQProducer and attach the transaction listener:

-let builder = DefaultMQProducer::builder();
+let builder = TransactionMQProducer::builder();

 let mut producer = builder
+    .transaction_listener(Arc::new(TransactionListenerImpl::default()))
     .producer_group(PRODUCER_GROUP.to_string())
     .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
     .build();

Committable suggestion was skipped due to low confidence.

Comment on lines +1908 to +1967
pub async fn end_transaction(
&mut self,
msg: &Message,
send_result: &SendResult,
local_transaction_state: LocalTransactionState,
) -> Result<()> {
let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id {
MessageDecoder::decode_message_id(offset_msg_id)
} else {
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap())
};
let transaction_id = send_result.transaction_id.clone();
let queue = self
.client_config
.queue_with_namespace(send_result.message_queue.clone().unwrap());
let dest_broker_name = self
.client_instance
.as_mut()
.unwrap()
.get_broker_name_from_message_queue(&queue)
.await;
let broker_addr = self
.client_instance
.as_mut()
.unwrap()
.find_broker_address_in_publish(dest_broker_name.as_str())
.await;
let request_header = EndTransactionRequestHeader {
topic: msg.get_topic().to_string(),
producer_group: self.producer_config.producer_group().to_string(),
tran_state_table_offset: send_result.queue_offset,
commit_log_offset: id.offset as u64,
commit_or_rollback: match local_transaction_state {
LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE,
LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: false,
msg_id: send_result.msg_id.clone().unwrap_or_default(),
transaction_id,
rpc_request_header: RpcRequestHeader {
broker_name: Some(dest_broker_name),
..Default::default()
},
};
self.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_ref().unwrap(),
request_header,
"".to_string(),
self.producer_config.send_msg_timeout() as u64,
)
.await;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check result of end_transaction_oneway to handle potential errors

In the end_transaction method, the result of end_transaction_oneway is not checked, which may ignore any errors that occur during the transaction ending process.

Modify the code to handle errors:

- self.client_instance
-     .as_mut()
-     .unwrap()
-     .mq_client_api_impl
-     .as_mut()
-     .unwrap()
-     .end_transaction_oneway(
-         broker_addr.as_ref().unwrap(),
-         request_header,
-         "".to_string(),
-         self.producer_config.send_msg_timeout() as u64,
-     )
-     .await;
+ let result = self.client_instance
+     .as_mut()
+     .unwrap()
+     .mq_client_api_impl
+     .as_mut()
+     .unwrap()
+     .end_transaction_oneway(
+         broker_addr.as_ref().unwrap(),
+         request_header,
+         "".to_string(),
+         self.producer_config.send_msg_timeout() as u64,
+     )
+     .await;
+ if let Err(e) = result {
+     warn!("Failed to end transaction: {}", e);
+     return Err(e);
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn end_transaction(
&mut self,
msg: &Message,
send_result: &SendResult,
local_transaction_state: LocalTransactionState,
) -> Result<()> {
let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id {
MessageDecoder::decode_message_id(offset_msg_id)
} else {
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap())
};
let transaction_id = send_result.transaction_id.clone();
let queue = self
.client_config
.queue_with_namespace(send_result.message_queue.clone().unwrap());
let dest_broker_name = self
.client_instance
.as_mut()
.unwrap()
.get_broker_name_from_message_queue(&queue)
.await;
let broker_addr = self
.client_instance
.as_mut()
.unwrap()
.find_broker_address_in_publish(dest_broker_name.as_str())
.await;
let request_header = EndTransactionRequestHeader {
topic: msg.get_topic().to_string(),
producer_group: self.producer_config.producer_group().to_string(),
tran_state_table_offset: send_result.queue_offset,
commit_log_offset: id.offset as u64,
commit_or_rollback: match local_transaction_state {
LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE,
LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: false,
msg_id: send_result.msg_id.clone().unwrap_or_default(),
transaction_id,
rpc_request_header: RpcRequestHeader {
broker_name: Some(dest_broker_name),
..Default::default()
},
};
self.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_ref().unwrap(),
request_header,
"".to_string(),
self.producer_config.send_msg_timeout() as u64,
)
.await;
Ok(())
}
pub async fn end_transaction(
&mut self,
msg: &Message,
send_result: &SendResult,
local_transaction_state: LocalTransactionState,
) -> Result<()> {
let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id {
MessageDecoder::decode_message_id(offset_msg_id)
} else {
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap())
};
let transaction_id = send_result.transaction_id.clone();
let queue = self
.client_config
.queue_with_namespace(send_result.message_queue.clone().unwrap());
let dest_broker_name = self
.client_instance
.as_mut()
.unwrap()
.get_broker_name_from_message_queue(&queue)
.await;
let broker_addr = self
.client_instance
.as_mut()
.unwrap()
.find_broker_address_in_publish(dest_broker_name.as_str())
.await;
let request_header = EndTransactionRequestHeader {
topic: msg.get_topic().to_string(),
producer_group: self.producer_config.producer_group().to_string(),
tran_state_table_offset: send_result.queue_offset,
commit_log_offset: id.offset as u64,
commit_or_rollback: match local_transaction_state {
LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE,
LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: false,
msg_id: send_result.msg_id.clone().unwrap_or_default(),
transaction_id,
rpc_request_header: RpcRequestHeader {
broker_name: Some(dest_broker_name),
..Default::default()
},
};
let result = self.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_ref().unwrap(),
request_header,
"".to_string(),
self.producer_config.send_msg_timeout() as u64,
)
.await;
if let Err(e) = result {
warn!("Failed to end transaction: {}", e);
return Err(e);
}
Ok(())
}

Comment on lines +42 to +51
impl EndTransactionRequestHeader {
pub const TOPIC: &'static str = "topic";
pub const PRODUCER_GROUP: &'static str = "producerGroup";
pub const TRAN_STATE_TABLE_OFFSET: &'static str = "tranStateTableOffset";
pub const COMMIT_LOG_OFFSET: &'static str = "commitLogOffset";
pub const COMMIT_OR_ROLLBACK: &'static str = "commitOrRollback";
pub const FROM_TRANSACTION_CHECK: &'static str = "fromTransactionCheck";
pub const MSG_ID: &'static str = "msgId";
pub const TRANSACTION_ID: &'static str = "transactionId";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using an enum for constant field names

The constant field names are defined as pub const strings. Using an enum with &'static str variants can improve code organization and prevent typos.

Example:

pub enum Field {
    Topic,
    ProducerGroup,
    TranStateTableOffset,
    CommitLogOffset,
    CommitOrRollback,
    FromTransactionCheck,
    MsgId,
    TransactionId,
}

impl Field {
    pub fn as_str(&self) -> &'static str {
        match self {
            Field::Topic => "topic",
            Field::ProducerGroup => "producerGroup",
            Field::TranStateTableOffset => "tranStateTableOffset",
            Field::CommitLogOffset => "commitLogOffset",
            Field::CommitOrRollback => "commitOrRollback",
            Field::FromTransactionCheck => "fromTransactionCheck",
            Field::MsgId => "msgId",
            Field::TransactionId => "transactionId",
        }
    }
}

Adjust the code to use Field::Topic.as_str() wherever needed.

Comment on lines +87 to +98
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("commitOrRollback field wrong"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in check_fields method

The check_fields method defaults to an error message "commitOrRollback field wrong" when commit_or_rollback does not match any of the expected types. To improve debugging and clarity, consider including the invalid value in the error message.

Apply this diff to enhance the error message:

 fn check_fields(&self) -> anyhow::Result<(), Error> {
     if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
         return Ok(());
     }
     if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
         return Ok(());
     }
     if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
         return Ok(());
     }
-    Err(anyhow!("commitOrRollback field wrong"))
+    Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("commitOrRollback field wrong"))
}
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback))
}

Comment on lines +54 to +85
fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
let mut map = std::collections::HashMap::new();
map.insert(Self::TOPIC.to_string(), self.topic.clone());
map.insert(
Self::PRODUCER_GROUP.to_string(),
self.producer_group.clone(),
);
map.insert(
Self::TRAN_STATE_TABLE_OFFSET.to_string(),
self.tran_state_table_offset.to_string(),
);
map.insert(
Self::COMMIT_LOG_OFFSET.to_string(),
self.commit_log_offset.to_string(),
);
map.insert(
Self::COMMIT_OR_ROLLBACK.to_string(),
self.commit_or_rollback.to_string(),
);
map.insert(
Self::FROM_TRANSACTION_CHECK.to_string(),
self.from_transaction_check.to_string(),
);
map.insert(Self::MSG_ID.to_string(), self.msg_id.clone());
if let Some(value) = self.transaction_id.as_ref() {
map.insert(Self::TRANSACTION_ID.to_string(), value.clone());
}
if let Some(value) = self.rpc_request_header.to_map() {
map.extend(value);
}
Some(map)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify to_map method using a serializer

Manually constructing the map in to_map is error-prone and verbose. Consider using serde_json::to_value followed by converting to a map, which leverages serialization attributes and ensures consistency.

Example:

 fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
-    let mut map = std::collections::HashMap::new();
-    map.insert(Self::TOPIC.to_string(), self.topic.clone());
-    // ... (other inserts)
-    if let Some(value) = self.rpc_request_header.to_map() {
-        map.extend(value);
-    }
-    Some(map)
+    let value = serde_json::to_value(self).ok()?;
+    let map = serde_json::from_value(value).ok()?;
+    Some(map)
 }

This approach reduces the potential for errors and keeps the method concise.

Committable suggestion was skipped due to low confidence.

Comment on lines +104 to +139
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or_default(),
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default(),
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid defaulting to zero on parse failure in from method

In the from method of the FromMap implementation, numeric fields default to zero when parsing fails. This might mask issues where invalid data is provided. It's safer to propagate the error or return None if parsing fails to prevent unintended behavior due to default values.

Apply this diff to handle parsing errors appropriately:

 fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
     Some(EndTransactionRequestHeader {
         topic: map
             .get(EndTransactionRequestHeader::TOPIC)
             .cloned()
             .unwrap_or_default(),
         producer_group: map
             .get(EndTransactionRequestHeader::PRODUCER_GROUP)
             .cloned()
             .unwrap_or_default(),
         tran_state_table_offset: map
             .get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
-            .and_then(|s| s.parse::<u64>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<u64>().ok())?,
         commit_log_offset: map
             .get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
-            .and_then(|s| s.parse::<u64>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<u64>().ok())?,
         commit_or_rollback: map
             .get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
-            .and_then(|s| s.parse::<i32>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<i32>().ok())?,
         from_transaction_check: map
             .get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
-            .and_then(|s| s.parse::<bool>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<bool>().ok())?,
         msg_id: map
             .get(EndTransactionRequestHeader::MSG_ID)
             .cloned()
             .unwrap_or_default(),
         transaction_id: map
             .get(EndTransactionRequestHeader::TRANSACTION_ID)
             .cloned(),
         rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
     })
 }

This change will cause the method to return None if parsing fails, making it explicit when invalid data is encountered.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or_default(),
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default(),
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())?,
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())?,
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())?,
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())?,
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}

@mxsm
Copy link
Owner Author

mxsm commented Oct 27, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🧹 Outside diff range and nitpick comments (5)
rocketmq-client/src/producer/transaction_listener.rs (1)

28-28: LGTM: Improved parameter type for better thread safety and flexibility

The change to Option<&(dyn Any + Send + Sync)> is a good improvement because:

  1. Making the argument optional provides flexibility when no context is needed
  2. Adding Send + Sync bounds ensures thread safety in concurrent transaction processing

This change aligns well with Rust's concurrency model and makes the transaction processing system more robust. The Send + Sync bounds ensure that the transaction context can be safely shared across thread boundaries, which is crucial for a message queue system.

rocketmq-client/src/producer/transaction_send_result.rs (1)

33-41: Consider enhancing the display format for better readability

The implementation is correct and safe, but consider making the output more user-friendly:

  1. Hide the Option wrapper syntax
  2. Add more descriptive field labels
  3. Format null states more elegantly

Consider this alternative implementation for better readability:

 impl Display for TransactionSendResult {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "TransactionSendResult {{ local_transaction_state: {:?}, send_result: {:?} }}",
-            self.local_transaction_state, self.send_result
+            "TransactionSendResult [State: {}, Result: {}]",
+            self.local_transaction_state.as_ref().map_or("None", |s| &format!("{:?}", s)),
+            self.send_result.as_ref().map_or("None", |r| &format!("{}", r))
         )
     }
 }
rocketmq-common/src/common/message/message_client_id_setter.rs (1)

Line range hint 123-128: LGTM! Optimization improves performance.

The change optimizes the set_uniq_id method by moving the uniq_id creation inside the conditional block. This ensures that create_uniq_id() is only called when necessary, avoiding unnecessary computation when the property is already set. The implementation remains thread-safe and well-tested.

This optimization is particularly important in the context of transaction messages where high throughput is expected. The unique ID generation involves multiple operations (timestamp calculations, atomic counters), so avoiding unnecessary calls can significantly improve performance under load.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (2)

Line range hint 1742-1746: Handle potential timeout errors when sending the request

In the request method, after calculating the cost, the remaining timeout duration might become negative, leading to immediate timeout errors.

Add a check to ensure the remaining timeout is valid:

let cost = begin_timestamp.elapsed().as_millis() as u64;
+ if timeout <= cost {
+     return Err(RequestTimeoutError(
+         ClientErrorCode::REQUEST_TIMEOUT_EXCEPTION,
+         "Request timed out before sending the message.".to_string(),
+     ));
+ }
self.send_default_impl(
    &mut msg,
    CommunicationMode::Async,
    Some(Arc::new(send_callback)),
    timeout - cost,
)
.await?;

156-160: Consider using immutable references if mutation is not required

The methods send_with_timeout and send now take &mut self and msg: &mut T. If the methods do not modify self or msg, consider using immutable references to enhance code safety and clarity.

Also applies to: 169-172

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between ae75277 and 4f4fc7b.

📒 Files selected for processing (9)
  • rocketmq-client/examples/transaction/transaction_producer.rs (1 hunks)
  • rocketmq-client/src/producer/default_mq_producer.rs (6 hunks)
  • rocketmq-client/src/producer/mq_producer.rs (1 hunks)
  • rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (27 hunks)
  • rocketmq-client/src/producer/transaction_listener.rs (2 hunks)
  • rocketmq-client/src/producer/transaction_mq_produce_builder.rs (1 hunks)
  • rocketmq-client/src/producer/transaction_mq_producer.rs (1 hunks)
  • rocketmq-client/src/producer/transaction_send_result.rs (2 hunks)
  • rocketmq-common/src/common/message/message_client_id_setter.rs (1 hunks)
🔇 Additional comments (15)
rocketmq-client/src/producer/transaction_listener.rs (1)

17-18: LGTM: Clean import addition

The explicit import of std::any::Any improves code readability.

rocketmq-client/src/producer/transaction_send_result.rs (1)

17-17: LGTM: Import correctly added

The Display trait import is properly placed and necessary for the implementation.

rocketmq-client/examples/transaction/transaction_producer.rs (2)

23-23: ⚠️ Potential issue

Remove unused import DefaultMQProducer

The DefaultMQProducer import is not used in this file since we're using TransactionMQProducer for transaction message support.

-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;

Likely invalid or redundant comment.


40-68: Verify transaction message implementation against RocketMQ specifications

Please ensure this implementation follows RocketMQ's transaction message specifications, particularly around:

  1. Transaction message lifecycle
  2. Proper handling of transaction states
  3. Timeout and recovery mechanisms
✅ Verification successful

Let me gather more specific information about the transaction implementation.


Based on the comprehensive code analysis, I can now generate the final response:

Transaction message implementation follows RocketMQ specifications correctly

The implementation correctly handles:

  • Transaction message lifecycle with proper state transitions (PREPARED, COMMIT, ROLLBACK)
  • Transaction states through LocalTransactionState enum and corresponding system flags
  • Message properties and system flags for transaction messages
  • Transaction listener interface with required methods for local transaction execution and checking
  • Transaction producer configuration with proper thread pool settings for transaction checks
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other transaction message implementations to compare
rg -l "send_message_in_transaction|TransactionListener" --type rust

# Look for transaction-related tests
rg -l "test.*transaction" --type rust

Length of output: 720


Script:

#!/bin/bash
# Check transaction listener implementation
cat rocketmq-client/src/producer/transaction_listener.rs

# Check transaction producer implementation
cat rocketmq-client/src/producer/transaction_mq_producer.rs

# Check if there are any transaction state enums or constants
rg -A 5 "enum.*Transaction|TRANSACTION" --type rust

Length of output: 45758

rocketmq-client/src/producer/mq_producer.rs (1)

431-437: LGTM! The new signature provides better flexibility and safety.

The changes to send_message_in_transaction improve the interface by:

  • Making it generic over argument type T with thread-safety bounds
  • Taking ownership of the message for potential modifications
  • Making the argument optional
  • Supporting mutable state changes during transaction processing

Let's verify the impact on existing implementations:

✅ Verification successful

The previous ast-grep pattern might have failed due to syntax issues. Let's try a simpler ripgrep approach to find implementations and their context.


The previous results show that DefaultMQProducer doesn't implement the method (it's unimplemented), but we need to check the transaction-specific producer and implementation files to understand the full impact of the signature change.


The signature change is correctly implemented across the codebase

The verification shows that:

  • The TransactionMQProducer has already implemented the new generic signature correctly
  • The implementation in default_mq_producer_impl.rs properly handles the boxed generic argument
  • DefaultMQProducer explicitly marks it as unimplemented as expected

The changes are consistent throughout the codebase and maintain proper type safety and flexibility.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing implementations of send_message_in_transaction
ast-grep --pattern 'impl $_ for $_ {
  $$$
  send_message_in_transaction($$$) {
    $$$
  }
  $$$
}'

Length of output: 102


Script:

#!/bin/bash
# Let's try a different approach to find implementations
# 1. First find all files that might contain implementations
rg -l "send_message_in_transaction" 

# 2. Then look for actual implementations with context
rg "impl.*send_message_in_transaction" -A 10

Length of output: 1309


Script:

#!/bin/bash
# Let's check the transaction producer implementation specifically
rg -B 5 -A 15 "send_message_in_transaction" rocketmq-client/src/producer/transaction_mq_producer.rs

# Also check the producer_impl for any existing implementations
rg -B 5 -A 15 "send_message_in_transaction" rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs

Length of output: 1936

rocketmq-client/src/producer/default_mq_producer.rs (4)

351-356: LGTM: Thread-safe producer implementation setup

The implementation correctly uses ArcRefCellWrapper for thread-safe reference counting and interior mutability.


Line range hint 506-518: LGTM: Comprehensive message sending implementation

The implementation properly handles both synchronous and asynchronous message sending scenarios with appropriate mutability.


704-704: LGTM: Proper timeout handling

The implementation correctly handles message timeout with appropriate mutability and error handling.


1017-1022: LGTM: Proper batch message handling

The implementation correctly handles batch message sending with appropriate mutability and error handling, both with and without timeout.

Also applies to: 1032-1037

rocketmq-client/src/producer/transaction_mq_produce_builder.rs (2)

94-97: LGTM: The client_config setter method is correctly implemented

The client_config method appropriately sets the client configuration and supports method chaining.


316-325: Avoid cloning configurations unnecessarily

In the build method, when initializing DefaultMQProducerImpl, configurations are cloned:

let producer_impl = DefaultMQProducerImpl::new(
    mq_producer.client_config().clone(),
    mq_producer.producer_config().clone(),
    mq_producer.rpc_hook().clone(),
);

Cloning can be expensive for large configurations. If possible, consider passing references instead of cloning to improve performance.

[performance]

Modify the DefaultMQProducerImpl::new method to accept references:

- pub fn new(client_config: ClientConfig, producer_config: ProducerConfig, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self {
+ pub fn new(client_config: &ClientConfig, producer_config: &ProducerConfig, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self {
    // Implementation...
}

Then, adjust the call in the build method:

let producer_impl = DefaultMQProducerImpl::new(
-   mq_producer.client_config().clone(),
-   mq_producer.producer_config().clone(),
+   mq_producer.client_config(),
+   mq_producer.producer_config(),
    mq_producer.rpc_hook().clone(),
);
rocketmq-client/src/producer/transaction_mq_producer.rs (1)

1-523: Code implementation aligns with transactional message support

The TransactionMQProducer struct and its methods are well-implemented, providing the necessary functionality for transactional message production. The code is well-organized, follows Rust best practices, and integrates seamlessly with the existing producer architecture.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)

156-160: Verify the necessity of changing method signatures to mutable references

The methods send_with_timeout and send now take &mut self and msg: &mut T instead of &self and msg: &T. This change increases the mutability requirements and may affect thread safety and code reusability. Ensure that the mutability is necessary and does not lead to unintended side effects.

Also applies to: 169-172


1921-1980: ⚠️ Potential issue

Check the result of end_transaction_oneway to handle potential errors

In the end_transaction method, the result of end_transaction_oneway is not checked, which may ignore any errors during the transaction ending process. This can lead to inconsistent transaction states.

Modify the code to handle errors:

self.client_instance
    .as_mut()
    .unwrap()
    .mq_client_api_impl
    .as_mut()
    .unwrap()
    .end_transaction_oneway(
        broker_addr.as_ref().unwrap(),
        request_header,
        "".to_string(),
        self.producer_config.send_msg_timeout() as u64,
    )
    .await;
+ if let Err(e) = result {
+     warn!("Failed to end transaction: {}", e);
+     return Err(e);
+ }

Likely invalid or redundant comment.


2202-2214: ⚠️ Potential issue

Ensure proper error handling when registering the producer

When registering the producer in the start_with_factory method, if register_ok is false, the service state is reset but the method proceeds without returning an error. This might lead to the producer running in an inconsistent state.

Apply this diff to return the error after resetting the service state:

if !register_ok {
    self.service_state = ServiceState::CreateJust;
-   // Missing error return
+   return Err(MQClientError::MQClientErr(
+       -1,
+       format!(
+           "The producer group[{}] has been created before, specify another name please. {}",
+           self.producer_config.producer_group(),
+           FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
+       ),
+   ));
}

Likely invalid or redundant comment.

Comment on lines +48 to +53
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding retry configuration for better reliability

The producer configuration could benefit from retry settings to handle temporary network issues or broker unavailability.

 let mut producer = builder
     .producer_group(PRODUCER_GROUP.to_string())
     .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
     .topics(vec![TOPIC.to_string()])
     .transaction_listener(TransactionListenerImpl::default())
+    .retry_times(3)
+    .retry_interval_ms(1000)
     .build();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.build();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
.retry_times(3)
.retry_interval_ms(1000)
.build();

Comment on lines +90 to +96
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve transaction state management and logging

The current implementation has several areas for improvement:

  1. Transaction state determination is overly simplistic using modulo
  2. Missing logging for state changes
  3. No error handling for transaction ID
 let value = self
     .transaction_index
     .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-let status = value % 3;
+// Implement proper business logic for transaction state determination
+let status = determine_transaction_status(msg, value);
 let mut guard = self.local_trans.lock();
-guard.insert(msg.get_transaction_id().to_string(), status);
+if let Some(tx_id) = msg.get_transaction_id() {
+    guard.insert(tx_id.to_string(), status);
+    log::info!("Transaction state set: ID={}, status={}", tx_id, status);
+} else {
+    log::error!("Missing transaction ID in message");
+}

Committable suggestion was skipped due to low confidence.

Comment on lines +57 to +63
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
println!("send result: {}", send_result);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and logging for send results

The current implementation only prints send results to stdout. Consider using proper logging for better observability in production environments.

 for _ in 0..10 {
     let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
     let send_result = producer
         .send_message_in_transaction::<()>(message, None)
         .await?;
-    println!("send result: {}", send_result);
+    log::info!("Transaction message sent: {}", send_result);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
println!("send result: {}", send_result);
}
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
log::info!("Transaction message sent: {}", send_result);
}

Comment on lines +99 to +107
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve transaction check robustness and logging

The current implementation could panic on missing transaction IDs and lacks proper logging.

 fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
     let mut guard = self.local_trans.lock();
-    let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
-    match status {
-        1 => LocalTransactionState::CommitMessage,
-        2 => LocalTransactionState::RollbackMessage,
-        _ => LocalTransactionState::Unknown,
+    let tx_id = msg.get_transaction_id();
+    if let Some(tx_id) = tx_id {
+        let status = guard.get(tx_id).copied().unwrap_or(-1);
+        let state = match status {
+            1 => LocalTransactionState::CommitMessage,
+            2 => LocalTransactionState::RollbackMessage,
+            _ => LocalTransactionState::Unknown,
+        };
+        log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state);
+        state
+    } else {
+        log::error!("Missing transaction ID in message during check");
+        LocalTransactionState::RollbackMessage
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let tx_id = msg.get_transaction_id();
if let Some(tx_id) = tx_id {
let status = guard.get(tx_id).copied().unwrap_or(-1);
let state = match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
};
log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state);
state
} else {
log::error!("Missing transaction ID in message during check");
LocalTransactionState::RollbackMessage
}
}

Comment on lines +1005 to +1013
async fn send_message_in_transaction<T>(
&mut self,
msg: Message,
arg: Option<T>,
) -> Result<TransactionSendResult>
where
T: std::any::Any + Sync + Send,
{
unimplemented!("DefaultMQProducer not support send_message_in_transaction")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implementation missing for transaction message support

The PR aims to add transaction message support, but the method is marked as unimplemented!(). This will cause runtime panics when called. Consider implementing the transaction message support as per the PR objectives.

Additionally, the error message could be more descriptive to help users understand why the operation isn't supported.

Apply this change to improve the error message:

-        unimplemented!("DefaultMQProducer not support send_message_in_transaction")
+        Err(MQClientErr(
+            -1,
+            "Transaction messages are not supported by DefaultMQProducer. Please use TransactionMQProducer instead.".to_string(),
+        ))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn send_message_in_transaction<T>(
&mut self,
msg: Message,
arg: Option<T>,
) -> Result<TransactionSendResult>
where
T: std::any::Any + Sync + Send,
{
unimplemented!("DefaultMQProducer not support send_message_in_transaction")
async fn send_message_in_transaction<T>(
&mut self,
msg: Message,
arg: Option<T>,
) -> Result<TransactionSendResult>
where
T: std::any::Any + Sync + Send,
{
Err(MQClientErr(
-1,
"Transaction messages are not supported by DefaultMQProducer. Please use TransactionMQProducer instead.".to_string(),
))

Comment on lines +94 to +334
pub fn client_config(mut self, client_config: ClientConfig) -> Self {
self.client_config = Some(client_config);
self
}

pub fn default_mqproducer_impl(
mut self,
default_mqproducer_impl: DefaultMQProducerImpl,
) -> Self {
self.default_mqproducer_impl = Some(default_mqproducer_impl);
self
}

pub fn retry_response_codes(mut self, retry_response_codes: HashSet<i32>) -> Self {
self.retry_response_codes = Some(retry_response_codes);
self
}

pub fn producer_group(mut self, producer_group: impl Into<String>) -> Self {
self.producer_group = Some(producer_group.into());
self
}

pub fn topics(mut self, topics: Vec<String>) -> Self {
self.topics = Some(topics);
self
}

pub fn name_server_addr(mut self, name_server_addr: String) -> Self {
if let Some(client_config) = self.client_config.as_mut() {
client_config.namesrv_addr = Some(name_server_addr);
client_config
.namespace_initialized
.store(false, std::sync::atomic::Ordering::Release);
}
self
}

pub fn create_topic_key(mut self, create_topic_key: String) -> Self {
self.create_topic_key = Some(create_topic_key);
self
}

pub fn default_topic_queue_nums(mut self, default_topic_queue_nums: u32) -> Self {
self.default_topic_queue_nums = Some(default_topic_queue_nums);
self
}

pub fn send_msg_timeout(mut self, send_msg_timeout: u32) -> Self {
self.send_msg_timeout = Some(send_msg_timeout);
self
}

pub fn compress_msg_body_over_howmuch(mut self, compress_msg_body_over_howmuch: u32) -> Self {
self.compress_msg_body_over_howmuch = Some(compress_msg_body_over_howmuch);
self
}

pub fn retry_times_when_send_failed(mut self, retry_times_when_send_failed: u32) -> Self {
self.retry_times_when_send_failed = Some(retry_times_when_send_failed);
self
}

pub fn retry_times_when_send_async_failed(
mut self,
retry_times_when_send_async_failed: u32,
) -> Self {
self.retry_times_when_send_async_failed = Some(retry_times_when_send_async_failed);
self
}

pub fn retry_another_broker_when_not_store_ok(
mut self,
retry_another_broker_when_not_store_ok: bool,
) -> Self {
self.retry_another_broker_when_not_store_ok = Some(retry_another_broker_when_not_store_ok);
self
}

pub fn max_message_size(mut self, max_message_size: u32) -> Self {
self.max_message_size = Some(max_message_size);
self
}

pub fn trace_dispatcher(
mut self,
trace_dispatcher: Arc<Box<dyn TraceDispatcher + Send + Sync>>,
) -> Self {
self.trace_dispatcher = Some(trace_dispatcher);
self
}

pub fn auto_batch(mut self, auto_batch: bool) -> Self {
self.auto_batch = Some(auto_batch);
self
}

pub fn produce_accumulator(mut self, produce_accumulator: ProduceAccumulator) -> Self {
self.produce_accumulator = Some(produce_accumulator);
self
}

pub fn enable_backpressure_for_async_mode(
mut self,
enable_backpressure_for_async_mode: bool,
) -> Self {
self.enable_backpressure_for_async_mode = Some(enable_backpressure_for_async_mode);
self
}

pub fn back_pressure_for_async_send_num(
mut self,
back_pressure_for_async_send_num: u32,
) -> Self {
self.back_pressure_for_async_send_num = Some(back_pressure_for_async_send_num);
self
}

pub fn back_pressure_for_async_send_size(
mut self,
back_pressure_for_async_send_size: u32,
) -> Self {
self.back_pressure_for_async_send_size = Some(back_pressure_for_async_send_size);
self
}

pub fn rpc_hook(mut self, rpc_hook: Box<dyn RPCHook>) -> Self {
self.rpc_hook = Some(Arc::new(rpc_hook));
self
}

pub fn compress_level(mut self, compress_level: i32) -> Self {
self.compress_level = Some(compress_level);
self
}

pub fn compress_type(mut self, compress_type: CompressionType) -> Self {
self.compress_type = Some(compress_type);
self
}

pub fn compressor(mut self, compressor: Arc<Box<dyn Compressor + Send + Sync>>) -> Self {
self.compressor = Some(compressor);
self
}

pub fn transaction_listener(mut self, transaction_listener: impl TransactionListener) -> Self {
self.transaction_listener = Some(Arc::new(Box::new(transaction_listener)));
self
}

pub fn build(self) -> TransactionMQProducer {
let mut mq_producer = DefaultMQProducer::default();
if let Some(client_config) = self.client_config {
mq_producer.set_client_config(client_config);
}

if let Some(retry_response_codes) = self.retry_response_codes {
mq_producer.set_retry_response_codes(retry_response_codes);
}
if let Some(producer_group) = self.producer_group {
mq_producer.set_producer_group(producer_group);
}
if let Some(topics) = self.topics {
mq_producer.set_topics(topics);
}
if let Some(create_topic_key) = self.create_topic_key {
mq_producer.set_create_topic_key(create_topic_key);
}
if let Some(default_topic_queue_nums) = self.default_topic_queue_nums {
mq_producer.set_default_topic_queue_nums(default_topic_queue_nums);
}
if let Some(send_msg_timeout) = self.send_msg_timeout {
mq_producer.set_send_msg_timeout(send_msg_timeout);
}
if let Some(compress_msg_body_over_howmuch) = self.compress_msg_body_over_howmuch {
mq_producer.set_compress_msg_body_over_howmuch(compress_msg_body_over_howmuch);
}
if let Some(retry_times_when_send_failed) = self.retry_times_when_send_failed {
mq_producer.set_retry_times_when_send_failed(retry_times_when_send_failed);
}
if let Some(retry_times_when_send_async_failed) = self.retry_times_when_send_async_failed {
mq_producer.set_retry_times_when_send_async_failed(retry_times_when_send_async_failed);
}
if let Some(retry_another_broker_when_not_store_ok) =
self.retry_another_broker_when_not_store_ok
{
mq_producer
.set_retry_another_broker_when_not_store_ok(retry_another_broker_when_not_store_ok);
}
if let Some(max_message_size) = self.max_message_size {
mq_producer.set_max_message_size(max_message_size);
}

mq_producer.set_trace_dispatcher(self.trace_dispatcher);
if let Some(auto_batch) = self.auto_batch {
mq_producer.set_auto_batch(auto_batch);
}
if let Some(produce_accumulator) = self.produce_accumulator {
mq_producer.set_produce_accumulator(Some(produce_accumulator));
}

if let Some(enable_backpressure_for_async_mode) = self.enable_backpressure_for_async_mode {
mq_producer.set_enable_backpressure_for_async_mode(enable_backpressure_for_async_mode);
}
if let Some(back_pressure_for_async_send_num) = self.back_pressure_for_async_send_num {
mq_producer.set_back_pressure_for_async_send_num(back_pressure_for_async_send_num);
}
if let Some(back_pressure_for_async_send_size) = self.back_pressure_for_async_send_size {
mq_producer.set_back_pressure_for_async_send_size(back_pressure_for_async_send_size);
}
mq_producer.set_rpc_hook(self.rpc_hook);
if let Some(compress_level) = self.compress_level {
mq_producer.set_compress_level(compress_level);
}
if let Some(compress_type) = self.compress_type {
mq_producer.set_compress_type(compress_type);
}
if let Some(compressor) = self.compressor {
mq_producer.set_compressor(Some(compressor));
}

if let Some(default_mqproducer_impl) = self.default_mqproducer_impl {
mq_producer.set_default_mqproducer_impl(default_mqproducer_impl);
} else {
let producer_impl = DefaultMQProducerImpl::new(
mq_producer.client_config().clone(),
mq_producer.producer_config().clone(),
mq_producer.rpc_hook().clone(),
);
mq_producer.set_default_mqproducer_impl(producer_impl);
}
let transaction_producer_config = TransactionProducerConfig {
transaction_listener: self.transaction_listener,
check_thread_pool_min_size: 0,
check_thread_pool_max_size: 0,
check_request_hold_max: 0,
check_runtime: self.check_runtime,
};
TransactionMQProducer::new(transaction_producer_config, mq_producer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using macros or iterators to reduce repetitive code in setter methods

The builder currently contains numerous setter methods and repetitive if let statements in the build method for setting optional configurations. This can be streamlined to improve maintainability and reduce boilerplate code.

You can use macros to generate setter methods and to iterate over the optional fields in the build method. For example, define a macro for setter methods:

macro_rules! builder_setter {
    ($field:ident, $type:ty) => {
        pub fn $field(mut self, value: $type) -> Self {
            self.$field = Some(value);
            self
        }
    };
}

// Use the macro for your fields
impl TransactionMQProducerBuilder {
    builder_setter!(producer_group, String);
    builder_setter!(topics, Vec<String>);
    // Repeat for other fields...
}

In the build method, you can iterate over fields:

if let Some(value) = self.producer_group {
    mq_producer.set_producer_group(value);
}
// Instead of writing an `if let` for each field, consider using a loop or macro if possible

This refactoring can make the code cleaner and easier to manage, especially as the number of configurable fields grows.

Comment on lines +110 to +112
default_mqproducer_impl_inner: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
transaction_listener: Option<Arc<Box<dyn TransactionListener>>>,
check_runtime: Option<Arc<RocketMQRuntime>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider initializing new fields in the constructor for clarity and safety

The newly added fields default_mqproducer_impl_inner, transaction_listener, and check_runtime are initialized to None. To ensure these fields are properly set before use and to prevent potential None dereferences, consider initializing them with default values or adding checks where they are used.

Apply this diff to initialize the fields in the constructor:

pub struct DefaultMQProducerImpl {
    // ...
    default_mqproducer_impl_inner: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
    transaction_listener: Option<Arc<Box<dyn TransactionListener>>>,
    check_runtime: Option<Arc<RocketMQRuntime>>,
}

impl DefaultMQProducerImpl {
    pub fn new(
        client_config: ClientConfig,
        producer_config: ProducerConfig,
        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
    ) -> Self {
        // ...
        DefaultMQProducerImpl {
            // ...
-           default_mqproducer_impl_inner: None,
-           transaction_listener: None,
-           check_runtime: None,
+           default_mqproducer_impl_inner: Some(ArcRefCellWrapper::new(self.clone())),
+           transaction_listener: Some(Arc::new(Box::new(DefaultTransactionListener::new()))),
+           check_runtime: Some(Arc::new(RocketMQRuntime::new_multi(
+               num_cpus::get(),
+               "transaction-check",
+           ))),
        }
    }
}

Also applies to: 149-151

impl ServiceDetector for DefaultServiceDetector {
fn detect(&self, endpoint: &str, timeout_millis: u64) -> bool {
todo!()
unimplemented!("detect")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement the detect method in DefaultServiceDetector

The detect method in DefaultServiceDetector is unimplemented, which means the functionality for service detection is incomplete.

Provide an implementation for the detect method:

impl ServiceDetector for DefaultServiceDetector {
    fn detect(&self, endpoint: &str, timeout_millis: u64) -> bool {
-       unimplemented!("detect")
+       // Implement actual detection logic here
+       // For example, perform a health check or ping to the endpoint
+       // Return true if the service is reachable, false otherwise
+       true // Placeholder implementation
    }
}

Committable suggestion was skipped due to low confidence.

Comment on lines +2078 to +2145
broker_addr: &str,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
todo!()
let transaction_listener = self.transaction_listener.clone().unwrap();
let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap();
let broker_addr = broker_addr.to_string();
self.check_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let mut unique_key =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if unique_key.is_none() {
unique_key = Some(msg.msg_id.clone());
}
let transaction_state = transaction_listener.check_local_transaction(&msg);
let request_header = EndTransactionRequestHeader {
topic: check_request_header.topic.clone().unwrap_or_default(),
producer_group: producer_impl_inner
.producer_config
.producer_group()
.to_string(),
tran_state_table_offset: check_request_header.commit_log_offset as u64,
commit_log_offset: check_request_header.commit_log_offset as u64,
commit_or_rollback: match transaction_state {
LocalTransactionState::CommitMessage => {
MessageSysFlag::TRANSACTION_COMMIT_TYPE
}
LocalTransactionState::RollbackMessage => {
MessageSysFlag::TRANSACTION_ROLLBACK_TYPE
}
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: true,
msg_id: unique_key.clone().unwrap_or_default(),
transaction_id: check_request_header.transaction_id.clone(),
rpc_request_header: RpcRequestHeader {
broker_name: check_request_header
.rpc_request_header
.unwrap_or_default()
.broker_name,
..Default::default()
},
};
producer_impl_inner.do_execute_end_transaction_hook(
&msg.message,
unique_key.as_ref().unwrap(),
broker_addr.as_str(),
transaction_state,
true,
);
let _ = producer_impl_inner
.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_str(),
request_header,
"".to_string(),
3000,
)
.await;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential panic by checking for None before unwrapping

In check_transaction_state, the code calls self.transaction_listener.clone().unwrap() without checking if transaction_listener is Some. If transaction_listener is None, this will cause a panic. Ensure that transaction_listener is initialized before use or add a check to handle the None case gracefully.

Apply this diff to add a check:

let transaction_listener = self.transaction_listener.clone().unwrap();
+ if transaction_listener.is_none() {
+     warn!("Transaction listener is not set.");
+     return;
+ }

Committable suggestion was skipped due to low confidence.

Comment on lines +1843 to +1908
pub async fn send_message_in_transaction(
&mut self,
mut msg: M,
selector: MessageQueueSelectorFn,
arg: T,
timeout: u64,
send_callback: Option<SendMessageCallback>,
) -> Result<SendResult>
where
M: MessageTrait + Clone + Send + Sync,
T: std::any::Any + Sync + Send,
{
let begin_start_time = Instant::now();
self.make_sure_state_ok()?;
mut msg: Message,
arg: Option<Box<dyn Any + Send + Sync>>,
) -> Result<TransactionSendResult> {
// ignore DelayTimeLevel parameter
if msg.get_delay_time_level() != 0 {
MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL);
}
Validators::check_message(Some(&msg), self.producer_config.as_ref())?;
let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await;
if let Some(topic_publish_info) = topic_publish_info {
if topic_publish_info.ok() {
let message_queue_list = self
.client_instance
.as_mut()
.unwrap()
.mq_admin_impl
.parse_publish_message_queues(
&topic_publish_info.message_queue_list,
&mut self.client_config,
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_TRANSACTION_PREPARED,
"true",
);
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_PRODUCER_GROUP,
self.producer_config.producer_group(),
);
let result = self.send(&mut msg).await;
if let Err(e) = result {
return Err(MQClientErr(
-1,
format!("send message in transaction error, {}", e),
));
}
let send_result = result.unwrap().expect("send result is none");
let local_transaction_state = match send_result.send_status {
SendStatus::SendOk => {
if let Some(ref transaction_id) = send_result.transaction_id {
msg.put_user_property(
MessageConst::PROPERTY_TRANSACTION_ID,
transaction_id.as_str(),
);
let mut user_message = msg.clone();
let user_topic = NamespaceUtil::without_namespace_with_namespace(
user_message.get_topic(),
self.client_config
.get_namespace()
.unwrap_or("".to_string())
.as_str(),
);
user_message.set_topic(user_topic.as_str());
let message_queue = selector(&message_queue_list, &msg, &arg);
let cost_time = begin_start_time.elapsed().as_millis() as u64;
if timeout < cost_time {
return Err(MQClientError::RemotingTooMuchRequestException(
"sendSelectImpl call timeout".to_string(),
));
}
if message_queue.is_some() {
let result = self
.send_kernel_impl(
&mut msg,
message_queue.as_ref().unwrap(),
CommunicationMode::Sync,
send_callback,
None,
timeout - cost_time,
)
.await?;
return Ok(result.unwrap());
let transaction_id =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
msg.set_transaction_id(transaction_id.as_str());
}
return Err(MQClientError::MQClientException(
-1,
"select message queue return None.".to_string(),
));
self.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref())
}
SendStatus::FlushDiskTimeout
| SendStatus::FlushSlaveTimeout
| SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage,
};
if let Err(e) = self
.end_transaction(&msg, &send_result, local_transaction_state)
.await
{
warn!(
"local transaction execute {}, but end broker transaction failed,{}",
local_transaction_state,
e.to_string()
);
}
self.validate_name_server_setting()?;
Err(MQClientError::MQClientException(
-1,
"No route info for this topic, ".to_string(),
))
}*/
let transaction_send_result = TransactionSendResult {
local_transaction_state: Some(local_transaction_state),
send_result: Some(send_result),
};
Ok(transaction_send_result)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify the transaction listener is set before executing

In send_message_in_transaction, the method calls self.transaction_listener.as_ref().unwrap() without checking if transaction_listener is Some, which can cause a panic if it's None.

Add a check to ensure transaction_listener is initialized:

+ if self.transaction_listener.is_none() {
+     return Err(MQClientErr(
+         -1,
+         "Transaction listener is not set.".to_string(),
+     ));
+ }
let local_transaction_state = self
    .transaction_listener
    .as_ref()
    .unwrap()
    .execute_local_transaction(&msg, arg.as_deref());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn send_message_in_transaction(
&mut self,
mut msg: M,
selector: MessageQueueSelectorFn,
arg: T,
timeout: u64,
send_callback: Option<SendMessageCallback>,
) -> Result<SendResult>
where
M: MessageTrait + Clone + Send + Sync,
T: std::any::Any + Sync + Send,
{
let begin_start_time = Instant::now();
self.make_sure_state_ok()?;
mut msg: Message,
arg: Option<Box<dyn Any + Send + Sync>>,
) -> Result<TransactionSendResult> {
// ignore DelayTimeLevel parameter
if msg.get_delay_time_level() != 0 {
MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL);
}
Validators::check_message(Some(&msg), self.producer_config.as_ref())?;
let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await;
if let Some(topic_publish_info) = topic_publish_info {
if topic_publish_info.ok() {
let message_queue_list = self
.client_instance
.as_mut()
.unwrap()
.mq_admin_impl
.parse_publish_message_queues(
&topic_publish_info.message_queue_list,
&mut self.client_config,
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_TRANSACTION_PREPARED,
"true",
);
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_PRODUCER_GROUP,
self.producer_config.producer_group(),
);
let result = self.send(&mut msg).await;
if let Err(e) = result {
return Err(MQClientErr(
-1,
format!("send message in transaction error, {}", e),
));
}
let send_result = result.unwrap().expect("send result is none");
let local_transaction_state = match send_result.send_status {
SendStatus::SendOk => {
if let Some(ref transaction_id) = send_result.transaction_id {
msg.put_user_property(
MessageConst::PROPERTY_TRANSACTION_ID,
transaction_id.as_str(),
);
let mut user_message = msg.clone();
let user_topic = NamespaceUtil::without_namespace_with_namespace(
user_message.get_topic(),
self.client_config
.get_namespace()
.unwrap_or("".to_string())
.as_str(),
);
user_message.set_topic(user_topic.as_str());
let message_queue = selector(&message_queue_list, &msg, &arg);
let cost_time = begin_start_time.elapsed().as_millis() as u64;
if timeout < cost_time {
return Err(MQClientError::RemotingTooMuchRequestException(
"sendSelectImpl call timeout".to_string(),
));
}
if message_queue.is_some() {
let result = self
.send_kernel_impl(
&mut msg,
message_queue.as_ref().unwrap(),
CommunicationMode::Sync,
send_callback,
None,
timeout - cost_time,
)
.await?;
return Ok(result.unwrap());
let transaction_id =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
msg.set_transaction_id(transaction_id.as_str());
}
return Err(MQClientError::MQClientException(
-1,
"select message queue return None.".to_string(),
));
self.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref())
}
SendStatus::FlushDiskTimeout
| SendStatus::FlushSlaveTimeout
| SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage,
};
if let Err(e) = self
.end_transaction(&msg, &send_result, local_transaction_state)
.await
{
warn!(
"local transaction execute {}, but end broker transaction failed,{}",
local_transaction_state,
e.to_string()
);
}
self.validate_name_server_setting()?;
Err(MQClientError::MQClientException(
-1,
"No route info for this topic, ".to_string(),
))
}*/
let transaction_send_result = TransactionSendResult {
local_transaction_state: Some(local_transaction_state),
send_result: Some(send_result),
};
Ok(transaction_send_result)
}
pub async fn send_message_in_transaction(
&mut self,
mut msg: Message,
arg: Option<Box<dyn Any + Send + Sync>>,
) -> Result<TransactionSendResult> {
// ignore DelayTimeLevel parameter
if msg.get_delay_time_level() != 0 {
MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL);
}
Validators::check_message(Some(&msg), self.producer_config.as_ref())?;
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_TRANSACTION_PREPARED,
"true",
);
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_PRODUCER_GROUP,
self.producer_config.producer_group(),
);
let result = self.send(&mut msg).await;
if let Err(e) = result {
return Err(MQClientErr(
-1,
format!("send message in transaction error, {}", e),
));
}
let send_result = result.unwrap().expect("send result is none");
let local_transaction_state = match send_result.send_status {
SendStatus::SendOk => {
if let Some(ref transaction_id) = send_result.transaction_id {
msg.put_user_property(
MessageConst::PROPERTY_TRANSACTION_ID,
transaction_id.as_str(),
);
}
let transaction_id =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
msg.set_transaction_id(transaction_id.as_str());
}
if self.transaction_listener.is_none() {
return Err(MQClientErr(
-1,
"Transaction listener is not set.".to_string(),
));
}
self.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref())
}
SendStatus::FlushDiskTimeout
| SendStatus::FlushSlaveTimeout
| SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage,
};
if let Err(e) = self
.end_transaction(&msg, &send_result, local_transaction_state)
.await
{
warn!(
"local transaction execute {}, but end broker transaction failed,{}",
local_transaction_state,
e.to_string()
);
}
let transaction_send_result = TransactionSendResult {
local_transaction_state: Some(local_transaction_state),
send_result: Some(send_result),
};
Ok(transaction_send_result)
}

⚠️ Potential issue

Handle errors more precisely in send_message_in_transaction

In the method send_message_in_transaction, when self.send(&mut msg).await returns an error, a generic error is created with a custom message. To provide more context and aid in debugging, consider propagating the original error instead of creating a new one.

Apply this diff to propagate the original error:

let result = self.send(&mut msg).await;
if let Err(e) = result {
-   return Err(MQClientErr(
-       -1,
-       format!("send message in transaction error, {}", e),
-   ));
+   return Err(e);
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn send_message_in_transaction(
&mut self,
mut msg: M,
selector: MessageQueueSelectorFn,
arg: T,
timeout: u64,
send_callback: Option<SendMessageCallback>,
) -> Result<SendResult>
where
M: MessageTrait + Clone + Send + Sync,
T: std::any::Any + Sync + Send,
{
let begin_start_time = Instant::now();
self.make_sure_state_ok()?;
mut msg: Message,
arg: Option<Box<dyn Any + Send + Sync>>,
) -> Result<TransactionSendResult> {
// ignore DelayTimeLevel parameter
if msg.get_delay_time_level() != 0 {
MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL);
}
Validators::check_message(Some(&msg), self.producer_config.as_ref())?;
let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await;
if let Some(topic_publish_info) = topic_publish_info {
if topic_publish_info.ok() {
let message_queue_list = self
.client_instance
.as_mut()
.unwrap()
.mq_admin_impl
.parse_publish_message_queues(
&topic_publish_info.message_queue_list,
&mut self.client_config,
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_TRANSACTION_PREPARED,
"true",
);
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_PRODUCER_GROUP,
self.producer_config.producer_group(),
);
let result = self.send(&mut msg).await;
if let Err(e) = result {
return Err(MQClientErr(
-1,
format!("send message in transaction error, {}", e),
));
}
let send_result = result.unwrap().expect("send result is none");
let local_transaction_state = match send_result.send_status {
SendStatus::SendOk => {
if let Some(ref transaction_id) = send_result.transaction_id {
msg.put_user_property(
MessageConst::PROPERTY_TRANSACTION_ID,
transaction_id.as_str(),
);
let mut user_message = msg.clone();
let user_topic = NamespaceUtil::without_namespace_with_namespace(
user_message.get_topic(),
self.client_config
.get_namespace()
.unwrap_or("".to_string())
.as_str(),
);
user_message.set_topic(user_topic.as_str());
let message_queue = selector(&message_queue_list, &msg, &arg);
let cost_time = begin_start_time.elapsed().as_millis() as u64;
if timeout < cost_time {
return Err(MQClientError::RemotingTooMuchRequestException(
"sendSelectImpl call timeout".to_string(),
));
}
if message_queue.is_some() {
let result = self
.send_kernel_impl(
&mut msg,
message_queue.as_ref().unwrap(),
CommunicationMode::Sync,
send_callback,
None,
timeout - cost_time,
)
.await?;
return Ok(result.unwrap());
let transaction_id =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
msg.set_transaction_id(transaction_id.as_str());
}
return Err(MQClientError::MQClientException(
-1,
"select message queue return None.".to_string(),
));
self.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref())
}
SendStatus::FlushDiskTimeout
| SendStatus::FlushSlaveTimeout
| SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage,
};
if let Err(e) = self
.end_transaction(&msg, &send_result, local_transaction_state)
.await
{
warn!(
"local transaction execute {}, but end broker transaction failed,{}",
local_transaction_state,
e.to_string()
);
}
self.validate_name_server_setting()?;
Err(MQClientError::MQClientException(
-1,
"No route info for this topic, ".to_string(),
))
}*/
let transaction_send_result = TransactionSendResult {
local_transaction_state: Some(local_transaction_state),
send_result: Some(send_result),
};
Ok(transaction_send_result)
}
pub async fn send_message_in_transaction(
&mut self,
mut msg: Message,
arg: Option<Box<dyn Any + Send + Sync>>,
) -> Result<TransactionSendResult> {
// ignore DelayTimeLevel parameter
if msg.get_delay_time_level() != 0 {
MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL);
}
Validators::check_message(Some(&msg), self.producer_config.as_ref())?;
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_TRANSACTION_PREPARED,
"true",
);
MessageAccessor::put_property(
&mut msg,
MessageConst::PROPERTY_PRODUCER_GROUP,
self.producer_config.producer_group(),
);
let result = self.send(&mut msg).await;
if let Err(e) = result {
return Err(e);
}
let send_result = result.unwrap().expect("send result is none");
let local_transaction_state = match send_result.send_status {
SendStatus::SendOk => {
if let Some(ref transaction_id) = send_result.transaction_id {
msg.put_user_property(
MessageConst::PROPERTY_TRANSACTION_ID,
transaction_id.as_str(),
);
}
let transaction_id =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if let Some(transaction_id) = transaction_id {
msg.set_transaction_id(transaction_id.as_str());
}
self.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref())
}
SendStatus::FlushDiskTimeout
| SendStatus::FlushSlaveTimeout
| SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage,
};
if let Err(e) = self
.end_transaction(&msg, &send_result, local_transaction_state)
.await
{
warn!(
"local transaction execute {}, but end broker transaction failed,{}",
local_transaction_state,
e.to_string()
);
}
let transaction_send_result = TransactionSendResult {
local_transaction_state: Some(local_transaction_state),
send_result: Some(send_result),
};
Ok(transaction_send_result)
}

@codecov
Copy link

codecov bot commented Oct 27, 2024

Codecov Report

Attention: Patch coverage is 3.42342% with 536 lines in your changes missing coverage. Please review.

Project coverage is 19.69%. Comparing base (18afaac) to head (4f4fc7b).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...ent/src/producer/transaction_mq_produce_builder.rs 0.00% 210 Missing ⚠️
...producer/producer_impl/default_mq_producer_impl.rs 0.00% 93 Missing ⚠️
.../protocol/header/end_transaction_request_header.rs 0.00% 67 Missing ⚠️
...nt/src/implementation/client_remoting_processor.rs 0.00% 48 Missing ⚠️
...tmq-client/src/producer/transaction_mq_producer.rs 0.00% 43 Missing ⚠️
...nt/src/producer/producer_impl/mq_producer_inner.rs 0.00% 30 Missing ⚠️
...mq-client/src/implementation/mq_client_api_impl.rs 0.00% 14 Missing ⚠️
rocketmq-client/src/factory/mq_client_instance.rs 0.00% 6 Missing ⚠️
...ocketmq-client/src/producer/default_mq_producer.rs 0.00% 6 Missing ⚠️
...tmq-client/src/producer/local_transaction_state.rs 0.00% 6 Missing ⚠️
... and 6 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1088      +/-   ##
==========================================
- Coverage   19.88%   19.69%   -0.20%     
==========================================
  Files         427      431       +4     
  Lines       35722    36256     +534     
==========================================
+ Hits         7105     7141      +36     
- Misses      28617    29115     +498     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@SpaceXCN SpaceXCN added approved PR has approved and removed ready to review labels Oct 27, 2024
@mxsm mxsm merged commit fe0ccb3 into main Oct 27, 2024
15 of 17 checks passed
@mxsm mxsm deleted the feature-1078 branch October 27, 2024 14:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved PR has approved auto merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support send transaction message for client

3 participants