Skip to content

Commit 082d149

Browse files
committed
Persist name records for Peer Discovery
1 parent 10e2605 commit 082d149

File tree

10 files changed

+129
-10
lines changed

10 files changed

+129
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/devnet/monad/config/node.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ unresponsive_prune_threshold = 5
4545
last_participation_prune_threshold = 5000
4646
min_num_peers = 0
4747
max_num_peers = 200
48+
persisted_peers_path = "/tmp/peers.toml"
4849

4950
[fullnode_dedicated]
5051
identities = []

monad-node-config/src/peers.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// You should have received a copy of the GNU General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16+
use std::path::PathBuf;
17+
1618
use monad_crypto::certificate_signature::CertificateSignatureRecoverable;
1719
use monad_types::Round;
1820
use serde::{Deserialize, Serialize};
@@ -32,4 +34,5 @@ pub struct PeerDiscoveryConfig<ST: CertificateSignatureRecoverable> {
3234
pub last_participation_prune_threshold: Round,
3335
pub min_num_peers: usize,
3436
pub max_num_peers: usize,
37+
pub persisted_peers_path: PathBuf,
3538
}

monad-node/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ where
643643
enable_publisher: node_config.fullnode_raptorcast.enable_publisher,
644644
enable_client: node_config.fullnode_raptorcast.enable_client,
645645
rng: ChaCha8Rng::from_entropy(),
646+
persisted_peers_path: peer_discovery_config.persisted_peers_path,
646647
};
647648

648649
MultiRouter::new(

monad-peer-disc-swarm/tests/peer_discovery.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ fn setup_keys_and_swarm_builder(
218218
enable_publisher: secondary_raptorcast_enabled,
219219
enable_client: secondary_raptorcast_enabled,
220220
rng: ChaCha8Rng::seed_from_u64(123456), // fixed seed for reproducibility
221+
persisted_peers_path: Default::default(),
221222
},
222223
router_scheduler: NoSerRouterConfig::new(all_peers.keys().cloned().collect())
223224
.build(),
@@ -396,6 +397,7 @@ fn test_update_name_record() {
396397
enable_publisher: false,
397398
enable_client: false,
398399
rng: ChaCha8Rng::seed_from_u64(123456),
400+
persisted_peers_path: Default::default(),
399401
},
400402
router_scheduler: NoSerRouterConfig::new(node_ids.iter().cloned().collect()).build(),
401403
seed: 1,

monad-peer-discovery/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ arrayvec = { workspace = true }
1414
futures = { workspace = true }
1515
rand = { workspace = true }
1616
rand_chacha = { workspace = true }
17+
serde = { workspace = true }
1718
tracing = { workspace = true }
1819
tokio-util = { workspace = true, features = ["time"] }
20+
toml = { workspace = true }
1921

2022
[dev-dependencies]
2123
monad-keystore = { workspace = true }
@@ -28,4 +30,3 @@ hex = { workspace = true }
2830
insta = { workspace = true }
2931
rstest = { workspace = true }
3032
test-case = { workspace = true }
31-
toml = { workspace = true }

monad-peer-discovery/src/discovery.rs

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use std::{
1717
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
1818
net::SocketAddrV4,
19+
path::PathBuf,
1920
time::Duration,
2021
};
2122

@@ -27,6 +28,7 @@ use monad_executor_glue::PeerEntry;
2728
use monad_types::{Epoch, NodeId, Round};
2829
use rand::{RngCore, seq::IteratorRandom};
2930
use rand_chacha::ChaCha8Rng;
31+
use serde::{Deserialize, Serialize};
3032
use tracing::{debug, info, trace, warn};
3133

3234
use crate::{
@@ -87,13 +89,24 @@ pub enum PeerDiscoveryRole {
8789
FullNodeClient, // full node set as Client in secondary raptorcast
8890
}
8991

90-
#[derive(Debug, Clone, PartialEq, Eq)]
92+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
93+
#[serde(deny_unknown_fields)]
94+
#[serde(bound = "ST: CertificateSignatureRecoverable")]
9195
pub struct ConnectionInfo<ST: CertificateSignatureRecoverable> {
9296
pub last_ping: Ping<ST>,
9397
pub unresponsive_pings: u32,
9498
pub name_record: MonadNameRecord<ST>,
9599
}
96100

101+
// Helper struct to deserialize both routing_info and pending_queue from TOML
102+
#[derive(Debug, Clone, Deserialize, Serialize)]
103+
#[serde(deny_unknown_fields)]
104+
#[serde(bound = "ST: CertificateSignatureRecoverable")]
105+
struct PersistedPeers<ST: CertificateSignatureRecoverable> {
106+
routing_info: BTreeMap<NodeId<CertificateSignaturePubKey<ST>>, MonadNameRecord<ST>>,
107+
pending_queue: BTreeMap<NodeId<CertificateSignaturePubKey<ST>>, ConnectionInfo<ST>>,
108+
}
109+
97110
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98111
pub enum SecondaryRaptorcastConnectionStatus {
99112
Connected,
@@ -165,6 +178,7 @@ pub struct PeerDiscovery<ST: CertificateSignatureRecoverable> {
165178
// secondary raptorcast setting: enable client mode when self is a full node
166179
pub enable_client: bool,
167180
pub rng: ChaCha8Rng,
181+
pub persisted_peers_path: PathBuf,
168182
}
169183

170184
pub struct PeerDiscoveryBuilder<ST: CertificateSignatureRecoverable> {
@@ -186,6 +200,7 @@ pub struct PeerDiscoveryBuilder<ST: CertificateSignatureRecoverable> {
186200
pub enable_publisher: bool,
187201
pub enable_client: bool,
188202
pub rng: ChaCha8Rng,
203+
pub persisted_peers_path: PathBuf,
189204
}
190205

191206
impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDiscoveryBuilder<ST> {
@@ -258,6 +273,7 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
258273
enable_publisher: self.enable_publisher,
259274
enable_client: self.enable_client,
260275
rng: self.rng,
276+
persisted_peers_path: self.persisted_peers_path,
261277
};
262278

263279
let mut cmds = Vec::new();
@@ -269,6 +285,60 @@ impl<ST: CertificateSignatureRecoverable> PeerDiscoveryAlgoBuilder for PeerDisco
269285
}
270286
});
271287

288+
// Load persisted peers from JSON file (admin-friendly).
289+
if let Ok(toml_data) = std::fs::read_to_string(&state.persisted_peers_path) {
290+
match toml::from_str::<PersistedPeers<ST>>(&toml_data) {
291+
Ok(persisted) => {
292+
debug!(
293+
path = ?state.persisted_peers_path,
294+
routing_count = ?persisted.routing_info.len(),
295+
pending_count = ?persisted.pending_queue.len(),
296+
"loaded persisted peers from disk"
297+
);
298+
299+
for (peer_id, name_record) in persisted.routing_info {
300+
match state.insert_peer_to_pending(peer_id, name_record) {
301+
Ok(cmds_from_insert) => {
302+
debug!(?peer_id, "Persisted routing peer accepted");
303+
cmds.extend(cmds_from_insert);
304+
}
305+
Err(err) => {
306+
debug!(?peer_id, ?err, "Persisted routing peer rejected");
307+
}
308+
}
309+
}
310+
311+
for (peer_id, conn_info) in persisted.pending_queue {
312+
match validate_socket_ipv4_address(
313+
&conn_info.name_record.udp_address(),
314+
&state.self_record.udp_address(),
315+
) {
316+
Ok(_) => {
317+
state.pending_queue.insert(peer_id, conn_info);
318+
debug!(?peer_id, "Loaded persisted pending peer");
319+
}
320+
Err(err) => {
321+
debug!(?peer_id, ?err, "Persisted pending peer rejected");
322+
}
323+
}
324+
}
325+
state.metrics[GAUGE_PEER_DISC_NUM_PENDING_PEERS] =
326+
state.pending_queue.len() as u64;
327+
}
328+
Err(err) => {
329+
warn!(
330+
path = ?state.persisted_peers_path,
331+
?err,
332+
"failed to deserialize persisted peers as TOML"
333+
);
334+
}
335+
}
336+
} else {
337+
debug!(path = ?state.persisted_peers_path, "no persisted peers file found, continuing");
338+
}
339+
340+
state.persist_peers(); // Early test before timer cycle
341+
272342
cmds.extend(state.refresh());
273343

274344
(state, cmds)
@@ -1291,6 +1361,8 @@ where
12911361
self.metrics[GAUGE_PEER_DISC_NUM_PEERS] = self.routing_info.len() as u64;
12921362
self.metrics[GAUGE_PEER_DISC_NUM_PENDING_PEERS] = self.pending_queue.len() as u64;
12931363

1364+
self.persist_peers();
1365+
12941366
// reset timer to schedule for the next refresh
12951367
cmds.extend(self.reset_refresh_timer());
12961368

@@ -1302,6 +1374,27 @@ where
13021374
cmds
13031375
}
13041376

1377+
fn persist_peers(&self) {
1378+
// Persist both routing_info and pending_queue as PersistedPeers (TOML)
1379+
let persisted = PersistedPeers {
1380+
routing_info: self.routing_info.clone(),
1381+
pending_queue: self.pending_queue.clone(),
1382+
};
1383+
match toml::to_string(&persisted) {
1384+
Ok(serialized) => {
1385+
if let Err(e) = std::fs::write(&self.persisted_peers_path, serialized) {
1386+
warn!(path = ?self.persisted_peers_path, ?e, "failed to persist peers");
1387+
} else {
1388+
debug!(path = ?self.persisted_peers_path, routing_len =? self.routing_info.len(), pending_len =? self.pending_queue.len(),
1389+
"persisted peers to disk");
1390+
}
1391+
}
1392+
Err(err) => {
1393+
warn!(path = ?self.persisted_peers_path, ?err, "failed to serialize peers");
1394+
}
1395+
}
1396+
}
1397+
13051398
fn update_current_round(
13061399
&mut self,
13071400
round: Round,
@@ -1625,6 +1718,7 @@ mod tests {
16251718
enable_publisher: false,
16261719
enable_client: false,
16271720
rng: ChaCha8Rng::seed_from_u64(123456),
1721+
persisted_peers_path: Default::default(),
16281722
}
16291723
}
16301724

monad-peer-discovery/src/lib.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use monad_crypto::{
3131
use monad_executor::ExecutorMetrics;
3232
use monad_executor_glue::PeerEntry;
3333
use monad_types::{Epoch, NodeId, Round};
34+
use serde::{Deserialize, Serialize};
3435
use tracing::{debug, warn};
3536

3637
pub mod discovery;
@@ -47,7 +48,8 @@ pub enum PortTag {
4748
UDP = 1,
4849
}
4950

50-
#[derive(Debug, Clone, Copy, PartialEq, Eq, RlpEncodable, RlpDecodable)]
51+
#[derive(Debug, Clone, Copy, PartialEq, Eq, RlpEncodable, RlpDecodable, Deserialize, Serialize)]
52+
#[serde(deny_unknown_fields)]
5153
pub struct Port {
5254
pub tag: u8,
5355
pub port: u16,
@@ -70,7 +72,8 @@ impl Port {
7072
}
7173
}
7274

73-
#[derive(Debug, Clone, PartialEq, Eq)]
75+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
76+
#[serde(deny_unknown_fields)]
7477
struct WireNameRecordV1 {
7578
pub ip: Ipv4Addr,
7679
pub port: u16,
@@ -112,7 +115,8 @@ impl WireNameRecordV1 {
112115
}
113116
}
114117

115-
#[derive(Debug, Clone, PartialEq, Eq)]
118+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
119+
#[serde(deny_unknown_fields)]
116120
struct PortList<const N: usize>(ArrayVec<Port, N>);
117121

118122
impl<const N: usize> Encodable for PortList<N> {
@@ -182,7 +186,8 @@ impl<const N: usize> PortList<N> {
182186
}
183187
}
184188

185-
#[derive(Debug, Clone, PartialEq, Eq)]
189+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190+
#[serde(deny_unknown_fields)]
186191
struct WireNameRecordV2 {
187192
pub ip: Ipv4Addr,
188193
pub ports: PortList<8>,
@@ -256,13 +261,15 @@ impl WireNameRecordV2 {
256261
}
257262
}
258263

259-
#[derive(Debug, Clone, PartialEq, Eq)]
264+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
265+
#[serde(deny_unknown_fields)]
260266
enum VersionedNameRecord {
261267
V1(WireNameRecordV1),
262268
V2(WireNameRecordV2),
263269
}
264270

265-
#[derive(Debug, Clone, PartialEq, Eq)]
271+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
272+
#[serde(deny_unknown_fields)]
266273
pub struct NameRecord {
267274
record: VersionedNameRecord,
268275
}
@@ -384,7 +391,9 @@ impl Decodable for NameRecord {
384391
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
385392
pub enum Capability {}
386393

387-
#[derive(Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq)]
394+
#[derive(Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq, Deserialize, Serialize)]
395+
#[serde(deny_unknown_fields)]
396+
#[serde(bound = "ST: CertificateSignatureRecoverable")]
388397
pub struct MonadNameRecord<ST: CertificateSignatureRecoverable> {
389398
pub name_record: NameRecord,
390399
pub signature: ST,
@@ -656,6 +665,8 @@ pub trait PeerDiscoveryAlgo {
656665
NodeId<CertificateSignaturePubKey<Self::SignatureType>>,
657666
MonadNameRecord<Self::SignatureType>,
658667
>;
668+
669+
fn persist_peers(&self);
659670
}
660671

661672
pub trait PeerDiscoveryAlgoBuilder {

monad-peer-discovery/src/message.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use monad_crypto::certificate_signature::{
1919
};
2020
use monad_executor_glue::Message;
2121
use monad_types::NodeId;
22+
use serde::{Deserialize, Serialize};
2223

2324
use crate::{MonadNameRecord, PeerDiscoveryEvent};
2425

@@ -114,7 +115,9 @@ impl<ST: CertificateSignatureRecoverable> Decodable for PeerDiscoveryMessage<ST>
114115
}
115116
}
116117

117-
#[derive(Debug, Clone, PartialEq, RlpDecodable, RlpEncodable)]
118+
#[derive(Debug, Clone, PartialEq, RlpDecodable, RlpEncodable, Deserialize, Serialize)]
119+
#[serde(deny_unknown_fields)]
120+
#[serde(bound = "ST: CertificateSignatureRecoverable")]
118121
#[rlp(trailing)]
119122
pub struct Ping<ST: CertificateSignatureRecoverable> {
120123
pub id: u32,

monad-peer-discovery/src/mock.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,4 +279,6 @@ where
279279
) -> HashMap<NodeId<CertificateSignaturePubKey<ST>>, MonadNameRecord<ST>> {
280280
HashMap::new()
281281
}
282+
283+
fn persist_peers(&self) {}
282284
}

0 commit comments

Comments
 (0)