Skip to content

Commit fe0ccb3

Browse files
authored
[ISSUE #1078]🚀Support send transaction message for client🍻 (#1088)
1 parent 18afaac commit fe0ccb3

25 files changed

+1697
-130
lines changed

rocketmq-client/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,8 @@ path = "examples/ordermessage/ordermessage_producer.rs"
8181

8282
[[example]]
8383
name = "ordermessage-consumer"
84-
path = "examples/ordermessage/ordermessage_consumer.rs"
84+
path = "examples/ordermessage/ordermessage_consumer.rs"
85+
86+
[[example]]
87+
name = "transaction-producer"
88+
path = "examples/transaction/transaction_producer.rs"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::any::Any;
18+
use std::collections::HashMap;
19+
use std::sync::atomic::AtomicI32;
20+
use std::sync::Arc;
21+
22+
use parking_lot::Mutex;
23+
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
24+
use rocketmq_client::producer::local_transaction_state::LocalTransactionState;
25+
use rocketmq_client::producer::mq_producer::MQProducer;
26+
use rocketmq_client::producer::transaction_listener::TransactionListener;
27+
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
28+
use rocketmq_client::Result;
29+
use rocketmq_common::common::message::message_ext::MessageExt;
30+
use rocketmq_common::common::message::message_single::Message;
31+
use rocketmq_common::common::message::MessageTrait;
32+
use rocketmq_rust::rocketmq;
33+
34+
pub const MESSAGE_COUNT: usize = 1;
35+
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
36+
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
37+
pub const TOPIC: &str = "TopicTest";
38+
pub const TAG: &str = "TagA";
39+
40+
#[rocketmq::main]
41+
pub async fn main() -> Result<()> {
42+
//init logger
43+
rocketmq_common::log::init_logger();
44+
45+
// create a producer builder with default configuration
46+
let builder = TransactionMQProducer::builder();
47+
48+
let mut producer = builder
49+
.producer_group(PRODUCER_GROUP.to_string())
50+
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
51+
.topics(vec![TOPIC.to_string()])
52+
.transaction_listener(TransactionListenerImpl::default())
53+
.build();
54+
55+
producer.start().await?;
56+
57+
for _ in 0..10 {
58+
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
59+
let send_result = producer
60+
.send_message_in_transaction::<()>(message, None)
61+
.await?;
62+
println!("send result: {}", send_result);
63+
}
64+
let _ = tokio::signal::ctrl_c().await;
65+
producer.shutdown().await;
66+
67+
Ok(())
68+
}
69+
70+
struct TransactionListenerImpl {
71+
local_trans: Arc<Mutex<HashMap<String, i32>>>,
72+
transaction_index: AtomicI32,
73+
}
74+
75+
impl Default for TransactionListenerImpl {
76+
fn default() -> Self {
77+
Self {
78+
local_trans: Arc::new(Default::default()),
79+
transaction_index: Default::default(),
80+
}
81+
}
82+
}
83+
84+
impl TransactionListener for TransactionListenerImpl {
85+
fn execute_local_transaction(
86+
&self,
87+
msg: &Message,
88+
arg: Option<&(dyn Any + Send + Sync)>,
89+
) -> LocalTransactionState {
90+
let value = self
91+
.transaction_index
92+
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
93+
let status = value % 3;
94+
let mut guard = self.local_trans.lock();
95+
guard.insert(msg.get_transaction_id().to_string(), status);
96+
LocalTransactionState::Unknown
97+
}
98+
99+
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
100+
let mut guard = self.local_trans.lock();
101+
let status = guard.get(msg.get_transaction_id()).unwrap_or(&-1);
102+
match status {
103+
1 => LocalTransactionState::CommitMessage,
104+
2 => LocalTransactionState::RollbackMessage,
105+
_ => LocalTransactionState::Unknown,
106+
}
107+
}
108+
}

rocketmq-client/src/factory/mq_client_instance.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,21 @@ use crate::implementation::mq_admin_impl::MQAdminImpl;
5959
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
6060
use crate::producer::default_mq_producer::DefaultMQProducer;
6161
use crate::producer::default_mq_producer::ProducerConfig;
62-
use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
62+
use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
6363
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
6464
use crate::Result;
6565

6666
const LOCK_TIMEOUT_MILLIS: u64 = 3000;
6767

6868
pub struct MQClientInstance {
69-
pub(crate) client_config: Arc<ClientConfig>,
69+
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
7070
pub(crate) client_id: String,
7171
boot_timestamp: u64,
7272
/**
7373
* The container of the producer in the current client. The key is the name of
7474
* producerGroup.
7575
*/
76-
producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>,
76+
producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>,
7777
/**
7878
* The container of the consumer in the current client. The key is the name of
7979
* consumer_group.
@@ -177,7 +177,7 @@ impl MQClientInstance {
177177
) -> ArcRefCellWrapper<MQClientInstance> {
178178
let broker_addr_table = Arc::new(Default::default());
179179
let mut instance = ArcRefCellWrapper::new(MQClientInstance {
180-
client_config: Arc::new(client_config.clone()),
180+
client_config: ArcRefCellWrapper::new(client_config.clone()),
181181
client_id,
182182
boot_timestamp: get_current_millis(),
183183
producer_table: Arc::new(RwLock::new(HashMap::new())),
@@ -326,7 +326,7 @@ impl MQClientInstance {
326326

327327
pub async fn shutdown(&mut self) {}
328328

329-
pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool {
329+
pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
330330
if group.is_empty() {
331331
return false;
332332
}
@@ -335,7 +335,7 @@ impl MQClientInstance {
335335
warn!("the producer group[{}] exist already.", group);
336336
return false;
337337
}
338-
producer_table.insert(group.to_string(), Box::new(producer));
338+
producer_table.insert(group.to_string(), producer);
339339
true
340340
}
341341

@@ -1055,6 +1055,11 @@ impl MQClientInstance {
10551055
consumer_table.get(group).cloned()
10561056
}
10571057

1058+
pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1059+
let producer_table = self.producer_table.read().await;
1060+
producer_table.get(group).cloned()
1061+
}
1062+
10581063
pub async fn unregister_consumer(&mut self, group: impl Into<String>) {
10591064
self.unregister_client(None, Some(group.into())).await;
10601065
}

rocketmq-client/src/hook/end_transaction_context.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,16 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
pub struct EndTransactionContext {}
17+
use rocketmq_common::common::message::message_single::Message;
18+
19+
use crate::producer::local_transaction_state::LocalTransactionState;
20+
21+
pub struct EndTransactionContext<'a> {
22+
pub producer_group: String,
23+
pub broker_addr: String,
24+
pub message: &'a Message,
25+
pub msg_id: String,
26+
pub transaction_id: String,
27+
pub transaction_state: LocalTransactionState,
28+
pub from_transaction_check: bool,
29+
}

rocketmq-client/src/hook/send_message_context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121
use rocketmq_common::common::message::message_enum::MessageType;
2222
use rocketmq_common::common::message::message_queue::MessageQueue;
2323
use rocketmq_common::common::message::MessageTrait;
24+
use rocketmq_common::ArcRefCellWrapper;
2425

2526
use crate::implementation::communication_mode::CommunicationMode;
2627
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
@@ -38,7 +39,7 @@ pub struct SendMessageContext<'a> {
3839
pub exception: Option<Arc<Box<dyn Error + Send + Sync>>>,
3940
pub mq_trace_context: Option<Arc<Box<dyn std::any::Any + Send + Sync>>>,
4041
pub props: HashMap<String, String>,
41-
pub producer: Option<DefaultMQProducerImpl>,
42+
pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
4243
pub msg_type: Option<MessageType>,
4344
pub namespace: Option<String>,
4445
}

rocketmq-client/src/implementation/client_remoting_processor.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use bytes::Bytes;
2020
use rocketmq_common::common::compression::compressor_factory::CompressorFactory;
2121
use rocketmq_common::common::message::message_ext::MessageExt;
2222
use rocketmq_common::common::message::MessageConst;
23+
use rocketmq_common::common::message::MessageTrait;
2324
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
2425
use rocketmq_common::MessageAccessor::MessageAccessor;
2526
use rocketmq_common::MessageDecoder;
@@ -28,8 +29,10 @@ use rocketmq_common::WeakCellWrapper;
2829
use rocketmq_remoting::code::request_code::RequestCode;
2930
use rocketmq_remoting::code::response_code::ResponseCode;
3031
use rocketmq_remoting::net::channel::Channel;
32+
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
3133
use rocketmq_remoting::protocol::header::notify_consumer_ids_changed_request_header::NotifyConsumerIdsChangedRequestHeader;
3234
use rocketmq_remoting::protocol::header::reply_message_request_header::ReplyMessageRequestHeader;
35+
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
3336
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
3437
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
3538
use rocketmq_remoting::runtime::processor::RequestProcessor;
@@ -62,10 +65,26 @@ impl RequestProcessor for ClientRemotingProcessor {
6265
let request_code = RequestCode::from(request.code());
6366
info!("process_request: {:?}", request_code);
6467
match request_code {
68+
RequestCode::CheckTransactionState => {
69+
self.check_transaction_state(channel, ctx, request).await
70+
}
71+
RequestCode::ResetConsumerClientOffset => {
72+
unimplemented!("ResetConsumerClientOffset")
73+
}
74+
RequestCode::GetConsumerStatusFromClient => {
75+
unimplemented!("GetConsumerStatusFromClient")
76+
}
77+
RequestCode::GetConsumerRunningInfo => {
78+
unimplemented!("GetConsumerRunningInfo")
79+
}
80+
RequestCode::ConsumeMessageDirectly => {
81+
unimplemented!("ConsumeMessageDirectly")
82+
}
6583
RequestCode::PushReplyMessageToClient => self.receive_reply_message(ctx, request).await,
6684
RequestCode::NotifyConsumerIdsChanged => {
6785
self.notify_consumer_ids_changed(channel, ctx, request)
6886
}
87+
6988
_ => {
7089
info!("Unknown request code: {:?}", request_code);
7190
Ok(None)
@@ -190,4 +209,64 @@ impl ClientRemotingProcessor {
190209
}
191210
Ok(None)
192211
}
212+
213+
async fn check_transaction_state(
214+
&mut self,
215+
channel: Channel,
216+
ctx: ConnectionHandlerContext,
217+
mut request: RemotingCommand,
218+
) -> Result<Option<RemotingCommand>> {
219+
let request_header = request
220+
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
221+
.unwrap();
222+
let message_ext = MessageDecoder::decode(
223+
request.get_body_mut().unwrap(),
224+
true,
225+
true,
226+
false,
227+
false,
228+
false,
229+
);
230+
if let Some(mut message_ext) = message_ext {
231+
if let Some(mut client_instance) = self.client_instance.upgrade() {
232+
if let Some(ref namespace) = client_instance.client_config.get_namespace() {
233+
let topic = NamespaceUtil::without_namespace_with_namespace(
234+
message_ext.get_topic(),
235+
client_instance
236+
.client_config
237+
.get_namespace()
238+
.unwrap_or_default()
239+
.as_str(),
240+
);
241+
message_ext.set_topic(topic.as_str());
242+
}
243+
let transaction_id =
244+
message_ext.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
245+
if let Some(transaction_id) = transaction_id {
246+
if !transaction_id.is_empty() {
247+
message_ext.set_transaction_id(transaction_id.as_str());
248+
}
249+
}
250+
let group = message_ext.get_property(MessageConst::PROPERTY_PRODUCER_GROUP);
251+
if let Some(group) = group {
252+
let producer = client_instance.select_producer(&group).await;
253+
if let Some(producer) = producer {
254+
let addr = channel.remote_address().to_string();
255+
producer.check_transaction_state(
256+
addr.as_str(),
257+
message_ext,
258+
request_header,
259+
);
260+
} else {
261+
warn!("checkTransactionState, pick producer group failed");
262+
}
263+
} else {
264+
warn!("checkTransactionState, pick producer group failed");
265+
}
266+
}
267+
} else {
268+
warn!("checkTransactionState, decode message failed");
269+
};
270+
Ok(None)
271+
}
193272
}

rocketmq-client/src/implementation/mq_client_api_impl.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockB
4646
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
4747
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
4848
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
49+
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
4950
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
5051
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
5152
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
@@ -1130,4 +1131,21 @@ impl MQClientAPIImpl {
11301131
))
11311132
}
11321133
}
1134+
1135+
pub async fn end_transaction_oneway(
1136+
&mut self,
1137+
addr: &str,
1138+
request_header: EndTransactionRequestHeader,
1139+
remark: String,
1140+
timeout_millis: u64,
1141+
) -> Result<()> {
1142+
let request =
1143+
RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header)
1144+
.set_remark(Some(remark));
1145+
1146+
self.remoting_client
1147+
.invoke_oneway(addr.to_string(), request, timeout_millis)
1148+
.await;
1149+
Ok(())
1150+
}
11331151
}

rocketmq-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
#![allow(dead_code)]
1818
#![allow(unused_variables)]
19+
#![recursion_limit = "256"]
20+
1921
extern crate core;
2022

2123
use crate::error::MQClientError;

rocketmq-client/src/producer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ pub mod send_callback;
2828
pub mod send_result;
2929
pub mod send_status;
3030
pub mod transaction_listener;
31+
pub mod transaction_mq_produce_builder;
32+
pub mod transaction_mq_producer;
3133
pub mod transaction_send_result;

0 commit comments

Comments
 (0)