Skip to content

Commit bbe369e

Browse files
fix(bitcoind_rpc): FilterIter detects reorgs
Co-authored-by: valued mammal <valuedmammal@protonmail.com>
1 parent 10fa62a commit bbe369e

File tree

2 files changed

+119
-71
lines changed

2 files changed

+119
-71
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 119 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
77
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
88
9-
use bdk_core::collections::BTreeMap;
9+
use bdk_core::collections::{BTreeMap, BTreeSet};
1010
use core::fmt;
1111

1212
use bdk_core::bitcoin;
@@ -33,20 +33,29 @@ pub struct FilterIter<'c, C> {
3333
cp: Option<CheckPoint>,
3434
// blocks map
3535
blocks: BTreeMap<Height, BlockHash>,
36+
// set of heights with filters that matched any watched SPK
37+
matched: BTreeSet<Height>,
38+
// initial height
39+
start: Height,
3640
// best height counter
3741
height: Height,
3842
// stop height
3943
stop: Height,
4044
}
4145

4246
impl<'c, C: RpcApi> FilterIter<'c, C> {
47+
/// Hard cap on how far to walk back when a reorg is detected.
48+
const MAX_REORG_DEPTH: u32 = 100;
49+
4350
/// Construct [`FilterIter`] from a given `client` and start `height`.
4451
pub fn new_with_height(client: &'c C, height: u32) -> Self {
4552
Self {
4653
client,
4754
spks: vec![],
4855
cp: None,
4956
blocks: BTreeMap::new(),
57+
matched: BTreeSet::new(),
58+
start: height,
5059
height,
5160
stop: 0,
5261
}
@@ -69,57 +78,28 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
6978
self.spks.push(spk);
7079
}
7180

72-
/// Get the next filter and increment the current best height.
73-
///
74-
/// Returns `Ok(None)` when the stop height is exceeded.
75-
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
76-
if self.height > self.stop {
77-
return Ok(None);
78-
}
79-
let height = self.height;
80-
let hash = match self.blocks.get(&height) {
81-
Some(h) => *h,
82-
None => self.client.get_block_hash(height as u64)?,
83-
};
84-
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
85-
let filter = BlockFilter::new(&filter_bytes);
86-
self.height += 1;
87-
Ok(Some((BlockId { height, hash }, filter)))
81+
/// Get the block hash by `height` if it is found in the blocks map.
82+
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> {
83+
self.blocks.get(height).copied()
8884
}
8985

9086
/// Get the remote tip.
9187
///
92-
/// Returns `None` if the remote height is not strictly greater than the height of this
93-
/// [`FilterIter`].
88+
/// Returns `None` if the remote height is less than the height of this [`FilterIter`].
9489
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
9590
let tip_hash = self.client.get_best_block_hash()?;
96-
let mut header = self.client.get_block_header_info(&tip_hash)?;
91+
let header = self.client.get_block_header_info(&tip_hash)?;
9792
let tip_height = header.height as u32;
98-
if self.height >= tip_height {
93+
// Allow returning tip if we're exactly at it. Return `None`` if we've already scanned past.
94+
if self.height > tip_height {
9995
// nothing to do
10096
return Ok(None);
10197
}
102-
self.blocks.insert(tip_height, tip_hash);
10398

104-
// if we have a checkpoint we use a lookback of ten blocks
105-
// to ensure consistency of the local chain
99+
// start scanning from point of agreement + 1
106100
if let Some(cp) = self.cp.as_ref() {
107-
// adjust start height to point of agreement + 1
108101
let base = self.find_base_with(cp.clone())?;
109-
self.height = base.height + 1;
110-
111-
for _ in 0..9 {
112-
let hash = match header.previous_block_hash {
113-
Some(hash) => hash,
114-
None => break,
115-
};
116-
header = self.client.get_block_header_info(&hash)?;
117-
let height = header.height as u32;
118-
if height < self.height {
119-
break;
120-
}
121-
self.blocks.insert(height, hash);
122-
}
102+
self.height = base.height.saturating_add(1);
123103
}
124104

125105
self.stop = tip_height;
@@ -131,9 +111,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
131111
}
132112
}
133113

134-
/// Alias for a compact filter and associated block id.
135-
type NextFilter = (BlockId, BlockFilter);
136-
137114
/// Event inner type
138115
#[derive(Debug, Clone)]
139116
pub struct EventInner {
@@ -171,27 +148,86 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
171148
type Item = Result<Event, Error>;
172149

173150
fn next(&mut self) -> Option<Self::Item> {
174-
(|| -> Result<_, Error> {
175-
// if the next filter matches any of our watched spks, get the block
176-
// and return it, inserting relevant block ids along the way
177-
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
178-
let height = block.height;
179-
let hash = block.hash;
180-
181-
if self.spks.is_empty() {
182-
Err(Error::NoScripts)
183-
} else if filter
184-
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
185-
.map_err(Error::Bip158)?
186-
{
187-
let block = self.client.get_block(&hash)?;
188-
self.blocks.insert(height, hash);
189-
let inner = EventInner { height, block };
190-
Ok(Some(Event::Block(inner)))
191-
} else {
192-
Ok(Some(Event::NoMatch(height)))
151+
(|| -> Result<Option<_>, Error> {
152+
if self.height > self.stop {
153+
return Ok(None);
154+
}
155+
// Fetch next header.
156+
let mut height = self.height;
157+
let mut hash = self.client.get_block_hash(height as _)?;
158+
let mut header = self.client.get_block_header(&hash)?;
159+
160+
// Detect and resolve reorgs: either block at height changed, or its parent changed.
161+
let stored_hash = self.blocks.get(&height).copied();
162+
let prev_hash = height
163+
.checked_sub(1)
164+
.and_then(|height| self.blocks.get(&height).copied());
165+
166+
// If we've seen this height before but the hash has changed, or parent changed, trigger
167+
// reorg.
168+
let reorg_detected = if let Some(old_hash) = stored_hash {
169+
old_hash != hash
170+
} else if let Some(expected_prev) = prev_hash {
171+
header.prev_blockhash != expected_prev
172+
} else {
173+
false
174+
};
175+
176+
// Reorg detected, rewind to last known-good ancestor.
177+
if reorg_detected {
178+
self.blocks.split_off(&height);
179+
self.matched = self.matched.split_off(&height);
180+
181+
let mut reorg_depth = 0;
182+
loop {
183+
if reorg_depth >= Self::MAX_REORG_DEPTH || height == 0 {
184+
return Err(Error::ReorgDepthExceeded);
185+
}
186+
187+
height = height.saturating_sub(1);
188+
hash = self.client.get_block_hash(height as _)?;
189+
header = self.client.get_block_header(&hash)?;
190+
191+
let prev_height = height.saturating_sub(1);
192+
let prev_hash = self.client.get_block_hash(prev_height as _)?;
193+
self.blocks.insert(prev_height, prev_hash);
194+
195+
if let Some(prev_hash) = self.blocks.get(&prev_height) {
196+
if header.prev_blockhash == *prev_hash {
197+
break;
198+
}
199+
}
200+
201+
reorg_depth += 1;
193202
}
194-
})
203+
204+
// Update self.height so we reprocess this height
205+
self.height = height;
206+
}
207+
208+
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
209+
let filter = BlockFilter::new(&filter_bytes);
210+
211+
// record the scanned block
212+
self.blocks.insert(height, hash);
213+
// increment best height
214+
self.height = height.saturating_add(1);
215+
216+
// If the filter matches any of our watched SPKs, fetch the full
217+
// block, and record the matching block entry.
218+
if self.spks.is_empty() {
219+
Err(Error::NoScripts)
220+
} else if filter
221+
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes()))
222+
.map_err(Error::Bip158)?
223+
{
224+
let block = self.client.get_block(&hash)?;
225+
self.matched.insert(height);
226+
let inner = EventInner { height, block };
227+
Ok(Some(Event::Block(inner)))
228+
} else {
229+
Ok(Some(Event::NoMatch(height)))
230+
}
195231
})()
196232
.transpose()
197233
}
@@ -202,8 +238,8 @@ impl<C: RpcApi> FilterIter<'_, C> {
202238
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
203239
loop {
204240
let height = cp.height();
205-
let fetched_hash = match self.blocks.get(&height) {
206-
Some(hash) => *hash,
241+
let fetched_hash = match self.get_block_hash(&height) {
242+
Some(hash) => hash,
207243
None if height == 0 => cp.hash(),
208244
_ => self.client.get_block_hash(height as _)?,
209245
};
@@ -221,17 +257,27 @@ impl<C: RpcApi> FilterIter<'_, C> {
221257
/// Returns a chain update from the newly scanned blocks.
222258
///
223259
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
224-
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
260+
/// if not all events have been emitted (by calling `next`).
225261
pub fn chain_update(&mut self) -> Option<CheckPoint> {
226-
if self.cp.is_none() || self.blocks.is_empty() {
262+
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
227263
return None;
228264
}
229265

230-
// note: to connect with the local chain we must guarantee that `self.blocks.first()`
231-
// is also the point of agreement with `self.cp`.
266+
// We return blocks up to and including the initial height, all of the matching blocks,
267+
// and blocks in the terminal range.
268+
let tail_range = self.stop.saturating_sub(9)..=self.stop;
232269
Some(
233-
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
234-
.expect("blocks must be in order"),
270+
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| {
271+
if height <= self.start
272+
|| self.matched.contains(&height)
273+
|| tail_range.contains(&height)
274+
{
275+
Some(BlockId { height, hash })
276+
} else {
277+
None
278+
}
279+
}))
280+
.expect("blocks must be in order"),
235281
)
236282
}
237283
}
@@ -245,6 +291,8 @@ pub enum Error {
245291
NoScripts,
246292
/// `bitcoincore_rpc` error
247293
Rpc(bitcoincore_rpc::Error),
294+
/// `MAX_REORG_DEPTH` exceeded
295+
ReorgDepthExceeded,
248296
}
249297

250298
impl From<bitcoincore_rpc::Error> for Error {
@@ -259,6 +307,7 @@ impl fmt::Display for Error {
259307
Self::Bip158(e) => e.fmt(f),
260308
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
261309
Self::Rpc(e) => e.fmt(f),
310+
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"),
262311
}
263312
}
264313
}

crates/bitcoind_rpc/tests/test_filter_iter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,6 @@ fn filter_iter_handles_reorg() -> anyhow::Result<()> {
400400

401401
// Test that while a reorg is detected we delay incrementing the best height
402402
#[test]
403-
#[ignore]
404403
fn repeat_reorgs() -> anyhow::Result<()> {
405404
const MINE_TO: u32 = 11;
406405

0 commit comments

Comments
 (0)