Skip to content

Commit cfe1a42

Browse files
committed
fix(sv-consumer): Filter block execution using store only
1 parent 509071a commit cfe1a42

File tree

1 file changed

+29
-4
lines changed

1 file changed

+29
-4
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,12 @@ impl BlockExecutor {
141141
let payload = msg.payload();
142142
let msg_payload = MsgPayload::decode_json(&payload)?.arc();
143143
let packets = Self::build_packets(&msg_payload);
144-
let cli = self.cli.as_ref().cloned();
144+
let cli = self.cli.clone();
145145
join_set.spawn({
146146
let semaphore = semaphore.clone();
147147
let packets = packets.clone();
148148
let msg_payload = msg_payload.clone();
149+
let cli = cli.clone();
149150
async move {
150151
let _permit = semaphore.acquire().await?;
151152
let result =
@@ -160,10 +161,16 @@ impl BlockExecutor {
160161
let packets = packets.clone();
161162
let msg_payload = msg_payload.clone();
162163
let fuel_streams = fuel_streams.clone();
164+
let cli = cli.clone();
163165
async move {
164166
let _permit = semaphore.acquire_owned().await?;
165-
let result =
166-
handle_streams(&fuel_streams, &packets, &msg_payload).await;
167+
let result = handle_streams(
168+
cli.as_ref(),
169+
&fuel_streams,
170+
&packets,
171+
&msg_payload,
172+
)
173+
.await;
167174
Ok(ProcessResult::Stream(result))
168175
}
169176
});
@@ -324,14 +331,32 @@ async fn handle_stores(
324331
}
325332

326333
async fn handle_streams(
334+
cli: Option<&Arc<Cli>>,
327335
fuel_streams: &Arc<FuelStreams>,
328336
packets: &Arc<Vec<RecordPacket>>,
329337
msg_payload: &Arc<MsgPayload>,
330338
) -> Result<BlockStats, ConsumerError> {
331339
let block_height = msg_payload.block_height();
332340
let stats = BlockStats::new(block_height.to_owned(), ActionType::Stream);
333341
let now = BlockTimestamp::now();
334-
let publish_futures = packets.iter().map(|packet| {
342+
343+
// Filter packets based on store_only_entity if specified
344+
let filtered_packets = packets.iter().filter(|packet| {
345+
let subject_id = packet.subject_id();
346+
if let Ok(entity) = RecordEntity::from_subject_id(&subject_id) {
347+
// Skip if store_only_entity is set and doesn't match current entity
348+
if let Some(cli_ref) = cli {
349+
if let Some(store_only) = &cli_ref.store_only_entity {
350+
return entity.as_str() == store_only;
351+
}
352+
}
353+
true
354+
} else {
355+
false
356+
}
357+
});
358+
359+
let publish_futures = filtered_packets.map(|packet| {
335360
let packet = packet.to_owned();
336361
let packet = packet.with_start_time(now);
337362
fuel_streams.publish_by_entity(packet.arc())

0 commit comments

Comments
 (0)