From ae516019ca9533051cb8dd20aed9a0d9bb2910c1 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Sun, 26 Jan 2025 16:26:25 -0500 Subject: [PATCH] Introduce OnceVec primitive and use it for AllocId caches This significantly reduces contention when running miri under -Zthreads, allowing us to scale to 30ish cores (from ~7-8 without this patch). --- compiler/rustc_data_structures/src/sync.rs | 3 + .../src/sync/once_vec.rs | 229 ++++++++++++++++++ .../src/sync/once_vec/test.rs | 83 +++++++ .../rustc_middle/src/mir/interpret/mod.rs | 56 +++-- compiler/rustc_middle/src/ty/context.rs | 4 +- 5 files changed, 351 insertions(+), 24 deletions(-) create mode 100644 compiler/rustc_data_structures/src/sync/once_vec.rs create mode 100644 compiler/rustc_data_structures/src/sync/once_vec/test.rs diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 7a9533031f4bd..0465a801e8fc5 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -59,6 +59,9 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec}; mod vec; +mod once_vec; +pub use once_vec::OnceVec; + mod freeze; pub use freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; diff --git a/compiler/rustc_data_structures/src/sync/once_vec.rs b/compiler/rustc_data_structures/src/sync/once_vec.rs new file mode 100644 index 0000000000000..068ec23b52c9a --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/once_vec.rs @@ -0,0 +1,229 @@ +use std::alloc::Layout; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ptr::NonNull; +use std::sync::Mutex; +use std::sync::atomic::{AtomicPtr, AtomicU8, Ordering}; + +/// Provides a singly-settable Vec. +/// +/// This provides amortized, concurrent O(1) access to &T, expecting a densely numbered key space +/// (all value slots are allocated up to the highest key inserted). +pub struct OnceVec { + // Provide storage for up to 2^35 elements, which we expect to be enough in practice -- but can + // be increased if needed. We may want to make the `slabs` list dynamic itself, likely by + // indirecting through one more pointer to reduce space consumption of OnceVec if this grows + // much larger. + // + // None of the code makes assumptions based on this size so bumping it up is easy. + slabs: [Slab; 36], +} + +impl Default for OnceVec { + fn default() -> Self { + OnceVec { slabs: [const { Slab::new() }; 36] } + } +} + +unsafe impl<#[may_dangle] T> Drop for OnceVec { + fn drop(&mut self) { + for (idx, slab) in self.slabs.iter_mut().enumerate() { + unsafe { slab.deallocate(1 << idx) } + } + } +} + +impl OnceVec { + #[inline] + fn to_slab_args(idx: usize) -> (usize, usize, usize) { + let slab_idx = (idx + 1).ilog2() as usize; + let cap = 1 << slab_idx; + let idx_in_slab = idx - (cap - 1); + (slab_idx, cap, idx_in_slab) + } + + pub fn insert(&self, idx: usize, value: T) -> Result<(), T> { + let (slab_idx, cap, idx_in_slab) = Self::to_slab_args(idx); + self.slabs[slab_idx].insert(cap, idx_in_slab, value) + } + + pub fn get(&self, idx: usize) -> Option<&T> { + let (slab_idx, cap, idx_in_slab) = Self::to_slab_args(idx); + self.slabs[slab_idx].get(cap, idx_in_slab) + } +} + +struct Slab { + // If non-zero, points to a contiguously allocated block which starts with a bitset + // (two bits per value, one for whether a value is present and the other for whether a value is + // currently being written) and then `[V]` (some of which may be missing). + // + // The capacity is implicit and passed with all accessors. + v: AtomicPtr, + _phantom: PhantomData<[T; 1]>, +} + +impl Slab { + const fn new() -> Slab { + Slab { v: AtomicPtr::new(std::ptr::null_mut()), _phantom: PhantomData } + } + + fn initialize(&self, cap: usize) -> NonNull { + static LOCK: Mutex<()> = Mutex::new(()); + + if let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) { + return ptr; + } + + // If we are initializing the bucket, then acquire a global lock. + // + // This path is quite cold, so it's cheap to use a global lock. This ensures that we never + // have multiple allocations for the same bucket. + let _allocator_guard = LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + // Check the lock again, sicne we might have been initialized while waiting on the lock. + if let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) { + return ptr; + } + + let layout = Self::layout(cap).0; + assert!(layout.size() > 0); + + // SAFETY: Checked above that layout is non-zero sized. + let Some(allocation) = NonNull::new(unsafe { std::alloc::alloc_zeroed(layout) }) else { + std::alloc::handle_alloc_error(layout); + }; + + self.v.store(allocation.as_ptr(), Ordering::Release); + + allocation + } + + fn bitset(ptr: NonNull, cap: usize) -> NonNull<[AtomicU8]> { + NonNull::slice_from_raw_parts(ptr.cast(), cap.div_ceil(4)) + } + + // SAFETY: Must be called on a `initialize`d `ptr` for this capacity. + unsafe fn slice(ptr: NonNull, cap: usize) -> NonNull<[MaybeUninit]> { + let offset = Self::layout(cap).1; + // SAFETY: Passed up to caller. + NonNull::slice_from_raw_parts(unsafe { ptr.add(offset).cast() }, cap) + } + + // idx is already compacted to within this slab + fn get(&self, cap: usize, idx: usize) -> Option<&T> { + // avoid initializing for get queries + let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) else { + return None; + }; + + let bitset = unsafe { Self::bitset(ptr, cap).as_ref() }; + + // Check if the entry is initialized. + // + // Bottom 4 bits are the "is initialized" bits, top 4 bits are used for "is initializing" + // lock. + let word = bitset[idx / 4].load(Ordering::Acquire); + if word & (1 << (idx % 4)) == 0 { + return None; + } + + // Avoid as_ref() since we don't want to assert shared refs to all slots (some are being + // concurrently updated). + // + // SAFETY: `ptr` is only written by `initialize`, so this is safe. + let slice = unsafe { Self::slice(ptr, cap) }; + assert!(idx < slice.len()); + // SAFETY: assertion above checks that we're in-bounds. + let slot = unsafe { slice.cast::().add(idx) }; + + // SAFETY: We checked `bitset` and this value was initialized. Our Acquire load + // establishes the memory ordering with the release store which set the bit, so we're safe + // to read it. + Some(unsafe { slot.as_ref() }) + } + + // idx is already compacted to within this slab + fn insert(&self, cap: usize, idx: usize, value: T) -> Result<(), T> { + // avoid initializing for get queries + let ptr = self.initialize(cap); + let bitset = unsafe { Self::bitset(ptr, cap).as_ref() }; + + // Check if the entry is initialized, and lock it for writing. + let word = bitset[idx / 4].fetch_or(1 << (4 + idx % 4), Ordering::AcqRel); + if word & (1 << (idx % 4)) != 0 { + // Already fully initialized prior to us setting the "is writing" bit. + return Err(value); + } + if word & (1 << (4 + idx % 4)) != 0 { + // Someone else already acquired the lock for writing. + return Err(value); + } + + let slice = unsafe { Self::slice(ptr, cap) }; + assert!(idx < slice.len()); + // SAFETY: assertion above checks that we're in-bounds. + let slot = unsafe { slice.cast::().add(idx) }; + + // SAFETY: We locked this slot for writing with the fetch_or above, and were the first to do + // so (checked in 2nd `if` above). + unsafe { + slot.write(value); + } + + // Set the is-present bit, indicating that we have finished writing this value. + // Acquire ensures we don't break synchronizes-with relationships in other bits (unclear if + // strictly necessary but definitely doesn't hurt). + bitset[idx / 4].fetch_or(1 << (idx % 4), Ordering::AcqRel); + + Ok(()) + } + + /// Returns the layout for a Slab with capacity for `cap` elements, and the offset into the + /// allocation at which the T slice starts. + fn layout(cap: usize) -> (Layout, usize) { + Layout::array::(cap.div_ceil(4)) + .unwrap() + .extend(Layout::array::(cap).unwrap()) + .unwrap() + } + + // Drop, except passing the capacity + unsafe fn deallocate(&mut self, cap: usize) { + // avoid initializing just to Drop + let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) else { + return; + }; + + if std::mem::needs_drop::() { + // SAFETY: `ptr` is only written by `initialize`, and zero-init'd so AtomicU8 is present in + // the bitset range. + let bitset = unsafe { Self::bitset(ptr, cap).as_ref() }; + // SAFETY: `ptr` is only written by `initialize`, so satisfies slice precondition. + let slice = unsafe { Self::slice(ptr, cap).cast::() }; + + for (word_idx, word) in bitset.iter().enumerate() { + let word = word.load(Ordering::Acquire); + for word_offset in 0..4 { + if word & (1 << word_offset) != 0 { + // Was initialized, need to drop the value. + let idx = word_idx * 4 + word_offset; + unsafe { + std::ptr::drop_in_place(slice.add(idx).as_ptr()); + } + } + } + } + } + + let layout = Self::layout(cap).0; + + // SAFETY: Allocated with `alloc` and the same layout. + unsafe { + std::alloc::dealloc(ptr.as_ptr(), layout); + } + } +} + +#[cfg(test)] +mod test; diff --git a/compiler/rustc_data_structures/src/sync/once_vec/test.rs b/compiler/rustc_data_structures/src/sync/once_vec/test.rs new file mode 100644 index 0000000000000..d35a091c24315 --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/once_vec/test.rs @@ -0,0 +1,83 @@ +use super::*; + +#[test] +#[cfg(not(miri))] +fn empty() { + let cache: OnceVec = OnceVec::default(); + for key in 0..u32::MAX { + assert!(cache.get(key as usize).is_none()); + } +} + +#[test] +fn insert_and_check() { + let cache: OnceVec = OnceVec::default(); + for idx in 0..100 { + cache.insert(idx, idx).unwrap(); + } + for idx in 0..100 { + assert_eq!(cache.get(idx), Some(&idx)); + } +} + +#[test] +fn sparse_inserts() { + let cache: OnceVec = OnceVec::default(); + let end = if cfg!(target_pointer_width = "64") && cfg!(target_os = "linux") { + // For paged memory, 64-bit systems we should be able to sparsely allocate all of the pages + // needed for these inserts cheaply (without needing to actually have gigabytes of resident + // memory). + 31 + } else { + // Otherwise, still run the test but scaled back: + // + // Each slot is <5 bytes, so 2^25 entries (on non-virtual memory systems, like e.g. Windows) + // will mean 160 megabytes of allocated memory. Going beyond that is probably not reasonable + // for tests. + 25 + }; + for shift in 0..end { + let key = 1u32 << shift; + cache.insert(key as usize, shift).unwrap(); + assert_eq!(cache.get(key as usize), Some(&shift)); + } +} + +#[test] +fn concurrent_stress_check() { + let cache: OnceVec = OnceVec::default(); + std::thread::scope(|s| { + for idx in 0..100 { + let cache = &cache; + s.spawn(move || { + cache.insert(idx, idx).unwrap(); + }); + } + }); + + for idx in 0..100 { + assert_eq!(cache.get(idx), Some(&idx)); + } +} + +#[test] +#[cfg(not(miri))] +fn slot_index_exhaustive() { + let mut prev = None::<(usize, usize, usize)>; + for idx in 0..=u32::MAX as usize { + let slot_idx = OnceVec::<()>::to_slab_args(idx); + if let Some(p) = prev { + if p.0 == slot_idx.0 { + assert_eq!(p.2 + 1, slot_idx.2); + } else { + assert_eq!(slot_idx.2, 0); + } + } else { + assert_eq!(idx, 0); + assert_eq!(slot_idx.2, 0); + assert_eq!(slot_idx.0, 0); + } + + prev = Some(slot_idx); + } +} diff --git a/compiler/rustc_middle/src/mir/interpret/mod.rs b/compiler/rustc_middle/src/mir/interpret/mod.rs index b88137544bca3..280ec76b78b6f 100644 --- a/compiler/rustc_middle/src/mir/interpret/mod.rs +++ b/compiler/rustc_middle/src/mir/interpret/mod.rs @@ -10,6 +10,7 @@ mod value; use std::io::{Read, Write}; use std::num::NonZero; +use std::sync::atomic::AtomicU64; use std::{fmt, io}; use rustc_abi::{AddressSpace, Align, Endian, HasDataLayout, Size}; @@ -389,17 +390,17 @@ pub const CTFE_ALLOC_SALT: usize = 0; pub(crate) struct AllocMap<'tcx> { /// Maps `AllocId`s to their corresponding allocations. - alloc_map: FxHashMap>, + alloc_map: rustc_data_structures::sync::OnceVec>, /// Used to deduplicate global allocations: functions, vtables, string literals, ... /// /// The `usize` is a "salt" used by Miri to make deduplication imperfect, thus better emulating /// the actual guarantees. - dedup: FxHashMap<(GlobalAlloc<'tcx>, usize), AllocId>, + dedup: Lock, usize), AllocId>>, /// The `AllocId` to assign to the next requested ID. /// Always incremented; never gets smaller. - next_id: AllocId, + next_id: AtomicU64, } impl<'tcx> AllocMap<'tcx> { @@ -407,17 +408,14 @@ impl<'tcx> AllocMap<'tcx> { AllocMap { alloc_map: Default::default(), dedup: Default::default(), - next_id: AllocId(NonZero::new(1).unwrap()), + next_id: AtomicU64::new(1), } } - fn reserve(&mut self) -> AllocId { - let next = self.next_id; - self.next_id.0 = self.next_id.0.checked_add(1).expect( - "You overflowed a u64 by incrementing by 1... \ - You've just earned yourself a free drink if we ever meet. \ - Seriously, how did you do that?!", - ); - next + fn reserve(&self) -> AllocId { + let next_id = self.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + AllocId( + NonZero::new(next_id).expect("Overflow of u64 should be impossible with +1 increments"), + ) } } @@ -428,26 +426,32 @@ impl<'tcx> TyCtxt<'tcx> { /// Make sure to call `set_alloc_id_memory` or `set_alloc_id_same_memory` before returning such /// an `AllocId` from a query. pub fn reserve_alloc_id(self) -> AllocId { - self.alloc_map.lock().reserve() + self.alloc_map.reserve() } /// Reserves a new ID *if* this allocation has not been dedup-reserved before. /// Should not be used for mutable memory. fn reserve_and_set_dedup(self, alloc: GlobalAlloc<'tcx>, salt: usize) -> AllocId { - let mut alloc_map = self.alloc_map.lock(); if let GlobalAlloc::Memory(mem) = alloc { if mem.inner().mutability.is_mut() { bug!("trying to dedup-reserve mutable memory"); } } let alloc_salt = (alloc, salt); - if let Some(&alloc_id) = alloc_map.dedup.get(&alloc_salt) { + let mut dedup = self.alloc_map.dedup.lock(); + if let Some(&alloc_id) = dedup.get(&alloc_salt) { return alloc_id; } - let id = alloc_map.reserve(); + let id = self.alloc_map.reserve(); debug!("creating alloc {:?} with id {id:?}", alloc_salt.0); - alloc_map.alloc_map.insert(id, alloc_salt.0.clone()); - alloc_map.dedup.insert(alloc_salt, id); + // We just reserved, so should always be unique. + assert!( + self.alloc_map + .alloc_map + .insert(id.0.get().try_into().unwrap(), alloc_salt.0.clone()) + .is_ok() + ); + dedup.insert(alloc_salt, id); id } @@ -497,7 +501,7 @@ impl<'tcx> TyCtxt<'tcx> { /// local dangling pointers and allocations in constants/statics. #[inline] pub fn try_get_global_alloc(self, id: AllocId) -> Option> { - self.alloc_map.lock().alloc_map.get(&id).cloned() + self.alloc_map.alloc_map.get(id.0.get().try_into().unwrap()).cloned() } #[inline] @@ -516,7 +520,12 @@ impl<'tcx> TyCtxt<'tcx> { /// Freezes an `AllocId` created with `reserve` by pointing it at an `Allocation`. Trying to /// call this function twice, even with the same `Allocation` will ICE the compiler. pub fn set_alloc_id_memory(self, id: AllocId, mem: ConstAllocation<'tcx>) { - if let Some(old) = self.alloc_map.lock().alloc_map.insert(id, GlobalAlloc::Memory(mem)) { + if let Err(_) = self + .alloc_map + .alloc_map + .insert(id.0.get().try_into().unwrap(), GlobalAlloc::Memory(mem)) + { + let old = self.try_get_global_alloc(id).unwrap(); bug!("tried to set allocation ID {id:?}, but it was already existing as {old:#?}"); } } @@ -524,9 +533,12 @@ impl<'tcx> TyCtxt<'tcx> { /// Freezes an `AllocId` created with `reserve` by pointing it at a static item. Trying to /// call this function twice, even with the same `DefId` will ICE the compiler. pub fn set_nested_alloc_id_static(self, id: AllocId, def_id: LocalDefId) { - if let Some(old) = - self.alloc_map.lock().alloc_map.insert(id, GlobalAlloc::Static(def_id.to_def_id())) + if let Err(_) = self + .alloc_map + .alloc_map + .insert(id.0.get().try_into().unwrap(), GlobalAlloc::Static(def_id.to_def_id())) { + let old = self.try_get_global_alloc(id).unwrap(); bug!("tried to set allocation ID {id:?}, but it was already existing as {old:#?}"); } } diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs index aeb734ba3f650..b4b1ff8310e25 100644 --- a/compiler/rustc_middle/src/ty/context.rs +++ b/compiler/rustc_middle/src/ty/context.rs @@ -1348,7 +1348,7 @@ pub struct GlobalCtxt<'tcx> { pub data_layout: TargetDataLayout, /// Stores memory for globals (statics/consts). - pub(crate) alloc_map: Lock>, + pub(crate) alloc_map: interpret::AllocMap<'tcx>, } /// This is used to get a reference to a `GlobalCtxt` if one is available. @@ -1538,7 +1538,7 @@ impl<'tcx> TyCtxt<'tcx> { new_solver_evaluation_cache: Default::default(), canonical_param_env_cache: Default::default(), data_layout, - alloc_map: Lock::new(interpret::AllocMap::new()), + alloc_map: interpret::AllocMap::new(), }); let icx = tls::ImplicitCtxt::new(&gcx);