Skip to content

perf: Request cancellation while processing changed files #19757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 87 additions & 76 deletions crates/rust-analyzer/src/global_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//!
//! Each tick provides an immutable snapshot of the state as `WorldSnapshot`.

use std::{ops::Not as _, time::Instant};
use std::{ops::Not as _, panic::AssertUnwindSafe, time::Instant};

use crossbeam_channel::{Receiver, Sender, unbounded};
use hir::ChangeWithProcMacros;
Expand All @@ -19,6 +19,7 @@ use parking_lot::{
use proc_macro_api::ProcMacroClient;
use project_model::{ManifestPath, ProjectWorkspace, ProjectWorkspaceKind, WorkspaceBuildScripts};
use rustc_hash::{FxHashMap, FxHashSet};
use stdx::thread;
use tracing::{Level, span, trace};
use triomphe::Arc;
use vfs::{AbsPathBuf, AnchoredPathBuf, ChangeKind, Vfs, VfsPath};
Expand Down Expand Up @@ -78,6 +79,7 @@ pub(crate) struct GlobalState {

pub(crate) task_pool: Handle<TaskPool<Task>, Receiver<Task>>,
pub(crate) fmt_pool: Handle<TaskPool<Task>, Receiver<Task>>,
pub(crate) cancellation_pool: thread::Pool,

pub(crate) config: Arc<Config>,
pub(crate) config_errors: Option<ConfigErrors>,
Expand Down Expand Up @@ -210,6 +212,7 @@ impl GlobalState {
let handle = TaskPool::new_with_threads(sender, 1);
Handle { handle, receiver }
};
let cancellation_pool = thread::Pool::new(1);

let task_queue = {
let (sender, receiver) = unbounded();
Expand All @@ -230,6 +233,7 @@ impl GlobalState {
req_queue: ReqQueue::default(),
task_pool,
fmt_pool,
cancellation_pool,
loader,
config: Arc::new(config.clone()),
analysis_host,
Expand Down Expand Up @@ -290,74 +294,82 @@ impl GlobalState {

pub(crate) fn process_changes(&mut self) -> bool {
let _p = span!(Level::INFO, "GlobalState::process_changes").entered();

// We cannot directly resolve a change in a ratoml file to a format
// that can be used by the config module because config talks
// in `SourceRootId`s instead of `FileId`s and `FileId` -> `SourceRootId`
// mapping is not ready until `AnalysisHost::apply_changes` has been called.
let mut modified_ratoml_files: FxHashMap<FileId, (ChangeKind, vfs::VfsPath)> =
FxHashMap::default();

let (change, modified_rust_files, workspace_structure_change) = {
let mut change = ChangeWithProcMacros::default();
let mut guard = self.vfs.write();
let changed_files = guard.0.take_changes();
if changed_files.is_empty() {
return false;
}
let mut change = ChangeWithProcMacros::default();
let mut guard = self.vfs.write();
let changed_files = guard.0.take_changes();
if changed_files.is_empty() {
return false;
}

// downgrade to read lock to allow more readers while we are normalizing text
let guard = RwLockWriteGuard::downgrade_to_upgradable(guard);
let vfs: &Vfs = &guard.0;

let mut workspace_structure_change = None;
// A file was added or deleted
let mut has_structure_changes = false;
let mut bytes = vec![];
let mut modified_rust_files = vec![];
for file in changed_files.into_values() {
let vfs_path = vfs.file_path(file.file_id);
if let Some(("rust-analyzer", Some("toml"))) = vfs_path.name_and_extension() {
// Remember ids to use them after `apply_changes`
modified_ratoml_files.insert(file.file_id, (file.kind(), vfs_path.clone()));
}
let (change, modified_rust_files, workspace_structure_change) =
self.cancellation_pool.scoped(|s| {
// start cancellation in parallel, this will kick off lru eviction
// allowing us to do meaningful work while waiting
let analysis_host = AssertUnwindSafe(&mut self.analysis_host);
s.spawn(thread::ThreadIntent::LatencySensitive, || {
{ analysis_host }.0.request_cancellation()
});

// downgrade to read lock to allow more readers while we are normalizing text
let guard = RwLockWriteGuard::downgrade_to_upgradable(guard);
let vfs: &Vfs = &guard.0;

let mut workspace_structure_change = None;
// A file was added or deleted
let mut has_structure_changes = false;
let mut bytes = vec![];
let mut modified_rust_files = vec![];
for file in changed_files.into_values() {
let vfs_path = vfs.file_path(file.file_id);
if let Some(("rust-analyzer", Some("toml"))) = vfs_path.name_and_extension() {
// Remember ids to use them after `apply_changes`
modified_ratoml_files.insert(file.file_id, (file.kind(), vfs_path.clone()));
}

if let Some(path) = vfs_path.as_path() {
has_structure_changes |= file.is_created_or_deleted();
if let Some(path) = vfs_path.as_path() {
has_structure_changes |= file.is_created_or_deleted();

if file.is_modified() && path.extension() == Some("rs") {
modified_rust_files.push(file.file_id);
}
if file.is_modified() && path.extension() == Some("rs") {
modified_rust_files.push(file.file_id);
}

let additional_files = self
.config
.discover_workspace_config()
.map(|cfg| {
cfg.files_to_watch.iter().map(String::as_str).collect::<Vec<&str>>()
})
.unwrap_or_default();

let path = path.to_path_buf();
if file.is_created_or_deleted() {
workspace_structure_change.get_or_insert((path, false)).1 |=
self.crate_graph_file_dependencies.contains(vfs_path);
} else if reload::should_refresh_for_change(
&path,
file.kind(),
&additional_files,
) {
trace!(?path, kind = ?file.kind(), "refreshing for a change");
workspace_structure_change.get_or_insert((path.clone(), false));
let additional_files = self
.config
.discover_workspace_config()
.map(|cfg| {
cfg.files_to_watch.iter().map(String::as_str).collect::<Vec<&str>>()
})
.unwrap_or_default();

let path = path.to_path_buf();
if file.is_created_or_deleted() {
workspace_structure_change.get_or_insert((path, false)).1 |=
self.crate_graph_file_dependencies.contains(vfs_path);
} else if reload::should_refresh_for_change(
&path,
file.kind(),
&additional_files,
) {
trace!(?path, kind = ?file.kind(), "refreshing for a change");
workspace_structure_change.get_or_insert((path.clone(), false));
}
}
}

// Clear native diagnostics when their file gets deleted
if !file.exists() {
self.diagnostics.clear_native_for(file.file_id);
}
// Clear native diagnostics when their file gets deleted
if !file.exists() {
self.diagnostics.clear_native_for(file.file_id);
}

let text =
if let vfs::Change::Create(v, _) | vfs::Change::Modify(v, _) = file.change {
let text = if let vfs::Change::Create(v, _) | vfs::Change::Modify(v, _) =
file.change
{
String::from_utf8(v).ok().map(|text| {
// FIXME: Consider doing normalization in the `vfs` instead? That allows
// getting rid of some locking
Expand All @@ -367,29 +379,28 @@ impl GlobalState {
} else {
None
};
// delay `line_endings_map` changes until we are done normalizing the text
// this allows delaying the re-acquisition of the write lock
bytes.push((file.file_id, text));
}
let (vfs, line_endings_map) = &mut *RwLockUpgradableReadGuard::upgrade(guard);
bytes.into_iter().for_each(|(file_id, text)| {
let text = match text {
None => None,
Some((text, line_endings)) => {
line_endings_map.insert(file_id, line_endings);
Some(text)
}
};
change.change_file(file_id, text);
// delay `line_endings_map` changes until we are done normalizing the text
// this allows delaying the re-acquisition of the write lock
bytes.push((file.file_id, text));
}
let (vfs, line_endings_map) = &mut *RwLockUpgradableReadGuard::upgrade(guard);
bytes.into_iter().for_each(|(file_id, text)| {
let text = match text {
None => None,
Some((text, line_endings)) => {
line_endings_map.insert(file_id, line_endings);
Some(text)
}
};
change.change_file(file_id, text);
});
if has_structure_changes {
let roots = self.source_root_config.partition(vfs);
change.set_roots(roots);
}
(change, modified_rust_files, workspace_structure_change)
});
if has_structure_changes {
let roots = self.source_root_config.partition(vfs);
change.set_roots(roots);
}
(change, modified_rust_files, workspace_structure_change)
};

let _p = span!(Level::INFO, "GlobalState::process_changes/apply_change").entered();
self.analysis_host.apply_change(change);
if !modified_ratoml_files.is_empty()
|| !self.config.same_source_root_parent_map(&self.local_roots_parent_map)
Expand Down
1 change: 1 addition & 0 deletions crates/stdx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jod-thread = "1.0.0"
crossbeam-channel.workspace = true
itertools.workspace = true
tracing.workspace = true
crossbeam-utils = "0.8.21"
# Think twice before adding anything here

[target.'cfg(unix)'.dependencies]
Expand Down
51 changes: 47 additions & 4 deletions crates/stdx/src/thread/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! the threading utilities in [`crate::thread`].

use std::{
marker::PhantomData,
panic::{self, UnwindSafe},
sync::{
Arc,
Expand All @@ -16,8 +17,9 @@ use std::{
};

use crossbeam_channel::{Receiver, Sender};
use crossbeam_utils::sync::WaitGroup;

use super::{Builder, JoinHandle, ThreadIntent};
use crate::thread::{Builder, JoinHandle, ThreadIntent};

pub struct Pool {
// `_handles` is never read: the field is present
Expand Down Expand Up @@ -79,9 +81,6 @@ impl Pool {
Self { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
}

/// # Panics
///
/// Panics if job panics
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: FnOnce() + Send + UnwindSafe + 'static,
Expand All @@ -97,6 +96,17 @@ impl Pool {
self.job_sender.send(job).unwrap();
}

pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R
where
F: FnOnce(&Scope<'pool, 'scope>) -> R,
{
let wg = WaitGroup::new();
let scope = Scope { pool: self, wg, _marker: PhantomData };
let r = f(&scope);
scope.wg.wait();
r
}

#[must_use]
pub fn len(&self) -> usize {
self.extant_tasks.load(Ordering::SeqCst)
Expand All @@ -107,3 +117,36 @@ impl Pool {
self.len() == 0
}
}

pub struct Scope<'pool, 'scope> {
pool: &'pool Pool,
wg: WaitGroup,
_marker: PhantomData<fn(&'scope ()) -> &'scope ()>,
}

impl<'scope> Scope<'_, 'scope> {
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: 'scope + FnOnce() + Send + UnwindSafe,
{
let wg = self.wg.clone();
let f = Box::new(move || {
if cfg!(debug_assertions) {
intent.assert_is_used_on_current_thread();
}
f();
drop(wg);
});

let job = Job {
requested_intent: intent,
f: unsafe {
std::mem::transmute::<
Box<dyn 'scope + FnOnce() + Send + UnwindSafe>,
Box<dyn 'static + FnOnce() + Send + UnwindSafe>,
>(f)
},
};
self.pool.job_sender.send(job).unwrap();
}
}