|
| 1 | +# Nimbus |
| 2 | +# Copyright (c) 2024-2025 Status Research & Development GmbH |
| 3 | +# Licensed under either of |
| 4 | +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) |
| 5 | +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) |
| 6 | +# at your option. |
| 7 | +# This file may not be copied, modified, or distributed except according to |
| 8 | +# those terms. |
| 9 | + |
| 10 | +## Consensus to execution syncer prototype based on nrpc |
| 11 | + |
| 12 | +{.push raises: [].} |
| 13 | + |
| 14 | +import |
| 15 | + chronos, |
| 16 | + chronicles, |
| 17 | + web3, |
| 18 | + web3/[engine_api, primitives, conversions], |
| 19 | + beacon_chain/consensus_object_pools/blockchain_dag, |
| 20 | + beacon_chain/el/[el_manager, engine_api_conversions], |
| 21 | + beacon_chain/spec/[forks, presets, state_transition_block] |
| 22 | + |
| 23 | +logScope: |
| 24 | + topics = "elsync" |
| 25 | + |
| 26 | +proc getForkedBlock(dag: ChainDAGRef, slot: Slot): Opt[ForkedTrustedSignedBeaconBlock] = |
| 27 | + let bsi = ?dag.getBlockIdAtSlot(slot) |
| 28 | + if bsi.isProposed(): |
| 29 | + dag.getForkedBlock(bsi.bid) |
| 30 | + else: |
| 31 | + Opt.none(ForkedTrustedSignedBeaconBlock) |
| 32 | + |
| 33 | +proc blockNumber(blck: ForkedTrustedSignedBeaconBlock): uint64 = |
| 34 | + withBlck(blck): |
| 35 | + when consensusFork >= ConsensusFork.Bellatrix and |
| 36 | + consensusFork < ConsensusFork.Gloas: |
| 37 | + forkyBlck.message.body.execution_payload.block_number |
| 38 | + else: |
| 39 | + 0'u64 |
| 40 | + |
| 41 | +# Load the network configuration based on the network id |
| 42 | +proc loadNetworkConfig(cfg: RuntimeConfig): (uint64, uint64) = |
| 43 | + case cfg.CONFIG_NAME |
| 44 | + of "mainnet": |
| 45 | + (15537393'u64, 4700013'u64) |
| 46 | + of "sepolia": |
| 47 | + (1450408'u64, 115193'u64) |
| 48 | + of "holesky", "hoodi": |
| 49 | + (0'u64, 0'u64) |
| 50 | + else: |
| 51 | + notice "Loading custom network, assuming post-merge" |
| 52 | + (0'u64, 0'u64) |
| 53 | + |
| 54 | +# Slot Finding Mechanism |
| 55 | +# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1 |
| 56 | +# Then it iterates over the slots to find the current slot number, along with reducing the |
| 57 | +# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload |
| 58 | +# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more, |
| 59 | +# making the search way smaller |
| 60 | +proc findSlot( |
| 61 | + dag: ChainDAGRef, |
| 62 | + elBlockNumber: uint64, |
| 63 | + lastEra1Block: uint64, |
| 64 | + firstSlotAfterMerge: uint64, |
| 65 | +): Opt[uint64] = |
| 66 | + var importedSlot = (elBlockNumber - lastEra1Block) + firstSlotAfterMerge + 1 |
| 67 | + debug "Finding slot number corresponding to block", elBlockNumber, importedSlot |
| 68 | + |
| 69 | + var clNum = 0'u64 |
| 70 | + while clNum < elBlockNumber: |
| 71 | + # Check if we can get the block id - if not, this part of the chain is not |
| 72 | + # available from the CL |
| 73 | + let bsi = ?dag.getBlockIdAtSlot(Slot(importedSlot)) |
| 74 | + |
| 75 | + if not bsi.isProposed: |
| 76 | + importedSlot += 1 |
| 77 | + continue # Empty slot |
| 78 | + |
| 79 | + let blck = dag.getForkedBlock(bsi.bid).valueOr: |
| 80 | + return # Block unavailable |
| 81 | + |
| 82 | + clNum = blck.blockNumber |
| 83 | + # on the first iteration, the arithmetic helps skip the gap that has built |
| 84 | + # up due to empty slots - for all subsequent iterations, except the last, |
| 85 | + # we'll go one step at a time |
| 86 | + # iteration so that we don't start at "one slot early" |
| 87 | + importedSlot += max(elBlockNumber - clNum, 1) |
| 88 | + |
| 89 | + Opt.some importedSlot |
| 90 | + |
| 91 | +proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} = |
| 92 | + # Takes blocks from the CL and sends them to the EL - the attempt is made |
| 93 | + # optimistically until something unexpected happens (reorg etc) at which point |
| 94 | + # the process ends |
| 95 | + |
| 96 | + let |
| 97 | + # Create the client for the engine api |
| 98 | + # And exchange the capabilities for a test communication |
| 99 | + web3 = await url.newWeb3() |
| 100 | + rpcClient = web3.provider |
| 101 | + (lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig() |
| 102 | + |
| 103 | + defer: |
| 104 | + try: |
| 105 | + await web3.close() |
| 106 | + except: |
| 107 | + discard |
| 108 | + |
| 109 | + # Load the EL state detials and create the beaconAPI client |
| 110 | + var elBlockNumber = uint64(await rpcClient.eth_blockNumber()) |
| 111 | + |
| 112 | + # Check for pre-merge situation |
| 113 | + if elBlockNumber <= lastEra1Block: |
| 114 | + debug "EL still pre-merge, no EL sync", |
| 115 | + blocknumber = elBlockNumber, lastPoWBlock = lastEra1Block |
| 116 | + return |
| 117 | + |
| 118 | + # Load the latest state from the CL |
| 119 | + var clBlockNumber = dag.getForkedBlock(dag.head.slot).expect("head block").blockNumber |
| 120 | + |
| 121 | + # Check if the EL is already in sync or about to become so (ie processing a |
| 122 | + # payload already, most likely) |
| 123 | + if clBlockNumber in [elBlockNumber, elBlockNumber + 1]: |
| 124 | + debug "EL in sync (or almost)", clBlockNumber, elBlockNumber |
| 125 | + return |
| 126 | + |
| 127 | + if clBlockNumber < elBlockNumber: |
| 128 | + # This happens often during initial sync when the light client information |
| 129 | + # allows the EL to sync ahead of the CL head - it can also happen during |
| 130 | + # reorgs |
| 131 | + debug "CL is behind EL, not activating", clBlockNumber, elBlockNumber |
| 132 | + return |
| 133 | + |
| 134 | + var importedSlot = findSlot(dag, elBlockNumber, lastEra1Block, firstSlotAfterMerge).valueOr: |
| 135 | + debug "Missing slot information for sync", elBlockNumber |
| 136 | + return |
| 137 | + |
| 138 | + notice "Found initial slot for EL sync", importedSlot, elBlockNumber, clBlockNumber |
| 139 | + |
| 140 | + while elBlockNumber < clBlockNumber: |
| 141 | + var isAvailable = false |
| 142 | + let curBlck = dag.getForkedBlock(Slot(importedSlot)).valueOr: |
| 143 | + importedSlot += 1 |
| 144 | + continue |
| 145 | + importedSlot += 1 |
| 146 | + let payloadResponse = withBlck(curBlck): |
| 147 | + # Don't include blocks before bellatrix, as it doesn't have payload |
| 148 | + when consensusFork >= ConsensusFork.Gloas: |
| 149 | + break |
| 150 | + elif consensusFork >= ConsensusFork.Bellatrix: |
| 151 | + # Load the execution payload for all blocks after the bellatrix upgrade |
| 152 | + let payload = |
| 153 | + forkyBlck.message.body.execution_payload.asEngineExecutionPayload() |
| 154 | + |
| 155 | + debug "Sending payload", payload |
| 156 | + |
| 157 | + when consensusFork >= ConsensusFork.Electra: |
| 158 | + let |
| 159 | + # Calculate the versioned hashes from the kzg commitments |
| 160 | + versioned_hashes = |
| 161 | + forkyBlck.message.body.blob_kzg_commitments.asEngineVersionedHashes() |
| 162 | + # Execution Requests for Electra |
| 163 | + execution_requests = |
| 164 | + forkyBlck.message.body.execution_requests.asEngineExecutionRequests() |
| 165 | + |
| 166 | + await rpcClient.engine_newPayloadV4( |
| 167 | + payload, |
| 168 | + versioned_hashes, |
| 169 | + forkyBlck.message.parent_root.to(Hash32), |
| 170 | + execution_requests, |
| 171 | + ) |
| 172 | + elif consensusFork >= ConsensusFork.Deneb: |
| 173 | + # Calculate the versioned hashes from the kzg commitments |
| 174 | + let versioned_hashes = |
| 175 | + forkyBlck.message.body.blob_kzg_commitments.asEngineVersionedHashes() |
| 176 | + await rpcClient.engine_newPayloadV3( |
| 177 | + payload, versioned_hashes, forkyBlck.message.parent_root.to(Hash32) |
| 178 | + ) |
| 179 | + elif consensusFork >= ConsensusFork.Capella: |
| 180 | + await rpcClient.engine_newPayloadV2(payload) |
| 181 | + else: |
| 182 | + await rpcClient.engine_newPayloadV1(payload) |
| 183 | + else: |
| 184 | + return |
| 185 | + |
| 186 | + if payloadResponse.status != PayloadExecutionStatus.valid: |
| 187 | + if payloadResponse.status notin |
| 188 | + [PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted]: |
| 189 | + # This would be highly unusual since it would imply a CL-valid but |
| 190 | + # EL-invalid block.. |
| 191 | + warn "Payload invalid", |
| 192 | + elBlockNumber, status = payloadResponse.status, curBlck = shortLog(curBlck) |
| 193 | + return |
| 194 | + |
| 195 | + debug "newPayload accepted", elBlockNumber, response = payloadResponse.status |
| 196 | + |
| 197 | + elBlockNumber += 1 |
| 198 | + |
| 199 | + if elBlockNumber mod 1024 == 0: |
| 200 | + let curElBlock = uint64(await rpcClient.eth_blockNumber()) |
| 201 | + if curElBlock != elBlockNumber: |
| 202 | + # If the EL starts syncing on its own, faster than we can feed it blocks |
| 203 | + # from here, it'll run ahead and we can stop this remote-drive attempt |
| 204 | + # TODO this happens because el-sync competes with the regular devp2p sync |
| 205 | + # when in fact it could be collaborating such that we don't do |
| 206 | + # redundant work |
| 207 | + debug "EL out of sync with EL syncer", curElBlock, elBlockNumber |
| 208 | + return |
0 commit comments