Skip to content

Commit cc4b0dd

Browse files
authored
fix(sv-consumer): Stream blocks before storing on db (#484)
1 parent f395620 commit cc4b0dd

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,10 @@ impl BlockExecutor {
135135
let packets = Self::build_packets(&msg_payload);
136136

137137
join_set.spawn({
138-
let packets = packets.clone();
138+
let packets: Arc<Vec<RecordPacket>> = packets.clone();
139139
let msg_payload = msg_payload.clone();
140+
let _ = handle_streams_task(&fuel_streams, &packets, &msg_payload)
141+
.await;
140142
async move {
141143
let result = handle_stores(&db, &packets, &msg_payload).await;
142144
if let Ok(stats) = result {
@@ -159,14 +161,6 @@ impl BlockExecutor {
159161
stats,
160162
)));
161163
}
162-
let result_streams =
163-
handle_streams(&fuel_streams, &packets, &msg_payload).await;
164-
if let Ok(stream_stats) = result_streams {
165-
match &stream_stats.error {
166-
Some(error) => stream_stats.log_error(error),
167-
None => stream_stats.log_success(),
168-
}
169-
}
170164
Ok::<_, ConsumerError>(ProcessResult::Store(result))
171165
}
172166
});
@@ -311,11 +305,32 @@ async fn handle_stores(
311305
}
312306
}
313307

308+
fn handle_streams_task(
309+
fuel_streams: &Arc<FuelStreams>,
310+
packets: &Arc<Vec<RecordPacket>>,
311+
msg_payload: &Arc<MsgPayload>,
312+
) -> tokio::task::JoinHandle<()> {
313+
let packets = packets.clone();
314+
let msg_payload = msg_payload.clone();
315+
let fuel_streams = fuel_streams.clone();
316+
tokio::spawn(async move {
317+
let result_streams =
318+
handle_streams(&fuel_streams, &packets, &msg_payload).await;
319+
if let Ok(stream_stats) = result_streams {
320+
match &stream_stats.error {
321+
Some(error) => stream_stats.log_error(error),
322+
None => stream_stats.log_success(),
323+
}
324+
}
325+
})
326+
}
327+
314328
async fn handle_streams(
315329
fuel_streams: &Arc<FuelStreams>,
316330
packets: &Arc<Vec<RecordPacket>>,
317331
msg_payload: &Arc<MsgPayload>,
318332
) -> Result<BlockStats, ConsumerError> {
333+
tracing::info!("[#{}] Streaming packets", msg_payload.block_height());
319334
let block_height = msg_payload.block_height();
320335
let stats = BlockStats::new(block_height.to_owned(), ActionType::Stream);
321336
let now = BlockTimestamp::now();

0 commit comments

Comments
 (0)