Skip to content

[WIP] Accelerate loading of frozen states #2431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use state_processing::{
per_block_processing::errors::AttestationValidationError,
per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, SigVerifiedOp,
SigVerifiedOp, VerificationStrategy,
};
use std::borrow::Cow;
use std::cmp::Ordering;
Expand Down Expand Up @@ -205,12 +205,10 @@ pub type BeaconForkChoice<T> = ForkChoice<
<T as BeaconChainTypes>::EthSpec,
>;

pub type BeaconStore<T> = Arc<
HotColdDB<
<T as BeaconChainTypes>::EthSpec,
<T as BeaconChainTypes>::HotStore,
<T as BeaconChainTypes>::ColdStore,
>,
pub type BeaconStore<T> = HotColdDB<
<T as BeaconChainTypes>::EthSpec,
<T as BeaconChainTypes>::HotStore,
<T as BeaconChainTypes>::ColdStore,
>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
Expand Down Expand Up @@ -1013,7 +1011,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Note: supplying some `state_root` when it is known would be a cheap and easy
// optimization.
match per_slot_processing(&mut state, skip_state_root, &self.spec) {
match per_slot_processing(&mut state, skip_state_root, None, &self.spec) {
Ok(_) => (),
Err(e) => {
warn!(
Expand Down Expand Up @@ -2779,7 +2777,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&mut state,
&block,
None,
BlockSignatureStrategy::NoVerification,
VerificationStrategy::no_signatures(),
&self.spec,
)?;
drop(process_timer);
Expand Down Expand Up @@ -3483,7 +3481,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

for (head_hash, _head_slot) in heads {
for maybe_pair in ParentRootBlockIterator::new(&*self.store, head_hash) {
for maybe_pair in ParentRootBlockIterator::new(&self.store, head_hash) {
let (block_hash, signed_beacon_block) = maybe_pair.unwrap();
if visited.contains(&block_hash) {
break;
Expand Down
7 changes: 3 additions & 4 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{metrics, BeaconSnapshot};
use fork_choice::ForkChoiceStore;
use ssz_derive::{Decode, Encode};
use std::marker::PhantomData;
use std::sync::Arc;
use store::{Error as StoreError, HotColdDB, ItemStore};
use types::{BeaconBlock, BeaconState, BeaconStateError, Checkpoint, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -156,7 +155,7 @@ impl BalancesCache {
/// `fork_choice::ForkChoice` struct.
#[derive(Debug)]
pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
balances_cache: BalancesCache,
time: Slot,
finalized_checkpoint: Checkpoint,
Expand Down Expand Up @@ -201,7 +200,7 @@ where
///
/// It is assumed that `anchor` is already persisted in `store`.
pub fn get_forkchoice_store(
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
anchor: &BeaconSnapshot<E>,
) -> Self {
let anchor_state = &anchor.beacon_state;
Expand Down Expand Up @@ -245,7 +244,7 @@ where
/// Restore `Self` from a previously-generated `PersistedForkChoiceStore`.
pub fn from_persisted(
persisted: PersistedForkChoiceStore,
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
) -> Result<Self, Error> {
Ok(Self {
store,
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
BlockProcessingError, SlotProcessingError, VerificationStrategy,
};
use std::borrow::Cow;
use std::fs;
Expand Down Expand Up @@ -969,7 +969,9 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
state_root
};

if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
if let Some(summary) =
per_slot_processing(&mut state, Some(state_root), None, &chain.spec)?
{
// Expose Prometheus metrics.
if let Err(e) = summary.observe_metrics() {
error!(
Expand All @@ -979,7 +981,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
"error" => ?e
);
}
summaries.push(summary);
summaries.push(summary)
}
}

Expand Down Expand Up @@ -1041,7 +1043,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
&block,
Some(block_root),
// Signatures were verified earlier in this function.
BlockSignatureStrategy::NoVerification,
VerificationStrategy::no_signatures(),
&chain.spec,
) {
match err {
Expand Down
9 changes: 4 additions & 5 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::beacon_chain::{BeaconStore, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::head_tracker::HeadTracker;
Expand Down Expand Up @@ -63,8 +63,7 @@ where
///
/// See the tests for an example of a complete working example.
pub struct BeaconChainBuilder<T: BeaconChainTypes> {
#[allow(clippy::type_complexity)]
store: Option<Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>>,
store: Option<BeaconStore<T>>,
store_migrator_config: Option<MigratorConfig>,
pub genesis_time: Option<u64>,
genesis_block_root: Option<Hash256>,
Expand Down Expand Up @@ -153,7 +152,7 @@ where
/// Sets the store (database).
///
/// Should generally be called early in the build chain.
pub fn store(mut self, store: Arc<HotColdDB<TEthSpec, THotStore, TColdStore>>) -> Self {
pub fn store(mut self, store: HotColdDB<TEthSpec, THotStore, TColdStore>) -> Self {
self.store = Some(store);
self
}
Expand Down Expand Up @@ -792,7 +791,7 @@ mod test {

let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(Arc::new(store))
.store(store)
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
Expand Down
9 changes: 4 additions & 5 deletions beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use fork_choice::ForkChoice;
use itertools::process_results;
use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{per_block_processing, per_block_processing::BlockSignatureStrategy};
use std::sync::Arc;
use state_processing::{per_block_processing, VerificationStrategy};
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot};

Expand All @@ -21,7 +20,7 @@ const CORRUPT_DB_MESSAGE: &str = "The database could be corrupt. Check its file
pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
current_slot: Slot,
head_block_root: Hash256,
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
spec: &ChainSpec,
log: &Logger,
) -> Result<(Hash256, SignedBeaconBlock<E>), String> {
Expand Down Expand Up @@ -87,7 +86,7 @@ pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>
pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
head_block_root: Hash256,
head_state: &BeaconState<E>,
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
spec: &ChainSpec,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
Expand Down Expand Up @@ -159,7 +158,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&mut state,
&block,
None,
BlockSignatureStrategy::NoVerification,
VerificationStrategy::no_verification(),
spec,
)
.map_err(|e| format!("Error replaying block: {:?}", e))?;
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
/// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
db: HotColdDB<E, Hot, Cold>,
#[allow(clippy::type_complexity)]
tx_thread: Option<Mutex<(mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>)>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
Expand Down Expand Up @@ -83,7 +83,7 @@ pub struct MigrationNotification {
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Create a new `BackgroundMigrator` and spawn its thread if necessary.
pub fn new(
db: Arc<HotColdDB<E, Hot, Cold>>,
db: HotColdDB<E, Hot, Cold>,
config: MigratorConfig,
genesis_block_root: Hash256,
log: Logger,
Expand Down Expand Up @@ -153,7 +153,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}

/// Perform the actual work of `process_finalization`.
fn run_migration(db: Arc<HotColdDB<E, Hot, Cold>>, notif: MigrationNotification, log: &Logger) {
fn run_migration(db: HotColdDB<E, Hot, Cold>, notif: MigrationNotification, log: &Logger) {
let finalized_state_root = notif.finalized_state_root;

let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
db: HotColdDB<E, Hot, Cold>,
log: Logger,
) -> (mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
Expand Down Expand Up @@ -258,7 +258,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
/// space.
#[allow(clippy::too_many_arguments)]
fn prune_abandoned_forks(
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
head_tracker: Arc<HeadTracker>,
new_finalized_state_hash: BeaconStateHash,
new_finalized_state: &BeaconState<E>,
Expand Down Expand Up @@ -516,7 +516,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
/// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it
/// was last compacted.
pub fn run_compaction(
db: Arc<HotColdDB<E, Hot, Cold>>,
db: HotColdDB<E, Hot, Cold>,
old_finalized_epoch: Epoch,
new_finalized_epoch: Epoch,
log: &Logger,
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
Expand All @@ -13,7 +12,7 @@ const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";

/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
db: HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
datadir: &Path,
from: SchemaVersion,
to: SchemaVersion,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn advance_head<T: BeaconChainTypes>(
};

// Advance the state a single slot.
if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec)
if let Some(summary) = per_slot_processing(&mut state, state_root, None, &beacon_chain.spec)
.map_err(BeaconChainError::from)?
{
// Expose Prometheus metrics.
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl<E: EthSpec> BeaconChainHarness<EphemeralHarnessType<E>> {
) -> Self {
let spec = spec.unwrap_or_else(test_spec::<E>);
let log = test_logger();
let store = Arc::new(HotColdDB::open_ephemeral(store_config, spec.clone(), log).unwrap());
let store = HotColdDB::open_ephemeral(store_config, spec.clone(), log).unwrap();
Self::new_with_mutator(
eth_spec_instance,
spec,
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
pub fn new_with_disk_store(
eth_spec_instance: E,
spec: Option<ChainSpec>,
store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>,
store: HotColdDB<E, LevelDB<E>, LevelDB<E>>,
validator_keypairs: Vec<Keypair>,
) -> Self {
let spec = spec.unwrap_or_else(test_spec::<E>);
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
pub fn resume_from_disk_store(
eth_spec_instance: E,
spec: Option<ChainSpec>,
store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>,
store: HotColdDB<E, LevelDB<E>, LevelDB<E>>,
validator_keypairs: Vec<Keypair>,
) -> Self {
let spec = spec.unwrap_or_else(test_spec::<E>);
Expand Down Expand Up @@ -329,7 +329,7 @@ where
pub fn new_with_mutator(
eth_spec_instance: E,
spec: ChainSpec,
store: Arc<HotColdDB<E, Hot, Cold>>,
store: HotColdDB<E, Hot, Cold>,
validator_keypairs: Vec<Keypair>,
chain_config: ChainConfig,
mutator: impl FnOnce(
Expand Down
5 changes: 1 addition & 4 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Res
mod test {
use super::*;
use crate::test_utils::{test_logger, BeaconChainHarness, EphemeralHarnessType};
use std::sync::Arc;
use store::{HotColdDB, StoreConfig};
use tempfile::tempdir;
use types::{
Expand All @@ -348,9 +347,7 @@ mod test {
}

fn get_store() -> BeaconStore<T> {
Arc::new(
HotColdDB::open_ephemeral(<_>::default(), E::default_spec(), test_logger()).unwrap(),
)
HotColdDB::open_ephemeral(<_>::default(), E::default_spec(), test_logger()).unwrap()
}

#[allow(clippy::needless_range_loop)]
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/tests/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ fn attestation_that_skips_epochs() {
.expect("should find state");

while state.slot() < current_slot {
per_slot_processing(&mut state, None, &harness.spec).expect("should process slot");
per_slot_processing(&mut state, None, None, &harness.spec).expect("should process slot");
}

let state_root = state.update_tree_hash_cache().unwrap();
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use slasher::{Config as SlasherConfig, Slasher};
use state_processing::{
common::get_indexed_attestation,
per_block_processing::{per_block_processing, BlockSignatureStrategy},
per_slot_processing, BlockProcessingError,
per_slot_processing, BlockProcessingError, VerificationStrategy,
};
use std::sync::Arc;
use store::config::StoreConfig;
Expand Down Expand Up @@ -977,13 +977,13 @@ fn add_base_block_to_altair_chain() {
// Ensure that it would be impossible to apply this block to `per_block_processing`.
{
let mut state = state;
per_slot_processing(&mut state, None, &harness.chain.spec).unwrap();
per_slot_processing(&mut state, None, None, &harness.chain.spec).unwrap();
assert!(matches!(
per_block_processing(
&mut state,
&base_block,
None,
BlockSignatureStrategy::NoVerification,
VerificationStrategy::no_signatures(),
&harness.chain.spec,
),
Err(BlockProcessingError::InconsistentBlockFork(
Expand Down Expand Up @@ -1097,13 +1097,13 @@ fn add_altair_block_to_base_chain() {
// Ensure that it would be impossible to apply this block to `per_block_processing`.
{
let mut state = state;
per_slot_processing(&mut state, None, &harness.chain.spec).unwrap();
per_slot_processing(&mut state, None, None, &harness.chain.spec).unwrap();
assert!(matches!(
per_block_processing(
&mut state,
&altair_block,
None,
BlockSignatureStrategy::NoVerification,
VerificationStrategy::no_signatures(),
&harness.chain.spec,
),
Err(BlockProcessingError::InconsistentBlockFork(
Expand Down
Loading