Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/domains/migrations/20250608000000_update_cursor_padding.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- Splits our input on the delimiter
-- Leave the first element as it is
-- Pad all subsequential words to the given pad length
CREATE OR REPLACE FUNCTION split_and_pad(
input_text TEXT,
delimiter TEXT,
pad_length INTEGER,
pad_from INTEGER DEFAULT 0,
pad_char TEXT DEFAULT '0'
)
RETURNS TEXT AS $$
BEGIN
RETURN array_to_string(
ARRAY(
SELECT
CASE
WHEN row_number() OVER () = pad_from THEN elem
ELSE lpad(elem, pad_length, pad_char)
END
FROM unnest(string_to_array(input_text, delimiter)) WITH ORDINALITY AS t(elem, ord)
ORDER BY ord
),
delimiter
);
END;
$$ LANGUAGE plpgsql;

-- Update all tables with `cursor` and pad each part of the word
DO $$
DECLARE
-- Skip over the first element (block_height) in our case
-- We don't pad this element
PAD_FROM CONSTANT INTEGER := 1
-- The padding length for all subsequential words after
PAD_LENGTH CONSTANT INTEGER := 6;
DELIMINATOR CONSTANT TEXT := '-';
BEGIN
-- Update `inputs` table
update inputs
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `messages` table
update messages
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `transactions` table
update transactions
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `receipts` table
update receipts
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `utxos` table
update utxos
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `predicate_transactions` table
update predicate_transactions
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);

-- Update `outputs` table
update outputs
set cursor = split_and_pad(cursor, DELIMINATOR, PAD_LENGTH, PAD_FROM);
END $$;
17 changes: 16 additions & 1 deletion crates/domains/src/infra/db/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,26 @@ use serde::{Deserialize, Serialize};
pub struct Cursor(Cow<'static, str>);

impl Cursor {
// We don't pad the first element (`block_height`)
const CURSOR_PAD_FROM: usize = 1;
const CURSOR_PAD_LENGTH: usize = 6;

pub fn new(fields: &[&dyn ToString]) -> Self {
Self(Cow::Owned(
fields
.iter()
.map(|f| f.to_string())
.enumerate()
.map(|(i, f)| {
if i < Self::CURSOR_PAD_FROM {
f.to_string()
} else {
format!(
"{:0>width$}",
f.to_string(),
width = Self::CURSOR_PAD_LENGTH
)
}
})
.collect::<Vec<_>>()
.join("-"),
))
Expand Down
6 changes: 1 addition & 5 deletions crates/domains/src/messages/db_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct MessageDbItem {
pub value: Vec<u8>,
pub block_height: BlockHeight,
pub message_index: i32,
pub cursor: String,
// fields matching fuel-core
pub r#type: MessageType,
pub sender: String,
Expand Down Expand Up @@ -74,14 +73,11 @@ impl TryFrom<&RecordPacket> for MessageDbItem {
type Error = RecordPacketError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
let message = Message::decode_json(&packet.value)?;
let block_height = packet.pointer.block_height;
let msg_index = message.message_index as i32;
Ok(MessageDbItem {
subject: packet.subject_str(),
value: packet.value.to_owned(),
block_height: packet.pointer.block_height,
message_index: msg_index,
cursor: format!("{}-{}", block_height, msg_index),
message_index: message.message_index as i32,
r#type: message.r#type,
sender: message.sender.to_string(),
recipient: message.recipient.to_string(),
Expand Down
9 changes: 6 additions & 3 deletions crates/domains/src/messages/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use fuel_streams_types::BlockTimestamp;
use sqlx::{Acquire, PgExecutor, Postgres};

use super::*;
use crate::infra::repository::{Repository, RepositoryError, RepositoryResult};
use crate::infra::{
repository::{Repository, RepositoryError, RepositoryResult},
DbItem,
};

#[async_trait]
impl Repository for Message {
Expand Down Expand Up @@ -62,7 +65,7 @@ impl Repository for Message {
.bind(&db_item.value)
.bind(db_item.block_height.into_inner() as i64)
.bind(db_item.message_index)
.bind(db_item.cursor.to_string())
.bind(db_item.cursor().to_string())
.bind(db_item.r#type)
.bind(&db_item.sender)
.bind(&db_item.recipient)
Expand Down Expand Up @@ -110,11 +113,11 @@ mod tests {
}

fn assert_result(result: &MessageDbItem, expected: &MessageDbItem) {
assert_eq!(result.cursor(), expected.cursor());
assert_eq!(result.subject, expected.subject);
assert_eq!(result.value, expected.value);
assert_eq!(result.block_height, expected.block_height);
assert_eq!(result.message_index, expected.message_index);
assert_eq!(result.cursor, expected.cursor);
assert_eq!(result.r#type, expected.r#type);
assert_eq!(result.sender, expected.sender);
assert_eq!(result.recipient, expected.recipient);
Expand Down
13 changes: 13 additions & 0 deletions crates/fuel-streams/src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum FuelNetwork {
#[default]
Local,
Staging,
Testnet,
Mainnet,
}

Expand All @@ -27,6 +28,7 @@ impl FromStr for FuelNetwork {
match s {
"local" => Ok(FuelNetwork::Local),
"staging" => Ok(FuelNetwork::Staging),
"testnet" => Ok(FuelNetwork::Testnet),
"mainnet" => Ok(FuelNetwork::Mainnet),
_ => Err(format!("unknown network: {}", s)),
}
Expand All @@ -38,6 +40,7 @@ impl std::fmt::Display for FuelNetwork {
match self {
FuelNetwork::Local => write!(f, "local"),
FuelNetwork::Staging => write!(f, "staging"),
FuelNetwork::Testnet => write!(f, "testnet"),
FuelNetwork::Mainnet => write!(f, "mainnet"),
}
}
Expand All @@ -47,6 +50,7 @@ impl FuelNetwork {
pub fn load_from_env() -> Self {
match std::env::var("NETWORK").as_deref() {
Ok("mainnet") => FuelNetwork::Mainnet,
Ok("testnet") => FuelNetwork::Testnet,
Ok("staging") => FuelNetwork::Staging,
_ => FuelNetwork::Local,
}
Expand All @@ -56,6 +60,7 @@ impl FuelNetwork {
match self {
FuelNetwork::Local => "nats://localhost:4222",
FuelNetwork::Staging => "nats://stream-staging.fuel.network:4222",
FuelNetwork::Testnet => "nats://stream-testnet.fuel.network:4222",
FuelNetwork::Mainnet => "nats://stream-mainnet.fuel.network:4222",
}
.to_string()
Expand All @@ -70,6 +75,10 @@ impl FuelNetwork {
Url::parse("https://stream-staging.fuel.network")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("https://stream-testnet.fuel.network")
.expect("working url")
}
FuelNetwork::Mainnet => {
Url::parse("https://stream-mainnet.fuel.network")
.expect("working url")
Expand All @@ -86,6 +95,10 @@ impl FuelNetwork {
Url::parse("wss://stream-staging.fuel.network")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("wss://stream-testnet.fuel.network")
.expect("working url")
}
FuelNetwork::Mainnet => {
Url::parse("wss://stream-mainnet.fuel.network")
.expect("working url")
Expand Down
Loading