Skip to content

Commit 7a09099

Browse files
authored
feat(repo): Add messages (#465)
* feat(repo): Add messages entity * feat(repo): Add messages to API service * feat(sv-consumer): Add STORE_ONLY_ENTITY flag * fix(sv-consumer): Lint errors
1 parent 7a421b7 commit 7a09099

File tree

29 files changed

+1125
-10
lines changed

29 files changed

+1125
-10
lines changed

crates/core/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub mod types {
1515
pub use fuel_streams_domains::{
1616
blocks::types::*,
1717
inputs::types::*,
18+
messages::types::*,
1819
outputs::types::*,
1920
predicates::types::*,
2021
receipts::types::*,
@@ -30,6 +31,7 @@ pub mod subjects {
3031
pub use fuel_streams_domains::{
3132
blocks::subjects::*,
3233
inputs::subjects::*,
34+
messages::subjects::*,
3335
outputs::subjects::*,
3436
predicates::subjects::*,
3537
receipts::subjects::*,
@@ -49,7 +51,9 @@ macro_rules! export_module {
4951

5052
export_module!(blocks);
5153
export_module!(inputs);
54+
export_module!(messages);
5255
export_module!(outputs);
56+
export_module!(predicates);
5357
export_module!(receipts);
5458
export_module!(transactions);
5559
export_module!(utxos);

crates/core/src/server/responses.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use fuel_streams_domains::{
1515
DbError,
1616
},
1717
inputs::InputDbItem,
18+
messages::{Message, MessageDbItem},
1819
outputs::OutputDbItem,
1920
predicates::{Predicate, PredicateDbItem},
2021
receipts::ReceiptDbItem,
@@ -46,6 +47,7 @@ pub enum MessagePayload {
4647
Receipt(Arc<Receipt>),
4748
Utxo(Arc<Utxo>),
4849
Predicate(Arc<Predicate>),
50+
Message(Arc<Message>),
4951
}
5052

5153
impl utoipa::ToSchema for MessagePayload {
@@ -105,6 +107,9 @@ impl MessagePayload {
105107
RecordEntity::Predicate => Ok(MessagePayload::Predicate(Arc::new(
106108
Predicate::decode_json(value)?,
107109
))),
110+
RecordEntity::Message => Ok(MessagePayload::Message(Arc::new(
111+
Message::decode_json(value)?,
112+
))),
108113
}
109114
}
110115

@@ -160,6 +165,13 @@ impl MessagePayload {
160165
_ => Err(MessagePayloadError::InvalidData("predicate".to_string())),
161166
}
162167
}
168+
169+
pub fn as_message(&self) -> Result<Arc<Message>, MessagePayloadError> {
170+
match self {
171+
MessagePayload::Message(message) => Ok(message.clone()),
172+
_ => Err(MessagePayloadError::InvalidData("message".to_string())),
173+
}
174+
}
163175
}
164176

165177
#[derive(thiserror::Error, Debug)]
@@ -299,6 +311,13 @@ impl TryFrom<&RecordPacket> for StreamResponse {
299311
response.set_propagation_ms(propagation_ms);
300312
Ok(response)
301313
}
314+
RecordEntity::Message => {
315+
let db_item = MessageDbItem::try_from(packet)?;
316+
let mut response =
317+
StreamResponse::try_from((subject_id, db_item))?;
318+
response.set_propagation_ms(propagation_ms);
319+
Ok(response)
320+
}
302321
}
303322
}
304323
}

crates/core/src/stream/fuel_streams.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use fuel_streams_domains::{
66
db::Db,
77
record::{RecordEntity, RecordPacket},
88
},
9+
messages::Message,
910
predicates::Predicate,
1011
Subjects,
1112
};
@@ -23,6 +24,7 @@ pub struct FuelStreams {
2324
pub receipts: Stream<Receipt>,
2425
pub utxos: Stream<Utxo>,
2526
pub predicates: Stream<Predicate>,
27+
pub messages: Stream<Message>,
2628
pub msg_broker: Arc<NatsMessageBroker>,
2729
pub db: Arc<Db>,
2830
}
@@ -37,6 +39,7 @@ impl FuelStreams {
3739
receipts: Stream::<Receipt>::get_or_init(broker, db).await,
3840
utxos: Stream::<Utxo>::get_or_init(broker, db).await,
3941
predicates: Stream::<Predicate>::get_or_init(broker, db).await,
42+
messages: Stream::<Message>::get_or_init(broker, db).await,
4043
msg_broker: Arc::clone(broker),
4144
db: Arc::clone(db),
4245
}
@@ -79,6 +82,9 @@ impl FuelStreams {
7982
RecordEntity::Predicate => {
8083
self.predicates.publish(&subject, &response).await
8184
}
85+
RecordEntity::Message => {
86+
self.messages.publish(&subject, &response).await
87+
}
8288
}
8389
}
8490

@@ -261,6 +267,12 @@ impl FuelStreams {
261267
.subscribe_dynamic(subject, deliver_policy, api_key_role)
262268
.await
263269
}
270+
Subjects::Messages(messages_subject) => {
271+
let subject = Arc::new(messages_subject);
272+
self.messages
273+
.subscribe_dynamic(subject, deliver_policy, api_key_role)
274+
.await
275+
}
264276
};
265277

266278
Ok(Box::new(stream))
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- ------------------------------------------------------------------------------
2+
-- Enum Types
3+
-- ------------------------------------------------------------------------------
4+
CREATE TYPE "message_type" AS ENUM (
5+
'imported',
6+
'consumed'
7+
);
8+
9+
-- ------------------------------------------------------------------------------
10+
-- Main Messages Table
11+
-- ------------------------------------------------------------------------------
12+
CREATE TABLE IF NOT EXISTS messages (
13+
"id" SERIAL PRIMARY KEY,
14+
"value" BYTEA NOT NULL,
15+
-- uniques
16+
"subject" TEXT UNIQUE NOT NULL,
17+
"block_height" BIGINT NOT NULL,
18+
"message_index" INTEGER NOT NULL,
19+
"cursor" TEXT NOT NULL, -- {block_height}-{message_index}
20+
-- fields matching fuel-core
21+
"type" message_type NOT NULL,
22+
"sender" TEXT NOT NULL,
23+
"recipient" TEXT NOT NULL,
24+
"nonce" TEXT NOT NULL,
25+
"amount" BIGINT NOT NULL,
26+
"data" TEXT NOT NULL,
27+
"da_height" BIGINT NOT NULL,
28+
-- timestamps
29+
"block_time" TIMESTAMPTZ NOT NULL,
30+
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
31+
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
32+
-- constraints
33+
FOREIGN KEY ("block_height") REFERENCES "blocks" ("block_height")
34+
);
35+
36+
-- ------------------------------------------------------------------------------
37+
-- Indexes
38+
-- ------------------------------------------------------------------------------
39+
CREATE INDEX IF NOT EXISTS idx_messages_subject ON messages (subject);
40+
CREATE INDEX IF NOT EXISTS idx_messages_type ON messages (type);
41+
CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages (sender);
42+
CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient);
43+
CREATE INDEX IF NOT EXISTS idx_messages_nonce ON messages (nonce);
44+
CREATE INDEX IF NOT EXISTS idx_messages_da_height ON messages (da_height);
45+
CREATE INDEX IF NOT EXISTS idx_messages_block_height ON messages (block_height);
46+
CREATE INDEX IF NOT EXISTS idx_messages_cursor ON messages (cursor);
47+
CREATE INDEX IF NOT EXISTS idx_messages_block_time ON messages (block_time);
48+
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages (created_at);
49+
50+
-- Composite indexes for efficient querying
51+
CREATE INDEX IF NOT EXISTS idx_messages_type_block_height ON messages (type, block_height);
52+
CREATE INDEX IF NOT EXISTS idx_messages_sender_block_height ON messages (sender, block_height);
53+
CREATE INDEX IF NOT EXISTS idx_messages_recipient_block_height ON messages (recipient, block_height);

crates/domains/src/infra/record/record_entity.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum RecordEntity {
1616
Receipt,
1717
Utxo,
1818
Predicate,
19+
Message,
1920
}
2021

2122
impl std::fmt::Display for RecordEntity {
@@ -34,6 +35,7 @@ impl RecordEntity {
3435
Self::Receipt => "receipt",
3536
Self::Utxo => "utxo",
3637
Self::Predicate => "predicate",
38+
Self::Message => "message",
3739
}
3840
}
3941

@@ -69,6 +71,7 @@ impl TryFrom<&str> for RecordEntity {
6971
s if s.contains("receipt") => Ok(Self::Receipt),
7072
s if s.contains("utxo") => Ok(Self::Utxo),
7173
s if s.contains("predicate") => Ok(Self::Predicate),
74+
s if s.contains("message") => Ok(Self::Message),
7275
_ => Err(RecordEntityError::UnknownSubject(s.to_string())),
7376
}
7477
}

crates/domains/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod blocks;
22
pub mod infra;
33
pub mod inputs;
4+
pub mod messages;
45
mod msg_payload;
56
pub mod outputs;
67
pub mod predicates;
@@ -18,6 +19,7 @@ pub mod mocks {
1819
pub use crate::{
1920
blocks::types::MockBlock,
2021
inputs::types::MockInput,
22+
messages::types::MockMessage,
2123
outputs::types::MockOutput,
2224
receipts::types::MockReceipt,
2325
transactions::types::MockTransaction,
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use std::cmp::Ordering;
2+
3+
use fuel_data_parser::DataEncoder;
4+
use fuel_streams_types::*;
5+
use serde::{Deserialize, Serialize};
6+
7+
use super::{Message, MessageType, MessagesSubject};
8+
use crate::infra::{
9+
db::DbItem,
10+
record::{RecordEntity, RecordPacket, RecordPacketError, RecordPointer},
11+
Cursor,
12+
DbError,
13+
};
14+
15+
#[derive(
16+
Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, Default,
17+
)]
18+
pub struct MessageDbItem {
19+
pub subject: String,
20+
pub value: Vec<u8>,
21+
pub block_height: BlockHeight,
22+
pub message_index: i32,
23+
pub cursor: String,
24+
// fields matching fuel-core
25+
pub r#type: MessageType,
26+
pub sender: String,
27+
pub recipient: String,
28+
pub nonce: String,
29+
pub amount: i64,
30+
pub data: String,
31+
pub da_height: DaBlockHeight,
32+
// timestamps
33+
pub block_time: BlockTimestamp,
34+
pub created_at: BlockTimestamp,
35+
}
36+
37+
impl DataEncoder for MessageDbItem {}
38+
39+
impl DbItem for MessageDbItem {
40+
fn cursor(&self) -> Cursor {
41+
Cursor::new(&[&self.block_height, &self.message_index])
42+
}
43+
44+
fn entity(&self) -> &RecordEntity {
45+
&RecordEntity::Message
46+
}
47+
48+
fn encoded_value(&self) -> Result<Vec<u8>, DbError> {
49+
Ok(self.value.clone())
50+
}
51+
52+
fn subject_str(&self) -> String {
53+
self.subject.clone()
54+
}
55+
56+
fn subject_id(&self) -> String {
57+
MessagesSubject::ID.to_string()
58+
}
59+
60+
fn created_at(&self) -> BlockTimestamp {
61+
self.created_at
62+
}
63+
64+
fn block_time(&self) -> BlockTimestamp {
65+
self.block_time
66+
}
67+
68+
fn block_height(&self) -> BlockHeight {
69+
self.block_height
70+
}
71+
}
72+
73+
impl TryFrom<&RecordPacket> for MessageDbItem {
74+
type Error = RecordPacketError;
75+
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
76+
let message = Message::decode_json(&packet.value)?;
77+
let block_height = packet.pointer.block_height;
78+
let msg_index = message.message_index as i32;
79+
Ok(MessageDbItem {
80+
subject: packet.subject_str(),
81+
value: packet.value.to_owned(),
82+
block_height: packet.pointer.block_height,
83+
message_index: msg_index,
84+
cursor: format!("{}-{}", block_height, msg_index),
85+
r#type: message.r#type,
86+
sender: message.sender.to_string(),
87+
recipient: message.recipient.to_string(),
88+
nonce: message.nonce.to_string(),
89+
amount: message.amount.into_inner() as i64,
90+
data: message.data.to_string(),
91+
da_height: message.da_height,
92+
block_time: packet.block_timestamp,
93+
created_at: packet.block_timestamp,
94+
})
95+
}
96+
}
97+
98+
impl PartialOrd for MessageDbItem {
99+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
100+
Some(self.cmp(other))
101+
}
102+
}
103+
104+
impl Ord for MessageDbItem {
105+
fn cmp(&self, other: &Self) -> Ordering {
106+
self.block_height
107+
.cmp(&other.block_height)
108+
.then(self.message_index.cmp(&other.message_index))
109+
}
110+
}
111+
112+
impl From<MessageDbItem> for RecordPointer {
113+
fn from(val: MessageDbItem) -> Self {
114+
RecordPointer {
115+
block_height: val.block_height,
116+
..Default::default()
117+
}
118+
}
119+
}

crates/domains/src/messages/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
mod db_item;
2+
pub mod packets;
3+
mod query_params;
4+
pub mod repository;
5+
pub mod subjects;
6+
pub mod types;
7+
8+
pub use db_item::*;
9+
pub use query_params::*;
10+
pub use subjects::*;
11+
pub use types::*;

0 commit comments

Comments
 (0)