Skip to content

Commit da2ebd4

Browse files
Expose big ledger peers to dmq node
1 parent 16730d0 commit da2ebd4

File tree

5 files changed

+33
-10
lines changed

5 files changed

+33
-10
lines changed

dmq-node/app/Main.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{-# LANGUAGE DisambiguateRecordFields #-}
22
{-# LANGUAGE MultiWayIf #-}
3+
{-# LANGUAGE OverloadedRecordDot #-}
34
{-# LANGUAGE OverloadedStrings #-}
45
{-# LANGUAGE ScopedTypeVariables #-}
56
{-# LANGUAGE TemplateHaskell #-}
@@ -8,6 +9,7 @@
89

910
module Main where
1011

12+
import Control.Concurrent.Class.MonadSTM.Strict
1113
import Control.Exception (throwIO)
1214
import Control.Monad (void, when)
1315
import Control.Tracer (Tracer (..), nullTracer, traceWith)
@@ -173,6 +175,7 @@ runDMQ commandLineConfig = do
173175
(if localHandshakeTracer
174176
then WithEventType "Handshake" >$< tracer
175177
else nullTracer)
178+
$ readTVar $ nodeKernel.stakePools.ledgerPeersVar
176179
dmqDiffusionApplications =
177180
diffusionApplications nodeKernel
178181
dmqConfig

dmq-node/dmq-node.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ executable dmq-node
151151
cardano-ledger-core,
152152
contra-tracer >=0.1 && <0.3,
153153
dmq-node,
154+
io-classes:{strict-stm},
154155
kes-agent-crypto,
155156
optparse-applicative,
156157
ouroboros-network:{ouroboros-network, api, framework},

dmq-node/src/DMQ/Diffusion/Arguments.hs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import Control.Monad.Class.MonadST (MonadST)
2222
import Control.Monad.Class.MonadThrow (MonadCatch)
2323
import Control.Monad.Class.MonadTimer.SI (MonadDelay, MonadTimer)
2424
import Control.Tracer (Tracer)
25+
import Data.List.NonEmpty (NonEmpty)
2526
import Network.DNS (Resolver)
2627
import Network.Socket (Socket)
2728

@@ -35,7 +36,7 @@ import Ouroboros.Network.PeerSelection.Churn (peerChurnGovernor)
3536
import Ouroboros.Network.PeerSelection.Governor.Types
3637
(ExtraGuardedDecisions (..), PeerSelectionGovernorArgs (..))
3738
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
38-
(LedgerPeersConsensusInterface (..))
39+
(LedgerPeersConsensusInterface (..), PoolStake, LedgerRelayAccessPoint)
3940
import Ouroboros.Network.PeerSelection.RelayAccessPoint (SRVPrefix)
4041
import Ouroboros.Network.PeerSelection.Types (nullPublicExtraPeersAPI)
4142

@@ -49,6 +50,7 @@ diffusionArguments
4950
)
5051
=> Tracer m (NtN.HandshakeTr ntnAddr)
5152
-> Tracer m (NtC.HandshakeTr ntcAddr)
53+
-> STM m [(PoolStake, NonEmpty LedgerRelayAccessPoint)]
5254
-> Diffusion.Arguments
5355
NoExtraState NoExtraDebugState NoExtraFlags NoExtraPeers
5456
NoExtraAPI NoExtraChurnArgs NoExtraCounters NoExtraTracer
@@ -63,7 +65,8 @@ diffusionArguments
6365
NodeToClientVersion
6466
NodeToClientVersionData
6567
diffusionArguments handshakeNtNTracer
66-
handshakeNtCTracer =
68+
handshakeNtCTracer
69+
lpGetLedgerPeers =
6770
Diffusion.Arguments {
6871
Diffusion.daNtnDataFlow = DMQ.ntnDataFlow
6972
, Diffusion.daNtnPeerSharing = peerSharing
@@ -74,7 +77,7 @@ diffusionArguments handshakeNtNTracer
7477
, Diffusion.daLedgerPeersCtx =
7578
LedgerPeersConsensusInterface {
7679
lpGetLatestSlot = return minBound
77-
, lpGetLedgerPeers = return []
80+
, lpGetLedgerPeers
7881
, lpExtraAPI = NoExtraAPI
7982
}
8083
, Diffusion.daEmptyExtraState = NoExtraState

dmq-node/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import Data.Aeson qualified as Aeson
2020
import Data.Function (on)
2121
import Data.Functor.Contravariant ((>$<))
2222
import Data.Hashable
23+
import Data.List.NonEmpty (NonEmpty)
2324
import Data.Map.Strict (Map)
2425
import Data.Map.Strict qualified as Map
2526
import Data.Sequence (Seq)
@@ -42,6 +43,8 @@ import Ouroboros.Network.BlockFetch (FetchClientRegistry,
4243
import Ouroboros.Network.ConnectionId (ConnectionId (..))
4344
import Ouroboros.Network.PeerSelection.Governor.Types
4445
(makePublicPeerSelectionStateVar)
46+
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (PoolStake,
47+
LedgerRelayAccessPoint)
4548
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry,
4649
newPeerSharingAPI, newPeerSharingRegistry,
4750
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
@@ -83,6 +86,8 @@ data StakePools m = StakePools {
8386
stakePoolsVar :: StrictTVar m (Map PoolId StakeSnapshot)
8487
-- | acquires validation context for signature validation
8588
, poolValidationCtx :: m PoolValidationCtx
89+
, ledgerPeersVar
90+
:: StrictTVar m [(PoolStake, NonEmpty LedgerRelayAccessPoint)]
8691
}
8792

8893
data PoolValidationCtx =
@@ -109,15 +114,15 @@ newNodeKernel evolutionConfig rng = do
109114
sigMempoolSem <- newTxMempoolSem
110115
let (rng', rng'') = Random.split rng
111116
sigSharedTxStateVar <- newSharedTxStateVar rng'
112-
nextEpochVar <- newTVarIO Nothing
113-
stakePoolsVar <- newTVarIO Map.empty
117+
(nextEpochVar, stakePoolsVar, ledgerPeersVar) <- atomically $
118+
(,,) <$> newTVar Nothing <*> newTVar Map.empty <*> newTVar []
114119
let poolValidationCtx = do
115120
(nextEpochBoundary, stakePools') <-
116121
atomically $ (,) <$> readTVar nextEpochVar <*> readTVar stakePoolsVar
117122
now <- getCurrentTime
118123
return $ DMQPoolValidationCtx now nextEpochBoundary stakePools'
119124

120-
stakePools = StakePools { stakePoolsVar, poolValidationCtx }
125+
stakePools = StakePools { stakePoolsVar, poolValidationCtx, ledgerPeersVar }
121126

122127
peerSharingAPI <-
123128
newPeerSharingAPI

dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol ()
3838
import Ouroboros.Network.Block
3939
import Ouroboros.Network.Magic
4040
import Ouroboros.Network.Mux qualified as Mx
41+
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot (..))
4142
import Ouroboros.Network.Protocol.LocalStateQuery.Client
4243
import Ouroboros.Network.Protocol.LocalStateQuery.Type
4344

@@ -52,7 +53,8 @@ cardanoClient
5253
-> StakePools m
5354
-> StrictTVar m (Maybe UTCTime) -- ^ from node kernel
5455
-> LocalStateQueryClient (CardanoBlock crypto) (Point block) (Query block) m Void
55-
cardanoClient _tracer StakePools { stakePoolsVar } nextEpochVar = LocalStateQueryClient (idle Nothing)
56+
cardanoClient _tracer StakePools { stakePoolsVar, ledgerPeersVar } nextEpochVar =
57+
LocalStateQueryClient (idle Nothing)
5658
where
5759
idle mSystemStart = pure $ SendMsgAcquire ImmutableTip acquire
5860
where
@@ -103,9 +105,18 @@ cardanoClient _tracer StakePools { stakePoolsVar } nextEpochVar = LocalStateQuer
103105
atomically do
104106
writeTVar stakePoolsVar ssStakeSnapshots
105107
writeTVar nextEpochVar $ Just nextEpoch
106-
pure $ SendMsgRelease do
107-
threadDelay $ min (realToFrac toNextEpoch) 86400 -- TODO fuzz this?
108-
idle $ Just systemStart
108+
pure $
109+
SendMsgQuery (BlockQuery . QueryIfCurrentConway $ GetBigLedgerPeerSnapshot)
110+
$ wrappingMismatch handleLedgerPeers
111+
where
112+
handleLedgerPeers (LedgerPeerSnapshot snapshot) = do
113+
let bigRelays = fmap snd . snd $ snapshot
114+
atomically do
115+
writeTVar ledgerPeersVar bigRelays
116+
pure $ SendMsgRelease do
117+
threadDelay $ min (realToFrac toNextEpoch) 86400 -- TODO fuzz this?
118+
idle $ Just systemStart
119+
109120

110121
connectToCardanoNode :: Tracer IO (WithEventType String)
111122
-> LocalSnocket

0 commit comments

Comments
 (0)