Skip to content

Commit 41c4cd8

Browse files
committed
fix(sv-publisher): Restore tx_pointer for transactions
1 parent 41fb260 commit 41c4cd8

File tree

11 files changed

+145
-32
lines changed

11 files changed

+145
-32
lines changed

CONTRIBUTING.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ make run-publisher \
209209
PORT=4000 \
210210
TELEMETRY_PORT=9001 \
211211
NATS_URL=localhost:4222 \
212-
FROM_HEIGHT=0
212+
FROM_BLOCK=0
213213
```
214214

215215
- Use `testnet-dev` when developing features against testnet
@@ -221,7 +221,7 @@ make run-publisher \
221221
- `PORT`: Service port (default: 4000)
222222
- `TELEMETRY_PORT`: Metrics port (default: 9001)
223223
- `NATS_URL`: NATS server URL
224-
- `FROM_HEIGHT`: Starting block height
224+
- `FROM_BLOCK`: Starting block height
225225

226226
### Consumer Service
227227

Makefile

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -227,26 +227,26 @@ run-publisher: PORT="4000"
227227
run-publisher: TELEMETRY_PORT="9001"
228228
run-publisher: NATS_URL="localhost:4222"
229229
run-publisher: ARGS=""
230-
run-publisher: FROM_HEIGHT="0"
230+
run-publisher: FROM_BLOCK="0"
231231
run-publisher: check-network
232232
@./scripts/run_publisher.sh \
233233
--mode $(MODE) \
234234
--network $(NETWORK) \
235235
--telemetry-port $(TELEMETRY_PORT) \
236-
--from-height $(FROM_HEIGHT) \
236+
--from-block $(FROM_BLOCK) \
237237
--extra-args $(ARGS)
238238

239239
run-publisher-mainnet-dev:
240-
$(MAKE) run-publisher NETWORK=mainnet MODE=dev FROM_HEIGHT=0
240+
$(MAKE) run-publisher NETWORK=mainnet MODE=dev FROM_BLOCK=0
241241

242242
run-publisher-mainnet-profiling:
243-
$(MAKE) run-publisher NETWORK=mainnet MODE=profiling FROM_HEIGHT=0
243+
$(MAKE) run-publisher NETWORK=mainnet MODE=profiling FROM_BLOCK=0
244244

245245
run-publisher-testnet-dev:
246-
$(MAKE) run-publisher NETWORK=testnet MODE=dev FROM_HEIGHT=0
246+
$(MAKE) run-publisher NETWORK=testnet MODE=dev FROM_BLOCK=0
247247

248248
run-publisher-testnet-profiling:
249-
$(MAKE) run-publisher NETWORK=testnet MODE=profiling FROM_HEIGHT=0
249+
$(MAKE) run-publisher NETWORK=testnet MODE=profiling FROM_BLOCK=0
250250

251251
# ------------------------------------------------------------
252252
# Consumer Run Commands

cluster/charts/fuel-streams/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apiVersion: v2
22
appVersion: "1.0"
33
description: A Helm chart for Kubernetes
44
name: fuel-streams
5-
version: 0.11.13
5+
version: 0.11.14
66
dependencies:
77
- name: nats
88
version: 1.3.2

cluster/charts/fuel-streams/values.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -419,12 +419,6 @@ dune:
419419
pullPolicy: Always
420420
tag: latest
421421

422-
storage:
423-
size: 10Mi # Small size since we only store block height state
424-
storageClass: "gp3-generic"
425-
accessMode: ReadWriteOnce
426-
mountPath: /data/dune
427-
428422
cronjob:
429423
schedule: "0 * * * *" # Run every hour
430424
successfulJobsHistoryLimit: 3

crates/domains/src/transactions/db_item.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use fuel_streams_types::{
99
TransactionStatus,
1010
TransactionType,
1111
TxId,
12+
TxPointer,
1213
};
1314
use serde::{Deserialize, Serialize};
1415

@@ -115,7 +116,14 @@ impl TryFrom<&RecordPacket> for TransactionDbItem {
115116
let transaction = Transaction::decode_json(&packet.value)?;
116117
let tx_pointer = match transaction.tx_pointer {
117118
Some(tx_pointer) => Some(serde_json::to_vec(&tx_pointer)?),
118-
None => None,
119+
None => Some(serde_json::to_vec(&TxPointer {
120+
block_height: packet.pointer.block_height.to_owned(),
121+
tx_index: packet
122+
.pointer
123+
.tx_index
124+
.expect("tx_index should be defined")
125+
as u16,
126+
})?),
119127
};
120128

121129
let subject: Subjects = packet

crates/domains/src/transactions/repository.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ impl Repository for Transaction {
3232
let mut conn = executor.acquire().await?;
3333
let mut db_tx = conn.begin().await?;
3434
let created_at = BlockTimestamp::now();
35-
3635
let record = sqlx::query_as::<_, TransactionDbItem>(
3736
r#"
3837
INSERT INTO transactions (
@@ -171,7 +170,6 @@ impl Repository for Transaction {
171170
.map_err(RepositoryError::Insert)?;
172171

173172
let tx = Transaction::decode_json(&db_item.value)?;
174-
175173
if let Some(storage_slots) = &tx.storage_slots {
176174
for slot in storage_slots {
177175
let slot_item = TransactionStorageSlotDbItem {

scripts/run_publisher.sh

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ usage() {
1818
echo " --telemetry-port : Specify the telemetry port number"
1919
echo " Default: 8080"
2020
echo " --extra-args : Optional additional arguments to append (in quotes)"
21-
echo " --from-height : Specify the starting block height"
21+
echo " --from-block : Specify the starting block height"
2222
echo " Default: 0"
2323
echo ""
2424
echo "Examples:"
2525
echo " $0 # Runs with all defaults"
2626
echo " $0 --network mainnet # Runs mainnet with default settings"
2727
echo " $0 --port 9000 # Runs on port 9000"
2828
echo " $0 --network mainnet --port 4001 --telemetry-port 8080 --mode dev # Custom network, port, telemetry-port and mode"
29-
echo " $0 --from-height 1000 # Start from block height 1000"
29+
echo " $0 --from-block 1000 # Start from block height 1000"
3030
exit 1
3131
}
3232

@@ -35,7 +35,7 @@ NETWORK=${NETWORK:-"testnet"}
3535
MODE=${MODE:-"profiling"}
3636
PORT=${PORT:-"4004"}
3737
TELEMETRY_PORT=${TELEMETRY_PORT:-"9002"}
38-
FROM_HEIGHT=${FROM_HEIGHT:-"0"}
38+
FROM_BLOCK=${FROM_BLOCK:-"0"}
3939

4040
while [[ "$#" -gt 0 ]]; do
4141
case $1 in
@@ -59,8 +59,8 @@ while [[ "$#" -gt 0 ]]; do
5959
ARGS="$2"
6060
shift 2
6161
;;
62-
--from-height)
63-
FROM_HEIGHT="$2"
62+
--from-block)
63+
FROM_BLOCK="$2"
6464
shift 2
6565
;;
6666
--help)
@@ -89,7 +89,7 @@ echo " → Network: $NETWORK"
8989
echo " → Mode: $MODE"
9090
echo " → Fuel Block Consumer Port: $PORT"
9191
echo " → Telemetry Port: $TELEMETRY_PORT"
92-
echo " → From Height: $FROM_HEIGHT"
92+
echo " → From Height: $FROM_BLOCK"
9393
if [ -n "$ARGS" ]; then
9494
echo "→ Extra Arguments: $ARGS"
9595
fi
@@ -131,7 +131,7 @@ COMMON_ARGS=(
131131
# Application specific
132132
"--nats-url" "nats://localhost:4222"
133133
"--telemetry-port" "${TELEMETRY_PORT}"
134-
"--from-height" "${FROM_HEIGHT}"
134+
"--from-block" "${FROM_BLOCK}"
135135
)
136136

137137
# Execute based on mode

services/publisher/src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ pub struct Cli {
4242
/// Start from block height
4343
#[arg(
4444
long,
45-
value_name = "FROM_HEIGHT",
45+
value_name = "FROM_BLOCK",
4646
default_value = "0",
4747
help = "Start from block height"
4848
)]
49-
pub from_height: u64,
49+
pub from_block: u64,
5050
/// Use metrics
5151
#[arg(
5252
long,

services/publisher/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub mod error;
33
pub mod gaps;
44
pub mod metrics;
55
pub mod publish;
6+
pub mod recover;
67
pub mod state;

services/publisher/src/main.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use sv_publisher::{
1515
gaps::{find_next_block_to_save, BlockHeightGap},
1616
metrics::Metrics,
1717
publish::publish_block,
18+
recover::recover_tx_pointers,
1819
state::ServerState,
1920
};
2021
use tokio::{sync::Semaphore, task::JoinSet};
@@ -52,8 +53,9 @@ async fn main() -> anyhow::Result<()> {
5253
tokio::select! {
5354
result = async {
5455
tokio::join!(
56+
recover_tx_pointers(&db),
5557
process_historical_blocks(
56-
cli.from_height.into(),
58+
cli.from_block.into(),
5759
&message_broker,
5860
&fuel_core,
5961
&last_block_height,
@@ -72,6 +74,7 @@ async fn main() -> anyhow::Result<()> {
7274
} => {
7375
result.0?;
7476
result.1?;
77+
result.2?;
7578
}
7679
_ = shutdown.wait_for_shutdown() => {
7780
tracing::info!("Shutdown signal received, waiting for processing to complete...");
@@ -124,7 +127,7 @@ async fn process_live_blocks(
124127
}
125128

126129
fn process_historical_blocks(
127-
from_height: BlockHeight,
130+
from_block: BlockHeight,
128131
message_broker: &Arc<NatsMessageBroker>,
129132
fuel_core: &Arc<dyn FuelCoreLike>,
130133
last_block_height: &Arc<BlockHeight>,
@@ -145,7 +148,7 @@ fn process_historical_blocks(
145148
}
146149

147150
let Some(processed_gaps) =
148-
get_historical_block_range(from_height, &gaps, last_block_height)
151+
get_historical_block_range(from_block, &gaps, last_block_height)
149152
else {
150153
return Ok(());
151154
};
@@ -242,7 +245,7 @@ async fn process_blocks_with_join_set(
242245
}
243246

244247
fn get_historical_block_range(
245-
from_height: BlockHeight,
248+
from_block: BlockHeight,
246249
gaps: &[BlockHeightGap],
247250
last_block_height: BlockHeight,
248251
) -> Option<Vec<BlockHeightGap>> {
@@ -252,7 +255,7 @@ fn get_historical_block_range(
252255

253256
let mut processed_gaps = Vec::new();
254257
for gap in gaps {
255-
let start = std::cmp::max(from_height, gap.start);
258+
let start = std::cmp::max(from_block, gap.start);
256259
let end = std::cmp::min(gap.end, last_block_height);
257260

258261
if start <= end {

0 commit comments

Comments
 (0)