Skip to content

Commit a290a3c

Browse files
committed
Add configurable block replayer (#2863)
## Issue Addressed Successor to #2431 ## Proposed Changes * Add a `BlockReplayer` struct to abstract over the intricacies of calling `per_slot_processing` and `per_block_processing` while avoiding unnecessary tree hashing. * Add a variant of the forwards state root iterator that does not require an `end_state`. * Use the `BlockReplayer` when reconstructing states in the database. Use the efficient forwards iterator for frozen states. * Refactor the iterators to remove `Arc<HotColdDB>` (this seems to be neater than making _everything_ an `Arc<HotColdDB>` as I did in #2431). Supplying the state roots allow us to avoid building a tree hash cache at all when reconstructing historic states, which saves around 1 second flat (regardless of `slots-per-restore-point`). This is a small percentage of worst-case state load times with 200K validators and SPRP=2048 (~15s vs ~16s) but a significant speed-up for more frequent restore points: state loads with SPRP=32 should be now consistently <500ms instead of 1.5s (a ~3x speedup). ## Additional Info Required by #2628
1 parent 56d596e commit a290a3c

25 files changed

+955
-443
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 116 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use state_processing::{
6969
per_block_processing::{errors::AttestationValidationError, is_merge_transition_complete},
7070
per_slot_processing,
7171
state_advance::{complete_state_advance, partial_state_advance},
72-
BlockSignatureStrategy, SigVerifiedOp,
72+
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot,
7373
};
7474
use std::borrow::Cow;
7575
use std::cmp::Ordering;
@@ -488,7 +488,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
488488
pub fn forwards_iter_block_roots(
489489
&self,
490490
start_slot: Slot,
491-
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
491+
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
492492
let oldest_block_slot = self.store.get_oldest_block_slot();
493493
if start_slot < oldest_block_slot {
494494
return Err(Error::HistoricalBlockError(
@@ -501,8 +501,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
501501

502502
let local_head = self.head()?;
503503

504-
let iter = HotColdDB::forwards_block_roots_iterator(
505-
self.store.clone(),
504+
let iter = self.store.forwards_block_roots_iterator(
506505
start_slot,
507506
local_head.beacon_state,
508507
local_head.beacon_block_root,
@@ -512,6 +511,43 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
512511
Ok(iter.map(|result| result.map_err(Into::into)))
513512
}
514513

514+
/// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head
515+
/// state if it isn't required for the requested range of blocks.
516+
pub fn forwards_iter_block_roots_until(
517+
&self,
518+
start_slot: Slot,
519+
end_slot: Slot,
520+
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
521+
let oldest_block_slot = self.store.get_oldest_block_slot();
522+
if start_slot < oldest_block_slot {
523+
return Err(Error::HistoricalBlockError(
524+
HistoricalBlockError::BlockOutOfRange {
525+
slot: start_slot,
526+
oldest_block_slot,
527+
},
528+
));
529+
}
530+
531+
self.with_head(move |head| {
532+
let iter = self.store.forwards_block_roots_iterator_until(
533+
start_slot,
534+
end_slot,
535+
|| {
536+
(
537+
head.beacon_state.clone_with_only_committee_caches(),
538+
head.beacon_block_root,
539+
)
540+
},
541+
&self.spec,
542+
)?;
543+
Ok(iter
544+
.map(|result| result.map_err(Into::into))
545+
.take_while(move |result| {
546+
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
547+
}))
548+
})
549+
}
550+
515551
/// Traverse backwards from `block_root` to find the block roots of its ancestors.
516552
///
517553
/// ## Notes
@@ -524,14 +560,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
524560
pub fn rev_iter_block_roots_from(
525561
&self,
526562
block_root: Hash256,
527-
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
563+
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
528564
let block = self
529565
.get_block(&block_root)?
530566
.ok_or(Error::MissingBeaconBlock(block_root))?;
531567
let state = self
532568
.get_state(&block.state_root(), Some(block.slot()))?
533569
.ok_or_else(|| Error::MissingBeaconState(block.state_root()))?;
534-
let iter = BlockRootsIterator::owned(self.store.clone(), state);
570+
let iter = BlockRootsIterator::owned(&self.store, state);
535571
Ok(std::iter::once(Ok((block_root, block.slot())))
536572
.chain(iter)
537573
.map(|result| result.map_err(|e| e.into())))
@@ -618,12 +654,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
618654
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
619655
/// returned may be earlier than the wall-clock slot.
620656
pub fn rev_iter_state_roots_from<'a>(
621-
&self,
657+
&'a self,
622658
state_root: Hash256,
623659
state: &'a BeaconState<T::EthSpec>,
624660
) -> impl Iterator<Item = Result<(Hash256, Slot), Error>> + 'a {
625661
std::iter::once(Ok((state_root, state.slot())))
626-
.chain(StateRootsIterator::new(self.store.clone(), state))
662+
.chain(StateRootsIterator::new(&self.store, state))
627663
.map(|result| result.map_err(Into::into))
628664
}
629665

@@ -637,11 +673,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
637673
pub fn forwards_iter_state_roots(
638674
&self,
639675
start_slot: Slot,
640-
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
676+
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
641677
let local_head = self.head()?;
642678

643-
let iter = HotColdDB::forwards_state_roots_iterator(
644-
self.store.clone(),
679+
let iter = self.store.forwards_state_roots_iterator(
645680
start_slot,
646681
local_head.beacon_state_root(),
647682
local_head.beacon_state,
@@ -651,6 +686,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
651686
Ok(iter.map(|result| result.map_err(Into::into)))
652687
}
653688

689+
/// Super-efficient forwards state roots iterator that avoids cloning the head if the state
690+
/// roots lie entirely within the freezer database.
691+
///
692+
/// The iterator returned will include roots for `start_slot..=end_slot`, i.e. it
693+
/// is endpoint inclusive.
694+
pub fn forwards_iter_state_roots_until(
695+
&self,
696+
start_slot: Slot,
697+
end_slot: Slot,
698+
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
699+
self.with_head(move |head| {
700+
let iter = self.store.forwards_state_roots_iterator_until(
701+
start_slot,
702+
end_slot,
703+
|| {
704+
(
705+
head.beacon_state.clone_with_only_committee_caches(),
706+
head.beacon_state_root(),
707+
)
708+
},
709+
&self.spec,
710+
)?;
711+
Ok(iter
712+
.map(|result| result.map_err(Into::into))
713+
.take_while(move |result| {
714+
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
715+
}))
716+
})
717+
}
718+
654719
/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
655720
///
656721
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
@@ -708,18 +773,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
708773
return Ok(Some(root));
709774
}
710775

711-
process_results(self.forwards_iter_state_roots(request_slot)?, |mut iter| {
712-
if let Some((root, slot)) = iter.next() {
713-
if slot == request_slot {
714-
Ok(Some(root))
776+
process_results(
777+
self.forwards_iter_state_roots_until(request_slot, request_slot)?,
778+
|mut iter| {
779+
if let Some((root, slot)) = iter.next() {
780+
if slot == request_slot {
781+
Ok(Some(root))
782+
} else {
783+
// Sanity check.
784+
Err(Error::InconsistentForwardsIter { request_slot, slot })
785+
}
715786
} else {
716-
// Sanity check.
717-
Err(Error::InconsistentForwardsIter { request_slot, slot })
787+
Ok(None)
718788
}
719-
} else {
720-
Ok(None)
721-
}
722-
})?
789+
},
790+
)?
723791
}
724792

725793
/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
@@ -790,11 +858,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
790858
return Ok(root_opt);
791859
}
792860

793-
if let Some(((prev_root, _), (curr_root, curr_slot))) =
794-
process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| {
795-
iter.tuple_windows().next()
796-
})?
797-
{
861+
if let Some(((prev_root, _), (curr_root, curr_slot))) = process_results(
862+
self.forwards_iter_block_roots_until(prev_slot, request_slot)?,
863+
|iter| iter.tuple_windows().next(),
864+
)? {
798865
// Sanity check.
799866
if curr_slot != request_slot {
800867
return Err(Error::InconsistentForwardsIter {
@@ -842,18 +909,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
842909
return Ok(Some(root));
843910
}
844911

845-
process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| {
846-
if let Some((root, slot)) = iter.next() {
847-
if slot == request_slot {
848-
Ok(Some(root))
912+
process_results(
913+
self.forwards_iter_block_roots_until(request_slot, request_slot)?,
914+
|mut iter| {
915+
if let Some((root, slot)) = iter.next() {
916+
if slot == request_slot {
917+
Ok(Some(root))
918+
} else {
919+
// Sanity check.
920+
Err(Error::InconsistentForwardsIter { request_slot, slot })
921+
}
849922
} else {
850-
// Sanity check.
851-
Err(Error::InconsistentForwardsIter { request_slot, slot })
923+
Ok(None)
852924
}
853-
} else {
854-
Ok(None)
855-
}
856-
})?
925+
},
926+
)?
857927
}
858928

859929
/// Returns the block at the given root, if any.
@@ -1112,12 +1182,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
11121182
Ok(state)
11131183
}
11141184
Ordering::Less => {
1115-
let state_root = process_results(self.forwards_iter_state_roots(slot)?, |iter| {
1116-
iter.take_while(|(_, current_slot)| *current_slot >= slot)
1117-
.find(|(_, current_slot)| *current_slot == slot)
1118-
.map(|(root, _slot)| root)
1119-
})?
1120-
.ok_or(Error::NoStateForSlot(slot))?;
1185+
let state_root =
1186+
process_results(self.forwards_iter_state_roots_until(slot, slot)?, |iter| {
1187+
iter.take_while(|(_, current_slot)| *current_slot >= slot)
1188+
.find(|(_, current_slot)| *current_slot == slot)
1189+
.map(|(root, _slot)| root)
1190+
})?
1191+
.ok_or(Error::NoStateForSlot(slot))?;
11211192

11221193
Ok(self
11231194
.get_state(&state_root, Some(slot))?
@@ -1256,7 +1327,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
12561327
beacon_block_root: Hash256,
12571328
state: &BeaconState<T::EthSpec>,
12581329
) -> Result<Option<Hash256>, Error> {
1259-
let iter = BlockRootsIterator::new(self.store.clone(), state);
1330+
let iter = BlockRootsIterator::new(&self.store, state);
12601331
let iter_with_head = std::iter::once(Ok((beacon_block_root, state.slot())))
12611332
.chain(iter)
12621333
.map(|result| result.map_err(|e| e.into()));
@@ -2983,6 +3054,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29833054
&block,
29843055
None,
29853056
BlockSignatureStrategy::VerifyRandao,
3057+
VerifyBlockRoot::True,
29863058
&self.spec,
29873059
)?;
29883060
drop(process_timer);
@@ -3324,7 +3396,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33243396
.epoch
33253397
.start_slot(T::EthSpec::slots_per_epoch());
33263398
let new_finalized_state_root = process_results(
3327-
StateRootsIterator::new(self.store.clone(), &head.beacon_state),
3399+
StateRootsIterator::new(&self.store, &head.beacon_state),
33283400
|mut iter| {
33293401
iter.find_map(|(state_root, slot)| {
33303402
if slot == new_finalized_slot {

beacon_node/beacon_chain/src/block_verification.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ use state_processing::{
6565
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
6666
per_block_processing, per_slot_processing,
6767
state_advance::partial_state_advance,
68-
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
68+
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, VerifyBlockRoot,
6969
};
7070
use std::borrow::Cow;
7171
use std::fs;
@@ -1185,6 +1185,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
11851185
Some(block_root),
11861186
// Signatures were verified earlier in this function.
11871187
BlockSignatureStrategy::NoVerification,
1188+
VerifyBlockRoot::True,
11881189
&chain.spec,
11891190
) {
11901191
match err {

beacon_node/beacon_chain/src/errors.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use state_processing::{
2020
},
2121
signature_sets::Error as SignatureSetError,
2222
state_advance::Error as StateAdvanceError,
23-
BlockProcessingError, SlotProcessingError,
23+
BlockProcessingError, BlockReplayError, SlotProcessingError,
2424
};
2525
use std::time::Duration;
2626
use task_executor::ShutdownReason;
@@ -86,6 +86,7 @@ pub enum BeaconChainError {
8686
ValidatorPubkeyCacheIncomplete(usize),
8787
SignatureSetError(SignatureSetError),
8888
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
89+
BlockReplayError(BlockReplayError),
8990
DuplicateValidatorPublicKey,
9091
ValidatorPubkeyCacheFileError(String),
9192
ValidatorIndexUnknown(usize),
@@ -160,6 +161,7 @@ easy_from_to!(ArithError, BeaconChainError);
160161
easy_from_to!(ForkChoiceStoreError, BeaconChainError);
161162
easy_from_to!(HistoricalBlockError, BeaconChainError);
162163
easy_from_to!(StateAdvanceError, BeaconChainError);
164+
easy_from_to!(BlockReplayError, BeaconChainError);
163165

164166
#[derive(Debug)]
165167
pub enum BlockProductionError {

beacon_node/beacon_chain/src/fork_revert.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use fork_choice::{ForkChoice, PayloadVerificationStatus};
33
use itertools::process_results;
44
use slog::{info, warn, Logger};
55
use state_processing::state_advance::complete_state_advance;
6-
use state_processing::{per_block_processing, per_block_processing::BlockSignatureStrategy};
6+
use state_processing::{
7+
per_block_processing, per_block_processing::BlockSignatureStrategy, VerifyBlockRoot,
8+
};
79
use std::sync::Arc;
810
use std::time::Duration;
911
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
@@ -161,6 +163,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
161163
&block,
162164
None,
163165
BlockSignatureStrategy::NoVerification,
166+
VerifyBlockRoot::True,
164167
spec,
165168
)
166169
.map_err(|e| format!("Error replaying block: {:?}", e))?;

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,13 +360,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
360360
new_finalized_slot,
361361
(new_finalized_block_hash, new_finalized_state_hash),
362362
)))
363-
.chain(
364-
RootsIterator::new(store.clone(), new_finalized_state).map(|res| {
365-
res.map(|(block_root, state_root, slot)| {
366-
(slot, (block_root.into(), state_root.into()))
367-
})
368-
}),
369-
)
363+
.chain(RootsIterator::new(&store, new_finalized_state).map(|res| {
364+
res.map(|(block_root, state_root, slot)| {
365+
(slot, (block_root.into(), state_root.into()))
366+
})
367+
}))
370368
.take_while(|res| {
371369
res.as_ref()
372370
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
@@ -416,7 +414,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
416414

417415
// Iterate backwards from this head, staging blocks and states for deletion.
418416
let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot)))
419-
.chain(RootsIterator::from_block(store.clone(), head_hash)?);
417+
.chain(RootsIterator::from_block(&store, head_hash)?);
420418

421419
for maybe_tuple in iter {
422420
let (block_root, state_root, slot) = maybe_tuple?;

beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ fn map_relevant_epochs_to_roots<T: BeaconChainTypes>(
189189

190190
// Iterate backwards from the given `head_root` and `head_slot` and find the block root at each epoch.
191191
let mut iter = std::iter::once(Ok((head_root, head_slot)))
192-
.chain(BlockRootsIterator::from_block(db, head_root).map_err(|e| format!("{:?}", e))?);
192+
.chain(BlockRootsIterator::from_block(&db, head_root).map_err(|e| format!("{:?}", e))?);
193193
let mut roots_by_epoch = HashMap::new();
194194
for epoch in relevant_epochs {
195195
let start_slot = epoch.start_slot(T::EthSpec::slots_per_epoch());

0 commit comments

Comments
 (0)