-
Notifications
You must be signed in to change notification settings - Fork 175
[ISSUE #1078]🚀Support send transaction message for client🍻 #1088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces several enhancements to the RocketMQ client, including the addition of a new transaction producer example and modifications to existing structures and methods to support transactional message handling. Key changes involve the implementation of a Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 34
🧹 Outside diff range and nitpick comments (24)
rocketmq-common/src/common/message/message_id.rs (1)
20-24: Consider adding documentation for this public API.Since this is now a public struct used for message identification, it would be helpful to add documentation explaining:
- The purpose of MessageId
- The significance of the address and offset fields
- Any constraints or assumptions about the values
Example documentation:
#[derive(Debug, Clone, PartialEq, Eq, Copy)] +/// Uniquely identifies a message in RocketMQ using broker address and offset. +/// +/// # Fields +/// * `address` - The socket address of the broker storing the message +/// * `offset` - The position of the message in the broker's storage pub struct MessageId { pub address: SocketAddr, pub offset: i64, }rocketmq-client/src/hook/end_transaction_context.rs (1)
21-29: Add documentation for the EndTransactionContext struct.The struct appears to be well-structured for transaction handling, but it lacks documentation explaining its purpose, usage, and the significance of each field. Consider adding rustdoc comments.
Apply this diff to add documentation:
+/// Represents the context for ending a transaction in RocketMQ. +/// This struct contains all necessary information to complete a transaction operation. pub struct EndTransactionContext<'a> { + /// The producer group responsible for the transaction pub producer_group: String, + /// The address of the broker handling the transaction pub broker_addr: String, + /// Reference to the message involved in the transaction pub message: &'a Message, + /// Unique identifier for the message pub msg_id: String, + /// Unique identifier for the transaction pub transaction_id: String, + /// Current state of the local transaction pub transaction_state: LocalTransactionState, + /// Indicates if this context is created from a transaction check pub from_transaction_check: bool, }rocketmq-client/src/producer.rs (1)
31-32: LGTM! Consider grouping transaction-related modules together.The new module declarations align well with the PR's objective of supporting transaction messages and follow the existing naming conventions.
Consider grouping all transaction-related modules together for better code organization. You could move these declarations next to other transaction-related modules like
transaction_listenerandtransaction_send_result.pub mod send_callback; pub mod send_result; pub mod send_status; pub mod transaction_listener; +pub mod transaction_mq_produce_builder; +pub mod transaction_mq_producer; pub mod transaction_send_result; -pub mod transaction_mq_produce_builder; -pub mod transaction_mq_producer;rocketmq-client/src/hook/send_message_context.rs (1)
41-43: Consider adding documentation for the producer field.Adding documentation comments would help explain:
- The purpose of using
ArcRefCellWrapper- Thread safety guarantees
- Usage guidelines for interior mutability
+ /// Producer instance wrapped in ArcRefCellWrapper for thread-safe mutable access. + /// The wrapper provides: + /// - Thread-safe sharing through Arc + /// - Interior mutability through RefCell pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,rocketmq-common/src/utils/util_all.rs (2)
194-194: Add documentation for the public function.Since this function is now public, it should have documentation explaining its purpose, parameters, return value, and example usage.
Add documentation like this:
+/// Converts a hexadecimal string to a byte vector. +/// +/// # Arguments +/// +/// * `hex_string` - A string containing hexadecimal characters (0-9, A-F). Case-insensitive. +/// +/// # Returns +/// +/// * `Some(Vec<u8>)` - The converted bytes if the input is valid +/// * `None` - If the input is empty or contains invalid characters +/// +/// # Examples +/// +/// ``` +/// let bytes = string_to_bytes("414243").unwrap(); +/// assert_eq!(bytes, vec![0x41, 0x42, 0x43]); +/// ``` pub fn string_to_bytes(hex_string: impl Into<String>) -> Option<Vec<u8>> {
Function needs test coverage for hex string conversion functionality
The implementation looks correct, but there are no specific tests for the
string_to_bytesfunction despite it being used in a critical message decoding path. The function is used inmessage_decoder.rsto decode message IDs, which makes it an important component that should be thoroughly tested.Key findings:
- Function is used in
decode_message_idfor parsing message identifiers- No dedicated test cases found for hex string conversion
- No alternative hex conversion utilities exist in the codebase
🔗 Analysis chain
Line range hint
194-215: LGTM! The implementation is correct and well-tested.The function is well-implemented with proper:
- Input validation
- Error handling using Option
- Performance optimization with capacity pre-allocation
- Safe operations throughout
Let's verify its usage in the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of string_to_bytes function # Expected: Used in message_decoder.rs for decoding message IDs # Search for direct usage of the function rg "string_to_bytes\(" --type rust # Search for potential test cases rg "string_to_bytes" -A 5 --type rust -g "*test*.rs"Length of output: 465
Script:
#!/bin/bash # Let's verify the implementation and test coverage more thoroughly # Check the test file in the same directory as util_all.rs fd "util_all" --type f --extension rs | grep "test" # Look for test modules within util_all.rs rg "^mod test" -A 10 rocketmq-common/src/utils/util_all.rs # Check message_decoder.rs usage context rg -B 5 -A 5 "string_to_bytes" rocketmq-common/src/common/message/message_decoder.rs # Check if there are any other hex string conversion utilities rg "hex.*string|string.*hex" --type rustLength of output: 1546
rocketmq-common/src/common/message/message_decoder.rs (2)
391-395: Fix indentation in store size calculation comments.The comments are accurate but have inconsistent indentation compared to the surrounding code.
Apply this diff to fix the indentation:
- + 4 // 2 MAGICCOD - + 4 // 3 BODYCRC - + 4 // 4 FLAG - + 4 + body_len // 4 BODY - + 2 + properties_length; + + 4 // 2 MAGICCOD + + 4 // 3 BODYCRC + + 4 // 4 FLAG + + 4 + body_len // 4 BODY + + 2 + properties_length;
582-588: Add more test cases for message ID decoding.The current test only covers the IPv4 case. Consider adding:
- IPv6 message ID test case
- Invalid length test case
- Invalid format test case
Here's an example of additional test cases:
#[test] fn decode_message_id_ipv6() { let msg_id = "ABCD1234ABCD1234ABCD1234ABCD12340007D8260BF075769D36C348"; let message_id = decode_message_id(msg_id); assert_eq!(message_id.address, "[abcd:1234:abcd:1234:abcd:1234:abcd:1234]:55334".parse().unwrap()); assert_eq!(message_id.offset, 860316681131967304); } #[test] #[should_panic(expected = "Invalid message ID length")] fn decode_message_id_invalid_length() { decode_message_id("invalid"); }rocketmq-remoting/src/protocol/remoting_command.rs (1)
519-521: LGTM! Consider adding documentation.The implementation of
get_body_mutfollows Rust idioms well and provides the necessary mutable access to support transaction message handling. Consider adding documentation to explain its purpose and usage in the transaction message context.Add documentation like this:
+ /// Returns a mutable reference to the command body. + /// + /// This method is particularly useful when modifying message bodies during transaction processing. pub fn get_body_mut(&mut self) -> Option<&mut Bytes> { self.body.as_mut() }rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
1135-1150: Consider adding parameter validation.The implementation looks good and follows the established patterns. However, consider adding validation for:
addr: Ensure it's not emptytimeout_millis: Ensure it's within reasonable boundspub async fn end_transaction_oneway( &mut self, addr: &str, request_header: EndTransactionRequestHeader, remark: String, timeout_millis: u64, ) -> Result<()> { + if addr.is_empty() { + return Err(MQClientError::MQClientErr(-1, "addr cannot be empty".to_string())); + } + if timeout_millis == 0 || timeout_millis > 3600000 { // 1 hour max + return Err(MQClientError::MQClientErr(-1, "invalid timeout_millis".to_string())); + } + let request = RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header) .set_remark(Some(remark));rocketmq-client/examples/transaction/transaction_producer.rs (4)
60-60: Handle the result ofproducer.shutdown().awaitThe
shutdownmethod returns aResult. It's good practice to handle potential errors by using the?operator to propagate them.Apply this diff to handle possible errors during shutdown:
-producer.shutdown().await; +producer.shutdown().await?;
33-33: Remove unusedMESSAGE_COUNTconstantThe
MESSAGE_COUNTconstant is defined but not used in the code. If it's unnecessary, consider removing it to keep the code clean.Apply this diff to remove the unused constant:
-pub const MESSAGE_COUNT: usize = 1;
34-34: Replace placeholder producer group name with a meaningful identifierThe
PRODUCER_GROUPconstant is set to"please_rename_unique_group_name". For clarity and to avoid confusion, replace it with a meaningful producer group name.Apply this diff to update the producer group name:
-pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name"; +pub const PRODUCER_GROUP: &str = "transaction_producer_group";
94-96: Define constants for transaction statusesUsing named constants instead of magic numbers improves code readability and maintainability. Define constants for the transaction status codes.
Apply this diff to define and use status constants:
+const STATUS_COMMIT: i32 = 1; +const STATUS_ROLLBACK: i32 = 2; match status { - 1 => LocalTransactionState::CommitMessage, - 2 => LocalTransactionState::RollbackMessage, + STATUS_COMMIT => LocalTransactionState::CommitMessage, + STATUS_ROLLBACK => LocalTransactionState::RollbackMessage, _ => LocalTransactionState::Unknown, }rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)
74-83: Ensure consistent parameter naming in method signatures.In the
check_transaction_statemethod ofMQProducerInnerImpl, the parameter is namedaddr, whereas in the trait it'sbroker_addr. This inconsistency can cause confusion.Consider renaming
addrtobroker_addrfor consistency:pub fn check_transaction_state( &self, - addr: &str, + broker_addr: &str, msg: MessageExt, check_request_header: CheckTransactionStateRequestHeader, ) { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { - default_mqproducer_impl_inner.check_transaction_state(addr, msg, check_request_header); + default_mqproducer_impl_inner.check_transaction_state(broker_addr, msg, check_request_header); } }
91-96: Simplify theis_unit_modemethod with default value.Since
is_unit_modereturnsfalsewhendefault_mqproducer_impl_innerisNone, consider simplifying the method.pub fn is_unit_mode(&self) -> bool { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { return default_mqproducer_impl_inner.is_unit_mode(); } - false + // Default to false if not set false }Ensure that returning
falseis the intended default behavior.rocketmq-client/src/implementation/client_remoting_processor.rs (1)
261-264: Differentiate warning messages for better debuggingThe warning messages at lines 261 and 264 are the same, which may make debugging difficult. Consider making them more specific.
You can update the warnings as follows:
} else { - warn!("checkTransactionState, pick producer group failed"); + warn!("checkTransactionState: No producer found for group '{}'", group); } } else { - warn!("checkTransactionState, pick producer group failed"); + warn!("checkTransactionState: Producer group not specified in message properties"); }rocketmq-client/src/producer/transaction_mq_produce_builder.rs (2)
35-61: Improve readability by grouping related fields or adding documentationThe
TransactionMQProducerBuilderstruct contains numerous optional fields, which can make the code harder to read and maintain. Consider grouping related fields together or adding documentation comments to explain the purpose of each field group. This will enhance clarity for future maintainers.
248-335: Ensure required configurations are validated in thebuildmethodThe
buildmethod does not currently validate whether all necessary configurations have been provided. To prevent runtime errors, consider adding validation logic to check for the presence of essential fields and provide default values or error messages when they are missing.rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)
Line range hint
99-111: Review use ofArcandArcRefCellWrapperfor shared stateThe struct
DefaultMQProducerImplusesArcandArcRefCellWrapperfor shared fields. Ensure that this usage properly handles concurrency and avoids potential deadlocks or mutable borrow issues.Consider whether using
Arc<Mutex<T>>or other concurrency primitives would be more appropriate for the shared mutable state.
1847-1849: Clear delay properties only if necessaryThe code clears the
PROPERTY_DELAY_TIME_LEVELif it's not zero. Ensure that this is the intended behavior and that it doesn't affect other message properties unexpectedly.Double-check that clearing this property is required and doesn't have side effects on message delivery timing.
1939-1945: Clarify logic forcommit_or_rollbackin request headerThe mapping of
local_transaction_statetocommit_or_rollbackflags inEndTransactionRequestHeadermight benefit from comments or refactoring for clarity.Consider adding comments or refactoring to make the mapping between transaction states and sys flags clearer.
rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (2)
27-40: Ensure field ordering aligns with serialization expectationsThe
#[serde(flatten)]attribute is applied torpc_request_headerafter other fields. While this works, it's generally recommended to place flattened fields before other fields to avoid potential conflicts or unexpected behavior during serialization.Consider rearranging the struct fields:
pub struct EndTransactionRequestHeader { + #[serde(flatten)] + pub rpc_request_header: RpcRequestHeader, pub topic: String, pub producer_group: String, tran_state_table_offset: u64, commit_log_offset: u64, commit_or_rollback: i32, from_transaction_check: bool, msg_id: String, transaction_id: Option<String>, - #[serde(flatten)] - pub rpc_request_header: RpcRequestHeader, }
17-26: Organize imports alphabetically and by crateFor better readability and maintenance, consider organizing imports first by external crates, then by internal crates, and sorting them alphabetically.
Reorder the imports:
use anyhow::anyhow; use anyhow::Error; use serde::Deserialize; use serde::Serialize; +use crate::protocol::command_custom_header::CommandCustomHeader; +use crate::protocol::command_custom_header::FromMap; +use crate::rpc::rpc_request_header::RpcRequestHeader; +use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (22)
- rocketmq-client/Cargo.toml (1 hunks)
- rocketmq-client/examples/transaction/transaction_producer.rs (1 hunks)
- rocketmq-client/src/factory/mq_client_instance.rs (5 hunks)
- rocketmq-client/src/hook/end_transaction_context.rs (1 hunks)
- rocketmq-client/src/hook/send_message_context.rs (2 hunks)
- rocketmq-client/src/implementation/client_remoting_processor.rs (4 hunks)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (2 hunks)
- rocketmq-client/src/lib.rs (1 hunks)
- rocketmq-client/src/producer.rs (1 hunks)
- rocketmq-client/src/producer/default_mq_producer.rs (2 hunks)
- rocketmq-client/src/producer/local_transaction_state.rs (2 hunks)
- rocketmq-client/src/producer/mq_producer.rs (1 hunks)
- rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (16 hunks)
- rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2 hunks)
- rocketmq-client/src/producer/transaction_mq_produce_builder.rs (1 hunks)
- rocketmq-client/src/producer/transaction_mq_producer.rs (1 hunks)
- rocketmq-common/src/common/message/message_decoder.rs (5 hunks)
- rocketmq-common/src/common/message/message_id.rs (1 hunks)
- rocketmq-common/src/utils/util_all.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
- rocketmq-client/src/lib.rs
🔇 Additional comments (23)
rocketmq-common/src/common/message/message_id.rs (1)
20-21: LGTM! Appropriate trait implementations for message identification.The added traits (Clone, PartialEq, Eq, Copy) and public visibility are well-suited for message identification in a distributed system. The implementation is safe as both fields (SocketAddr and i64) implement these traits.
rocketmq-client/src/hook/end_transaction_context.rs (1)
21-29: Verify transaction state handling across the codebase.The introduction of transaction state handling might require consistent implementation across other components.
Let's verify the transaction state handling implementation:
✅ Verification successful
Based on the shell script results, I can now generate the final response:
Transaction state handling is consistently implemented across the codebase
The transaction state implementation shows a well-structured and consistent pattern:
LocalTransactionStateenum is properly defined with serialization/deserialization- Transaction state is correctly used in producer implementations for commit/rollback handling
- End transaction hooks provide proper extension points
- Example code demonstrates correct usage patterns
The only minor concern is the unimplemented
end_transactionmethod inEndTransactionTraceHookImpl, but this appears to be a planned extension point rather than a critical issue.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for consistent transaction state handling across the codebase # Check for LocalTransactionState usage echo "Checking LocalTransactionState usage patterns:" rg -A 5 "LocalTransactionState" # Check for transaction-related implementations echo "Checking transaction-related implementations:" ast-grep --pattern 'impl$$$Transaction$$$' # Check for other transaction context usage echo "Checking EndTransactionContext usage:" rg -A 5 "EndTransactionContext"Length of output: 19939
rocketmq-client/src/hook/send_message_context.rs (1)
42-42: LGTM! Thread-safe producer field type.The change to
Option<ArcRefCellWrapper<DefaultMQProducerImpl>>is a good improvement as it provides:
- Thread-safe sharing through
Arc- Interior mutability through
RefCell- Consistent with the codebase's concurrency patterns
rocketmq-client/Cargo.toml (2)
84-85: LGTM! Improved formatting with newline.The added newline improves readability and follows Rust's formatting conventions.
86-88: Verify the transaction example directory structure.The new transaction producer example follows the established pattern of organizing examples by feature. Let's verify the directory structure exists.
✅ Verification successful
Directory structure verified and follows the established pattern
The transaction example follows the established pattern in the codebase:
- Located at the correct path:
rocketmq-client/examples/transaction/transaction_producer.rs- Follows the same directory structure as other examples like:
rocketmq-client/examples/batch/rocketmq-client/examples/broadcast/rocketmq-client/examples/ordermessage/rocketmq-client/examples/quickstart/The example's location and structure are consistent with the project's conventions.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the transaction example directory structure and file # Check if the example directory and file exist fd --type d "transaction$" "examples/" fd --type f "transaction_producer.rs$" "examples/" # Verify the example follows the same structure as other examples fd --type f "_producer.rs$" "examples/" --exec echo "Found producer example: {}"Length of output: 461
Script:
#!/bin/bash # Let's first check if the examples directory exists and its location fd -t d "examples" # Then check for any transaction-related files in the repository fd -t f "transaction.*\.rs$" # List all Rust files to understand the example structure fd -t f "\.rs$"Length of output: 31904
rocketmq-remoting/src/protocol/header.rs (1)
23-23: LGTM! Verify the new module implementation.The new module declaration is correctly placed and follows the established naming pattern.
Let's verify that the module file exists and contains the expected transaction-related header implementation:
✅ Verification successful
The new module is properly implemented and integrated
The verification confirms:
- The module file exists at the correct location
- The
EndTransactionRequestHeaderstruct is properly implemented with all necessary fields and methods- The module is correctly integrated and used in the producer implementation for transaction message handling
- The implementation follows the codebase's patterns by implementing required traits (
CommandCustomHeader,FromMap)🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the existence and implementation of the end_transaction_request_header module # Test 1: Check if the module file exists fd "end_transaction_request_header.rs" --type f # Test 2: Verify the implementation includes transaction-related structures rg -A 5 "EndTransactionRequestHeader"Length of output: 10953
rocketmq-client/src/producer/local_transaction_state.rs (3)
18-18: LGTM!The import of
Displaytrait is correctly added.
Line range hint
24-30: LGTM!The enum definition follows Rust best practices with appropriate derive macros and a sensible default variant.
81-89:⚠️ Potential issueConsider standardizing the "UNKNOWN" spelling across the codebase.
The current implementation uses "UNKNOW" in the Display implementation, while the Deserialize implementation accepts both "UNKNOW" and "UNKNOWN". This inconsistency could lead to confusion:
- "UNKNOW" appears to be a typo of "UNKNOWN"
- The deserialize implementation suggests "UNKNOWN" is also valid
- The comment "java is UNKNOW" indicates this might be for compatibility
Consider either:
- Standardizing on "UNKNOWN" throughout the codebase:
- LocalTransactionState::Unknown => write!(f, "UNKNOW"), + LocalTransactionState::Unknown => write!(f, "UNKNOWN"),
- Or documenting the reason for maintaining "UNKNOW" if it's required for Java compatibility.
Let's check if "UNKNOW" is used consistently across the codebase:
✅ Verification successful
Let me gather more context about the Java compatibility aspect since there's a comment suggesting this might be intentional.
Based on the gathered evidence, I can now provide the final response:
Document the Java compatibility requirement for "UNKNOW" spelling
The "UNKNOW" spelling appears to be intentionally maintained for Java compatibility, as indicated by the comment "//java is UNKNOW" in the enum definition. The current implementation correctly handles both spellings in deserialization while maintaining the Java-compatible "UNKNOW" in serialization.
Recommended actions:
- Add a more detailed documentation comment explaining the Java compatibility requirement:
/// Represents the state of a local transaction. /// Note: The `Unknown` variant serializes to "UNKNOW" (without 'N') /// to maintain compatibility with the Java implementation of RocketMQ. pub enum LocalTransactionState {🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for both spellings to understand the convention echo "Checking for 'UNKNOW' usage:" rg "UNKNOW" --type rust echo -e "\nChecking for 'UNKNOWN' usage:" rg "UNKNOWN" --type rustLength of output: 2943
Script:
#!/bin/bash # Check for any Java-related compatibility code or comments rg -B 2 -A 2 "java" --type rust # Also check the specific enum implementation more thoroughly ast-grep --pattern 'enum LocalTransactionState { $$$ }'Length of output: 3174
rocketmq-common/src/common/message/message_decoder.rs (1)
18-18: LGTM! Import changes are well-organized.The new imports are properly organized and necessary for the message ID decoding functionality.
Also applies to: 34-34, 39-39
rocketmq-client/src/producer/mq_producer.rs (1)
431-437: LGTM! The signature changes improve flexibility and safety.The updated method signature introduces several improvements:
- Generic type parameter allows passing any thread-safe type as transaction argument
- Taking ownership of the message is more idiomatic in Rust
- Mutability requirement ensures thread-safety
Let's verify that all implementations have been updated to match the new signature:
✅ Verification successful
All implementations correctly match the new signature
The search results show that all implementations of
send_message_in_transactionhave been properly updated to match the new signature:
transaction_mq_producer.rs: Implements with the new generic signaturedefault_mq_producer.rs: Implements with the new signature but explicitly unimplements the functionalitydefault_mq_producer_impl.rs: Internal implementation matches the new signatureThe changes are consistent across the codebase and all implementations properly handle the breaking changes introduced in the trait.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for implementations of send_message_in_transaction ast-grep --pattern 'impl $_ for $_ { $$$ async fn send_message_in_transaction$_($_, $_, $_) { $$$ } $$$ }'Length of output: 120
Script:
#!/bin/bash # Let's try a broader search for send_message_in_transaction implementations rg "send_message_in_transaction" -A 5 -B 2Length of output: 3988
rocketmq-client/src/producer/default_mq_producer.rs (2)
351-356: LGTM: Thread-safe producer implementation handling.The implementation correctly manages thread-safe sharing of the producer implementation using
ArcRefCellWrapper.
1005-1013:⚠️ Potential issueImplementation missing for transaction message support.
The PR aims to add transaction message support, but the
send_message_in_transactionmethod is marked as unimplemented. This will cause runtime panics when called. Please implement the transaction message handling logic as per the PR objectives.Let's verify if there are any transaction-related implementations in other files:
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
49-49: LGTM!The import statement for
EndTransactionRequestHeaderis correctly placed and follows the existing import pattern.rocketmq-client/src/factory/mq_client_instance.rs (3)
69-69: LGTM: Improved type safety and mutability handling.The changes to field types enhance the implementation by:
- Using
ArcRefCellWrapperforclient_configenables thread-safe interior mutability- Replacing boxed trait objects with concrete
MQProducerInnerImplimproves type safety and performanceAlso applies to: 76-76
Line range hint
329-338: LGTM: Thread-safe producer registration.The implementation correctly handles producer registration with proper thread safety using RwLock and appropriate error handling.
1058-1061: LGTM: Clean producer selection implementation.The new
select_producermethod provides thread-safe access to producers with proper ownership handling through cloning.rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)
67-72:⚠️ Potential issueAvoid using
unreachable!()macro in production code.In the
get_check_listenermethod, usingunreachable!()may lead to runtime panics ifdefault_mqproducer_impl_innerisNone. This could cause the application to crash unexpectedly.Consider handling the
Nonecase gracefully or ensuring thatdefault_mqproducer_impl_innercannot beNone:pub fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>> { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { return default_mqproducer_impl_inner.get_check_listener(); } - unreachable!("default_mqproducer_impl_inner is None") + panic!("default_mqproducer_impl_inner should not be None"); }Alternatively, remove the
Optionwrapper as suggested earlier.Likely invalid or redundant comment.
85-89: Ensure mutable access is properly handled.The method
update_topic_publish_infotakes&mut self, but the borrowing ofdefault_mqproducer_impl_innershould ensure mutable access correctly. Verify that there are no data races or borrowing issues.Run the following script to check for any borrowing conflicts:
✅ Verification successful
Let me gather more information about the implementation and usage of
TopicPublishInfoto ensure thread safety and proper mutation.
Let me analyze the mutable access pattern by checking the implementation of
TopicPublishInfoand its usage.
Based on the code analysis, I can now provide the final response:
The mutable access is properly handled and safe.
The
TopicPublishInfostruct is designed to be thread-safe through:
#[derive(Clone)]implementation allowing safe copying- Storage in an
Arc<RwLock<HashMap>>in the producer implementation, providing synchronized access- Proper mutable access through
RwLockwhen updating viaupdate_topic_publish_infoThe implementation follows Rust's ownership and borrowing rules correctly, with synchronized access patterns preventing data races.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for mutable borrowing issues in update_topic_publish_info method. # Expect: No error messages about borrowing issues. rg 'update_topic_publish_info' -A 5Length of output: 2754
Script:
#!/bin/bash # Check TopicPublishInfo implementation and usage ast-grep --pattern 'struct TopicPublishInfo { $$$ }' # Check for any concurrent access patterns rg "topic_publish_info" -B 2 -A 2 # Check for any Arc/Mutex usage with TopicPublishInfo rg "Arc|Mutex.*TopicPublishInfo" -B 1 -A 1Length of output: 383290
Script:
#!/bin/bash # Check TopicPublishInfo implementation and usage ast-grep --pattern 'struct TopicPublishInfo { $$$ }' # Check for any concurrent access patterns in TopicPublishInfo rg "TopicPublishInfo" -A 5 -B 2 rocketmq-client/src/producer/producer_impl/topic_publish_info.rsLength of output: 696
rocketmq-client/src/implementation/client_remoting_processor.rs (1)
213-271:check_transaction_statemethod implementation looks correctThe
check_transaction_statemethod is implemented properly, with appropriate logic for handling transaction state checks.rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)
1861-1906: Handle possibleNonevalues for transaction IDsWhen retrieving and setting the transaction ID from message properties, ensure that
transaction_idis available to prevent potential issues.Confirm that
transaction_idis always present in message properties or handle theNonecase appropriately.
2078-2081: Handle potentialNonevalues when retrieving unique message IDWhen retrieving
unique_keyfrom message properties, if it isNone, you fall back tomsg.msg_id.clone(). Ensure thatmsg.msg_idis always set or handle the case where both values could beNoneto prevent issues.Add checks or defaults to handle cases where both
unique_keyandmsg.msg_idmight be missing.
1973-2002: 🛠️ Refactor suggestionEnsure
end_transaction_hook_listis properly synchronizedThe
end_transaction_hook_listmay be accessed concurrently without synchronization. Consider wrapping it with a thread-safe structure likeArc<Mutex<...>>to prevent data races.Check if synchronization is necessary and implement appropriate thread safety mechanisms.
| pub struct EndTransactionContext<'a> { | ||
| pub producer_group: String, | ||
| pub broker_addr: String, | ||
| pub message: &'a Message, | ||
| pub msg_id: String, | ||
| pub transaction_id: String, | ||
| pub transaction_state: LocalTransactionState, | ||
| pub from_transaction_check: bool, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider encapsulating fields and adding validation.
The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation.
Here's a suggested implementation with better encapsulation and validation:
/// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
producer_group: String,
broker_addr: String,
message: &'a Message,
msg_id: String,
transaction_id: String,
transaction_state: LocalTransactionState,
from_transaction_check: bool,
}
impl<'a> EndTransactionContext<'a> {
/// Creates a new EndTransactionContext with validation
pub fn new(
producer_group: String,
broker_addr: String,
message: &'a Message,
msg_id: String,
transaction_id: String,
transaction_state: LocalTransactionState,
from_transaction_check: bool,
) -> Result<Self, &'static str> {
// Validate inputs
if producer_group.is_empty() {
return Err("producer_group cannot be empty");
}
if broker_addr.is_empty() {
return Err("broker_addr cannot be empty");
}
if msg_id.is_empty() {
return Err("msg_id cannot be empty");
}
if transaction_id.is_empty() {
return Err("transaction_id cannot be empty");
}
Ok(Self {
producer_group,
broker_addr,
message,
msg_id,
transaction_id,
transaction_state,
from_transaction_check,
})
}
// Getters
pub fn producer_group(&self) -> &str {
&self.producer_group
}
pub fn broker_addr(&self) -> &str {
&self.broker_addr
}
pub fn message(&self) -> &Message {
self.message
}
pub fn msg_id(&self) -> &str {
&self.msg_id
}
pub fn transaction_id(&self) -> &str {
&self.transaction_id
}
pub fn transaction_state(&self) -> LocalTransactionState {
self.transaction_state
}
pub fn from_transaction_check(&self) -> bool {
self.from_transaction_check
}
// Setter for mutable state only
pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
self.transaction_state = state;
}
}| pub fn decode_message_id(msg_id: &str) -> MessageId { | ||
| let bytes = util_all::string_to_bytes(msg_id).unwrap(); | ||
| let mut buffer = Bytes::from(bytes); | ||
| let len = if msg_id.len() == 32 { | ||
| let mut ip = [0u8; 4]; | ||
| buffer.copy_to_slice(&mut ip); | ||
| let port = buffer.get_i32(); | ||
| SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16) | ||
| } else { | ||
| let mut ip = [0u8; 16]; | ||
| buffer.copy_to_slice(&mut ip); | ||
| let port = buffer.get_i32(); | ||
| SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16) | ||
| }; | ||
| MessageId { | ||
| address: len, | ||
| offset: buffer.get_i64(), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add input validation and improve variable naming.
The function handles both IPv4 and IPv6 addresses well, but could be improved in two areas:
- There's no validation of the input string length before processing
- The variable name
lenis misleading as it stores a socket address
Apply these improvements:
pub fn decode_message_id(msg_id: &str) -> MessageId {
+ // Message ID should be either 32 chars (IPv4) or 56 chars (IPv6)
+ if msg_id.len() != 32 && msg_id.len() != 56 {
+ panic!("Invalid message ID length: {}", msg_id.len());
+ }
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
- let len = if msg_id.len() == 32 {
+ let address = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
- address: len,
+ address,
offset: buffer.get_i64(),
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn decode_message_id(msg_id: &str) -> MessageId { | |
| let bytes = util_all::string_to_bytes(msg_id).unwrap(); | |
| let mut buffer = Bytes::from(bytes); | |
| let len = if msg_id.len() == 32 { | |
| let mut ip = [0u8; 4]; | |
| buffer.copy_to_slice(&mut ip); | |
| let port = buffer.get_i32(); | |
| SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16) | |
| } else { | |
| let mut ip = [0u8; 16]; | |
| buffer.copy_to_slice(&mut ip); | |
| let port = buffer.get_i32(); | |
| SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16) | |
| }; | |
| MessageId { | |
| address: len, | |
| offset: buffer.get_i64(), | |
| } | |
| } | |
| pub fn decode_message_id(msg_id: &str) -> MessageId { | |
| // Message ID should be either 32 chars (IPv4) or 56 chars (IPv6) | |
| if msg_id.len() != 32 && msg_id.len() != 56 { | |
| panic!("Invalid message ID length: {}", msg_id.len()); | |
| } | |
| let bytes = util_all::string_to_bytes(msg_id).unwrap(); | |
| let mut buffer = Bytes::from(bytes); | |
| let address = if msg_id.len() == 32 { | |
| let mut ip = [0u8; 4]; | |
| buffer.copy_to_slice(&mut ip); | |
| let port = buffer.get_i32(); | |
| SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16) | |
| } else { | |
| let mut ip = [0u8; 16]; | |
| buffer.copy_to_slice(&mut ip); | |
| let port = buffer.get_i32(); | |
| SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16) | |
| }; | |
| MessageId { | |
| address, | |
| offset: buffer.get_i64(), | |
| } | |
| } |
| pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> { | ||
| let producer_table = self.producer_table.read().await; | ||
| producer_table.get(group).cloned() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the gathered information, I can now provide the final response:
Transaction producer cleanup needs implementation
The verification reveals incomplete producer cleanup implementation:
unregister_produceris properly implemented to handle producer table cleanup- However, transaction producer's
shutdown()is unimplemented (marked withunimplemented!()) - Default producer's shutdown only handles accumulator and trace dispatcher cleanup
🔗 Analysis chain
Verify producer cleanup in transaction scenarios.
Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure.
Let me gather more information about the producer cleanup implementation and transaction handling.
Let me gather more specific information about producer cleanup in the unregister flow.
Let me check one final aspect about producer cleanup in the implementation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove"
Length of output: 1583
Script:
#!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction"
# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'
# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"
Length of output: 22008
Script:
#!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
$$$
}'
# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"
Length of output: 3022
Script:
#!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
$$$
}'
# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"
Length of output: 3087
| use std::sync::Arc; | ||
|
|
||
| use parking_lot::Mutex; | ||
| use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import TransactionMQProducer for transactional messaging
Since this example is intended to demonstrate transactional message sending, you should import TransactionMQProducer instead of DefaultMQProducer to enable transactional functionality.
Apply this diff to import TransactionMQProducer:
-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
+use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; | |
| use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer; |
| let mut producer = builder | ||
| .producer_group(PRODUCER_GROUP.to_string()) | ||
| .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use TransactionMQProducer instead of DefaultMQProducer for transactional messaging
To properly handle transactional messages, you should instantiate a TransactionMQProducer rather than a DefaultMQProducer. This ensures that the producer is configured correctly for transactional operations.
Apply this diff to use TransactionMQProducer and attach the transaction listener:
-let builder = DefaultMQProducer::builder();
+let builder = TransactionMQProducer::builder();
let mut producer = builder
+ .transaction_listener(Arc::new(TransactionListenerImpl::default()))
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();Committable suggestion was skipped due to low confidence.
| pub async fn end_transaction( | ||
| &mut self, | ||
| msg: &Message, | ||
| send_result: &SendResult, | ||
| local_transaction_state: LocalTransactionState, | ||
| ) -> Result<()> { | ||
| let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id { | ||
| MessageDecoder::decode_message_id(offset_msg_id) | ||
| } else { | ||
| MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap()) | ||
| }; | ||
| let transaction_id = send_result.transaction_id.clone(); | ||
| let queue = self | ||
| .client_config | ||
| .queue_with_namespace(send_result.message_queue.clone().unwrap()); | ||
| let dest_broker_name = self | ||
| .client_instance | ||
| .as_mut() | ||
| .unwrap() | ||
| .get_broker_name_from_message_queue(&queue) | ||
| .await; | ||
| let broker_addr = self | ||
| .client_instance | ||
| .as_mut() | ||
| .unwrap() | ||
| .find_broker_address_in_publish(dest_broker_name.as_str()) | ||
| .await; | ||
| let request_header = EndTransactionRequestHeader { | ||
| topic: msg.get_topic().to_string(), | ||
| producer_group: self.producer_config.producer_group().to_string(), | ||
| tran_state_table_offset: send_result.queue_offset, | ||
| commit_log_offset: id.offset as u64, | ||
| commit_or_rollback: match local_transaction_state { | ||
| LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE, | ||
| LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE, | ||
| LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE, | ||
| }, | ||
| from_transaction_check: false, | ||
| msg_id: send_result.msg_id.clone().unwrap_or_default(), | ||
| transaction_id, | ||
| rpc_request_header: RpcRequestHeader { | ||
| broker_name: Some(dest_broker_name), | ||
| ..Default::default() | ||
| }, | ||
| }; | ||
| self.client_instance | ||
| .as_mut() | ||
| .unwrap() | ||
| .mq_client_api_impl | ||
| .as_mut() | ||
| .unwrap() | ||
| .end_transaction_oneway( | ||
| broker_addr.as_ref().unwrap(), | ||
| request_header, | ||
| "".to_string(), | ||
| self.producer_config.send_msg_timeout() as u64, | ||
| ) | ||
| .await; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check result of end_transaction_oneway to handle potential errors
In the end_transaction method, the result of end_transaction_oneway is not checked, which may ignore any errors that occur during the transaction ending process.
Modify the code to handle errors:
- self.client_instance
- .as_mut()
- .unwrap()
- .mq_client_api_impl
- .as_mut()
- .unwrap()
- .end_transaction_oneway(
- broker_addr.as_ref().unwrap(),
- request_header,
- "".to_string(),
- self.producer_config.send_msg_timeout() as u64,
- )
- .await;
+ let result = self.client_instance
+ .as_mut()
+ .unwrap()
+ .mq_client_api_impl
+ .as_mut()
+ .unwrap()
+ .end_transaction_oneway(
+ broker_addr.as_ref().unwrap(),
+ request_header,
+ "".to_string(),
+ self.producer_config.send_msg_timeout() as u64,
+ )
+ .await;
+ if let Err(e) = result {
+ warn!("Failed to end transaction: {}", e);
+ return Err(e);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn end_transaction( | |
| &mut self, | |
| msg: &Message, | |
| send_result: &SendResult, | |
| local_transaction_state: LocalTransactionState, | |
| ) -> Result<()> { | |
| let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id { | |
| MessageDecoder::decode_message_id(offset_msg_id) | |
| } else { | |
| MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap()) | |
| }; | |
| let transaction_id = send_result.transaction_id.clone(); | |
| let queue = self | |
| .client_config | |
| .queue_with_namespace(send_result.message_queue.clone().unwrap()); | |
| let dest_broker_name = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .get_broker_name_from_message_queue(&queue) | |
| .await; | |
| let broker_addr = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .find_broker_address_in_publish(dest_broker_name.as_str()) | |
| .await; | |
| let request_header = EndTransactionRequestHeader { | |
| topic: msg.get_topic().to_string(), | |
| producer_group: self.producer_config.producer_group().to_string(), | |
| tran_state_table_offset: send_result.queue_offset, | |
| commit_log_offset: id.offset as u64, | |
| commit_or_rollback: match local_transaction_state { | |
| LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE, | |
| LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE, | |
| LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE, | |
| }, | |
| from_transaction_check: false, | |
| msg_id: send_result.msg_id.clone().unwrap_or_default(), | |
| transaction_id, | |
| rpc_request_header: RpcRequestHeader { | |
| broker_name: Some(dest_broker_name), | |
| ..Default::default() | |
| }, | |
| }; | |
| self.client_instance | |
| .as_mut() | |
| .unwrap() | |
| .mq_client_api_impl | |
| .as_mut() | |
| .unwrap() | |
| .end_transaction_oneway( | |
| broker_addr.as_ref().unwrap(), | |
| request_header, | |
| "".to_string(), | |
| self.producer_config.send_msg_timeout() as u64, | |
| ) | |
| .await; | |
| Ok(()) | |
| } | |
| pub async fn end_transaction( | |
| &mut self, | |
| msg: &Message, | |
| send_result: &SendResult, | |
| local_transaction_state: LocalTransactionState, | |
| ) -> Result<()> { | |
| let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id { | |
| MessageDecoder::decode_message_id(offset_msg_id) | |
| } else { | |
| MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap()) | |
| }; | |
| let transaction_id = send_result.transaction_id.clone(); | |
| let queue = self | |
| .client_config | |
| .queue_with_namespace(send_result.message_queue.clone().unwrap()); | |
| let dest_broker_name = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .get_broker_name_from_message_queue(&queue) | |
| .await; | |
| let broker_addr = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .find_broker_address_in_publish(dest_broker_name.as_str()) | |
| .await; | |
| let request_header = EndTransactionRequestHeader { | |
| topic: msg.get_topic().to_string(), | |
| producer_group: self.producer_config.producer_group().to_string(), | |
| tran_state_table_offset: send_result.queue_offset, | |
| commit_log_offset: id.offset as u64, | |
| commit_or_rollback: match local_transaction_state { | |
| LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE, | |
| LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE, | |
| LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE, | |
| }, | |
| from_transaction_check: false, | |
| msg_id: send_result.msg_id.clone().unwrap_or_default(), | |
| transaction_id, | |
| rpc_request_header: RpcRequestHeader { | |
| broker_name: Some(dest_broker_name), | |
| ..Default::default() | |
| }, | |
| }; | |
| let result = self.client_instance | |
| .as_mut() | |
| .unwrap() | |
| .mq_client_api_impl | |
| .as_mut() | |
| .unwrap() | |
| .end_transaction_oneway( | |
| broker_addr.as_ref().unwrap(), | |
| request_header, | |
| "".to_string(), | |
| self.producer_config.send_msg_timeout() as u64, | |
| ) | |
| .await; | |
| if let Err(e) = result { | |
| warn!("Failed to end transaction: {}", e); | |
| return Err(e); | |
| } | |
| Ok(()) | |
| } |
| impl EndTransactionRequestHeader { | ||
| pub const TOPIC: &'static str = "topic"; | ||
| pub const PRODUCER_GROUP: &'static str = "producerGroup"; | ||
| pub const TRAN_STATE_TABLE_OFFSET: &'static str = "tranStateTableOffset"; | ||
| pub const COMMIT_LOG_OFFSET: &'static str = "commitLogOffset"; | ||
| pub const COMMIT_OR_ROLLBACK: &'static str = "commitOrRollback"; | ||
| pub const FROM_TRANSACTION_CHECK: &'static str = "fromTransactionCheck"; | ||
| pub const MSG_ID: &'static str = "msgId"; | ||
| pub const TRANSACTION_ID: &'static str = "transactionId"; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using an enum for constant field names
The constant field names are defined as pub const strings. Using an enum with &'static str variants can improve code organization and prevent typos.
Example:
pub enum Field {
Topic,
ProducerGroup,
TranStateTableOffset,
CommitLogOffset,
CommitOrRollback,
FromTransactionCheck,
MsgId,
TransactionId,
}
impl Field {
pub fn as_str(&self) -> &'static str {
match self {
Field::Topic => "topic",
Field::ProducerGroup => "producerGroup",
Field::TranStateTableOffset => "tranStateTableOffset",
Field::CommitLogOffset => "commitLogOffset",
Field::CommitOrRollback => "commitOrRollback",
Field::FromTransactionCheck => "fromTransactionCheck",
Field::MsgId => "msgId",
Field::TransactionId => "transactionId",
}
}
}Adjust the code to use Field::Topic.as_str() wherever needed.
| fn check_fields(&self) -> anyhow::Result<(), Error> { | ||
| if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback { | ||
| return Ok(()); | ||
| } | ||
| if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback { | ||
| return Ok(()); | ||
| } | ||
| if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback { | ||
| return Ok(()); | ||
| } | ||
| Err(anyhow!("commitOrRollback field wrong")) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling in check_fields method
The check_fields method defaults to an error message "commitOrRollback field wrong" when commit_or_rollback does not match any of the expected types. To improve debugging and clarity, consider including the invalid value in the error message.
Apply this diff to enhance the error message:
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
- Err(anyhow!("commitOrRollback field wrong"))
+ Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback))
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn check_fields(&self) -> anyhow::Result<(), Error> { | |
| if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| Err(anyhow!("commitOrRollback field wrong")) | |
| } | |
| fn check_fields(&self) -> anyhow::Result<(), Error> { | |
| if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback { | |
| return Ok(()); | |
| } | |
| Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback)) | |
| } |
| fn to_map(&self) -> Option<std::collections::HashMap<String, String>> { | ||
| let mut map = std::collections::HashMap::new(); | ||
| map.insert(Self::TOPIC.to_string(), self.topic.clone()); | ||
| map.insert( | ||
| Self::PRODUCER_GROUP.to_string(), | ||
| self.producer_group.clone(), | ||
| ); | ||
| map.insert( | ||
| Self::TRAN_STATE_TABLE_OFFSET.to_string(), | ||
| self.tran_state_table_offset.to_string(), | ||
| ); | ||
| map.insert( | ||
| Self::COMMIT_LOG_OFFSET.to_string(), | ||
| self.commit_log_offset.to_string(), | ||
| ); | ||
| map.insert( | ||
| Self::COMMIT_OR_ROLLBACK.to_string(), | ||
| self.commit_or_rollback.to_string(), | ||
| ); | ||
| map.insert( | ||
| Self::FROM_TRANSACTION_CHECK.to_string(), | ||
| self.from_transaction_check.to_string(), | ||
| ); | ||
| map.insert(Self::MSG_ID.to_string(), self.msg_id.clone()); | ||
| if let Some(value) = self.transaction_id.as_ref() { | ||
| map.insert(Self::TRANSACTION_ID.to_string(), value.clone()); | ||
| } | ||
| if let Some(value) = self.rpc_request_header.to_map() { | ||
| map.extend(value); | ||
| } | ||
| Some(map) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify to_map method using a serializer
Manually constructing the map in to_map is error-prone and verbose. Consider using serde_json::to_value followed by converting to a map, which leverages serialization attributes and ensures consistency.
Example:
fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
- let mut map = std::collections::HashMap::new();
- map.insert(Self::TOPIC.to_string(), self.topic.clone());
- // ... (other inserts)
- if let Some(value) = self.rpc_request_header.to_map() {
- map.extend(value);
- }
- Some(map)
+ let value = serde_json::to_value(self).ok()?;
+ let map = serde_json::from_value(value).ok()?;
+ Some(map)
}This approach reduces the potential for errors and keeps the method concise.
Committable suggestion was skipped due to low confidence.
| fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> { | ||
| Some(EndTransactionRequestHeader { | ||
| topic: map | ||
| .get(EndTransactionRequestHeader::TOPIC) | ||
| .cloned() | ||
| .unwrap_or_default(), | ||
| producer_group: map | ||
| .get(EndTransactionRequestHeader::PRODUCER_GROUP) | ||
| .cloned() | ||
| .unwrap_or_default(), | ||
| tran_state_table_offset: map | ||
| .get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET) | ||
| .and_then(|s| s.parse::<u64>().ok()) | ||
| .unwrap_or_default(), | ||
| commit_log_offset: map | ||
| .get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET) | ||
| .and_then(|s| s.parse::<u64>().ok()) | ||
| .unwrap_or_default(), | ||
| commit_or_rollback: map | ||
| .get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK) | ||
| .and_then(|s| s.parse::<i32>().ok()) | ||
| .unwrap_or_default(), | ||
| from_transaction_check: map | ||
| .get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK) | ||
| .and_then(|s| s.parse::<bool>().ok()) | ||
| .unwrap_or_default(), | ||
| msg_id: map | ||
| .get(EndTransactionRequestHeader::MSG_ID) | ||
| .cloned() | ||
| .unwrap_or_default(), | ||
| transaction_id: map | ||
| .get(EndTransactionRequestHeader::TRANSACTION_ID) | ||
| .cloned(), | ||
| rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid defaulting to zero on parse failure in from method
In the from method of the FromMap implementation, numeric fields default to zero when parsing fails. This might mask issues where invalid data is provided. It's safer to propagate the error or return None if parsing fails to prevent unintended behavior due to default values.
Apply this diff to handle parsing errors appropriately:
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
- .and_then(|s| s.parse::<u64>().ok())
- .unwrap_or_default(),
+ .and_then(|s| s.parse::<u64>().ok())?,
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
- .and_then(|s| s.parse::<u64>().ok())
- .unwrap_or_default(),
+ .and_then(|s| s.parse::<u64>().ok())?,
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
- .and_then(|s| s.parse::<i32>().ok())
- .unwrap_or_default(),
+ .and_then(|s| s.parse::<i32>().ok())?,
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
- .and_then(|s| s.parse::<bool>().ok())
- .unwrap_or_default(),
+ .and_then(|s| s.parse::<bool>().ok())?,
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}This change will cause the method to return None if parsing fails, making it explicit when invalid data is encountered.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> { | |
| Some(EndTransactionRequestHeader { | |
| topic: map | |
| .get(EndTransactionRequestHeader::TOPIC) | |
| .cloned() | |
| .unwrap_or_default(), | |
| producer_group: map | |
| .get(EndTransactionRequestHeader::PRODUCER_GROUP) | |
| .cloned() | |
| .unwrap_or_default(), | |
| tran_state_table_offset: map | |
| .get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET) | |
| .and_then(|s| s.parse::<u64>().ok()) | |
| .unwrap_or_default(), | |
| commit_log_offset: map | |
| .get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET) | |
| .and_then(|s| s.parse::<u64>().ok()) | |
| .unwrap_or_default(), | |
| commit_or_rollback: map | |
| .get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK) | |
| .and_then(|s| s.parse::<i32>().ok()) | |
| .unwrap_or_default(), | |
| from_transaction_check: map | |
| .get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK) | |
| .and_then(|s| s.parse::<bool>().ok()) | |
| .unwrap_or_default(), | |
| msg_id: map | |
| .get(EndTransactionRequestHeader::MSG_ID) | |
| .cloned() | |
| .unwrap_or_default(), | |
| transaction_id: map | |
| .get(EndTransactionRequestHeader::TRANSACTION_ID) | |
| .cloned(), | |
| rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(), | |
| }) | |
| } | |
| fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> { | |
| Some(EndTransactionRequestHeader { | |
| topic: map | |
| .get(EndTransactionRequestHeader::TOPIC) | |
| .cloned() | |
| .unwrap_or_default(), | |
| producer_group: map | |
| .get(EndTransactionRequestHeader::PRODUCER_GROUP) | |
| .cloned() | |
| .unwrap_or_default(), | |
| tran_state_table_offset: map | |
| .get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET) | |
| .and_then(|s| s.parse::<u64>().ok())?, | |
| commit_log_offset: map | |
| .get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET) | |
| .and_then(|s| s.parse::<u64>().ok())?, | |
| commit_or_rollback: map | |
| .get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK) | |
| .and_then(|s| s.parse::<i32>().ok())?, | |
| from_transaction_check: map | |
| .get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK) | |
| .and_then(|s| s.parse::<bool>().ok())?, | |
| msg_id: map | |
| .get(EndTransactionRequestHeader::MSG_ID) | |
| .cloned() | |
| .unwrap_or_default(), | |
| transaction_id: map | |
| .get(EndTransactionRequestHeader::TRANSACTION_ID) | |
| .cloned(), | |
| rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(), | |
| }) | |
| } |
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🧹 Outside diff range and nitpick comments (5)
rocketmq-client/src/producer/transaction_listener.rs (1)
28-28: LGTM: Improved parameter type for better thread safety and flexibilityThe change to
Option<&(dyn Any + Send + Sync)>is a good improvement because:
- Making the argument optional provides flexibility when no context is needed
- Adding
Send + Syncbounds ensures thread safety in concurrent transaction processingThis change aligns well with Rust's concurrency model and makes the transaction processing system more robust. The
Send + Syncbounds ensure that the transaction context can be safely shared across thread boundaries, which is crucial for a message queue system.rocketmq-client/src/producer/transaction_send_result.rs (1)
33-41: Consider enhancing the display format for better readabilityThe implementation is correct and safe, but consider making the output more user-friendly:
- Hide the Option wrapper syntax
- Add more descriptive field labels
- Format null states more elegantly
Consider this alternative implementation for better readability:
impl Display for TransactionSendResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "TransactionSendResult {{ local_transaction_state: {:?}, send_result: {:?} }}", - self.local_transaction_state, self.send_result + "TransactionSendResult [State: {}, Result: {}]", + self.local_transaction_state.as_ref().map_or("None", |s| &format!("{:?}", s)), + self.send_result.as_ref().map_or("None", |r| &format!("{}", r)) ) } }rocketmq-common/src/common/message/message_client_id_setter.rs (1)
Line range hint
123-128: LGTM! Optimization improves performance.The change optimizes the
set_uniq_idmethod by moving theuniq_idcreation inside the conditional block. This ensures thatcreate_uniq_id()is only called when necessary, avoiding unnecessary computation when the property is already set. The implementation remains thread-safe and well-tested.This optimization is particularly important in the context of transaction messages where high throughput is expected. The unique ID generation involves multiple operations (timestamp calculations, atomic counters), so avoiding unnecessary calls can significantly improve performance under load.
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (2)
Line range hint
1742-1746: Handle potential timeout errors when sending the requestIn the
requestmethod, after calculating thecost, the remaining timeout duration might become negative, leading to immediate timeout errors.Add a check to ensure the remaining timeout is valid:
let cost = begin_timestamp.elapsed().as_millis() as u64; + if timeout <= cost { + return Err(RequestTimeoutError( + ClientErrorCode::REQUEST_TIMEOUT_EXCEPTION, + "Request timed out before sending the message.".to_string(), + )); + } self.send_default_impl( &mut msg, CommunicationMode::Async, Some(Arc::new(send_callback)), timeout - cost, ) .await?;
156-160: Consider using immutable references if mutation is not requiredThe methods
send_with_timeoutandsendnow take&mut selfandmsg: &mut T. If the methods do not modifyselformsg, consider using immutable references to enhance code safety and clarity.Also applies to: 169-172
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (9)
- rocketmq-client/examples/transaction/transaction_producer.rs (1 hunks)
- rocketmq-client/src/producer/default_mq_producer.rs (6 hunks)
- rocketmq-client/src/producer/mq_producer.rs (1 hunks)
- rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (27 hunks)
- rocketmq-client/src/producer/transaction_listener.rs (2 hunks)
- rocketmq-client/src/producer/transaction_mq_produce_builder.rs (1 hunks)
- rocketmq-client/src/producer/transaction_mq_producer.rs (1 hunks)
- rocketmq-client/src/producer/transaction_send_result.rs (2 hunks)
- rocketmq-common/src/common/message/message_client_id_setter.rs (1 hunks)
🔇 Additional comments (15)
rocketmq-client/src/producer/transaction_listener.rs (1)
17-18: LGTM: Clean import additionThe explicit import of
std::any::Anyimproves code readability.rocketmq-client/src/producer/transaction_send_result.rs (1)
17-17: LGTM: Import correctly addedThe Display trait import is properly placed and necessary for the implementation.
rocketmq-client/examples/transaction/transaction_producer.rs (2)
23-23:⚠️ Potential issueRemove unused import
DefaultMQProducerThe
DefaultMQProducerimport is not used in this file since we're usingTransactionMQProducerfor transaction message support.-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;Likely invalid or redundant comment.
40-68: Verify transaction message implementation against RocketMQ specificationsPlease ensure this implementation follows RocketMQ's transaction message specifications, particularly around:
- Transaction message lifecycle
- Proper handling of transaction states
- Timeout and recovery mechanisms
✅ Verification successful
Let me gather more specific information about the transaction implementation.
Based on the comprehensive code analysis, I can now generate the final response:
Transaction message implementation follows RocketMQ specifications correctly
The implementation correctly handles:
- Transaction message lifecycle with proper state transitions (PREPARED, COMMIT, ROLLBACK)
- Transaction states through
LocalTransactionStateenum and corresponding system flags- Message properties and system flags for transaction messages
- Transaction listener interface with required methods for local transaction execution and checking
- Transaction producer configuration with proper thread pool settings for transaction checks
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for other transaction message implementations to compare rg -l "send_message_in_transaction|TransactionListener" --type rust # Look for transaction-related tests rg -l "test.*transaction" --type rustLength of output: 720
Script:
#!/bin/bash # Check transaction listener implementation cat rocketmq-client/src/producer/transaction_listener.rs # Check transaction producer implementation cat rocketmq-client/src/producer/transaction_mq_producer.rs # Check if there are any transaction state enums or constants rg -A 5 "enum.*Transaction|TRANSACTION" --type rustLength of output: 45758
rocketmq-client/src/producer/mq_producer.rs (1)
431-437: LGTM! The new signature provides better flexibility and safety.The changes to
send_message_in_transactionimprove the interface by:
- Making it generic over argument type
Twith thread-safety bounds- Taking ownership of the message for potential modifications
- Making the argument optional
- Supporting mutable state changes during transaction processing
Let's verify the impact on existing implementations:
✅ Verification successful
The previous ast-grep pattern might have failed due to syntax issues. Let's try a simpler ripgrep approach to find implementations and their context.
The previous results show that DefaultMQProducer doesn't implement the method (it's unimplemented), but we need to check the transaction-specific producer and implementation files to understand the full impact of the signature change.
The signature change is correctly implemented across the codebase
The verification shows that:
- The
TransactionMQProducerhas already implemented the new generic signature correctly- The implementation in
default_mq_producer_impl.rsproperly handles the boxed generic argumentDefaultMQProducerexplicitly marks it as unimplemented as expectedThe changes are consistent throughout the codebase and maintain proper type safety and flexibility.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for existing implementations of send_message_in_transaction ast-grep --pattern 'impl $_ for $_ { $$$ send_message_in_transaction($$$) { $$$ } $$$ }'Length of output: 102
Script:
#!/bin/bash # Let's try a different approach to find implementations # 1. First find all files that might contain implementations rg -l "send_message_in_transaction" # 2. Then look for actual implementations with context rg "impl.*send_message_in_transaction" -A 10Length of output: 1309
Script:
#!/bin/bash # Let's check the transaction producer implementation specifically rg -B 5 -A 15 "send_message_in_transaction" rocketmq-client/src/producer/transaction_mq_producer.rs # Also check the producer_impl for any existing implementations rg -B 5 -A 15 "send_message_in_transaction" rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rsLength of output: 1936
rocketmq-client/src/producer/default_mq_producer.rs (4)
351-356: LGTM: Thread-safe producer implementation setupThe implementation correctly uses
ArcRefCellWrapperfor thread-safe reference counting and interior mutability.
Line range hint
506-518: LGTM: Comprehensive message sending implementationThe implementation properly handles both synchronous and asynchronous message sending scenarios with appropriate mutability.
704-704: LGTM: Proper timeout handlingThe implementation correctly handles message timeout with appropriate mutability and error handling.
1017-1022: LGTM: Proper batch message handlingThe implementation correctly handles batch message sending with appropriate mutability and error handling, both with and without timeout.
Also applies to: 1032-1037
rocketmq-client/src/producer/transaction_mq_produce_builder.rs (2)
94-97: LGTM: Theclient_configsetter method is correctly implementedThe
client_configmethod appropriately sets the client configuration and supports method chaining.
316-325: Avoid cloning configurations unnecessarilyIn the
buildmethod, when initializingDefaultMQProducerImpl, configurations are cloned:let producer_impl = DefaultMQProducerImpl::new( mq_producer.client_config().clone(), mq_producer.producer_config().clone(), mq_producer.rpc_hook().clone(), );Cloning can be expensive for large configurations. If possible, consider passing references instead of cloning to improve performance.
[performance]
Modify the
DefaultMQProducerImpl::newmethod to accept references:- pub fn new(client_config: ClientConfig, producer_config: ProducerConfig, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self { + pub fn new(client_config: &ClientConfig, producer_config: &ProducerConfig, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self { // Implementation... }Then, adjust the call in the
buildmethod:let producer_impl = DefaultMQProducerImpl::new( - mq_producer.client_config().clone(), - mq_producer.producer_config().clone(), + mq_producer.client_config(), + mq_producer.producer_config(), mq_producer.rpc_hook().clone(), );rocketmq-client/src/producer/transaction_mq_producer.rs (1)
1-523: Code implementation aligns with transactional message supportThe
TransactionMQProducerstruct and its methods are well-implemented, providing the necessary functionality for transactional message production. The code is well-organized, follows Rust best practices, and integrates seamlessly with the existing producer architecture.rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)
156-160: Verify the necessity of changing method signatures to mutable referencesThe methods
send_with_timeoutandsendnow take&mut selfandmsg: &mut Tinstead of&selfandmsg: &T. This change increases the mutability requirements and may affect thread safety and code reusability. Ensure that the mutability is necessary and does not lead to unintended side effects.Also applies to: 169-172
1921-1980:⚠️ Potential issueCheck the result of
end_transaction_onewayto handle potential errorsIn the
end_transactionmethod, the result ofend_transaction_onewayis not checked, which may ignore any errors during the transaction ending process. This can lead to inconsistent transaction states.Modify the code to handle errors:
self.client_instance .as_mut() .unwrap() .mq_client_api_impl .as_mut() .unwrap() .end_transaction_oneway( broker_addr.as_ref().unwrap(), request_header, "".to_string(), self.producer_config.send_msg_timeout() as u64, ) .await; + if let Err(e) = result { + warn!("Failed to end transaction: {}", e); + return Err(e); + }Likely invalid or redundant comment.
2202-2214:⚠️ Potential issueEnsure proper error handling when registering the producer
When registering the producer in the
start_with_factorymethod, ifregister_okisfalse, the service state is reset but the method proceeds without returning an error. This might lead to the producer running in an inconsistent state.Apply this diff to return the error after resetting the service state:
if !register_ok { self.service_state = ServiceState::CreateJust; - // Missing error return + return Err(MQClientError::MQClientErr( + -1, + format!( + "The producer group[{}] has been created before, specify another name please. {}", + self.producer_config.producer_group(), + FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL) + ), + )); }Likely invalid or redundant comment.
| let mut producer = builder | ||
| .producer_group(PRODUCER_GROUP.to_string()) | ||
| .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | ||
| .topics(vec![TOPIC.to_string()]) | ||
| .transaction_listener(TransactionListenerImpl::default()) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding retry configuration for better reliability
The producer configuration could benefit from retry settings to handle temporary network issues or broker unavailability.
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC.to_string()])
.transaction_listener(TransactionListenerImpl::default())
+ .retry_times(3)
+ .retry_interval_ms(1000)
.build();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut producer = builder | |
| .producer_group(PRODUCER_GROUP.to_string()) | |
| .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | |
| .topics(vec![TOPIC.to_string()]) | |
| .transaction_listener(TransactionListenerImpl::default()) | |
| .build(); | |
| let mut producer = builder | |
| .producer_group(PRODUCER_GROUP.to_string()) | |
| .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | |
| .topics(vec![TOPIC.to_string()]) | |
| .transaction_listener(TransactionListenerImpl::default()) | |
| .retry_times(3) | |
| .retry_interval_ms(1000) | |
| .build(); |
| let value = self | ||
| .transaction_index | ||
| .fetch_add(1, std::sync::atomic::Ordering::AcqRel); | ||
| let status = value % 3; | ||
| let mut guard = self.local_trans.lock(); | ||
| guard.insert(msg.get_transaction_id().to_string(), status); | ||
| LocalTransactionState::Unknown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve transaction state management and logging
The current implementation has several areas for improvement:
- Transaction state determination is overly simplistic using modulo
- Missing logging for state changes
- No error handling for transaction ID
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-let status = value % 3;
+// Implement proper business logic for transaction state determination
+let status = determine_transaction_status(msg, value);
let mut guard = self.local_trans.lock();
-guard.insert(msg.get_transaction_id().to_string(), status);
+if let Some(tx_id) = msg.get_transaction_id() {
+ guard.insert(tx_id.to_string(), status);
+ log::info!("Transaction state set: ID={}, status={}", tx_id, status);
+} else {
+ log::error!("Missing transaction ID in message");
+}Committable suggestion was skipped due to low confidence.
| for _ in 0..10 { | ||
| let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes()); | ||
| let send_result = producer | ||
| .send_message_in_transaction::<()>(message, None) | ||
| .await?; | ||
| println!("send result: {}", send_result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and logging for send results
The current implementation only prints send results to stdout. Consider using proper logging for better observability in production environments.
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
- println!("send result: {}", send_result);
+ log::info!("Transaction message sent: {}", send_result);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _ in 0..10 { | |
| let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes()); | |
| let send_result = producer | |
| .send_message_in_transaction::<()>(message, None) | |
| .await?; | |
| println!("send result: {}", send_result); | |
| } | |
| for _ in 0..10 { | |
| let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes()); | |
| let send_result = producer | |
| .send_message_in_transaction::<()>(message, None) | |
| .await?; | |
| log::info!("Transaction message sent: {}", send_result); | |
| } |
| fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState { | ||
| let mut guard = self.local_trans.lock(); | ||
| let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1); | ||
| match status { | ||
| 1 => LocalTransactionState::CommitMessage, | ||
| 2 => LocalTransactionState::RollbackMessage, | ||
| _ => LocalTransactionState::Unknown, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve transaction check robustness and logging
The current implementation could panic on missing transaction IDs and lacks proper logging.
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
- let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
- match status {
- 1 => LocalTransactionState::CommitMessage,
- 2 => LocalTransactionState::RollbackMessage,
- _ => LocalTransactionState::Unknown,
+ let tx_id = msg.get_transaction_id();
+ if let Some(tx_id) = tx_id {
+ let status = guard.get(tx_id).copied().unwrap_or(-1);
+ let state = match status {
+ 1 => LocalTransactionState::CommitMessage,
+ 2 => LocalTransactionState::RollbackMessage,
+ _ => LocalTransactionState::Unknown,
+ };
+ log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state);
+ state
+ } else {
+ log::error!("Missing transaction ID in message during check");
+ LocalTransactionState::RollbackMessage
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState { | |
| let mut guard = self.local_trans.lock(); | |
| let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1); | |
| match status { | |
| 1 => LocalTransactionState::CommitMessage, | |
| 2 => LocalTransactionState::RollbackMessage, | |
| _ => LocalTransactionState::Unknown, | |
| } | |
| } | |
| fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState { | |
| let mut guard = self.local_trans.lock(); | |
| let tx_id = msg.get_transaction_id(); | |
| if let Some(tx_id) = tx_id { | |
| let status = guard.get(tx_id).copied().unwrap_or(-1); | |
| let state = match status { | |
| 1 => LocalTransactionState::CommitMessage, | |
| 2 => LocalTransactionState::RollbackMessage, | |
| _ => LocalTransactionState::Unknown, | |
| }; | |
| log::info!("Checked transaction state: ID={}, status={:?}", tx_id, state); | |
| state | |
| } else { | |
| log::error!("Missing transaction ID in message during check"); | |
| LocalTransactionState::RollbackMessage | |
| } | |
| } |
| async fn send_message_in_transaction<T>( | ||
| &mut self, | ||
| msg: Message, | ||
| arg: Option<T>, | ||
| ) -> Result<TransactionSendResult> | ||
| where | ||
| T: std::any::Any + Sync + Send, | ||
| { | ||
| unimplemented!("DefaultMQProducer not support send_message_in_transaction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation missing for transaction message support
The PR aims to add transaction message support, but the method is marked as unimplemented!(). This will cause runtime panics when called. Consider implementing the transaction message support as per the PR objectives.
Additionally, the error message could be more descriptive to help users understand why the operation isn't supported.
Apply this change to improve the error message:
- unimplemented!("DefaultMQProducer not support send_message_in_transaction")
+ Err(MQClientErr(
+ -1,
+ "Transaction messages are not supported by DefaultMQProducer. Please use TransactionMQProducer instead.".to_string(),
+ ))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn send_message_in_transaction<T>( | |
| &mut self, | |
| msg: Message, | |
| arg: Option<T>, | |
| ) -> Result<TransactionSendResult> | |
| where | |
| T: std::any::Any + Sync + Send, | |
| { | |
| unimplemented!("DefaultMQProducer not support send_message_in_transaction") | |
| async fn send_message_in_transaction<T>( | |
| &mut self, | |
| msg: Message, | |
| arg: Option<T>, | |
| ) -> Result<TransactionSendResult> | |
| where | |
| T: std::any::Any + Sync + Send, | |
| { | |
| Err(MQClientErr( | |
| -1, | |
| "Transaction messages are not supported by DefaultMQProducer. Please use TransactionMQProducer instead.".to_string(), | |
| )) |
| pub fn client_config(mut self, client_config: ClientConfig) -> Self { | ||
| self.client_config = Some(client_config); | ||
| self | ||
| } | ||
|
|
||
| pub fn default_mqproducer_impl( | ||
| mut self, | ||
| default_mqproducer_impl: DefaultMQProducerImpl, | ||
| ) -> Self { | ||
| self.default_mqproducer_impl = Some(default_mqproducer_impl); | ||
| self | ||
| } | ||
|
|
||
| pub fn retry_response_codes(mut self, retry_response_codes: HashSet<i32>) -> Self { | ||
| self.retry_response_codes = Some(retry_response_codes); | ||
| self | ||
| } | ||
|
|
||
| pub fn producer_group(mut self, producer_group: impl Into<String>) -> Self { | ||
| self.producer_group = Some(producer_group.into()); | ||
| self | ||
| } | ||
|
|
||
| pub fn topics(mut self, topics: Vec<String>) -> Self { | ||
| self.topics = Some(topics); | ||
| self | ||
| } | ||
|
|
||
| pub fn name_server_addr(mut self, name_server_addr: String) -> Self { | ||
| if let Some(client_config) = self.client_config.as_mut() { | ||
| client_config.namesrv_addr = Some(name_server_addr); | ||
| client_config | ||
| .namespace_initialized | ||
| .store(false, std::sync::atomic::Ordering::Release); | ||
| } | ||
| self | ||
| } | ||
|
|
||
| pub fn create_topic_key(mut self, create_topic_key: String) -> Self { | ||
| self.create_topic_key = Some(create_topic_key); | ||
| self | ||
| } | ||
|
|
||
| pub fn default_topic_queue_nums(mut self, default_topic_queue_nums: u32) -> Self { | ||
| self.default_topic_queue_nums = Some(default_topic_queue_nums); | ||
| self | ||
| } | ||
|
|
||
| pub fn send_msg_timeout(mut self, send_msg_timeout: u32) -> Self { | ||
| self.send_msg_timeout = Some(send_msg_timeout); | ||
| self | ||
| } | ||
|
|
||
| pub fn compress_msg_body_over_howmuch(mut self, compress_msg_body_over_howmuch: u32) -> Self { | ||
| self.compress_msg_body_over_howmuch = Some(compress_msg_body_over_howmuch); | ||
| self | ||
| } | ||
|
|
||
| pub fn retry_times_when_send_failed(mut self, retry_times_when_send_failed: u32) -> Self { | ||
| self.retry_times_when_send_failed = Some(retry_times_when_send_failed); | ||
| self | ||
| } | ||
|
|
||
| pub fn retry_times_when_send_async_failed( | ||
| mut self, | ||
| retry_times_when_send_async_failed: u32, | ||
| ) -> Self { | ||
| self.retry_times_when_send_async_failed = Some(retry_times_when_send_async_failed); | ||
| self | ||
| } | ||
|
|
||
| pub fn retry_another_broker_when_not_store_ok( | ||
| mut self, | ||
| retry_another_broker_when_not_store_ok: bool, | ||
| ) -> Self { | ||
| self.retry_another_broker_when_not_store_ok = Some(retry_another_broker_when_not_store_ok); | ||
| self | ||
| } | ||
|
|
||
| pub fn max_message_size(mut self, max_message_size: u32) -> Self { | ||
| self.max_message_size = Some(max_message_size); | ||
| self | ||
| } | ||
|
|
||
| pub fn trace_dispatcher( | ||
| mut self, | ||
| trace_dispatcher: Arc<Box<dyn TraceDispatcher + Send + Sync>>, | ||
| ) -> Self { | ||
| self.trace_dispatcher = Some(trace_dispatcher); | ||
| self | ||
| } | ||
|
|
||
| pub fn auto_batch(mut self, auto_batch: bool) -> Self { | ||
| self.auto_batch = Some(auto_batch); | ||
| self | ||
| } | ||
|
|
||
| pub fn produce_accumulator(mut self, produce_accumulator: ProduceAccumulator) -> Self { | ||
| self.produce_accumulator = Some(produce_accumulator); | ||
| self | ||
| } | ||
|
|
||
| pub fn enable_backpressure_for_async_mode( | ||
| mut self, | ||
| enable_backpressure_for_async_mode: bool, | ||
| ) -> Self { | ||
| self.enable_backpressure_for_async_mode = Some(enable_backpressure_for_async_mode); | ||
| self | ||
| } | ||
|
|
||
| pub fn back_pressure_for_async_send_num( | ||
| mut self, | ||
| back_pressure_for_async_send_num: u32, | ||
| ) -> Self { | ||
| self.back_pressure_for_async_send_num = Some(back_pressure_for_async_send_num); | ||
| self | ||
| } | ||
|
|
||
| pub fn back_pressure_for_async_send_size( | ||
| mut self, | ||
| back_pressure_for_async_send_size: u32, | ||
| ) -> Self { | ||
| self.back_pressure_for_async_send_size = Some(back_pressure_for_async_send_size); | ||
| self | ||
| } | ||
|
|
||
| pub fn rpc_hook(mut self, rpc_hook: Box<dyn RPCHook>) -> Self { | ||
| self.rpc_hook = Some(Arc::new(rpc_hook)); | ||
| self | ||
| } | ||
|
|
||
| pub fn compress_level(mut self, compress_level: i32) -> Self { | ||
| self.compress_level = Some(compress_level); | ||
| self | ||
| } | ||
|
|
||
| pub fn compress_type(mut self, compress_type: CompressionType) -> Self { | ||
| self.compress_type = Some(compress_type); | ||
| self | ||
| } | ||
|
|
||
| pub fn compressor(mut self, compressor: Arc<Box<dyn Compressor + Send + Sync>>) -> Self { | ||
| self.compressor = Some(compressor); | ||
| self | ||
| } | ||
|
|
||
| pub fn transaction_listener(mut self, transaction_listener: impl TransactionListener) -> Self { | ||
| self.transaction_listener = Some(Arc::new(Box::new(transaction_listener))); | ||
| self | ||
| } | ||
|
|
||
| pub fn build(self) -> TransactionMQProducer { | ||
| let mut mq_producer = DefaultMQProducer::default(); | ||
| if let Some(client_config) = self.client_config { | ||
| mq_producer.set_client_config(client_config); | ||
| } | ||
|
|
||
| if let Some(retry_response_codes) = self.retry_response_codes { | ||
| mq_producer.set_retry_response_codes(retry_response_codes); | ||
| } | ||
| if let Some(producer_group) = self.producer_group { | ||
| mq_producer.set_producer_group(producer_group); | ||
| } | ||
| if let Some(topics) = self.topics { | ||
| mq_producer.set_topics(topics); | ||
| } | ||
| if let Some(create_topic_key) = self.create_topic_key { | ||
| mq_producer.set_create_topic_key(create_topic_key); | ||
| } | ||
| if let Some(default_topic_queue_nums) = self.default_topic_queue_nums { | ||
| mq_producer.set_default_topic_queue_nums(default_topic_queue_nums); | ||
| } | ||
| if let Some(send_msg_timeout) = self.send_msg_timeout { | ||
| mq_producer.set_send_msg_timeout(send_msg_timeout); | ||
| } | ||
| if let Some(compress_msg_body_over_howmuch) = self.compress_msg_body_over_howmuch { | ||
| mq_producer.set_compress_msg_body_over_howmuch(compress_msg_body_over_howmuch); | ||
| } | ||
| if let Some(retry_times_when_send_failed) = self.retry_times_when_send_failed { | ||
| mq_producer.set_retry_times_when_send_failed(retry_times_when_send_failed); | ||
| } | ||
| if let Some(retry_times_when_send_async_failed) = self.retry_times_when_send_async_failed { | ||
| mq_producer.set_retry_times_when_send_async_failed(retry_times_when_send_async_failed); | ||
| } | ||
| if let Some(retry_another_broker_when_not_store_ok) = | ||
| self.retry_another_broker_when_not_store_ok | ||
| { | ||
| mq_producer | ||
| .set_retry_another_broker_when_not_store_ok(retry_another_broker_when_not_store_ok); | ||
| } | ||
| if let Some(max_message_size) = self.max_message_size { | ||
| mq_producer.set_max_message_size(max_message_size); | ||
| } | ||
|
|
||
| mq_producer.set_trace_dispatcher(self.trace_dispatcher); | ||
| if let Some(auto_batch) = self.auto_batch { | ||
| mq_producer.set_auto_batch(auto_batch); | ||
| } | ||
| if let Some(produce_accumulator) = self.produce_accumulator { | ||
| mq_producer.set_produce_accumulator(Some(produce_accumulator)); | ||
| } | ||
|
|
||
| if let Some(enable_backpressure_for_async_mode) = self.enable_backpressure_for_async_mode { | ||
| mq_producer.set_enable_backpressure_for_async_mode(enable_backpressure_for_async_mode); | ||
| } | ||
| if let Some(back_pressure_for_async_send_num) = self.back_pressure_for_async_send_num { | ||
| mq_producer.set_back_pressure_for_async_send_num(back_pressure_for_async_send_num); | ||
| } | ||
| if let Some(back_pressure_for_async_send_size) = self.back_pressure_for_async_send_size { | ||
| mq_producer.set_back_pressure_for_async_send_size(back_pressure_for_async_send_size); | ||
| } | ||
| mq_producer.set_rpc_hook(self.rpc_hook); | ||
| if let Some(compress_level) = self.compress_level { | ||
| mq_producer.set_compress_level(compress_level); | ||
| } | ||
| if let Some(compress_type) = self.compress_type { | ||
| mq_producer.set_compress_type(compress_type); | ||
| } | ||
| if let Some(compressor) = self.compressor { | ||
| mq_producer.set_compressor(Some(compressor)); | ||
| } | ||
|
|
||
| if let Some(default_mqproducer_impl) = self.default_mqproducer_impl { | ||
| mq_producer.set_default_mqproducer_impl(default_mqproducer_impl); | ||
| } else { | ||
| let producer_impl = DefaultMQProducerImpl::new( | ||
| mq_producer.client_config().clone(), | ||
| mq_producer.producer_config().clone(), | ||
| mq_producer.rpc_hook().clone(), | ||
| ); | ||
| mq_producer.set_default_mqproducer_impl(producer_impl); | ||
| } | ||
| let transaction_producer_config = TransactionProducerConfig { | ||
| transaction_listener: self.transaction_listener, | ||
| check_thread_pool_min_size: 0, | ||
| check_thread_pool_max_size: 0, | ||
| check_request_hold_max: 0, | ||
| check_runtime: self.check_runtime, | ||
| }; | ||
| TransactionMQProducer::new(transaction_producer_config, mq_producer) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using macros or iterators to reduce repetitive code in setter methods
The builder currently contains numerous setter methods and repetitive if let statements in the build method for setting optional configurations. This can be streamlined to improve maintainability and reduce boilerplate code.
You can use macros to generate setter methods and to iterate over the optional fields in the build method. For example, define a macro for setter methods:
macro_rules! builder_setter {
($field:ident, $type:ty) => {
pub fn $field(mut self, value: $type) -> Self {
self.$field = Some(value);
self
}
};
}
// Use the macro for your fields
impl TransactionMQProducerBuilder {
builder_setter!(producer_group, String);
builder_setter!(topics, Vec<String>);
// Repeat for other fields...
}In the build method, you can iterate over fields:
if let Some(value) = self.producer_group {
mq_producer.set_producer_group(value);
}
// Instead of writing an `if let` for each field, consider using a loop or macro if possibleThis refactoring can make the code cleaner and easier to manage, especially as the number of configurable fields grows.
| default_mqproducer_impl_inner: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>, | ||
| transaction_listener: Option<Arc<Box<dyn TransactionListener>>>, | ||
| check_runtime: Option<Arc<RocketMQRuntime>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider initializing new fields in the constructor for clarity and safety
The newly added fields default_mqproducer_impl_inner, transaction_listener, and check_runtime are initialized to None. To ensure these fields are properly set before use and to prevent potential None dereferences, consider initializing them with default values or adding checks where they are used.
Apply this diff to initialize the fields in the constructor:
pub struct DefaultMQProducerImpl {
// ...
default_mqproducer_impl_inner: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
transaction_listener: Option<Arc<Box<dyn TransactionListener>>>,
check_runtime: Option<Arc<RocketMQRuntime>>,
}
impl DefaultMQProducerImpl {
pub fn new(
client_config: ClientConfig,
producer_config: ProducerConfig,
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
) -> Self {
// ...
DefaultMQProducerImpl {
// ...
- default_mqproducer_impl_inner: None,
- transaction_listener: None,
- check_runtime: None,
+ default_mqproducer_impl_inner: Some(ArcRefCellWrapper::new(self.clone())),
+ transaction_listener: Some(Arc::new(Box::new(DefaultTransactionListener::new()))),
+ check_runtime: Some(Arc::new(RocketMQRuntime::new_multi(
+ num_cpus::get(),
+ "transaction-check",
+ ))),
}
}
}Also applies to: 149-151
| impl ServiceDetector for DefaultServiceDetector { | ||
| fn detect(&self, endpoint: &str, timeout_millis: u64) -> bool { | ||
| todo!() | ||
| unimplemented!("detect") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the detect method in DefaultServiceDetector
The detect method in DefaultServiceDetector is unimplemented, which means the functionality for service detection is incomplete.
Provide an implementation for the detect method:
impl ServiceDetector for DefaultServiceDetector {
fn detect(&self, endpoint: &str, timeout_millis: u64) -> bool {
- unimplemented!("detect")
+ // Implement actual detection logic here
+ // For example, perform a health check or ping to the endpoint
+ // Return true if the service is reachable, false otherwise
+ true // Placeholder implementation
}
}Committable suggestion was skipped due to low confidence.
| broker_addr: &str, | ||
| msg: MessageExt, | ||
| check_request_header: CheckTransactionStateRequestHeader, | ||
| ) { | ||
| todo!() | ||
| let transaction_listener = self.transaction_listener.clone().unwrap(); | ||
| let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap(); | ||
| let broker_addr = broker_addr.to_string(); | ||
| self.check_runtime | ||
| .as_ref() | ||
| .unwrap() | ||
| .get_handle() | ||
| .spawn(async move { | ||
| let mut unique_key = | ||
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | ||
| if unique_key.is_none() { | ||
| unique_key = Some(msg.msg_id.clone()); | ||
| } | ||
| let transaction_state = transaction_listener.check_local_transaction(&msg); | ||
| let request_header = EndTransactionRequestHeader { | ||
| topic: check_request_header.topic.clone().unwrap_or_default(), | ||
| producer_group: producer_impl_inner | ||
| .producer_config | ||
| .producer_group() | ||
| .to_string(), | ||
| tran_state_table_offset: check_request_header.commit_log_offset as u64, | ||
| commit_log_offset: check_request_header.commit_log_offset as u64, | ||
| commit_or_rollback: match transaction_state { | ||
| LocalTransactionState::CommitMessage => { | ||
| MessageSysFlag::TRANSACTION_COMMIT_TYPE | ||
| } | ||
| LocalTransactionState::RollbackMessage => { | ||
| MessageSysFlag::TRANSACTION_ROLLBACK_TYPE | ||
| } | ||
| LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE, | ||
| }, | ||
| from_transaction_check: true, | ||
| msg_id: unique_key.clone().unwrap_or_default(), | ||
| transaction_id: check_request_header.transaction_id.clone(), | ||
| rpc_request_header: RpcRequestHeader { | ||
| broker_name: check_request_header | ||
| .rpc_request_header | ||
| .unwrap_or_default() | ||
| .broker_name, | ||
| ..Default::default() | ||
| }, | ||
| }; | ||
| producer_impl_inner.do_execute_end_transaction_hook( | ||
| &msg.message, | ||
| unique_key.as_ref().unwrap(), | ||
| broker_addr.as_str(), | ||
| transaction_state, | ||
| true, | ||
| ); | ||
| let _ = producer_impl_inner | ||
| .client_instance | ||
| .as_mut() | ||
| .unwrap() | ||
| .mq_client_api_impl | ||
| .as_mut() | ||
| .unwrap() | ||
| .end_transaction_oneway( | ||
| broker_addr.as_str(), | ||
| request_header, | ||
| "".to_string(), | ||
| 3000, | ||
| ) | ||
| .await; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid potential panic by checking for None before unwrapping
In check_transaction_state, the code calls self.transaction_listener.clone().unwrap() without checking if transaction_listener is Some. If transaction_listener is None, this will cause a panic. Ensure that transaction_listener is initialized before use or add a check to handle the None case gracefully.
Apply this diff to add a check:
let transaction_listener = self.transaction_listener.clone().unwrap();
+ if transaction_listener.is_none() {
+ warn!("Transaction listener is not set.");
+ return;
+ }Committable suggestion was skipped due to low confidence.
| pub async fn send_message_in_transaction( | ||
| &mut self, | ||
| mut msg: M, | ||
| selector: MessageQueueSelectorFn, | ||
| arg: T, | ||
| timeout: u64, | ||
| send_callback: Option<SendMessageCallback>, | ||
| ) -> Result<SendResult> | ||
| where | ||
| M: MessageTrait + Clone + Send + Sync, | ||
| T: std::any::Any + Sync + Send, | ||
| { | ||
| let begin_start_time = Instant::now(); | ||
| self.make_sure_state_ok()?; | ||
| mut msg: Message, | ||
| arg: Option<Box<dyn Any + Send + Sync>>, | ||
| ) -> Result<TransactionSendResult> { | ||
| // ignore DelayTimeLevel parameter | ||
| if msg.get_delay_time_level() != 0 { | ||
| MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL); | ||
| } | ||
| Validators::check_message(Some(&msg), self.producer_config.as_ref())?; | ||
| let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await; | ||
| if let Some(topic_publish_info) = topic_publish_info { | ||
| if topic_publish_info.ok() { | ||
| let message_queue_list = self | ||
| .client_instance | ||
| .as_mut() | ||
| .unwrap() | ||
| .mq_admin_impl | ||
| .parse_publish_message_queues( | ||
| &topic_publish_info.message_queue_list, | ||
| &mut self.client_config, | ||
| MessageAccessor::put_property( | ||
| &mut msg, | ||
| MessageConst::PROPERTY_TRANSACTION_PREPARED, | ||
| "true", | ||
| ); | ||
| MessageAccessor::put_property( | ||
| &mut msg, | ||
| MessageConst::PROPERTY_PRODUCER_GROUP, | ||
| self.producer_config.producer_group(), | ||
| ); | ||
| let result = self.send(&mut msg).await; | ||
| if let Err(e) = result { | ||
| return Err(MQClientErr( | ||
| -1, | ||
| format!("send message in transaction error, {}", e), | ||
| )); | ||
| } | ||
| let send_result = result.unwrap().expect("send result is none"); | ||
| let local_transaction_state = match send_result.send_status { | ||
| SendStatus::SendOk => { | ||
| if let Some(ref transaction_id) = send_result.transaction_id { | ||
| msg.put_user_property( | ||
| MessageConst::PROPERTY_TRANSACTION_ID, | ||
| transaction_id.as_str(), | ||
| ); | ||
| let mut user_message = msg.clone(); | ||
| let user_topic = NamespaceUtil::without_namespace_with_namespace( | ||
| user_message.get_topic(), | ||
| self.client_config | ||
| .get_namespace() | ||
| .unwrap_or("".to_string()) | ||
| .as_str(), | ||
| ); | ||
| user_message.set_topic(user_topic.as_str()); | ||
| let message_queue = selector(&message_queue_list, &msg, &arg); | ||
| let cost_time = begin_start_time.elapsed().as_millis() as u64; | ||
| if timeout < cost_time { | ||
| return Err(MQClientError::RemotingTooMuchRequestException( | ||
| "sendSelectImpl call timeout".to_string(), | ||
| )); | ||
| } | ||
| if message_queue.is_some() { | ||
| let result = self | ||
| .send_kernel_impl( | ||
| &mut msg, | ||
| message_queue.as_ref().unwrap(), | ||
| CommunicationMode::Sync, | ||
| send_callback, | ||
| None, | ||
| timeout - cost_time, | ||
| ) | ||
| .await?; | ||
| return Ok(result.unwrap()); | ||
| let transaction_id = | ||
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | ||
| if let Some(transaction_id) = transaction_id { | ||
| msg.set_transaction_id(transaction_id.as_str()); | ||
| } | ||
| return Err(MQClientError::MQClientException( | ||
| -1, | ||
| "select message queue return None.".to_string(), | ||
| )); | ||
| self.transaction_listener | ||
| .as_ref() | ||
| .unwrap() | ||
| .execute_local_transaction(&msg, arg.as_deref()) | ||
| } | ||
| SendStatus::FlushDiskTimeout | ||
| | SendStatus::FlushSlaveTimeout | ||
| | SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage, | ||
| }; | ||
| if let Err(e) = self | ||
| .end_transaction(&msg, &send_result, local_transaction_state) | ||
| .await | ||
| { | ||
| warn!( | ||
| "local transaction execute {}, but end broker transaction failed,{}", | ||
| local_transaction_state, | ||
| e.to_string() | ||
| ); | ||
| } | ||
| self.validate_name_server_setting()?; | ||
| Err(MQClientError::MQClientException( | ||
| -1, | ||
| "No route info for this topic, ".to_string(), | ||
| )) | ||
| }*/ | ||
| let transaction_send_result = TransactionSendResult { | ||
| local_transaction_state: Some(local_transaction_state), | ||
| send_result: Some(send_result), | ||
| }; | ||
| Ok(transaction_send_result) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify the transaction listener is set before executing
In send_message_in_transaction, the method calls self.transaction_listener.as_ref().unwrap() without checking if transaction_listener is Some, which can cause a panic if it's None.
Add a check to ensure transaction_listener is initialized:
+ if self.transaction_listener.is_none() {
+ return Err(MQClientErr(
+ -1,
+ "Transaction listener is not set.".to_string(),
+ ));
+ }
let local_transaction_state = self
.transaction_listener
.as_ref()
.unwrap()
.execute_local_transaction(&msg, arg.as_deref());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn send_message_in_transaction( | |
| &mut self, | |
| mut msg: M, | |
| selector: MessageQueueSelectorFn, | |
| arg: T, | |
| timeout: u64, | |
| send_callback: Option<SendMessageCallback>, | |
| ) -> Result<SendResult> | |
| where | |
| M: MessageTrait + Clone + Send + Sync, | |
| T: std::any::Any + Sync + Send, | |
| { | |
| let begin_start_time = Instant::now(); | |
| self.make_sure_state_ok()?; | |
| mut msg: Message, | |
| arg: Option<Box<dyn Any + Send + Sync>>, | |
| ) -> Result<TransactionSendResult> { | |
| // ignore DelayTimeLevel parameter | |
| if msg.get_delay_time_level() != 0 { | |
| MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL); | |
| } | |
| Validators::check_message(Some(&msg), self.producer_config.as_ref())?; | |
| let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await; | |
| if let Some(topic_publish_info) = topic_publish_info { | |
| if topic_publish_info.ok() { | |
| let message_queue_list = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .mq_admin_impl | |
| .parse_publish_message_queues( | |
| &topic_publish_info.message_queue_list, | |
| &mut self.client_config, | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_TRANSACTION_PREPARED, | |
| "true", | |
| ); | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_PRODUCER_GROUP, | |
| self.producer_config.producer_group(), | |
| ); | |
| let result = self.send(&mut msg).await; | |
| if let Err(e) = result { | |
| return Err(MQClientErr( | |
| -1, | |
| format!("send message in transaction error, {}", e), | |
| )); | |
| } | |
| let send_result = result.unwrap().expect("send result is none"); | |
| let local_transaction_state = match send_result.send_status { | |
| SendStatus::SendOk => { | |
| if let Some(ref transaction_id) = send_result.transaction_id { | |
| msg.put_user_property( | |
| MessageConst::PROPERTY_TRANSACTION_ID, | |
| transaction_id.as_str(), | |
| ); | |
| let mut user_message = msg.clone(); | |
| let user_topic = NamespaceUtil::without_namespace_with_namespace( | |
| user_message.get_topic(), | |
| self.client_config | |
| .get_namespace() | |
| .unwrap_or("".to_string()) | |
| .as_str(), | |
| ); | |
| user_message.set_topic(user_topic.as_str()); | |
| let message_queue = selector(&message_queue_list, &msg, &arg); | |
| let cost_time = begin_start_time.elapsed().as_millis() as u64; | |
| if timeout < cost_time { | |
| return Err(MQClientError::RemotingTooMuchRequestException( | |
| "sendSelectImpl call timeout".to_string(), | |
| )); | |
| } | |
| if message_queue.is_some() { | |
| let result = self | |
| .send_kernel_impl( | |
| &mut msg, | |
| message_queue.as_ref().unwrap(), | |
| CommunicationMode::Sync, | |
| send_callback, | |
| None, | |
| timeout - cost_time, | |
| ) | |
| .await?; | |
| return Ok(result.unwrap()); | |
| let transaction_id = | |
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
| if let Some(transaction_id) = transaction_id { | |
| msg.set_transaction_id(transaction_id.as_str()); | |
| } | |
| return Err(MQClientError::MQClientException( | |
| -1, | |
| "select message queue return None.".to_string(), | |
| )); | |
| self.transaction_listener | |
| .as_ref() | |
| .unwrap() | |
| .execute_local_transaction(&msg, arg.as_deref()) | |
| } | |
| SendStatus::FlushDiskTimeout | |
| | SendStatus::FlushSlaveTimeout | |
| | SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage, | |
| }; | |
| if let Err(e) = self | |
| .end_transaction(&msg, &send_result, local_transaction_state) | |
| .await | |
| { | |
| warn!( | |
| "local transaction execute {}, but end broker transaction failed,{}", | |
| local_transaction_state, | |
| e.to_string() | |
| ); | |
| } | |
| self.validate_name_server_setting()?; | |
| Err(MQClientError::MQClientException( | |
| -1, | |
| "No route info for this topic, ".to_string(), | |
| )) | |
| }*/ | |
| let transaction_send_result = TransactionSendResult { | |
| local_transaction_state: Some(local_transaction_state), | |
| send_result: Some(send_result), | |
| }; | |
| Ok(transaction_send_result) | |
| } | |
| pub async fn send_message_in_transaction( | |
| &mut self, | |
| mut msg: Message, | |
| arg: Option<Box<dyn Any + Send + Sync>>, | |
| ) -> Result<TransactionSendResult> { | |
| // ignore DelayTimeLevel parameter | |
| if msg.get_delay_time_level() != 0 { | |
| MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL); | |
| } | |
| Validators::check_message(Some(&msg), self.producer_config.as_ref())?; | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_TRANSACTION_PREPARED, | |
| "true", | |
| ); | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_PRODUCER_GROUP, | |
| self.producer_config.producer_group(), | |
| ); | |
| let result = self.send(&mut msg).await; | |
| if let Err(e) = result { | |
| return Err(MQClientErr( | |
| -1, | |
| format!("send message in transaction error, {}", e), | |
| )); | |
| } | |
| let send_result = result.unwrap().expect("send result is none"); | |
| let local_transaction_state = match send_result.send_status { | |
| SendStatus::SendOk => { | |
| if let Some(ref transaction_id) = send_result.transaction_id { | |
| msg.put_user_property( | |
| MessageConst::PROPERTY_TRANSACTION_ID, | |
| transaction_id.as_str(), | |
| ); | |
| } | |
| let transaction_id = | |
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
| if let Some(transaction_id) = transaction_id { | |
| msg.set_transaction_id(transaction_id.as_str()); | |
| } | |
| if self.transaction_listener.is_none() { | |
| return Err(MQClientErr( | |
| -1, | |
| "Transaction listener is not set.".to_string(), | |
| )); | |
| } | |
| self.transaction_listener | |
| .as_ref() | |
| .unwrap() | |
| .execute_local_transaction(&msg, arg.as_deref()) | |
| } | |
| SendStatus::FlushDiskTimeout | |
| | SendStatus::FlushSlaveTimeout | |
| | SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage, | |
| }; | |
| if let Err(e) = self | |
| .end_transaction(&msg, &send_result, local_transaction_state) | |
| .await | |
| { | |
| warn!( | |
| "local transaction execute {}, but end broker transaction failed,{}", | |
| local_transaction_state, | |
| e.to_string() | |
| ); | |
| } | |
| let transaction_send_result = TransactionSendResult { | |
| local_transaction_state: Some(local_transaction_state), | |
| send_result: Some(send_result), | |
| }; | |
| Ok(transaction_send_result) | |
| } |
Handle errors more precisely in send_message_in_transaction
In the method send_message_in_transaction, when self.send(&mut msg).await returns an error, a generic error is created with a custom message. To provide more context and aid in debugging, consider propagating the original error instead of creating a new one.
Apply this diff to propagate the original error:
let result = self.send(&mut msg).await;
if let Err(e) = result {
- return Err(MQClientErr(
- -1,
- format!("send message in transaction error, {}", e),
- ));
+ return Err(e);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn send_message_in_transaction( | |
| &mut self, | |
| mut msg: M, | |
| selector: MessageQueueSelectorFn, | |
| arg: T, | |
| timeout: u64, | |
| send_callback: Option<SendMessageCallback>, | |
| ) -> Result<SendResult> | |
| where | |
| M: MessageTrait + Clone + Send + Sync, | |
| T: std::any::Any + Sync + Send, | |
| { | |
| let begin_start_time = Instant::now(); | |
| self.make_sure_state_ok()?; | |
| mut msg: Message, | |
| arg: Option<Box<dyn Any + Send + Sync>>, | |
| ) -> Result<TransactionSendResult> { | |
| // ignore DelayTimeLevel parameter | |
| if msg.get_delay_time_level() != 0 { | |
| MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL); | |
| } | |
| Validators::check_message(Some(&msg), self.producer_config.as_ref())?; | |
| let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await; | |
| if let Some(topic_publish_info) = topic_publish_info { | |
| if topic_publish_info.ok() { | |
| let message_queue_list = self | |
| .client_instance | |
| .as_mut() | |
| .unwrap() | |
| .mq_admin_impl | |
| .parse_publish_message_queues( | |
| &topic_publish_info.message_queue_list, | |
| &mut self.client_config, | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_TRANSACTION_PREPARED, | |
| "true", | |
| ); | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_PRODUCER_GROUP, | |
| self.producer_config.producer_group(), | |
| ); | |
| let result = self.send(&mut msg).await; | |
| if let Err(e) = result { | |
| return Err(MQClientErr( | |
| -1, | |
| format!("send message in transaction error, {}", e), | |
| )); | |
| } | |
| let send_result = result.unwrap().expect("send result is none"); | |
| let local_transaction_state = match send_result.send_status { | |
| SendStatus::SendOk => { | |
| if let Some(ref transaction_id) = send_result.transaction_id { | |
| msg.put_user_property( | |
| MessageConst::PROPERTY_TRANSACTION_ID, | |
| transaction_id.as_str(), | |
| ); | |
| let mut user_message = msg.clone(); | |
| let user_topic = NamespaceUtil::without_namespace_with_namespace( | |
| user_message.get_topic(), | |
| self.client_config | |
| .get_namespace() | |
| .unwrap_or("".to_string()) | |
| .as_str(), | |
| ); | |
| user_message.set_topic(user_topic.as_str()); | |
| let message_queue = selector(&message_queue_list, &msg, &arg); | |
| let cost_time = begin_start_time.elapsed().as_millis() as u64; | |
| if timeout < cost_time { | |
| return Err(MQClientError::RemotingTooMuchRequestException( | |
| "sendSelectImpl call timeout".to_string(), | |
| )); | |
| } | |
| if message_queue.is_some() { | |
| let result = self | |
| .send_kernel_impl( | |
| &mut msg, | |
| message_queue.as_ref().unwrap(), | |
| CommunicationMode::Sync, | |
| send_callback, | |
| None, | |
| timeout - cost_time, | |
| ) | |
| .await?; | |
| return Ok(result.unwrap()); | |
| let transaction_id = | |
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
| if let Some(transaction_id) = transaction_id { | |
| msg.set_transaction_id(transaction_id.as_str()); | |
| } | |
| return Err(MQClientError::MQClientException( | |
| -1, | |
| "select message queue return None.".to_string(), | |
| )); | |
| self.transaction_listener | |
| .as_ref() | |
| .unwrap() | |
| .execute_local_transaction(&msg, arg.as_deref()) | |
| } | |
| SendStatus::FlushDiskTimeout | |
| | SendStatus::FlushSlaveTimeout | |
| | SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage, | |
| }; | |
| if let Err(e) = self | |
| .end_transaction(&msg, &send_result, local_transaction_state) | |
| .await | |
| { | |
| warn!( | |
| "local transaction execute {}, but end broker transaction failed,{}", | |
| local_transaction_state, | |
| e.to_string() | |
| ); | |
| } | |
| self.validate_name_server_setting()?; | |
| Err(MQClientError::MQClientException( | |
| -1, | |
| "No route info for this topic, ".to_string(), | |
| )) | |
| }*/ | |
| let transaction_send_result = TransactionSendResult { | |
| local_transaction_state: Some(local_transaction_state), | |
| send_result: Some(send_result), | |
| }; | |
| Ok(transaction_send_result) | |
| } | |
| pub async fn send_message_in_transaction( | |
| &mut self, | |
| mut msg: Message, | |
| arg: Option<Box<dyn Any + Send + Sync>>, | |
| ) -> Result<TransactionSendResult> { | |
| // ignore DelayTimeLevel parameter | |
| if msg.get_delay_time_level() != 0 { | |
| MessageAccessor::clear_property(&mut msg, MessageConst::PROPERTY_DELAY_TIME_LEVEL); | |
| } | |
| Validators::check_message(Some(&msg), self.producer_config.as_ref())?; | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_TRANSACTION_PREPARED, | |
| "true", | |
| ); | |
| MessageAccessor::put_property( | |
| &mut msg, | |
| MessageConst::PROPERTY_PRODUCER_GROUP, | |
| self.producer_config.producer_group(), | |
| ); | |
| let result = self.send(&mut msg).await; | |
| if let Err(e) = result { | |
| return Err(e); | |
| } | |
| let send_result = result.unwrap().expect("send result is none"); | |
| let local_transaction_state = match send_result.send_status { | |
| SendStatus::SendOk => { | |
| if let Some(ref transaction_id) = send_result.transaction_id { | |
| msg.put_user_property( | |
| MessageConst::PROPERTY_TRANSACTION_ID, | |
| transaction_id.as_str(), | |
| ); | |
| } | |
| let transaction_id = | |
| msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); | |
| if let Some(transaction_id) = transaction_id { | |
| msg.set_transaction_id(transaction_id.as_str()); | |
| } | |
| self.transaction_listener | |
| .as_ref() | |
| .unwrap() | |
| .execute_local_transaction(&msg, arg.as_deref()) | |
| } | |
| SendStatus::FlushDiskTimeout | |
| | SendStatus::FlushSlaveTimeout | |
| | SendStatus::SlaveNotAvailable => LocalTransactionState::RollbackMessage, | |
| }; | |
| if let Err(e) = self | |
| .end_transaction(&msg, &send_result, local_transaction_state) | |
| .await | |
| { | |
| warn!( | |
| "local transaction execute {}, but end broker transaction failed,{}", | |
| local_transaction_state, | |
| e.to_string() | |
| ); | |
| } | |
| let transaction_send_result = TransactionSendResult { | |
| local_transaction_state: Some(local_transaction_state), | |
| send_result: Some(send_result), | |
| }; | |
| Ok(transaction_send_result) | |
| } |
Which Issue(s) This PR Fixes(Closes)
Fixes #1078
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
transaction-producerexample for RocketMQ transaction management.TransactionMQProducerandTransactionMQProducerBuilderfor enhanced transactional message production.EndTransactionRequestHeaderfor better transaction request handling.Improvements
bodyfield inRemotingCommandallows for direct modifications.TransactionListenerto accept optional arguments in transaction execution.Bug Fixes
Cargo.tomlfor better readability.These updates provide users with improved functionality and usability in managing transactions within the RocketMQ client.