Skip to content

Commit 4e6b5da

Browse files
committed
feat(bip158): FilterIter detects reorgs
1 parent 41f8ebc commit 4e6b5da

File tree

2 files changed

+170
-72
lines changed

2 files changed

+170
-72
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
77
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
88
9-
use bdk_core::collections::BTreeMap;
9+
use bdk_core::collections::{BTreeMap, BTreeSet};
1010
use core::fmt;
1111

1212
use bdk_core::bitcoin;
@@ -33,21 +33,30 @@ pub struct FilterIter<'c, C> {
3333
cp: Option<CheckPoint>,
3434
// blocks map
3535
blocks: BTreeMap<Height, BlockHash>,
36+
// heights of matching blocks
37+
matched: BTreeSet<Height>,
3638
// best height counter
3739
height: Height,
40+
// initial height
41+
start: Height,
3842
// stop height
3943
stop: Height,
4044
}
4145

4246
impl<'c, C: RpcApi> FilterIter<'c, C> {
47+
/// Hard cap on how far to walk back when a reorg is detected.
48+
const MAX_REORG_DEPTH: u32 = 100;
49+
4350
/// Construct [`FilterIter`] from a given `client` and start `height`.
4451
pub fn new_with_height(client: &'c C, height: u32) -> Self {
4552
Self {
4653
client,
4754
spks: vec![],
4855
cp: None,
4956
blocks: BTreeMap::new(),
57+
matched: BTreeSet::new(),
5058
height,
59+
start: height,
5160
stop: 0,
5261
}
5362
}
@@ -69,57 +78,21 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
6978
self.spks.push(spk);
7079
}
7180

72-
/// Get the next filter and increment the current best height.
73-
///
74-
/// Returns `Ok(None)` when the stop height is exceeded.
75-
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
76-
if self.height > self.stop {
77-
return Ok(None);
78-
}
79-
let height = self.height;
80-
let hash = match self.blocks.get(&height) {
81-
Some(h) => *h,
82-
None => self.client.get_block_hash(height as u64)?,
83-
};
84-
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
85-
let filter = BlockFilter::new(&filter_bytes);
86-
self.height += 1;
87-
Ok(Some((BlockId { height, hash }, filter)))
88-
}
89-
9081
/// Get the remote tip.
9182
///
92-
/// Returns `None` if the remote height is not strictly greater than the height of this
93-
/// [`FilterIter`].
83+
/// Returns `None` if the remote height is less than the height of this [`FilterIter`].
9484
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
9585
let tip_hash = self.client.get_best_block_hash()?;
96-
let mut header = self.client.get_block_header_info(&tip_hash)?;
86+
let header = self.client.get_block_header_info(&tip_hash)?;
9787
let tip_height = header.height as u32;
98-
if self.height >= tip_height {
99-
// nothing to do
88+
if self.height > tip_height {
10089
return Ok(None);
10190
}
102-
self.blocks.insert(tip_height, tip_hash);
10391

104-
// if we have a checkpoint we use a lookback of ten blocks
105-
// to ensure consistency of the local chain
92+
// start scanning from point of agreement + 1
10693
if let Some(cp) = self.cp.as_ref() {
107-
// adjust start height to point of agreement + 1
10894
let base = self.find_base_with(cp.clone())?;
109-
self.height = base.height + 1;
110-
111-
for _ in 0..9 {
112-
let hash = match header.previous_block_hash {
113-
Some(hash) => hash,
114-
None => break,
115-
};
116-
header = self.client.get_block_header_info(&hash)?;
117-
let height = header.height as u32;
118-
if height < self.height {
119-
break;
120-
}
121-
self.blocks.insert(height, hash);
122-
}
95+
self.height = base.height.saturating_add(1);
12396
}
12497

12598
self.stop = tip_height;
@@ -131,9 +104,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
131104
}
132105
}
133106

134-
/// Alias for a compact filter and associated block id.
135-
type NextFilter = (BlockId, BlockFilter);
136-
137107
/// Event inner type
138108
#[derive(Debug, Clone)]
139109
pub struct EventInner {
@@ -171,27 +141,71 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
171141
type Item = Result<Event, Error>;
172142

173143
fn next(&mut self) -> Option<Self::Item> {
174-
(|| -> Result<_, Error> {
175-
// if the next filter matches any of our watched spks, get the block
176-
// and return it, inserting relevant block ids along the way
177-
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
178-
let height = block.height;
179-
let hash = block.hash;
180-
181-
if self.spks.is_empty() {
182-
Err(Error::NoScripts)
183-
} else if filter
184-
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
185-
.map_err(Error::Bip158)?
186-
{
187-
let block = self.client.get_block(&hash)?;
188-
self.blocks.insert(height, hash);
189-
let inner = EventInner { height, block };
190-
Ok(Some(Event::Block(inner)))
191-
} else {
192-
Ok(Some(Event::NoMatch(height)))
144+
(|| -> Result<Option<_>, Error> {
145+
if self.height > self.stop {
146+
return Ok(None);
147+
}
148+
// Fetch next header.
149+
let mut height = self.height;
150+
let mut hash = self.client.get_block_hash(height as u64)?;
151+
152+
let mut reorg_depth = 0;
153+
154+
loop {
155+
if reorg_depth >= Self::MAX_REORG_DEPTH {
156+
return Err(Error::ReorgDepthExceeded);
193157
}
194-
})
158+
159+
let header = self.client.get_block_header(&hash)?;
160+
161+
let prev_height = height.saturating_sub(1);
162+
match self.blocks.get(&prev_height).copied() {
163+
// Not enough data.
164+
None => break,
165+
// Ok, the chain is consistent.
166+
Some(prev_hash) if prev_hash == header.prev_blockhash => break,
167+
_ => {
168+
// Reorg detected, keep backtracking.
169+
height = height.saturating_sub(1);
170+
hash = self.client.get_block_hash(height as u64)?;
171+
reorg_depth += 1;
172+
}
173+
}
174+
}
175+
176+
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
177+
let filter = BlockFilter::new(&filter_bytes);
178+
179+
// If the filter matches any of our watched SPKs, fetch the full
180+
// block and prepare the next event.
181+
let next_event = if self.spks.is_empty() {
182+
Err(Error::NoScripts)
183+
} else if filter
184+
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes()))
185+
.map_err(Error::Bip158)?
186+
{
187+
let block = self.client.get_block(&hash)?;
188+
let inner = EventInner { height, block };
189+
Ok(Some(Event::Block(inner)))
190+
} else {
191+
Ok(Some(Event::NoMatch(height)))
192+
};
193+
194+
// In case of a reorg, throw out any stale entries.
195+
if reorg_depth > 0 {
196+
self.blocks.split_off(&height);
197+
self.matched.split_off(&height);
198+
}
199+
// Record the scanned block
200+
self.blocks.insert(height, hash);
201+
// Record the matching block
202+
if let Ok(Some(Event::Block(..))) = next_event {
203+
self.matched.insert(height);
204+
}
205+
// Increment next height
206+
self.height = height.saturating_add(1);
207+
208+
next_event
195209
})()
196210
.transpose()
197211
}
@@ -220,17 +234,27 @@ impl<C: RpcApi> FilterIter<'_, C> {
220234
/// Returns a chain update from the newly scanned blocks.
221235
///
222236
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
223-
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
224-
pub fn chain_update(&mut self) -> Option<CheckPoint> {
225-
if self.cp.is_none() || self.blocks.is_empty() {
237+
/// if not all events have been emitted (by calling `next`).
238+
pub fn chain_update(&self) -> Option<CheckPoint> {
239+
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
226240
return None;
227241
}
228242

229-
// note: to connect with the local chain we must guarantee that `self.blocks.first()`
230-
// is also the point of agreement with `self.cp`.
243+
// We return blocks up to and including the initial height, all of the matching blocks,
244+
// and blocks in the terminal range.
245+
let tail_range = self.stop.saturating_sub(9)..=self.stop;
231246
Some(
232-
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
233-
.expect("blocks must be in order"),
247+
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| {
248+
if height <= self.start
249+
|| self.matched.contains(&height)
250+
|| tail_range.contains(&height)
251+
{
252+
Some(BlockId { height, hash })
253+
} else {
254+
None
255+
}
256+
}))
257+
.expect("blocks must be in order"),
234258
)
235259
}
236260
}

crates/bitcoind_rpc/tests/test_filter_iter.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ fn get_tip_and_chain_update() -> anyhow::Result<()> {
9999
let cp = CheckPoint::from_block_ids(test.chain).unwrap();
100100
let mut iter = FilterIter::new_with_checkpoint(env.rpc_client(), cp);
101101
assert_eq!(iter.get_tip().unwrap(), Some(new_tip));
102+
for _res in iter.by_ref() {}
102103
let update_cp = iter.chain_update().unwrap();
103104
let mut update_blocks: Vec<_> = update_cp.iter().map(|cp| cp.block_id()).collect();
104105
update_blocks.reverse();
@@ -161,6 +162,45 @@ fn filter_iter_error_no_scripts() -> anyhow::Result<()> {
161162
Ok(())
162163
}
163164

165+
// Test that while a reorg is detected we delay incrementing the best height
166+
#[test]
167+
fn repeat_reorgs() -> anyhow::Result<()> {
168+
const MINE_TO: u32 = 11;
169+
170+
let env = testenv()?;
171+
let rpc = env.rpc_client();
172+
while rpc.get_block_count()? < MINE_TO as u64 {
173+
let _ = env.mine_blocks(1, None)?;
174+
}
175+
176+
let spk = ScriptBuf::from_hex("0014446906a6560d8ad760db3156706e72e171f3a2aa")?;
177+
178+
let mut iter = FilterIter::new_with_height(env.rpc_client(), 1);
179+
iter.add_spk(spk);
180+
assert_eq!(iter.get_tip()?.unwrap().height, MINE_TO);
181+
182+
// Process events to height (MINE_TO - 1)
183+
loop {
184+
if iter.next().unwrap()?.height() == MINE_TO - 1 {
185+
break;
186+
}
187+
}
188+
189+
for _ in 0..3 {
190+
// Invalidate 2 blocks and remine to height = MINE_TO
191+
let _ = env.reorg(2)?;
192+
193+
// Call next. If we detect a reorg, we'll see no change in the event height
194+
assert_eq!(iter.next().unwrap()?.height(), MINE_TO - 1);
195+
}
196+
197+
// If no reorg, then height should increment normally from here on
198+
assert_eq!(iter.next().unwrap()?.height(), MINE_TO);
199+
assert!(iter.next().is_none());
200+
201+
Ok(())
202+
}
203+
164204
#[test]
165205
fn filter_iter_error_wrong_network() -> anyhow::Result<()> {
166206
let env = testenv()?;
@@ -181,3 +221,37 @@ fn filter_iter_error_wrong_network() -> anyhow::Result<()> {
181221

182222
Ok(())
183223
}
224+
225+
#[test]
226+
fn filter_iter_max_reorg_depth() -> anyhow::Result<()> {
227+
use bdk_bitcoind_rpc::bip158::Error;
228+
229+
let env = testenv()?;
230+
let client = env.rpc_client();
231+
232+
const BASE_HEIGHT: u32 = 10;
233+
const REORG_LEN: u32 = 101;
234+
const STOP_HEIGHT: u32 = BASE_HEIGHT + REORG_LEN;
235+
236+
while client.get_block_count()? < STOP_HEIGHT as u64 {
237+
env.mine_blocks(1, None)?;
238+
}
239+
240+
let mut iter = FilterIter::new_with_height(client, BASE_HEIGHT);
241+
let spk = ScriptBuf::from_hex("0014446906a6560d8ad760db3156706e72e171f3a2aa")?;
242+
iter.add_spk(spk.clone());
243+
assert_eq!(iter.get_tip()?.unwrap().height, STOP_HEIGHT);
244+
245+
// Consume events up to final height - 1.
246+
loop {
247+
if iter.next().unwrap()?.height() == STOP_HEIGHT - 1 {
248+
break;
249+
}
250+
}
251+
252+
let _ = env.reorg(REORG_LEN as usize)?;
253+
254+
assert!(matches!(iter.next(), Some(Err(Error::ReorgDepthExceeded))));
255+
256+
Ok(())
257+
}

0 commit comments

Comments
 (0)