Skip to content

Commit 65bde5e

Browse files
boicotinhoFabio Fernandes
authored andcommitted
Persist name records for Peer Discovery
1 parent ad72e75 commit 65bde5e

File tree

10 files changed

+113
-8
lines changed

10 files changed

+113
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

debian/usr/lib/systemd/system/monad-bft.service

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ExecStart=/usr/local/bin/monad-node \
1919
--statesync-sq-thread-cpu 8 \
2020
--record-metrics-interval-seconds 1 \
2121
--validators-path /home/monad/monad-bft/config/validators/validators.toml \
22+
--persisted-peers-path /home/monad/monad-bft/peers.toml \
2223
--keystore-password "${KEYSTORE_PASSWORD}"
2324
EnvironmentFile=-/home/monad/.env
2425
Environment="RUST_LOG=debug,h2=warn,tower=warn,opentelemetry_sdk=warn,opentelemetry-otlp=warn"

docker/single-node/nets/compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ services:
7474
--node-config /monad/config/node.toml
7575
--forkpoint-config /monad/config/forkpoint.genesis.toml
7676
--validators-path /monad/config/validators.toml
77+
--persisted-peers-path /monad/peers.toml
7778
--statesync-ipc-path /monad/statesync.sock
7879
--wal-path /monad/wal
7980
--mempool-ipc-path /monad/mempool.sock

monad-node/src/cli.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,8 @@ pub struct Cli {
9595

9696
#[arg(long)]
9797
pub manytrace_socket: Option<String>,
98+
99+
/// Set the path for the file that will persist peer discovery records across restarts
100+
#[arg(long)]
101+
pub persisted_peers_path: PathBuf,
98102
}

monad-node/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::{
1717
collections::{BTreeMap, BTreeSet, HashMap},
1818
marker::PhantomData,
1919
net::{IpAddr, SocketAddr, ToSocketAddrs},
20+
path::PathBuf,
2021
process,
2122
sync::Arc,
2223
time::{Duration, Instant},
@@ -171,6 +172,7 @@ async fn run(node_state: NodeState) -> Result<(), ()> {
171172
locked_epoch_validators.clone(),
172173
current_epoch,
173174
current_round,
175+
node_state.persisted_peers_path,
174176
);
175177

176178
let statesync_threshold: usize = node_state.node_config.statesync_threshold.into();
@@ -507,6 +509,7 @@ fn build_raptorcast_router<ST, SCT, M, OM>(
507509
locked_epoch_validators: Vec<ValidatorSetDataWithEpoch<SCT>>,
508510
current_epoch: Epoch,
509511
current_round: Round,
512+
persisted_peers_path: PathBuf,
510513
) -> MultiRouter<ST, M, OM, MonadEvent<ST, SCT, ExecutionProtocolType>, PeerDiscovery<ST>>
511514
where
512515
ST: CertificateSignatureRecoverable,
@@ -657,6 +660,7 @@ where
657660
enable_publisher: node_config.fullnode_raptorcast.enable_publisher,
658661
enable_client: node_config.fullnode_raptorcast.enable_client,
659662
rng: ChaCha8Rng::from_entropy(),
663+
persisted_peers_path,
660664
};
661665

662666
MultiRouter::new(

monad-node/src/state.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub struct NodeState {
7272
// should be kept as long as node is alive, tracing listener is stopped when handle is dropped
7373
#[allow(unused)]
7474
manytrace_agent: Option<agent::Agent>,
75+
pub persisted_peers_path: PathBuf,
7576
}
7677

7778
impl NodeState {
@@ -95,6 +96,7 @@ impl NodeState {
9596
record_metrics_interval_seconds,
9697
pprof,
9798
manytrace_socket,
99+
persisted_peers_path,
98100
} = Cli::from_arg_matches_mut(&mut cmd.get_matches_mut())?;
99101

100102
let (reload_handle, agent) = NodeState::setup_tracing(manytrace_socket)?;
@@ -200,6 +202,7 @@ impl NodeState {
200202
pprof,
201203
reload_handle,
202204
manytrace_agent: agent,
205+
persisted_peers_path,
203206
})
204207
}
205208

monad-peer-disc-swarm/tests/peer_discovery.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ fn setup_keys_and_swarm_builder(
218218
enable_publisher: secondary_raptorcast_enabled,
219219
enable_client: secondary_raptorcast_enabled,
220220
rng: ChaCha8Rng::seed_from_u64(123456), // fixed seed for reproducibility
221+
persisted_peers_path: Default::default(),
221222
},
222223
router_scheduler: NoSerRouterConfig::new(all_peers.keys().cloned().collect())
223224
.build(),
@@ -396,6 +397,7 @@ fn test_update_name_record() {
396397
enable_publisher: false,
397398
enable_client: false,
398399
rng: ChaCha8Rng::seed_from_u64(123456),
400+
persisted_peers_path: Default::default(),
399401
},
400402
router_scheduler: NoSerRouterConfig::new(node_ids.iter().cloned().collect()).build(),
401403
seed: 1,

monad-peer-discovery/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ arrayvec = { workspace = true }
1414
futures = { workspace = true }
1515
rand = { workspace = true }
1616
rand_chacha = { workspace = true }
17+
serde = { workspace = true }
1718
tracing = { workspace = true }
1819
tokio-util = { workspace = true, features = ["time"] }
20+
toml = { workspace = true }
1921

2022
[dev-dependencies]
2123
monad-keystore = { workspace = true }
@@ -28,4 +30,3 @@ hex = { workspace = true }
2830
insta = { workspace = true }
2931
rstest = { workspace = true }
3032
test-case = { workspace = true }
31-
toml = { workspace = true }

monad-peer-discovery/src/discovery.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use std::{
1717
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
1818
net::SocketAddrV4,
19+
path::PathBuf,
1920
time::Duration,
2021
};
2122

@@ -165,6 +166,7 @@ pub struct PeerDiscovery<ST: CertificateSignatureRecoverable> {
165166
// secondary raptorcast setting: enable client mode when self is a full node
166167
pub enable_client: bool,
167168
pub rng: ChaCha8Rng,
169+
pub persisted_peers_path: PathBuf,
168170
}
169171

170172
pub struct PeerDiscoveryBuilder<ST: CertificateSignatureRecoverable> {
@@ -186,6 +188,7 @@ pub struct PeerDiscoveryBuilder<ST: CertificateSignatureRecoverable> {
186188
pub enable_publisher: bool,
187189
pub enable_client: bool,
188190
pub rng: ChaCha8Rng,
191+
pub persisted_peers_path: PathBuf,
189192
}
190193

191194
impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDiscoveryBuilder<ST> {
@@ -258,6 +261,7 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
258261
enable_publisher: self.enable_publisher,
259262
enable_client: self.enable_client,
260263
rng: self.rng,
264+
persisted_peers_path: self.persisted_peers_path,
261265
};
262266

263267
let mut cmds = Vec::new();
@@ -269,6 +273,58 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
269273
}
270274
});
271275

276+
// Attempt to load persisted peers from JSON file and restore into pending queue
277+
match std::fs::read_to_string(&state.persisted_peers_path) {
278+
Ok(contents) => {
279+
match toml::from_str::<
280+
BTreeMap<NodeId<CertificateSignaturePubKey<ST>>, MonadNameRecord<ST>>,
281+
>(&contents)
282+
{
283+
Ok(loaded) => {
284+
debug!(path =? state.persisted_peers_path, loaded_len = loaded.len(), "loaded persisted peers");
285+
for (peer_id, name_record) in loaded.into_iter() {
286+
if peer_id == state.self_id {
287+
continue;
288+
}
289+
290+
match name_record.recover_pubkey() {
291+
Ok(recovered) if recovered == peer_id => {}
292+
_ => {
293+
warn!(
294+
?peer_id,
295+
"skipping persisted peer with invalid signature"
296+
);
297+
continue;
298+
}
299+
}
300+
301+
match state.insert_peer_to_pending(peer_id, name_record) {
302+
Ok(cmds_from_insert) => {
303+
cmds.extend(cmds_from_insert);
304+
debug!(?peer_id, "inserted persisted peer");
305+
}
306+
Err(err) => {
307+
warn!(
308+
?err,
309+
"skipping persisted peer due to IP/validation error"
310+
);
311+
continue;
312+
}
313+
}
314+
}
315+
}
316+
Err(err) => {
317+
warn!(path =? state.persisted_peers_path, ?err, "failed to deserialize persisted peers");
318+
}
319+
}
320+
}
321+
Err(err) => {
322+
debug!(path =? state.persisted_peers_path, ?err, "Skipped loading persisted peers");
323+
}
324+
}
325+
326+
state.persist_peers(); // Early test before timer cycle
327+
272328
cmds.extend(state.refresh());
273329

274330
(state, cmds)
@@ -608,6 +664,26 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscovery<ST> {
608664
fn is_pinned_node(&self, peer_id: &NodeId<CertificateSignaturePubKey<ST>>) -> bool {
609665
self.check_validator_membership(peer_id) || self.pinned_full_nodes.contains(peer_id)
610666
}
667+
668+
fn persist_peers(&self) {
669+
// Persist both routing_info and pending_queue to TOML file
670+
let mut routing_info = self.routing_info.clone();
671+
for (peer_id, conn_info) in self.pending_queue.iter() {
672+
routing_info.insert(*peer_id, conn_info.name_record.clone());
673+
}
674+
match toml::to_string(&routing_info) {
675+
Ok(serialized) => {
676+
if let Err(e) = std::fs::write(&self.persisted_peers_path, serialized) {
677+
warn!(path =? self.persisted_peers_path, ?e, "failed to persist peers");
678+
} else {
679+
debug!(path =? self.persisted_peers_path, routing_len =? self.routing_info.len(), "persisted peers to disk (initial)");
680+
}
681+
}
682+
Err(err) => {
683+
warn!(path =? self.persisted_peers_path, ?err, "failed to serialize peers");
684+
}
685+
}
686+
}
611687
}
612688

613689
impl<ST> PeerDiscoveryAlgo for PeerDiscovery<ST>
@@ -1291,6 +1367,8 @@ where
12911367
self.metrics[GAUGE_PEER_DISC_NUM_PEERS] = self.routing_info.len() as u64;
12921368
self.metrics[GAUGE_PEER_DISC_NUM_PENDING_PEERS] = self.pending_queue.len() as u64;
12931369

1370+
self.persist_peers();
1371+
12941372
// reset timer to schedule for the next refresh
12951373
cmds.extend(self.reset_refresh_timer());
12961374

@@ -1625,6 +1703,7 @@ mod tests {
16251703
enable_publisher: false,
16261704
enable_client: false,
16271705
rng: ChaCha8Rng::seed_from_u64(123456),
1706+
persisted_peers_path: Default::default(),
16281707
}
16291708
}
16301709

monad-peer-discovery/src/lib.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use monad_crypto::{
3131
use monad_executor::ExecutorMetrics;
3232
use monad_executor_glue::PeerEntry;
3333
use monad_types::{Epoch, NodeId, Round};
34+
use serde::{Deserialize, Serialize};
3435
use tracing::{debug, warn};
3536

3637
pub mod discovery;
@@ -47,7 +48,8 @@ pub enum PortTag {
4748
UDP = 1,
4849
}
4950

50-
#[derive(Debug, Clone, Copy, PartialEq, Eq, RlpEncodable, RlpDecodable)]
51+
#[derive(Debug, Clone, Copy, PartialEq, Eq, RlpEncodable, RlpDecodable, Deserialize, Serialize)]
52+
#[serde(deny_unknown_fields)]
5153
pub struct Port {
5254
pub tag: u8,
5355
pub port: u16,
@@ -70,7 +72,8 @@ impl Port {
7072
}
7173
}
7274

73-
#[derive(Debug, Clone, PartialEq, Eq)]
75+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
76+
#[serde(deny_unknown_fields)]
7477
struct WireNameRecordV1 {
7578
pub ip: Ipv4Addr,
7679
pub port: u16,
@@ -112,7 +115,8 @@ impl WireNameRecordV1 {
112115
}
113116
}
114117

115-
#[derive(Debug, Clone, PartialEq, Eq)]
118+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
119+
#[serde(deny_unknown_fields)]
116120
struct PortList<const N: usize>(ArrayVec<Port, N>);
117121

118122
impl<const N: usize> Encodable for PortList<N> {
@@ -182,7 +186,8 @@ impl<const N: usize> PortList<N> {
182186
}
183187
}
184188

185-
#[derive(Debug, Clone, PartialEq, Eq)]
189+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190+
#[serde(deny_unknown_fields)]
186191
struct WireNameRecordV2 {
187192
pub ip: Ipv4Addr,
188193
pub ports: PortList<8>,
@@ -256,13 +261,15 @@ impl WireNameRecordV2 {
256261
}
257262
}
258263

259-
#[derive(Debug, Clone, PartialEq, Eq)]
264+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
265+
#[serde(deny_unknown_fields)]
260266
enum VersionedNameRecord {
261267
V1(WireNameRecordV1),
262268
V2(WireNameRecordV2),
263269
}
264270

265-
#[derive(Debug, Clone, PartialEq, Eq)]
271+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
272+
#[serde(deny_unknown_fields)]
266273
pub struct NameRecord {
267274
record: VersionedNameRecord,
268275
}
@@ -384,7 +391,9 @@ impl Decodable for NameRecord {
384391
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
385392
pub enum Capability {}
386393

387-
#[derive(Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq)]
394+
#[derive(Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq, Deserialize, Serialize)]
395+
#[serde(deny_unknown_fields)]
396+
#[serde(bound = "ST: CertificateSignatureRecoverable")]
388397
pub struct MonadNameRecord<ST: CertificateSignatureRecoverable> {
389398
pub name_record: NameRecord,
390399
pub signature: ST,

0 commit comments

Comments
 (0)