Skip to content

Commit 02f3b25

Browse files
authored
Merge pull request #9104 from zhang2014/fix/broken_pipe
fix(cluster): try fix broken pipe or connect reset
2 parents 49103f9 + d16a15a commit 02f3b25

File tree

7 files changed

+60
-19
lines changed

7 files changed

+60
-19
lines changed

scripts/ci/ci-run-stateless-tests-cluster.sh

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,4 @@ cd "$SCRIPT_PATH/../../tests" || exit
1515

1616
echo "Starting databend-test"
1717
# 13_0004_q4: https://github.com/datafuselabs/databend/issues/8107
18-
# 13_0005_q5: https://github.com/datafuselabs/databend/issues/7986
19-
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4' --skip '13_0005_q5'
18+
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4'

src/query/service/src/api/rpc/exchange/exchange_transform.rs

+1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl Processor for ExchangeTransform {
264264
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
265265
DataPacket::FetchProgressAndPrecommit => unreachable!(),
266266
DataPacket::FragmentData(v) => self.on_recv_data(v),
267+
DataPacket::ClosingClient => Ok(()),
267268
};
268269
}
269270

src/query/service/src/api/rpc/exchange/exchange_transform_source.rs

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl Processor for ExchangeSourceTransform {
130130
DataPacket::FragmentData(v) => self.on_recv_data(v),
131131
DataPacket::FetchProgressAndPrecommit => unreachable!(),
132132
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
133+
DataPacket::ClosingClient => Ok(()),
133134
};
134135
}
135136

src/query/service/src/api/rpc/exchange/statistics_receiver.rs

+1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ impl StatisticsReceiver {
139139
fn recv_data(ctx: &Arc<QueryContext>, recv_data: Result<Option<DataPacket>>) -> Result<bool> {
140140
match recv_data {
141141
Ok(None) => Ok(true),
142+
Ok(Some(DataPacket::ClosingClient)) => Ok(true),
142143
Err(transport_error) => Err(transport_error),
143144
Ok(Some(DataPacket::ErrorCode(error))) => Err(error),
144145
Ok(Some(DataPacket::FragmentData(_))) => unreachable!(),

src/query/service/src/api/rpc/exchange/statistics_sender.rs

+10
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ impl StatisticsSender {
9090
notified = right;
9191
recv = Box::pin(flight_exchange.recv());
9292

93+
if matches!(command, DataPacket::ClosingClient) {
94+
ctx.get_exchange_manager().shutdown_query(&query_id);
95+
return;
96+
}
97+
9398
if let Err(_cause) = Self::on_command(&ctx, command, &flight_exchange).await
9499
{
95100
ctx.get_exchange_manager().shutdown_query(&query_id);
@@ -100,6 +105,10 @@ impl StatisticsSender {
100105
}
101106

102107
if let Ok(Some(command)) = flight_exchange.recv().await {
108+
if matches!(command, DataPacket::ClosingClient) {
109+
return;
110+
}
111+
103112
if let Err(error) = Self::on_command(&ctx, command, &flight_exchange).await {
104113
tracing::warn!("Statistics send has error, cause: {:?}.", error);
105114
}
@@ -140,6 +149,7 @@ impl StatisticsSender {
140149
})
141150
.await
142151
}
152+
DataPacket::ClosingClient => unreachable!(),
143153
}
144154
}
145155

src/query/service/src/api/rpc/flight_client.rs

+27-17
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,18 @@ impl FlightExchange {
135135
) -> FlightExchange {
136136
let mut streaming = streaming.into_inner();
137137
let (tx, rx) = async_channel::bounded(1);
138+
138139
common_base::base::tokio::spawn(async move {
139140
while let Some(message) = streaming.next().await {
140-
if let Err(_cause) = tx.send(message).await {
141-
break;
141+
match message {
142+
Ok(message) if DataPacket::is_closing_client(&message) => {
143+
break;
144+
}
145+
other => {
146+
if let Err(_c) = tx.send(other).await {
147+
break;
148+
}
149+
}
142150
}
143151
}
144152
});
@@ -159,8 +167,15 @@ impl FlightExchange {
159167
let (tx, request_rx) = async_channel::bounded(1);
160168
common_base::base::tokio::spawn(async move {
161169
while let Some(message) = streaming.next().await {
162-
if let Err(_cause) = tx.send(message).await {
163-
break;
170+
match message {
171+
Ok(flight_data) if DataPacket::is_closing_client(&flight_data) => {
172+
break;
173+
}
174+
other => {
175+
if let Err(_cause) = tx.send(other).await {
176+
break;
177+
}
178+
}
164179
}
165180
}
166181
});
@@ -282,7 +297,9 @@ impl ClientFlightExchange {
282297
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
283298
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
284299
{
285-
self.response_tx.close();
300+
let _ = self
301+
.response_tx
302+
.send_blocking(FlightData::from(DataPacket::ClosingClient));
286303
}
287304
}
288305
}
@@ -304,17 +321,8 @@ impl Clone for ClientFlightExchange {
304321

305322
impl Drop for ClientFlightExchange {
306323
fn drop(&mut self) {
307-
if !self.is_closed_request.fetch_or(true, Ordering::SeqCst)
308-
&& self.state.request_count.fetch_sub(1, Ordering::AcqRel) == 1
309-
{
310-
self.request_rx.close();
311-
}
312-
313-
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
314-
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
315-
{
316-
self.response_tx.close();
317-
}
324+
self.close_input();
325+
self.close_output();
318326
}
319327
}
320328

@@ -398,7 +406,9 @@ impl ServerFlightExchange {
398406
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
399407
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
400408
{
401-
self.response_tx.close();
409+
let _ = self
410+
.response_tx
411+
.send_blocking(Ok(FlightData::from(DataPacket::ClosingClient)));
402412
}
403413
}
404414
}

src/query/service/src/api/rpc/packets/packet_data.rs

+19
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ pub enum DataPacket {
5656
progress: Vec<ProgressInfo>,
5757
precommit: Vec<PrecommitBlock>,
5858
},
59+
// NOTE: Unknown reason. This may be tonic's bug.
60+
// when we use two-way streaming grpc for data exchange,
61+
// if the client side is closed and the server side reads data immediately.
62+
// we will get a broken pipe or connect reset error.
63+
// we use the ClosingClient to notify the server side to close the connection for avoid errors.
64+
ClosingClient,
65+
}
66+
67+
impl DataPacket {
68+
pub fn is_closing_client(data: &FlightData) -> bool {
69+
data.app_metadata.last() == Some(&0x05)
70+
}
5971
}
6072

6173
impl From<DataPacket> for FlightData {
@@ -103,6 +115,12 @@ impl From<DataPacket> for FlightData {
103115
app_metadata: vec![0x04],
104116
}
105117
}
118+
DataPacket::ClosingClient => FlightData {
119+
data_body: vec![],
120+
data_header: vec![],
121+
flight_descriptor: None,
122+
app_metadata: vec![0x05],
123+
},
106124
}
107125
}
108126
}
@@ -155,6 +173,7 @@ impl TryFrom<FlightData> for DataPacket {
155173
progress: progress_info,
156174
})
157175
}
176+
0x05 => Ok(DataPacket::ClosingClient),
158177
_ => Err(ErrorCode::BadBytes("Unknown flight data packet type.")),
159178
}
160179
}

0 commit comments

Comments
 (0)