-
Notifications
You must be signed in to change notification settings - Fork 404
fix(bitcoind_rpc): properly handle reorgs in FilterIter
#1985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
10fa62a
7e1cc26
c2ae4da
071fe40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki | ||
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki | ||
|
||
use bdk_core::collections::BTreeMap; | ||
use bdk_core::collections::{BTreeMap, BTreeSet}; | ||
use core::fmt; | ||
|
||
use bdk_core::bitcoin; | ||
|
@@ -33,20 +33,29 @@ pub struct FilterIter<'c, C> { | |
cp: Option<CheckPoint>, | ||
// blocks map | ||
blocks: BTreeMap<Height, BlockHash>, | ||
// set of heights with filters that matched any watched SPK | ||
matched: BTreeSet<Height>, | ||
// initial height | ||
start: Height, | ||
// best height counter | ||
height: Height, | ||
// stop height | ||
stop: Height, | ||
} | ||
|
||
impl<'c, C: RpcApi> FilterIter<'c, C> { | ||
/// Hard cap on how far to walk back when a reorg is detected. | ||
const MAX_REORG_DEPTH: u32 = 100; | ||
|
||
/// Construct [`FilterIter`] from a given `client` and start `height`. | ||
pub fn new_with_height(client: &'c C, height: u32) -> Self { | ||
Self { | ||
client, | ||
spks: vec![], | ||
cp: None, | ||
blocks: BTreeMap::new(), | ||
matched: BTreeSet::new(), | ||
start: height, | ||
height, | ||
stop: 0, | ||
} | ||
|
@@ -69,57 +78,28 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { | |
self.spks.push(spk); | ||
} | ||
|
||
/// Get the next filter and increment the current best height. | ||
/// | ||
/// Returns `Ok(None)` when the stop height is exceeded. | ||
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> { | ||
if self.height > self.stop { | ||
return Ok(None); | ||
} | ||
let height = self.height; | ||
let hash = match self.blocks.get(&height) { | ||
Some(h) => *h, | ||
None => self.client.get_block_hash(height as u64)?, | ||
}; | ||
let filter_bytes = self.client.get_block_filter(&hash)?.filter; | ||
let filter = BlockFilter::new(&filter_bytes); | ||
self.height += 1; | ||
Ok(Some((BlockId { height, hash }, filter))) | ||
/// Get the block hash by `height` if it is found in the blocks map. | ||
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> { | ||
self.blocks.get(height).copied() | ||
} | ||
|
||
/// Get the remote tip. | ||
/// | ||
/// Returns `None` if the remote height is not strictly greater than the height of this | ||
/// [`FilterIter`]. | ||
/// Returns `None` if the remote height is less than the height of this [`FilterIter`]. | ||
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> { | ||
Comment on lines
+85
to
86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a little out of scope for this PR, but the documentation here is a little lacking.
Is this the case? If so, purpose @ValuedMammal are you able to provide some clarity, thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ValuedMammal Thank you for the update. The internal implementation of What do you think about changing the API to properly represent a "single-use" workflow? I.e. I would change the following:
(Separate PR of course) Let me know what you think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this idea, and can make a follow up PR to implement this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it makes sense to make the API simpler/safer by making it a one time use struct that takes spks in the constructor. @LagginTimes please open a new issue or PR so we can continue exact implementation changes there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the other hand I think it would make sense to move the logic of Also +1 for single-use workflow. |
||
let tip_hash = self.client.get_best_block_hash()?; | ||
let mut header = self.client.get_block_header_info(&tip_hash)?; | ||
let header = self.client.get_block_header_info(&tip_hash)?; | ||
let tip_height = header.height as u32; | ||
if self.height >= tip_height { | ||
// Allow returning tip if we're exactly at it. Return `None`` if we've already scanned past. | ||
if self.height > tip_height { | ||
LagginTimes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// nothing to do | ||
return Ok(None); | ||
} | ||
self.blocks.insert(tip_height, tip_hash); | ||
|
||
// if we have a checkpoint we use a lookback of ten blocks | ||
// to ensure consistency of the local chain | ||
// start scanning from point of agreement + 1 | ||
if let Some(cp) = self.cp.as_ref() { | ||
// adjust start height to point of agreement + 1 | ||
let base = self.find_base_with(cp.clone())?; | ||
self.height = base.height + 1; | ||
|
||
for _ in 0..9 { | ||
let hash = match header.previous_block_hash { | ||
Some(hash) => hash, | ||
None => break, | ||
}; | ||
header = self.client.get_block_header_info(&hash)?; | ||
let height = header.height as u32; | ||
if height < self.height { | ||
break; | ||
} | ||
self.blocks.insert(height, hash); | ||
} | ||
self.height = base.height.saturating_add(1); | ||
} | ||
|
||
self.stop = tip_height; | ||
|
@@ -131,9 +111,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { | |
} | ||
} | ||
|
||
/// Alias for a compact filter and associated block id. | ||
type NextFilter = (BlockId, BlockFilter); | ||
|
||
/// Event inner type | ||
#[derive(Debug, Clone)] | ||
pub struct EventInner { | ||
|
@@ -171,27 +148,80 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> { | |
type Item = Result<Event, Error>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
(|| -> Result<_, Error> { | ||
// if the next filter matches any of our watched spks, get the block | ||
// and return it, inserting relevant block ids along the way | ||
self.next_filter()?.map_or(Ok(None), |(block, filter)| { | ||
let height = block.height; | ||
let hash = block.hash; | ||
|
||
if self.spks.is_empty() { | ||
Err(Error::NoScripts) | ||
} else if filter | ||
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes())) | ||
.map_err(Error::Bip158)? | ||
{ | ||
let block = self.client.get_block(&hash)?; | ||
self.blocks.insert(height, hash); | ||
let inner = EventInner { height, block }; | ||
Ok(Some(Event::Block(inner))) | ||
} else { | ||
Ok(Some(Event::NoMatch(height))) | ||
(|| -> Result<Option<_>, Error> { | ||
if self.height > self.stop { | ||
return Ok(None); | ||
} | ||
// Fetch next header. | ||
let mut height = self.height; | ||
let mut hash = self.client.get_block_hash(height as _)?; | ||
let mut header = self.client.get_block_header(&hash)?; | ||
|
||
// Detect and resolve reorgs: either block at height changed, or its parent changed. | ||
let stored_hash = self.blocks.get(&height).copied(); | ||
let prev_hash = height | ||
.checked_sub(1) | ||
.and_then(|height| self.blocks.get(&height).copied()); | ||
|
||
// If we've seen this height before but the hash has changed, or parent changed, trigger | ||
// reorg. | ||
let reorg_detected = if let Some(old_hash) = stored_hash { | ||
old_hash != hash | ||
} else if let Some(expected_prev) = prev_hash { | ||
header.prev_blockhash != expected_prev | ||
} else { | ||
false | ||
}; | ||
|
||
// Reorg detected, rewind to last known-good ancestor. | ||
if reorg_detected { | ||
let mut reorg_depth = 0; | ||
loop { | ||
if reorg_depth >= Self::MAX_REORG_DEPTH || height == 0 { | ||
return Err(Error::ReorgDepthExceeded); | ||
} | ||
|
||
height = height.saturating_sub(1); | ||
hash = self.client.get_block_hash(height as _)?; | ||
header = self.client.get_block_header(&hash)?; | ||
|
||
let prev_height = height.saturating_sub(1); | ||
if let Some(prev_hash) = self.blocks.get(&prev_height) { | ||
if header.prev_blockhash == *prev_hash { | ||
break; | ||
} | ||
} | ||
|
||
reorg_depth += 1; | ||
} | ||
}) | ||
|
||
self.blocks.split_off(&height); | ||
self.matched.split_off(&height); | ||
} | ||
LagginTimes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let filter_bytes = self.client.get_block_filter(&hash)?.filter; | ||
let filter = BlockFilter::new(&filter_bytes); | ||
|
||
// record the scanned block | ||
self.blocks.insert(height, hash); | ||
// increment best height | ||
self.height = height.saturating_add(1); | ||
LagginTimes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// If the filter matches any of our watched SPKs, fetch the full | ||
// block, and record the matching block entry. | ||
if self.spks.is_empty() { | ||
Err(Error::NoScripts) | ||
} else if filter | ||
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes())) | ||
.map_err(Error::Bip158)? | ||
{ | ||
let block = self.client.get_block(&hash)?; | ||
self.matched.insert(height); | ||
let inner = EventInner { height, block }; | ||
Ok(Some(Event::Block(inner))) | ||
} else { | ||
Ok(Some(Event::NoMatch(height))) | ||
} | ||
})() | ||
.transpose() | ||
} | ||
|
@@ -202,8 +232,8 @@ impl<C: RpcApi> FilterIter<'_, C> { | |
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> { | ||
loop { | ||
let height = cp.height(); | ||
let fetched_hash = match self.blocks.get(&height) { | ||
Some(hash) => *hash, | ||
let fetched_hash = match self.get_block_hash(&height) { | ||
Some(hash) => hash, | ||
None if height == 0 => cp.hash(), | ||
_ => self.client.get_block_hash(height as _)?, | ||
}; | ||
|
@@ -221,17 +251,27 @@ impl<C: RpcApi> FilterIter<'_, C> { | |
/// Returns a chain update from the newly scanned blocks. | ||
/// | ||
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or | ||
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip). | ||
/// if not all events have been emitted (by calling `next`). | ||
pub fn chain_update(&mut self) -> Option<CheckPoint> { | ||
if self.cp.is_none() || self.blocks.is_empty() { | ||
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop { | ||
return None; | ||
} | ||
|
||
// note: to connect with the local chain we must guarantee that `self.blocks.first()` | ||
// is also the point of agreement with `self.cp`. | ||
// We return blocks up to and including the initial height, all of the matching blocks, | ||
// and blocks in the terminal range. | ||
let tail_range = self.stop.saturating_sub(9)..=self.stop; | ||
Some( | ||
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from)) | ||
.expect("blocks must be in order"), | ||
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| { | ||
if height <= self.start | ||
|| self.matched.contains(&height) | ||
|| tail_range.contains(&height) | ||
{ | ||
Some(BlockId { height, hash }) | ||
} else { | ||
None | ||
} | ||
})) | ||
.expect("blocks must be in order"), | ||
) | ||
} | ||
} | ||
|
@@ -245,6 +285,8 @@ pub enum Error { | |
NoScripts, | ||
/// `bitcoincore_rpc` error | ||
Rpc(bitcoincore_rpc::Error), | ||
/// `MAX_REORG_DEPTH` exceeded | ||
ReorgDepthExceeded, | ||
} | ||
|
||
impl From<bitcoincore_rpc::Error> for Error { | ||
|
@@ -259,6 +301,7 @@ impl fmt::Display for Error { | |
Self::Bip158(e) => e.fmt(f), | ||
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"), | ||
Self::Rpc(e) => e.fmt(f), | ||
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"), | ||
LagginTimes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.