Skip to content

Commit df02639

Browse files
committed
De-duplicate attestations in the slasher (#2767)
## Issue Addressed Closes #2112 Closes #1861 ## Proposed Changes Collect attestations by validator index in the slasher, and use the magic of reference counting to automatically discard redundant attestations. This results in us storing only 1-2% of the attestations observed when subscribed to all subnets, which carries over to a 50-100x reduction in data stored 🎉 ## Additional Info There's some nuance to the configuration of the `slot-offset`. It has a profound effect on the effictiveness of de-duplication, see the docs added to the book for an explanation: https://github.com/michaelsproul/lighthouse/blob/5442e695e5256046b91d4b4f45b7d244b0d8ad12/book/src/slasher.md#slot-offset
1 parent fadb8b2 commit df02639

File tree

13 files changed

+252
-93
lines changed

13 files changed

+252
-93
lines changed

beacon_node/src/cli.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
451451
.requires("slasher")
452452
.takes_value(true)
453453
)
454+
.arg(
455+
Arg::with_name("slasher-slot-offset")
456+
.long("slasher-slot-offset")
457+
.help(
458+
"Set the delay from the start of the slot at which the slasher should ingest \
459+
attestations. Only effective if the slasher-update-period is a multiple of the \
460+
slot duration."
461+
)
462+
.value_name("SECONDS")
463+
.requires("slasher")
464+
.takes_value(true)
465+
)
454466
.arg(
455467
Arg::with_name("slasher-history-length")
456468
.long("slasher-history-length")

beacon_node/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,19 @@ pub fn get_config<E: EthSpec>(
448448
slasher_config.update_period = update_period;
449449
}
450450

451+
if let Some(slot_offset) =
452+
clap_utils::parse_optional::<f64>(cli_args, "slasher-slot-offset")?
453+
{
454+
if slot_offset.is_finite() {
455+
slasher_config.slot_offset = slot_offset;
456+
} else {
457+
return Err(format!(
458+
"invalid float for slasher-slot-offset: {}",
459+
slot_offset
460+
));
461+
}
462+
}
463+
451464
if let Some(history_length) =
452465
clap_utils::parse_optional(cli_args, "slasher-history-length")?
453466
{

book/src/slasher.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,31 @@ If the `time_taken` is substantially longer than the update period then it indic
102102
struggling under the load, and you should consider increasing the update period or lowering the
103103
resource requirements by tweaking the history length.
104104

105+
The update period should almost always be set to a multiple of the slot duration (12
106+
seconds), or in rare cases a divisor (e.g. 4 seconds).
107+
108+
### Slot Offset
109+
110+
* Flag: `--slasher-slot-offset SECONDS`
111+
* Argument: number of seconds (decimal allowed)
112+
* Default: 10.5 seconds
113+
114+
Set the offset from the start of the slot at which slasher processing should run. The default
115+
value of 10.5 seconds is chosen so that de-duplication can be maximally effective. The slasher
116+
will de-duplicate attestations from the same batch by storing only the attestations necessary
117+
to cover all seen validators. In other words, it will store aggregated attestations rather than
118+
unaggregated attestations if given the opportunity.
119+
120+
Aggregated attestations are published 8 seconds into the slot, so the default allows 2.5 seconds for
121+
them to arrive, and 1.5 seconds for them to be processed before a potential block proposal at the
122+
start of the next slot. If the batch processing time on your machine is significantly longer than
123+
1.5 seconds then you may want to lengthen the update period to 24 seconds, or decrease the slot
124+
offset to a value in the range 8.5-10.5s (lower values may result in more data being stored).
125+
126+
The slasher will run every `update-period` seconds after the first `slot_start + slot-offset`, which
127+
means the `slot-offset` will be ineffective if the `update-period` is not a multiple (or divisor) of
128+
the slot duration.
129+
105130
### Chunk Size and Validator Chunk Size
106131

107132
* Flags: `--slasher-chunk-size EPOCHS`, `--slasher-validator-chunk-size NUM_VALIDATORS`

lighthouse/tests/beacon_node.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,25 @@ fn slasher_update_period_flag() {
795795
});
796796
}
797797
#[test]
798+
fn slasher_slot_offset() {
799+
// TODO: check that the offset is actually stored, once the config is un-hacked
800+
// See: https://github.com/sigp/lighthouse/pull/2767#discussion_r741610402
801+
CommandLineTest::new()
802+
.flag("slasher", None)
803+
.flag("slasher-max-db-size", Some("16"))
804+
.flag("slasher-slot-offset", Some("11.25"))
805+
.run();
806+
}
807+
#[test]
808+
#[should_panic]
809+
fn slasher_slot_offset_nan() {
810+
CommandLineTest::new()
811+
.flag("slasher", None)
812+
.flag("slasher-max-db-size", Some("16"))
813+
.flag("slasher-slot-offset", Some("NaN"))
814+
.run();
815+
}
816+
#[test]
798817
fn slasher_history_length_flag() {
799818
CommandLineTest::new()
800819
.flag("slasher", None)

slasher/service/src/service.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,18 @@ impl<T: BeaconChainTypes> SlasherService<T> {
5555
// don't need to burden them with more work (we can wait).
5656
let (notif_sender, notif_receiver) = sync_channel(1);
5757
let update_period = slasher.config().update_period;
58+
let slot_offset = slasher.config().slot_offset;
5859
let beacon_chain = self.beacon_chain.clone();
5960
let network_sender = self.network_sender.clone();
6061

6162
executor.spawn(
62-
Self::run_notifier(beacon_chain.clone(), update_period, notif_sender, log),
63+
Self::run_notifier(
64+
beacon_chain.clone(),
65+
update_period,
66+
slot_offset,
67+
notif_sender,
68+
log,
69+
),
6370
"slasher_server_notifier",
6471
);
6572

@@ -75,12 +82,19 @@ impl<T: BeaconChainTypes> SlasherService<T> {
7582
async fn run_notifier(
7683
beacon_chain: Arc<BeaconChain<T>>,
7784
update_period: u64,
85+
slot_offset: f64,
7886
notif_sender: SyncSender<Epoch>,
7987
log: Logger,
8088
) {
81-
// NOTE: could align each run to some fixed point in each slot, see:
82-
// https://github.com/sigp/lighthouse/issues/1861
83-
let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period));
89+
let slot_offset = Duration::from_secs_f64(slot_offset);
90+
let start_instant =
91+
if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() {
92+
Instant::now() + duration_to_next_slot + slot_offset
93+
} else {
94+
error!(log, "Error aligning slasher to slot clock");
95+
Instant::now()
96+
};
97+
let mut interval = interval_at(start_instant, Duration::from_secs(update_period));
8498

8599
loop {
86100
interval.tick().await;

slasher/src/array.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED};
2-
use crate::{AttesterRecord, AttesterSlashingStatus, Config, Error, SlasherDB};
2+
use crate::{AttesterSlashingStatus, Config, Error, IndexedAttesterRecord, SlasherDB};
33
use flate2::bufread::{ZlibDecoder, ZlibEncoder};
44
use lmdb::{RwTransaction, Transaction};
55
use serde_derive::{Deserialize, Serialize};
@@ -486,7 +486,7 @@ pub fn update<E: EthSpec>(
486486
db: &SlasherDB<E>,
487487
txn: &mut RwTransaction<'_>,
488488
validator_chunk_index: usize,
489-
batch: Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>,
489+
batch: Vec<Arc<IndexedAttesterRecord<E>>>,
490490
current_epoch: Epoch,
491491
config: &Config,
492492
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
@@ -496,7 +496,7 @@ pub fn update<E: EthSpec>(
496496
let mut chunk_attestations = BTreeMap::new();
497497
for attestation in batch {
498498
chunk_attestations
499-
.entry(config.chunk_index(attestation.0.data.source.epoch))
499+
.entry(config.chunk_index(attestation.indexed.data.source.epoch))
500500
.or_insert_with(Vec::new)
501501
.push(attestation);
502502
}
@@ -573,7 +573,7 @@ pub fn update_array<E: EthSpec, T: TargetArrayChunk>(
573573
db: &SlasherDB<E>,
574574
txn: &mut RwTransaction<'_>,
575575
validator_chunk_index: usize,
576-
chunk_attestations: &BTreeMap<usize, Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>>,
576+
chunk_attestations: &BTreeMap<usize, Vec<Arc<IndexedAttesterRecord<E>>>>,
577577
current_epoch: Epoch,
578578
config: &Config,
579579
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
@@ -597,19 +597,19 @@ pub fn update_array<E: EthSpec, T: TargetArrayChunk>(
597597
for attestations in chunk_attestations.values() {
598598
for attestation in attestations {
599599
for validator_index in
600-
config.attesting_validators_in_chunk(&attestation.0, validator_chunk_index)
600+
config.attesting_validators_in_chunk(&attestation.indexed, validator_chunk_index)
601601
{
602602
let slashing_status = apply_attestation_for_validator::<E, T>(
603603
db,
604604
txn,
605605
&mut updated_chunks,
606606
validator_chunk_index,
607607
validator_index,
608-
&attestation.0,
608+
&attestation.indexed,
609609
current_epoch,
610610
config,
611611
)?;
612-
if let Some(slashing) = slashing_status.into_slashing(&attestation.0) {
612+
if let Some(slashing) = slashing_status.into_slashing(&attestation.indexed) {
613613
slashings.insert(slashing);
614614
}
615615
}

slasher/src/attestation_queue.rs

Lines changed: 66 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,104 @@
1-
use crate::{AttesterRecord, Config};
1+
use crate::{AttesterRecord, Config, IndexedAttesterRecord};
22
use parking_lot::Mutex;
3-
use std::collections::BTreeSet;
4-
use std::sync::Arc;
5-
use types::{EthSpec, IndexedAttestation};
3+
use std::collections::BTreeMap;
4+
use std::sync::{Arc, Weak};
5+
use types::{EthSpec, Hash256, IndexedAttestation};
66

77
/// Staging area for attestations received from the network.
88
///
9-
/// To be added to the database in batches, for efficiency and to prevent data races.
9+
/// Attestations are not grouped by validator index at this stage so that they can be easily
10+
/// filtered for timeliness.
1011
#[derive(Debug, Default)]
1112
pub struct AttestationQueue<E: EthSpec> {
12-
/// All attestations (unique) for storage on disk.
13-
pub queue: Mutex<AttestationBatch<E>>,
13+
pub queue: Mutex<SimpleBatch<E>>,
1414
}
1515

16-
/// Attestations grouped by validator index range.
17-
#[derive(Debug)]
18-
pub struct GroupedAttestations<E: EthSpec> {
19-
pub subqueues: Vec<AttestationBatch<E>>,
20-
}
16+
pub type SimpleBatch<E> = Vec<Arc<IndexedAttesterRecord<E>>>;
2117

22-
/// A queue of attestations for a range of validator indices.
18+
/// Attestations dequeued from the queue and in preparation for processing.
19+
///
20+
/// This struct is responsible for mapping validator indices to attestations and performing
21+
/// de-duplication to remove redundant attestations.
2322
#[derive(Debug, Default)]
2423
pub struct AttestationBatch<E: EthSpec> {
25-
pub attestations: Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>,
24+
/// Map from (`validator_index`, `attestation_data_hash`) to indexed attester record.
25+
///
26+
/// This mapping is used for de-duplication, see:
27+
///
28+
/// https://github.com/sigp/lighthouse/issues/2112
29+
pub attesters: BTreeMap<(u64, Hash256), Arc<IndexedAttesterRecord<E>>>,
30+
31+
/// Vec of all unique indexed attester records.
32+
///
33+
/// The weak references account for the fact that some records might prove useless after
34+
/// de-duplication.
35+
pub attestations: Vec<Weak<IndexedAttesterRecord<E>>>,
36+
}
37+
38+
/// Attestations grouped by validator index range.
39+
#[derive(Debug)]
40+
pub struct GroupedAttestations<E: EthSpec> {
41+
pub subqueues: Vec<SimpleBatch<E>>,
2642
}
2743

2844
impl<E: EthSpec> AttestationBatch<E> {
29-
pub fn len(&self) -> usize {
30-
self.attestations.len()
31-
}
45+
/// Add an attestation to the queue.
46+
pub fn queue(&mut self, indexed_record: Arc<IndexedAttesterRecord<E>>) {
47+
self.attestations.push(Arc::downgrade(&indexed_record));
3248

33-
pub fn is_empty(&self) -> bool {
34-
self.attestations.is_empty()
49+
let attestation_data_hash = indexed_record.record.attestation_data_hash;
50+
for &validator_index in &indexed_record.indexed.attesting_indices {
51+
self.attesters
52+
.entry((validator_index, attestation_data_hash))
53+
.and_modify(|existing_entry| {
54+
// If the new record is for the same attestation data but with more bits set
55+
// then replace the existing record so that we might avoid storing the
56+
// smaller indexed attestation. Single-bit attestations will usually be removed
57+
// completely by this process, and aggregates will only be retained if they
58+
// are not redundant with respect to a larger aggregate seen in the same batch.
59+
if existing_entry.indexed.attesting_indices.len()
60+
< indexed_record.indexed.attesting_indices.len()
61+
{
62+
*existing_entry = indexed_record.clone();
63+
}
64+
})
65+
.or_insert_with(|| indexed_record.clone());
66+
}
3567
}
3668

37-
/// Group the attestations by validator index.
38-
pub fn group_by_validator_index(self, config: &Config) -> GroupedAttestations<E> {
69+
/// Group the attestations by validator chunk index.
70+
pub fn group_by_validator_chunk_index(self, config: &Config) -> GroupedAttestations<E> {
3971
let mut grouped_attestations = GroupedAttestations { subqueues: vec![] };
4072

41-
for attestation in self.attestations {
42-
let subqueue_ids = attestation
43-
.0
44-
.attesting_indices
45-
.iter()
46-
.map(|validator_index| config.validator_chunk_index(*validator_index))
47-
.collect::<BTreeSet<_>>();
48-
49-
if let Some(max_subqueue_id) = subqueue_ids.iter().next_back() {
50-
if *max_subqueue_id >= grouped_attestations.subqueues.len() {
51-
grouped_attestations
52-
.subqueues
53-
.resize_with(max_subqueue_id + 1, AttestationBatch::default);
54-
}
55-
}
73+
for ((validator_index, _), indexed_record) in self.attesters {
74+
let subqueue_id = config.validator_chunk_index(validator_index);
5675

57-
for subqueue_id in subqueue_ids {
58-
grouped_attestations.subqueues[subqueue_id]
59-
.attestations
60-
.push(attestation.clone());
76+
if subqueue_id >= grouped_attestations.subqueues.len() {
77+
grouped_attestations
78+
.subqueues
79+
.resize_with(subqueue_id + 1, SimpleBatch::default);
6180
}
81+
82+
grouped_attestations.subqueues[subqueue_id].push(indexed_record);
6283
}
6384

6485
grouped_attestations
6586
}
6687
}
6788

6889
impl<E: EthSpec> AttestationQueue<E> {
69-
/// Add an attestation to the queue.
7090
pub fn queue(&self, attestation: IndexedAttestation<E>) {
7191
let attester_record = AttesterRecord::from(attestation.clone());
72-
self.queue
73-
.lock()
74-
.attestations
75-
.push(Arc::new((attestation, attester_record)));
92+
let indexed_record = IndexedAttesterRecord::new(attestation, attester_record);
93+
self.queue.lock().push(indexed_record);
7694
}
7795

78-
pub fn dequeue(&self) -> AttestationBatch<E> {
96+
pub fn dequeue(&self) -> SimpleBatch<E> {
7997
std::mem::take(&mut self.queue.lock())
8098
}
8199

82-
pub fn requeue(&self, batch: AttestationBatch<E>) {
83-
self.queue.lock().attestations.extend(batch.attestations);
100+
pub fn requeue(&self, batch: SimpleBatch<E>) {
101+
self.queue.lock().extend(batch);
84102
}
85103

86104
pub fn len(&self) -> usize {

slasher/src/attester_record.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use ssz_derive::{Decode, Encode};
2+
use std::sync::Arc;
23
use tree_hash::TreeHash as _;
34
use tree_hash_derive::TreeHash;
45
use types::{AggregateSignature, EthSpec, Hash256, IndexedAttestation, VariableList};
@@ -11,6 +12,21 @@ pub struct AttesterRecord {
1112
pub indexed_attestation_hash: Hash256,
1213
}
1314

15+
/// Bundling of an `IndexedAttestation` with an `AttesterRecord`.
16+
///
17+
/// This struct gets `Arc`d and passed around between each stage of queueing and processing.
18+
#[derive(Debug)]
19+
pub struct IndexedAttesterRecord<E: EthSpec> {
20+
pub indexed: IndexedAttestation<E>,
21+
pub record: AttesterRecord,
22+
}
23+
24+
impl<E: EthSpec> IndexedAttesterRecord<E> {
25+
pub fn new(indexed: IndexedAttestation<E>, record: AttesterRecord) -> Arc<Self> {
26+
Arc::new(IndexedAttesterRecord { indexed, record })
27+
}
28+
}
29+
1430
#[derive(Debug, Clone, Encode, Decode, TreeHash)]
1531
struct IndexedAttestationHeader<T: EthSpec> {
1632
pub attesting_indices: VariableList<u64, T::MaxValidatorsPerCommittee>,

0 commit comments

Comments
 (0)