Skip to content

Commit c672e4d

Browse files
committed
fix(repo): Add unique constraint on subject columns
1 parent 4f4f8b5 commit c672e4d

File tree

9 files changed

+121
-36
lines changed

9 files changed

+121
-36
lines changed

cluster/docker/nats-config/core.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
port: 4222
22
server_name: core-server
3+
max_payload: 8388608
34

45
jetstream {
56
store_dir: "./data/core"

crates/fuel-streams-domains/src/blocks/record_impl.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,16 @@ impl Record for Block {
2828
{
2929
let db_item = BlockDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, BlockDbItem>(
31-
"INSERT INTO blocks (subject, producer_address, block_height, value)
32-
VALUES ($1, $2, $3, $4)
33-
RETURNING subject, producer_address, block_height, value"
31+
"WITH upsert AS (
32+
INSERT INTO blocks (subject, producer_address, block_height, value)
33+
VALUES ($1, $2, $3, $4)
34+
ON CONFLICT (subject) DO UPDATE SET
35+
producer_address = EXCLUDED.producer_address,
36+
block_height = EXCLUDED.block_height,
37+
value = EXCLUDED.value
38+
RETURNING *
39+
)
40+
SELECT * FROM upsert"
3441
)
3542
.bind(db_item.subject)
3643
.bind(db_item.producer_address)

crates/fuel-streams-domains/src/inputs/record_impl.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,28 @@ impl Record for Input {
2828
{
2929
let db_item = InputDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, InputDbItem>(
31-
"INSERT INTO inputs (
32-
subject, value, block_height, tx_id, tx_index,
33-
input_index, input_type, owner_id, asset_id,
34-
contract_id, sender_address, recipient_address
31+
"WITH upsert AS (
32+
INSERT INTO inputs (
33+
subject, value, block_height, tx_id, tx_index,
34+
input_index, input_type, owner_id, asset_id,
35+
contract_id, sender_address, recipient_address
36+
)
37+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
38+
ON CONFLICT (subject) DO UPDATE SET
39+
value = EXCLUDED.value,
40+
block_height = EXCLUDED.block_height,
41+
tx_id = EXCLUDED.tx_id,
42+
tx_index = EXCLUDED.tx_index,
43+
input_index = EXCLUDED.input_index,
44+
input_type = EXCLUDED.input_type,
45+
owner_id = EXCLUDED.owner_id,
46+
asset_id = EXCLUDED.asset_id,
47+
contract_id = EXCLUDED.contract_id,
48+
sender_address = EXCLUDED.sender_address,
49+
recipient_address = EXCLUDED.recipient_address
50+
RETURNING *
3551
)
36-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
37-
RETURNING subject, value, block_height, tx_id, tx_index,
38-
input_index, input_type, owner_id, asset_id,
39-
contract_id, sender_address, recipient_address",
52+
SELECT * FROM upsert",
4053
)
4154
.bind(db_item.subject)
4255
.bind(db_item.value)

crates/fuel-streams-domains/src/outputs/record_impl.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,25 @@ impl Record for Output {
2828
{
2929
let db_item = OutputDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, OutputDbItem>(
31-
"INSERT INTO outputs (
32-
subject, value, block_height, tx_id, tx_index,
33-
output_index, output_type, to_address, asset_id, contract_id
31+
"WITH upsert AS (
32+
INSERT INTO outputs (
33+
subject, value, block_height, tx_id, tx_index,
34+
output_index, output_type, to_address, asset_id, contract_id
35+
)
36+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
37+
ON CONFLICT (subject) DO UPDATE SET
38+
value = EXCLUDED.value,
39+
block_height = EXCLUDED.block_height,
40+
tx_id = EXCLUDED.tx_id,
41+
tx_index = EXCLUDED.tx_index,
42+
output_index = EXCLUDED.output_index,
43+
output_type = EXCLUDED.output_type,
44+
to_address = EXCLUDED.to_address,
45+
asset_id = EXCLUDED.asset_id,
46+
contract_id = EXCLUDED.contract_id
47+
RETURNING *
3448
)
35-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
36-
RETURNING subject, value, block_height, tx_id, tx_index,
37-
output_index, output_type, to_address, asset_id, contract_id",
49+
SELECT * FROM upsert",
3850
)
3951
.bind(db_item.subject)
4052
.bind(db_item.value)

crates/fuel-streams-domains/src/receipts/record_impl.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,31 @@ impl Record for Receipt {
2828
{
2929
let db_item = ReceiptDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, ReceiptDbItem>(
31-
"INSERT INTO receipts (
32-
subject, value, block_height, tx_id, tx_index, receipt_index,
33-
receipt_type, from_contract_id, to_contract_id, to_address,
34-
asset_id, contract_id, sub_id, sender_address, recipient_address
31+
"WITH upsert AS (
32+
INSERT INTO receipts (
33+
subject, value, block_height, tx_id, tx_index, receipt_index,
34+
receipt_type, from_contract_id, to_contract_id, to_address,
35+
asset_id, contract_id, sub_id, sender_address, recipient_address
36+
)
37+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
38+
ON CONFLICT (subject) DO UPDATE SET
39+
value = EXCLUDED.value,
40+
block_height = EXCLUDED.block_height,
41+
tx_id = EXCLUDED.tx_id,
42+
tx_index = EXCLUDED.tx_index,
43+
receipt_index = EXCLUDED.receipt_index,
44+
receipt_type = EXCLUDED.receipt_type,
45+
from_contract_id = EXCLUDED.from_contract_id,
46+
to_contract_id = EXCLUDED.to_contract_id,
47+
to_address = EXCLUDED.to_address,
48+
asset_id = EXCLUDED.asset_id,
49+
contract_id = EXCLUDED.contract_id,
50+
sub_id = EXCLUDED.sub_id,
51+
sender_address = EXCLUDED.sender_address,
52+
recipient_address = EXCLUDED.recipient_address
53+
RETURNING *
3554
)
36-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
37-
RETURNING subject, value, block_height, tx_id, tx_index, receipt_index,
38-
receipt_type, from_contract_id, to_contract_id, to_address,
39-
asset_id, contract_id, sub_id, sender_address, recipient_address"
55+
SELECT * FROM upsert"
4056
)
4157
.bind(db_item.subject)
4258
.bind(db_item.value)

crates/fuel-streams-domains/src/transactions/record_impl.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,22 @@ impl Record for Transaction {
2828
{
2929
let db_item = TransactionDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, TransactionDbItem>(
31-
"INSERT INTO transactions (
32-
subject, value, block_height, tx_id, tx_index, tx_status, kind
31+
"WITH upsert AS (
32+
INSERT INTO transactions (
33+
subject, value, block_height, tx_id, tx_index,
34+
tx_status, kind
35+
)
36+
VALUES ($1, $2, $3, $4, $5, $6, $7)
37+
ON CONFLICT (subject) DO UPDATE SET
38+
value = EXCLUDED.value,
39+
block_height = EXCLUDED.block_height,
40+
tx_id = EXCLUDED.tx_id,
41+
tx_index = EXCLUDED.tx_index,
42+
tx_status = EXCLUDED.tx_status,
43+
kind = EXCLUDED.kind
44+
RETURNING *
3345
)
34-
VALUES ($1, $2, $3, $4, $5, $6, $7)
35-
RETURNING subject, value, block_height, tx_id, tx_index, tx_status, kind"
46+
SELECT * FROM upsert",
3647
)
3748
.bind(db_item.subject)
3849
.bind(db_item.value)

crates/fuel-streams-domains/src/utxos/record_impl.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,23 @@ impl Record for Utxo {
2828
{
2929
let db_item = UtxoDbItem::try_from(packet)?;
3030
let record = sqlx::query_as::<_, UtxoDbItem>(
31-
"INSERT INTO utxos (
32-
subject, value, block_height, tx_id, tx_index,
33-
input_index, utxo_type, utxo_id
31+
"WITH upsert AS (
32+
INSERT INTO utxos (
33+
subject, value, block_height, tx_id, tx_index,
34+
input_index, utxo_type, utxo_id
35+
)
36+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
37+
ON CONFLICT (subject) DO UPDATE SET
38+
value = EXCLUDED.value,
39+
block_height = EXCLUDED.block_height,
40+
tx_id = EXCLUDED.tx_id,
41+
tx_index = EXCLUDED.tx_index,
42+
input_index = EXCLUDED.input_index,
43+
utxo_type = EXCLUDED.utxo_type,
44+
utxo_id = EXCLUDED.utxo_id
45+
RETURNING *
3446
)
35-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
36-
RETURNING subject, value, block_height, tx_id, tx_index,
37-
input_index, utxo_type, utxo_id",
47+
SELECT * FROM upsert",
3848
)
3949
.bind(db_item.subject)
4050
.bind(db_item.value)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-- Drop existing non-unique indexes
2+
-- DROP INDEX IF EXISTS idx_blocks_subject;
3+
-- DROP INDEX IF EXISTS idx_transactions_subject;
4+
-- DROP INDEX IF EXISTS idx_inputs_subject;
5+
-- DROP INDEX IF EXISTS idx_outputs_subject;
6+
-- DROP INDEX IF EXISTS idx_utxos_subject;
7+
-- DROP INDEX IF EXISTS idx_receipts_subject;
8+
9+
-- Add unique constraints
10+
ALTER TABLE blocks ADD CONSTRAINT blocks_subject_unique UNIQUE (subject);
11+
ALTER TABLE transactions ADD CONSTRAINT transactions_subject_unique UNIQUE (subject);
12+
ALTER TABLE inputs ADD CONSTRAINT inputs_subject_unique UNIQUE (subject);
13+
ALTER TABLE outputs ADD CONSTRAINT outputs_subject_unique UNIQUE (subject);
14+
ALTER TABLE utxos ADD CONSTRAINT utxos_subject_unique UNIQUE (subject);
15+
ALTER TABLE receipts ADD CONSTRAINT receipts_subject_unique UNIQUE (subject);

crates/fuel-streams-store/src/db/db_impl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Default for DbConnectionOpts {
6060
pool_size: Some(*DB_POOL_SIZE as u32),
6161
connection_str: dotenvy::var("DATABASE_URL")
6262
.expect("DATABASE_URL not set"),
63-
statement_timeout: Some(Duration::from_secs(60)),
63+
statement_timeout: Some(Duration::from_secs(120)),
6464
acquire_timeout: Some(Duration::from_secs(10)),
6565
idle_timeout: Some(Duration::from_secs(240)),
6666
min_connections: Some((*DB_POOL_SIZE as u32) / 4),

0 commit comments

Comments
 (0)