diff --git a/Cargo.lock b/Cargo.lock index 07de5d2216855..7340dcba75df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4257,6 +4257,7 @@ dependencies = [ "bitflags", "either", "gsgdt", + "parking_lot", "polonius-engine", "rustc_abi", "rustc_apfloat", diff --git a/compiler/rustc_codegen_ssa/src/base.rs b/compiler/rustc_codegen_ssa/src/base.rs index 8ab0b367f08a6..c1baa326ab9d4 100644 --- a/compiler/rustc_codegen_ssa/src/base.rs +++ b/compiler/rustc_codegen_ssa/src/base.rs @@ -10,7 +10,6 @@ use rustc_ast::expand::allocator::{ }; use rustc_data_structures::fx::{FxHashMap, FxIndexSet}; use rustc_data_structures::profiling::{get_resident_set_size, print_time_passes_entry}; -use rustc_data_structures::sync::{IntoDynSyncSend, par_map}; use rustc_data_structures::unord::UnordMap; use rustc_hir::attrs::{AttributeKind, DebuggerVisualizerType, OptimizeAttr}; use rustc_hir::def_id::{CRATE_DEF_ID, DefId, LOCAL_CRATE}; @@ -25,6 +24,7 @@ use rustc_middle::mir::BinOp; use rustc_middle::mir::interpret::ErrorHandled; use rustc_middle::mir::mono::{CodegenUnit, CodegenUnitNameBuilder, MonoItem, MonoItemPartitions}; use rustc_middle::query::Providers; +use rustc_middle::sync::{IntoDynSyncSend, par_map}; use rustc_middle::ty::layout::{HasTyCtxt, HasTypingEnv, LayoutOf, TyAndLayout}; use rustc_middle::ty::{self, Instance, Ty, TyCtxt}; use rustc_middle::{bug, span_bug}; diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 3881f3c2aa841..732bb1879b494 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -36,17 +36,17 @@ pub use parking_lot::{ }; pub use self::atomic::AtomicU64; +pub use self::branch_key::BranchKey; pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; #[doc(no_inline)] pub use self::lock::{Lock, LockGuard, Mode}; pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; -pub use self::parallel::{ - broadcast, join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in, -}; +pub use self::parallel::{ParallelGuard, broadcast, parallel_guard, spawn}; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; +mod branch_key; mod freeze; mod lock; mod parallel; diff --git a/compiler/rustc_data_structures/src/sync/branch_key.rs b/compiler/rustc_data_structures/src/sync/branch_key.rs new file mode 100644 index 0000000000000..cde71f6c4b63b --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/branch_key.rs @@ -0,0 +1,50 @@ +use std::cmp; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct BranchKey(u128); + +impl BranchKey { + pub const fn root() -> Self { + Self(0x80000000_00000000_00000000_00000000) + } + + fn bits_branch(self, branch_num: u128, bits: u32) -> Result { + let trailing_zeros = self.0.trailing_zeros(); + let allocated_shift = trailing_zeros.checked_sub(bits).ok_or(BranchNestingError(()))?; + Ok(BranchKey( + self.0 & !(1 << trailing_zeros) + | (1 << allocated_shift) + | (branch_num << (allocated_shift + 1)), + )) + } + + pub fn branch(self, branch_num: u128, branch_space: u128) -> BranchKey { + debug_assert!( + branch_num < branch_space, + "branch_num = {branch_num} should be less than branch_space = {branch_space}" + ); + // floor(log2(n - 1)) + 1 == ceil(log2(n)) + self.bits_branch(branch_num, (branch_space - 1).checked_ilog2().map_or(0, |b| b + 1)) + .expect("query branch space is exhausted") + } + + pub fn disjoint_cmp(self, other: Self) -> cmp::Ordering { + self.0.cmp(&other.0) + } + + pub fn nest(self, then: Self) -> Result { + let trailing_zeros = then.0.trailing_zeros(); + let branch_num = then.0.wrapping_shr(trailing_zeros + 1); + let bits = u128::BITS - trailing_zeros; + self.bits_branch(branch_num, bits) + } +} + +#[derive(Debug)] +pub struct BranchNestingError(()); + +impl Default for BranchKey { + fn default() -> Self { + BranchKey::root() + } +} diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs index b515c0bee8a6e..15039567f6859 100644 --- a/compiler/rustc_data_structures/src/sync/parallel.rs +++ b/compiler/rustc_data_structures/src/sync/parallel.rs @@ -43,54 +43,6 @@ pub fn parallel_guard(f: impl FnOnce(&ParallelGuard) -> R) -> R { ret } -fn serial_join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA, - B: FnOnce() -> RB, -{ - let (a, b) = parallel_guard(|guard| { - let a = guard.run(oper_a); - let b = guard.run(oper_b); - (a, b) - }); - (a.unwrap(), b.unwrap()) -} - -/// Runs a list of blocks in parallel. The first block is executed immediately on -/// the current thread. Use that for the longest running block. -#[macro_export] -macro_rules! parallel { - (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { - parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) - }; - (impl $fblock:block [$($blocks:expr,)*] []) => { - $crate::sync::parallel_guard(|guard| { - $crate::sync::scope(|s| { - $( - let block = $crate::sync::FromDyn::from(|| $blocks); - s.spawn(move |_| { - guard.run(move || block.into_inner()()); - }); - )* - guard.run(|| $fblock); - }); - }); - }; - ($fblock:block, $($blocks:block),*) => { - if $crate::sync::is_dyn_thread_safe() { - // Reverse the order of the later blocks since Rayon executes them in reverse order - // when using a single thread. This ensures the execution order matches that - // of a single threaded rustc. - parallel!(impl $fblock [] [$($blocks),*]); - } else { - $crate::sync::parallel_guard(|guard| { - guard.run(|| $fblock); - $(guard.run(|| $blocks);)* - }); - } - }; - } - pub fn spawn(func: impl FnOnce() + DynSend + 'static) { if mode::is_dyn_thread_safe() { let func = FromDyn::from(func); @@ -102,140 +54,6 @@ pub fn spawn(func: impl FnOnce() + DynSend + 'static) { } } -// This function only works when `mode::is_dyn_thread_safe()`. -pub fn scope<'scope, OP, R>(op: OP) -> R -where - OP: FnOnce(&rustc_thread_pool::Scope<'scope>) -> R + DynSend, - R: DynSend, -{ - let op = FromDyn::from(op); - rustc_thread_pool::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() -} - -#[inline] -pub fn join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + DynSend, - B: FnOnce() -> RB + DynSend, -{ - if mode::is_dyn_thread_safe() { - let oper_a = FromDyn::from(oper_a); - let oper_b = FromDyn::from(oper_b); - let (a, b) = parallel_guard(|guard| { - rustc_thread_pool::join( - move || guard.run(move || FromDyn::from(oper_a.into_inner()())), - move || guard.run(move || FromDyn::from(oper_b.into_inner()())), - ) - }); - (a.unwrap().into_inner(), b.unwrap().into_inner()) - } else { - serial_join(oper_a, oper_b) - } -} - -fn par_slice( - items: &mut [I], - guard: &ParallelGuard, - for_each: impl Fn(&mut I) + DynSync + DynSend, -) { - struct State<'a, F> { - for_each: FromDyn, - guard: &'a ParallelGuard, - group: usize, - } - - fn par_rec( - items: &mut [I], - state: &State<'_, F>, - ) { - if items.len() <= state.group { - for item in items { - state.guard.run(|| (state.for_each)(item)); - } - } else { - let (left, right) = items.split_at_mut(items.len() / 2); - let mut left = state.for_each.derive(left); - let mut right = state.for_each.derive(right); - rustc_thread_pool::join(move || par_rec(*left, state), move || par_rec(*right, state)); - } - } - - let state = State { - for_each: FromDyn::from(for_each), - guard, - group: std::cmp::max(items.len() / 128, 1), - }; - par_rec(items, &state) -} - -pub fn par_for_each_in>( - t: T, - for_each: impl Fn(&I) + DynSync + DynSend, -) { - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let mut items: Vec<_> = t.into_iter().collect(); - par_slice(&mut items, guard, |i| for_each(&*i)) - } else { - t.into_iter().for_each(|i| { - guard.run(|| for_each(&i)); - }); - } - }); -} - -/// This runs `for_each` in parallel for each iterator item. If one or more of the -/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned -/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which -/// are all equivalent. -pub fn try_par_for_each_in( - t: T, - for_each: impl Fn(&::Item) -> Result<(), E> + DynSync + DynSend, -) -> Result<(), E> -where - ::Item: DynSend, -{ - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let mut items: Vec<_> = t.into_iter().collect(); - - let error = Mutex::new(None); - - par_slice(&mut items, guard, |i| { - if let Err(err) = for_each(&*i) { - *error.lock() = Some(err); - } - }); - - if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) } - } else { - t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and) - } - }) -} - -pub fn par_map, R: DynSend, C: FromIterator>( - t: T, - map: impl Fn(I) -> R + DynSync + DynSend, -) -> C { - parallel_guard(|guard| { - if mode::is_dyn_thread_safe() { - let map = FromDyn::from(map); - - let mut items: Vec<(Option, Option)> = - t.into_iter().map(|i| (Some(i), None)).collect(); - - par_slice(&mut items, guard, |i| { - i.1 = Some(map(i.0.take().unwrap())); - }); - - items.into_iter().filter_map(|i| i.1).collect() - } else { - t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() - } - }) -} - pub fn broadcast(op: impl Fn(usize) -> R + DynSync) -> Vec { if mode::is_dyn_thread_safe() { let op = FromDyn::from(op); diff --git a/compiler/rustc_incremental/src/persist/save.rs b/compiler/rustc_incremental/src/persist/save.rs index 58fea3278a839..445001bc3fbda 100644 --- a/compiler/rustc_incremental/src/persist/save.rs +++ b/compiler/rustc_incremental/src/persist/save.rs @@ -2,10 +2,10 @@ use std::fs; use std::sync::Arc; use rustc_data_structures::fx::FxIndexMap; -use rustc_data_structures::sync::join; use rustc_middle::dep_graph::{ DepGraph, SerializedDepGraph, WorkProduct, WorkProductId, WorkProductMap, }; +use rustc_middle::sync::join; use rustc_middle::ty::TyCtxt; use rustc_serialize::Encodable as RustcEncodable; use rustc_serialize::opaque::{FileEncodeResult, FileEncoder}; diff --git a/compiler/rustc_interface/src/interface.rs b/compiler/rustc_interface/src/interface.rs index c0f8f33692e8c..49feac95b8e68 100644 --- a/compiler/rustc_interface/src/interface.rs +++ b/compiler/rustc_interface/src/interface.rs @@ -589,7 +589,7 @@ pub fn try_print_query_stack( if let Some(icx) = icx { ty::print::with_no_queries!(print_query_stack( QueryCtxt::new(icx.tcx), - icx.query, + icx.query.map(|i| i.id), dcx, limit_frames, file, diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index a0383b187de51..5e024d319d0e1 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -12,7 +12,7 @@ use rustc_codegen_ssa::{CodegenResults, CrateInfo}; use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::steal::Steal; use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal}; -use rustc_data_structures::{parallel, thousands}; +use rustc_data_structures::thousands; use rustc_errors::timings::TimingSection; use rustc_expand::base::{ExtCtxt, LintStoreExpand}; use rustc_feature::Features; @@ -27,6 +27,7 @@ use rustc_metadata::EncodedMetadata; use rustc_metadata::creader::CStore; use rustc_middle::arena::Arena; use rustc_middle::dep_graph::DepsType; +use rustc_middle::parallel; use rustc_middle::ty::{self, CurrentGcx, GlobalCtxt, RegisteredTools, TyCtxt}; use rustc_middle::util::Providers; use rustc_parse::lexer::StripTokens; diff --git a/compiler/rustc_lint/src/late.rs b/compiler/rustc_lint/src/late.rs index ccfba715a1be3..9f0366781cd13 100644 --- a/compiler/rustc_lint/src/late.rs +++ b/compiler/rustc_lint/src/late.rs @@ -7,10 +7,10 @@ use std::any::Any; use std::cell::Cell; use rustc_data_structures::stack::ensure_sufficient_stack; -use rustc_data_structures::sync::join; use rustc_hir::def_id::{LocalDefId, LocalModDefId}; use rustc_hir::{self as hir, AmbigArg, HirId, intravisit as hir_visit}; use rustc_middle::hir::nested_filter; +use rustc_middle::sync::join; use rustc_middle::ty::{self, TyCtxt}; use rustc_session::Session; use rustc_session::lint::LintPass; diff --git a/compiler/rustc_metadata/src/rmeta/encoder.rs b/compiler/rustc_metadata/src/rmeta/encoder.rs index b15ed34fc3569..0863ce3f2b1bd 100644 --- a/compiler/rustc_metadata/src/rmeta/encoder.rs +++ b/compiler/rustc_metadata/src/rmeta/encoder.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use rustc_data_structures::fx::{FxIndexMap, FxIndexSet}; use rustc_data_structures::memmap::{Mmap, MmapMut}; -use rustc_data_structures::sync::{join, par_for_each_in}; use rustc_data_structures::temp_dir::MaybeTempDir; use rustc_data_structures::thousands::usize_with_underscores; use rustc_feature::Features; @@ -21,6 +20,7 @@ use rustc_middle::dep_graph::WorkProductId; use rustc_middle::middle::dependency_format::Linkage; use rustc_middle::mir::interpret; use rustc_middle::query::Providers; +use rustc_middle::sync::{join, par_for_each_in}; use rustc_middle::traits::specialization_graph; use rustc_middle::ty::AssocContainer; use rustc_middle::ty::codec::TyEncoder; diff --git a/compiler/rustc_middle/Cargo.toml b/compiler/rustc_middle/Cargo.toml index fbcce16cedca8..669d87182ae18 100644 --- a/compiler/rustc_middle/Cargo.toml +++ b/compiler/rustc_middle/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" bitflags = "2.4.1" either = "1.5.0" gsgdt = "0.1.2" +parking_lot = "0.12" polonius-engine = "0.13.0" rustc_abi = { path = "../rustc_abi" } rustc_apfloat = "0.2.0" diff --git a/compiler/rustc_middle/src/hir/map.rs b/compiler/rustc_middle/src/hir/map.rs index 5da762ef85650..e2fe506aede3d 100644 --- a/compiler/rustc_middle/src/hir/map.rs +++ b/compiler/rustc_middle/src/hir/map.rs @@ -7,7 +7,7 @@ use rustc_ast::visit::{VisitorResult, walk_list}; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; use rustc_data_structures::svh::Svh; -use rustc_data_structures::sync::{DynSend, DynSync, par_for_each_in, try_par_for_each_in}; +use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_hir::attrs::AttributeKind; use rustc_hir::def::{DefKind, Res}; use rustc_hir::def_id::{DefId, LOCAL_CRATE, LocalDefId, LocalModDefId}; @@ -15,6 +15,7 @@ use rustc_hir::definitions::{DefKey, DefPath, DefPathHash}; use rustc_hir::intravisit::Visitor; use rustc_hir::*; use rustc_hir_pretty as pprust_hir; +use rustc_middle::sync::{par_for_each_in, try_par_for_each_in}; use rustc_span::def_id::StableCrateId; use rustc_span::{ErrorGuaranteed, Ident, Span, Symbol, kw, with_metavar_spans}; diff --git a/compiler/rustc_middle/src/hir/mod.rs b/compiler/rustc_middle/src/hir/mod.rs index 217ecbab059ec..28864007edf2c 100644 --- a/compiler/rustc_middle/src/hir/mod.rs +++ b/compiler/rustc_middle/src/hir/mod.rs @@ -9,7 +9,6 @@ pub mod place; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::sorted_map::SortedMap; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; -use rustc_data_structures::sync::{DynSend, DynSync, try_par_for_each_in}; use rustc_hir::def::{DefKind, Res}; use rustc_hir::def_id::{DefId, LocalDefId, LocalModDefId}; use rustc_hir::lints::DelayedLint; @@ -18,6 +17,7 @@ use rustc_macros::{Decodable, Encodable, HashStable}; use rustc_span::{ErrorGuaranteed, ExpnId, Span}; use crate::query::Providers; +use crate::sync::{DynSend, DynSync, try_par_for_each_in}; use crate::ty::TyCtxt; /// Gather the LocalDefId for each item-like within a module, including items contained within diff --git a/compiler/rustc_middle/src/lib.rs b/compiler/rustc_middle/src/lib.rs index ee3e89e57bd42..b19e0b04c1483 100644 --- a/compiler/rustc_middle/src/lib.rs +++ b/compiler/rustc_middle/src/lib.rs @@ -81,6 +81,7 @@ pub mod lint; pub mod metadata; pub mod middle; pub mod mir; +pub mod sync; pub mod thir; pub mod traits; pub mod ty; diff --git a/compiler/rustc_middle/src/query/plumbing.rs b/compiler/rustc_middle/src/query/plumbing.rs index 8d01d9482ed4b..e9775fd2e0faa 100644 --- a/compiler/rustc_middle/src/query/plumbing.rs +++ b/compiler/rustc_middle/src/query/plumbing.rs @@ -6,7 +6,6 @@ use rustc_hir::hir_id::OwnerId; use rustc_macros::HashStable; use rustc_query_system::HandleCycleError; use rustc_query_system::dep_graph::{DepNodeIndex, SerializedDepNodeIndex}; -pub(crate) use rustc_query_system::query::QueryJobId; use rustc_query_system::query::*; use rustc_span::{ErrorGuaranteed, Span}; pub use sealed::IntoQueryParam; diff --git a/compiler/rustc_middle/src/sync.rs b/compiler/rustc_middle/src/sync.rs new file mode 100644 index 0000000000000..55c4aef18fb9f --- /dev/null +++ b/compiler/rustc_middle/src/sync.rs @@ -0,0 +1,244 @@ +use parking_lot::Mutex; +pub use rustc_data_structures::marker::{DynSend, DynSync}; +pub use rustc_data_structures::sync::*; +use rustc_query_system::query::QueryInclusion; + +pub use crate::ty::tls; + +fn serial_join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA, + B: FnOnce() -> RB, +{ + let (a, b) = parallel_guard(|guard| { + let a = guard.run(oper_a); + let b = guard.run(oper_b); + (a, b) + }); + (a.unwrap(), b.unwrap()) +} + +/// Runs a list of blocks in parallel. The first block is executed immediately on +/// the current thread. Use that for the longest running block. +#[macro_export] +macro_rules! parallel { + (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { + parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) + }; + (impl $fblock:block [$($blocks:expr,)*] []) => { + #[allow(unreachable_code)] + let n = 1 $(+ 'a: { break 'a 1; let _ = || $blocks; })*; + $crate::sync::parallel_guard(|guard| { + $crate::sync::scope(n, |mut s| { + $( + let block = $crate::sync::FromDyn::from(|| $blocks); + s.spawn(move || { + guard.run(move || block.into_inner()()); + }); + )* + guard.run(|| $fblock); + }); + }); + }; + ($fblock:block, $($blocks:block),*) => { + if $crate::sync::is_dyn_thread_safe() { + // Reverse the order of the later blocks since Rayon executes them in reverse order + // when using a single thread. This ensures the execution order matches that + // of a single threaded rustc. + parallel!(impl $fblock [] [$($blocks),*]); + } else { + $crate::sync::parallel_guard(|guard| { + guard.run(|| $fblock); + $(guard.run(|| $blocks);)* + }); + } + }; + } + +// This function only works when `is_dyn_thread_safe()`. +pub fn scope<'scope, OP, R>(spawn_limit: u128, op: OP) -> R +where + OP: for<'a, 'tcx> FnOnce(Scope<'a, 'scope>) -> R + DynSend, + R: DynSend, +{ + let op = FromDyn::from(op); + rustc_thread_pool::scope(|scope| { + FromDyn::from(op.into_inner()(Scope { scope, next_branch: 0, branch_limit: spawn_limit })) + }) + .into_inner() +} + +pub struct Scope<'a, 'scope> { + scope: &'a rustc_thread_pool::Scope<'scope>, + branch_limit: u128, + next_branch: u128, +} + +impl<'a, 'scope> Scope<'a, 'scope> { + pub fn spawn(&mut self, f: F) + where + F: FnOnce() + Send + 'scope, + { + if self.next_branch >= self.branch_limit { + panic!("number of spawns exceeded the spawn_limit = {}", self.branch_limit); + } + let query_branch = self.next_branch; + self.next_branch += 1; + branch_context(query_branch, self.branch_limit, || self.scope.spawn(|_| f())); + } +} + +#[inline] +pub fn join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + DynSend, + B: FnOnce() -> RB + DynSend, +{ + if is_dyn_thread_safe() { + let oper_a = FromDyn::from(oper_a); + let oper_b = FromDyn::from(oper_b); + let (a, b) = parallel_guard(|guard| { + raw_branched_join( + move || guard.run(move || FromDyn::from(oper_a.into_inner()())), + move || guard.run(move || FromDyn::from(oper_b.into_inner()())), + ) + }); + (a.unwrap().into_inner(), b.unwrap().into_inner()) + } else { + serial_join(oper_a, oper_b) + } +} + +fn par_slice( + items: &mut [I], + guard: &ParallelGuard, + for_each: impl Fn(&mut I) + DynSync + DynSend, +) { + struct State<'a, F> { + for_each: FromDyn, + guard: &'a ParallelGuard, + group: usize, + } + + fn par_rec( + items: &mut [I], + state: &State<'_, F>, + ) { + if items.len() <= state.group { + for item in items { + state.guard.run(|| (state.for_each)(item)); + } + } else { + let (left, right) = items.split_at_mut(items.len() / 2); + let mut left = state.for_each.derive(left); + let mut right = state.for_each.derive(right); + raw_branched_join(move || par_rec(*left, state), move || par_rec(*right, state)); + } + } + + let state = State { + for_each: FromDyn::from(for_each), + guard, + group: std::cmp::max(items.len() / 128, 1), + }; + par_rec(items, &state) +} + +pub fn par_for_each_in>( + t: T, + for_each: impl Fn(&I) + DynSync + DynSend, +) { + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let mut items: Vec<_> = t.into_iter().collect(); + par_slice(&mut items, guard, |i| for_each(&*i)) + } else { + t.into_iter().for_each(|i| { + guard.run(|| for_each(&i)); + }); + } + }); +} + +/// This runs `for_each` in parallel for each iterator item. If one or more of the +/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned +/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which +/// are all equivalent. +pub fn try_par_for_each_in( + t: T, + for_each: impl Fn(&::Item) -> Result<(), E> + DynSync + DynSend, +) -> Result<(), E> +where + ::Item: DynSend, +{ + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let mut items: Vec<_> = t.into_iter().collect(); + + let error = Mutex::new(None); + + par_slice(&mut items, guard, |i| { + if let Err(err) = for_each(&*i) { + *error.lock() = Some(err); + } + }); + + if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) } + } else { + t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and) + } + }) +} + +pub fn par_map, R: DynSend, C: FromIterator>( + t: T, + map: impl Fn(I) -> R + DynSync + DynSend, +) -> C { + parallel_guard(|guard| { + if is_dyn_thread_safe() { + let map = FromDyn::from(map); + + let mut items: Vec<(Option, Option)> = + t.into_iter().map(|i| (Some(i), None)).collect(); + + par_slice(&mut items, guard, |i| { + i.1 = Some(map(i.0.take().unwrap())); + }); + + items.into_iter().filter_map(|i| i.1).collect() + } else { + t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() + } + }) +} + +fn raw_branched_join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, +{ + rustc_thread_pool::join(|| branch_context(0, 2, oper_a), || branch_context(1, 2, oper_b)) +} + +fn branch_context(branch_num: u128, branch_space: u128, f: F) -> R +where + F: FnOnce() -> R, +{ + tls::with_context_opt(|icx| { + if let Some(icx) = icx + && let Some(QueryInclusion { id, branch, real_depth }) = icx.query + { + let icx = tls::ImplicitCtxt { + query: Some(QueryInclusion { + id, + branch: branch.branch(branch_num, branch_space), + real_depth, + }), + ..*icx + }; + tls::enter_context(&icx, f) + } else { + f() + } + }) +} diff --git a/compiler/rustc_middle/src/ty/context/tls.rs b/compiler/rustc_middle/src/ty/context/tls.rs index fa9995898ac20..58c284f5e45e8 100644 --- a/compiler/rustc_middle/src/ty/context/tls.rs +++ b/compiler/rustc_middle/src/ty/context/tls.rs @@ -1,10 +1,10 @@ use std::{mem, ptr}; use rustc_data_structures::sync; +use rustc_query_system::query::QueryInclusion; use super::{GlobalCtxt, TyCtxt}; use crate::dep_graph::TaskDepsRef; -use crate::query::plumbing::QueryJobId; /// This is the implicit state of rustc. It contains the current /// `TyCtxt` and query. It is updated when creating a local interner or @@ -18,7 +18,7 @@ pub struct ImplicitCtxt<'a, 'tcx> { /// The current query job, if any. This is updated by `JobOwner::start` in /// `ty::query::plumbing` when executing a query. - pub query: Option, + pub query: Option, /// Used to prevent queries from calling too deeply. pub query_depth: usize, diff --git a/compiler/rustc_monomorphize/src/collector.rs b/compiler/rustc_monomorphize/src/collector.rs index cd699436e0120..1d71cedd2c856 100644 --- a/compiler/rustc_monomorphize/src/collector.rs +++ b/compiler/rustc_monomorphize/src/collector.rs @@ -211,7 +211,6 @@ use std::cell::OnceCell; use std::ops::ControlFlow; use rustc_data_structures::fx::FxIndexMap; -use rustc_data_structures::sync::{MTLock, par_for_each_in}; use rustc_data_structures::unord::{UnordMap, UnordSet}; use rustc_hir as hir; use rustc_hir::attrs::InlineAttr; @@ -227,6 +226,7 @@ use rustc_middle::mir::mono::{ use rustc_middle::mir::visit::Visitor as MirVisitor; use rustc_middle::mir::{self, Body, Location, MentionedItem, traversal}; use rustc_middle::query::TyCtxtAt; +use rustc_middle::sync::{MTLock, par_for_each_in}; use rustc_middle::ty::adjustment::{CustomCoerceUnsized, PointerCoercion}; use rustc_middle::ty::layout::ValidityRequirement; use rustc_middle::ty::{ diff --git a/compiler/rustc_monomorphize/src/partitioning.rs b/compiler/rustc_monomorphize/src/partitioning.rs index 1c8d6db08c316..3ecddbbc731c1 100644 --- a/compiler/rustc_monomorphize/src/partitioning.rs +++ b/compiler/rustc_monomorphize/src/partitioning.rs @@ -99,14 +99,12 @@ use std::io::Write; use std::path::{Path, PathBuf}; use rustc_data_structures::fx::{FxIndexMap, FxIndexSet}; -use rustc_data_structures::sync; use rustc_data_structures::unord::{UnordMap, UnordSet}; use rustc_hir::LangItem; use rustc_hir::attrs::{InlineAttr, Linkage}; use rustc_hir::def::DefKind; use rustc_hir::def_id::{DefId, DefIdSet, LOCAL_CRATE}; use rustc_hir::definitions::DefPathDataName; -use rustc_middle::bug; use rustc_middle::middle::codegen_fn_attrs::CodegenFnAttrFlags; use rustc_middle::middle::exported_symbols::{SymbolExportInfo, SymbolExportLevel}; use rustc_middle::mir::mono::{ @@ -116,6 +114,7 @@ use rustc_middle::mir::mono::{ use rustc_middle::ty::print::{characteristic_def_id_of_type, with_no_trimmed_paths}; use rustc_middle::ty::{self, InstanceKind, TyCtxt}; use rustc_middle::util::Providers; +use rustc_middle::{bug, sync}; use rustc_session::CodegenUnits; use rustc_session::config::{DumpMonoStatsFormat, SwitchWithOptPath}; use rustc_span::Symbol; diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs index 39b6fac4ebc0b..abc2ce71a2c79 100644 --- a/compiler/rustc_query_impl/src/plumbing.rs +++ b/compiler/rustc_query_impl/src/plumbing.rs @@ -20,6 +20,7 @@ use rustc_middle::query::Key; use rustc_middle::query::on_disk_cache::{ AbsoluteBytePos, CacheDecoder, CacheEncoder, EncodedDepNodeIndex, }; +use rustc_middle::sync::BranchKey; use rustc_middle::ty::codec::TyEncoder; use rustc_middle::ty::print::with_reduced_queries; use rustc_middle::ty::tls::{self, ImplicitCtxt}; @@ -27,7 +28,7 @@ use rustc_middle::ty::{self, TyCtxt}; use rustc_query_system::dep_graph::{DepNodeParams, HasDepContext}; use rustc_query_system::ich::StableHashingContext; use rustc_query_system::query::{ - QueryCache, QueryConfig, QueryContext, QueryJobId, QueryMap, QuerySideEffect, + QueryCache, QueryConfig, QueryContext, QueryInclusion, QueryJobId, QueryMap, QuerySideEffect, QueryStackDeferred, QueryStackFrame, QueryStackFrameExtra, force_query, }; use rustc_query_system::{QueryOverflow, QueryOverflowNote}; @@ -84,7 +85,7 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> { } #[inline] - fn current_query_job(self) -> Option { + fn current_query_inclusion(self) -> Option { tls::with_related_context(self.tcx, |icx| icx.query) } @@ -160,7 +161,14 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> { // Update the `ImplicitCtxt` to point to our new query job. let new_icx = ImplicitCtxt { tcx: self.tcx, - query: Some(token), + query: Some(QueryInclusion { + id: token, + branch: BranchKey::root(), + real_depth: NonZero::new( + current_icx.query.map_or(0, |q| q.real_depth.get()).wrapping_add(1), + ) + .expect("real query depth exceeded type bounds"), + }), query_depth: current_icx.query_depth + depth_limit as usize, task_deps: current_icx.task_deps, }; diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index 7d9b594d501ff..5241734113f64 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -1,16 +1,20 @@ +use std::collections::{BTreeMap, hash_map}; use std::fmt::Debug; use std::hash::Hash; use std::io::Write; -use std::iter; use std::num::NonZero; -use std::sync::Arc; +use std::ops; +use std::sync::{Arc, Weak}; +use std::thread::ThreadId; use parking_lot::{Condvar, Mutex}; -use rustc_data_structures::fx::{FxHashMap, FxHashSet}; +use rustc_data_structures::fx::FxHashMap; +use rustc_data_structures::indexmap::{self, IndexMap}; +use rustc_data_structures::sync::BranchKey; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_session::Session; -use rustc_span::{DUMMY_SP, Span}; +use rustc_span::Span; use super::QueryStackFrameExtra; use crate::dep_graph::DepContext; @@ -45,18 +49,6 @@ impl QueryJobId { fn query(self, map: &QueryMap) -> QueryStackFrame { map.get(&self).unwrap().query.clone() } - - fn span(self, map: &QueryMap) -> Span { - map.get(&self).unwrap().job.span - } - - fn parent(self, map: &QueryMap) -> Option { - map.get(&self).unwrap().job.parent - } - - fn latch(self, map: &QueryMap) -> Option<&QueryLatch> { - map.get(&self).unwrap().job.latch.as_ref() - } } #[derive(Clone, Debug)] @@ -74,30 +66,51 @@ pub struct QueryJob { pub span: Span, /// The parent query job which created this job and is implicitly waiting on it. - pub parent: Option, + pub parent: Option, + + /// Id of the query's execution thread. + pub thread_id: ThreadId, /// The latch that is used to wait on this job. - latch: Option>, + latch: Weak>, } impl Clone for QueryJob { fn clone(&self) -> Self { - Self { id: self.id, span: self.span, parent: self.parent, latch: self.latch.clone() } + Self { + id: self.id, + span: self.span, + parent: self.parent, + thread_id: self.thread_id, + latch: self.latch.clone(), + } } } impl QueryJob { /// Creates a new query job. #[inline] - pub fn new(id: QueryJobId, span: Span, parent: Option) -> Self { - QueryJob { id, span, parent, latch: None } + pub fn new( + id: QueryJobId, + span: Span, + parent: Option, + thread_id: ThreadId, + ) -> Self { + QueryJob { id, span, parent, thread_id, latch: Weak::new() } + } + + pub fn real_depth(&self) -> usize { + self.parent.as_ref().map_or(0, |i| i.real_depth.get()) } - pub(super) fn latch(&mut self) -> QueryLatch { - if self.latch.is_none() { - self.latch = Some(QueryLatch::new()); + pub(super) fn latch(&mut self) -> Arc> { + if let Some(latch) = self.latch.upgrade() { + latch + } else { + let latch = Arc::new(QueryLatch::new()); + self.latch = Arc::downgrade(&latch); + latch } - self.latch.as_ref().unwrap().clone() } /// Signals to waiters that the query is complete. @@ -106,7 +119,7 @@ impl QueryJob { /// as there are no concurrent jobs which could be waiting on us #[inline] pub fn signal_complete(self) { - if let Some(latch) = self.latch { + if let Some(latch) = self.latch.upgrade() { latch.set(); } } @@ -116,12 +129,11 @@ impl QueryJobId { pub(super) fn find_cycle_in_stack( &self, query_map: QueryMap, - current_job: &Option, + mut current_job: Option, span: Span, ) -> CycleError { // Find the waitee amongst `current_job` parents let mut cycle = Vec::new(); - let mut current_job = Option::clone(current_job); while let Some(job) = current_job { let info = query_map.get(&job).unwrap(); @@ -140,11 +152,11 @@ impl QueryJobId { .job .parent .as_ref() - .map(|parent| (info.job.span, parent.query(&query_map))); + .map(|parent| (info.job.span, parent.id.query(&query_map))); return CycleError { usage, cycle }; } - current_job = info.job.parent; + current_job = info.job.parent.map(|i| i.id); } panic!("did not find a cycle") @@ -156,29 +168,45 @@ impl QueryJobId { let mut depth = 1; let info = query_map.get(&self).unwrap(); let dep_kind = info.query.dep_kind; - let mut current_id = info.job.parent; + let mut current = info.job.parent; let mut last_layout = (info.clone(), depth); - while let Some(id) = current_id { - let info = query_map.get(&id).unwrap(); + while let Some(inclusion) = current { + let info = query_map.get(&inclusion.id).unwrap(); if info.query.dep_kind == dep_kind { depth += 1; last_layout = (info.clone(), depth); } - current_id = info.job.parent; + current = info.job.parent; } last_layout } } +#[derive(Clone, Copy, Debug)] +pub struct QueryInclusion { + pub id: QueryJobId, + pub branch: BranchKey, + pub real_depth: NonZero, +} + #[derive(Debug)] struct QueryWaiter { - query: Option, + query: Option, + thread_id: ThreadId, condvar: Condvar, + // remove this after making sure PR it's ok to do + #[allow(dead_code)] span: Span, cycle: Mutex>>, } +impl QueryWaiter { + fn real_depth(&self) -> usize { + self.query.as_ref().map_or(0, |i| i.real_depth.get()) + } +} + #[derive(Debug)] struct QueryLatchInfo { complete: bool, @@ -187,31 +215,28 @@ struct QueryLatchInfo { #[derive(Debug)] pub(super) struct QueryLatch { - info: Arc>>, -} - -impl Clone for QueryLatch { - fn clone(&self) -> Self { - Self { info: Arc::clone(&self.info) } - } + info: Mutex>, } impl QueryLatch { fn new() -> Self { - QueryLatch { - info: Arc::new(Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() })), - } + QueryLatch { info: Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() }) } } /// Awaits for the query job to complete. pub(super) fn wait_on( &self, qcx: impl QueryContext, - query: Option, + query: Option, span: Span, ) -> Result<(), CycleError> { - let waiter = - Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() }); + let waiter = Arc::new(QueryWaiter { + query, + span, + thread_id: std::thread::current().id(), + cycle: Mutex::new(None), + condvar: Condvar::new(), + }); self.wait_on_inner(qcx, &waiter); // FIXME: Get rid of this lock. We have ownership of the QueryWaiter // although another thread may still have a Arc reference so we cannot @@ -257,298 +282,261 @@ impl QueryLatch { waiter.condvar.notify_one(); } } - - /// Removes a single waiter from the list of waiters. - /// This is used to break query cycles. - fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut info = self.info.lock(); - debug_assert!(!info.complete); - // Remove the waiter from the list of waiters - info.waiters.remove(waiter) - } } -/// A resumable waiter of a query. The usize is the index into waiters in the query's latch -type Waiter = (QueryJobId, usize); - -/// Visits all the non-resumable and resumable waiters of a query. -/// Only waiters in a query are visited. -/// `visit` is called for every waiter and is passed a query waiting on `query_ref` -/// and a span indicating the reason the query waited on `query_ref`. -/// If `visit` returns Some, this function returns. -/// For visits of non-resumable waiters it returns the return value of `visit`. -/// For visits of resumable waiters it returns Some(Some(Waiter)) which has the -/// required information to resume the waiter. -/// If all `visit` calls returns None, this function also returns None. -fn visit_waiters( - query_map: &QueryMap, - query: QueryJobId, - mut visit: F, -) -> Option> -where - F: FnMut(Span, QueryJobId) -> Option>, -{ - // Visit the parent query which is a non-resumable waiter since it's on the same stack - if let Some(parent) = query.parent(query_map) - && let Some(cycle) = visit(query.span(query_map), parent) - { - return Some(cycle); +/// Detects query cycles by using depth first search over all active query jobs. +/// If a query cycle is found it will break the cycle by finding an edge which +/// uses a query latch and then resuming that waiter. +/// There may be multiple cycles involved in a deadlock, so this searches +/// all active queries for cycles before finally resuming all the waiters at once. +#[allow(rustc::potential_query_instability)] +pub fn break_query_cycles( + query_map: QueryMap, + registry: &rustc_thread_pool::Registry, +) { + #[derive(Debug)] + struct QueryStackIntermediate { + start: Option, + depth: ops::RangeInclusive, + wait: Option, } - // Visit the explicit waiters which use condvars and are resumable - if let Some(latch) = query.latch(query_map) { - for (i, waiter) in latch.info.lock().waiters.iter().enumerate() { - if let Some(waiter_query) = waiter.query { - if visit(waiter.span, waiter_query).is_some() { - // Return a value which indicates that this waiter can be resumed - return Some(Some((query, i))); - } - } + impl QueryStackIntermediate { + fn from_depth(depth: usize) -> Self { + QueryStackIntermediate { start: None, depth: depth..=depth, wait: None } } - } - - None -} -/// Look for query cycles by doing a depth first search starting at `query`. -/// `span` is the reason for the `query` to execute. This is initially DUMMY_SP. -/// If a cycle is detected, this initial value is replaced with the span causing -/// the cycle. -fn cycle_check( - query_map: &QueryMap, - query: QueryJobId, - span: Span, - stack: &mut Vec<(Span, QueryJobId)>, - visited: &mut FxHashSet, -) -> Option> { - if !visited.insert(query) { - return if let Some(p) = stack.iter().position(|q| q.1 == query) { - // We detected a query cycle, fix up the initial span and return Some - - // Remove previous stack entries - stack.drain(0..p); - // Replace the span for the first query with the cycle cause - stack[0].0 = span; - Some(None) - } else { - None - }; + fn update_depth(&mut self, depth: usize) { + let (start, end) = self.depth.clone().into_inner(); + if depth < start { + self.depth = depth..=end; + } + if end < depth { + self.depth = start..=depth + } + } } - // Query marked as visited is added it to the stack - stack.push((span, query)); - - // Visit all the waiters - let r = visit_waiters(query_map, query, |span, successor| { - cycle_check(query_map, successor, span, stack, visited) - }); - - // Remove the entry in our stack if we didn't find a cycle - if r.is_none() { - stack.pop(); + #[derive(Debug)] + enum QueryWait { + /// Waits on a running query + Waiter { waited_on: QueryJobId, waiter_idx: usize }, + /// Waits other for tasks inside of `join` or `scope` + Direct { waited_on: Vec }, } - r -} + let mut stacks = FxHashMap::::default(); + for query in query_map.values() { + let query_depth = query.job.real_depth(); + let entry = stacks.entry(query.job.thread_id); + let stack = match entry { + hash_map::Entry::Vacant(entry) => { + entry.insert(QueryStackIntermediate::from_depth(query_depth)) + } + hash_map::Entry::Occupied(entry) => { + let stack = entry.into_mut(); + stack.update_depth(query_depth); + stack + } + }; -/// Finds out if there's a path to the compiler root (aka. code which isn't in a query) -/// from `query` without going through any of the queries in `visited`. -/// This is achieved with a depth first search. -fn connected_to_root( - query_map: &QueryMap, - query: QueryJobId, - visited: &mut FxHashSet, -) -> bool { - // We already visited this or we're deliberately ignoring it - if !visited.insert(query) { - return false; - } + if query + .job + .parent + .is_none_or(|inclusion| query_map[&inclusion.id].job.thread_id != query.job.thread_id) + { + // Register the thread's query stack beginning + assert!(stack.start.is_none(), "found two active queries at a thread's begining"); + stack.start = Some(query.job.id); + } - // This query is connected to the root (it has no query parent), return true - if query.parent(query_map).is_none() { - return true; + let Some(latch) = query.job.latch.upgrade() else { + continue; + }; + let lock = latch.info.try_lock().unwrap(); + assert!(!lock.complete); + for (waiter_idx, waiter) in lock.waiters.iter().enumerate() { + let waiting_stack = stacks + .entry(waiter.thread_id) + .or_insert_with(|| QueryStackIntermediate::from_depth(waiter.real_depth() - 1)); + assert!( + waiting_stack.wait.is_none(), + "found two active queries a thread is waiting for" + ); + waiting_stack.wait = Some(QueryWait::Waiter { waited_on: query.job.id, waiter_idx }); + } } - visit_waiters(query_map, query, |_, successor| { - connected_to_root(query_map, successor, visited).then_some(None) - }) - .is_some() -} - -// Deterministically pick an query from a list -fn pick_query<'a, I: Clone, T, F>(query_map: &QueryMap, queries: &'a [T], f: F) -> &'a T -where - F: Fn(&T) -> (Span, QueryJobId), -{ - // Deterministically pick an entry point - // FIXME: Sort this instead - queries - .iter() - .min_by_key(|v| { - let (span, query) = f(v); - let hash = query.query(query_map).hash; - // Prefer entry points which have valid spans for nicer error messages - // We add an integer to the tuple ensuring that entry points - // with valid spans are picked first - let span_cmp = if span == DUMMY_SP { 1 } else { 0 }; - (span_cmp, hash) - }) - .unwrap() -} - -/// Looks for query cycles starting from the last query in `jobs`. -/// If a cycle is found, all queries in the cycle is removed from `jobs` and -/// the function return true. -/// If a cycle was not found, the starting query is removed from `jobs` and -/// the function returns false. -fn remove_cycle( - query_map: &QueryMap, - jobs: &mut Vec, - wakelist: &mut Vec>>, -) -> bool { - let mut visited = FxHashSet::default(); - let mut stack = Vec::new(); - // Look for a cycle starting with the last query in `jobs` - if let Some(waiter) = - cycle_check(query_map, jobs.pop().unwrap(), DUMMY_SP, &mut stack, &mut visited) - { - // The stack is a vector of pairs of spans and queries; reverse it so that - // the earlier entries require later entries - let (mut spans, queries): (Vec<_>, Vec<_>) = stack.into_iter().rev().unzip(); - - // Shift the spans so that queries are matched with the span for their waitee - spans.rotate_right(1); - - // Zip them back together - let mut stack: Vec<_> = iter::zip(spans, queries).collect(); - - // Remove the queries in our cycle from the list of jobs to look at - for r in &stack { - if let Some(pos) = jobs.iter().position(|j| j == &r.1) { - jobs.remove(pos); + // Figure out what queries leftover stacks are blocked on + let mut root_query = None; + let thread_ids: Vec<_> = stacks.keys().copied().collect(); + for thread_id in &thread_ids { + let stack = &stacks[thread_id]; + let start = stack.start.unwrap(); + if let Some(inclusion) = query_map[&start].job.parent { + let parent = &query_map[&inclusion.id]; + assert_eq!(inclusion.real_depth.get(), *stack.depth.start()); + let waiting_stack = stacks.get_mut(&parent.job.thread_id).unwrap(); + if *waiting_stack.depth.end() == (inclusion.real_depth.get() - 1) { + match &mut waiting_stack.wait { + None => waiting_stack.wait = Some(QueryWait::Direct { waited_on: vec![start] }), + Some(QueryWait::Direct { waited_on }) => { + assert!(!waited_on.contains(&start)); + waited_on.push(start); + } + Some(QueryWait::Waiter { .. }) => (), + } } + } else { + assert!(root_query.is_none(), "found multiple threads without start"); + root_query = Some(start); } + } - // Find the queries in the cycle which are - // connected to queries outside the cycle - let entry_points = stack - .iter() - .filter_map(|&(span, query)| { - if query.parent(query_map).is_none() { - // This query is connected to the root (it has no query parent) - Some((span, query, None)) - } else { - let mut waiters = Vec::new(); - // Find all the direct waiters who lead to the root - visit_waiters(query_map, query, |span, waiter| { - // Mark all the other queries in the cycle as already visited - let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1)); - - if connected_to_root(query_map, waiter, &mut visited) { - waiters.push((span, waiter)); - } - - None - }); - if waiters.is_empty() { - None - } else { - // Deterministically pick one of the waiters to show to the user - let waiter = *pick_query(query_map, &waiters, |s| *s); - Some((span, query, Some(waiter))) - } + let root_query = root_query.expect("no root query was found"); + + for stack in stacks.values() { + match stack.wait.as_ref().expect("failed to figure out what active thread is waiting") { + QueryWait::Waiter { waited_on, waiter_idx } => { + assert_eq!( + query_map[waited_on] + .job + .latch + .upgrade() + .unwrap() + .info + .try_lock() + .unwrap() + .waiters[*waiter_idx] + .real_depth() + - 1, + *stack.depth.end() + ) + } + QueryWait::Direct { waited_on } => { + let waited_on_query = &query_map[&waited_on[0]]; + let query_inclusion = waited_on_query.job.parent.unwrap(); + let parent_id = query_inclusion.id; + for waited_on_id in &waited_on[1..] { + assert_eq!(parent_id, query_map[waited_on_id].job.parent.unwrap().id); } - }) - .collect::)>>(); - - // Deterministically pick an entry point - let (_, entry_point, usage) = pick_query(query_map, &entry_points, |e| (e.0, e.1)); - - // Shift the stack so that our entry point is first - let entry_point_pos = stack.iter().position(|(_, query)| query == entry_point); - if let Some(pos) = entry_point_pos { - stack.rotate_left(pos); + assert_eq!(query_inclusion.real_depth.get() - 1, *stack.depth.end()); + } } + } - let usage = usage.as_ref().map(|(span, query)| (*span, query.query(query_map))); - - // Create the cycle error - let error = CycleError { - usage, - cycle: stack - .iter() - .map(|&(s, ref q)| QueryInfo { span: s, query: q.query(query_map) }) - .collect(), + let mut subqueries = FxHashMap::<_, BTreeMap>::default(); + for query in query_map.values() { + let Some(inclusion) = &query.job.parent else { + continue; }; - - // We unwrap `waiter` here since there must always be one - // edge which is resumable / waited using a query latch - let (waitee_query, waiter_idx) = waiter.unwrap(); - - // Extract the waiter we want to resume - let waiter = waitee_query.latch(query_map).unwrap().extract_waiter(waiter_idx); - - // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); - - // Put the waiter on the list of things to resume - wakelist.push(waiter); - - true - } else { - false + let old = subqueries + .entry(inclusion.id) + .or_default() + .insert(inclusion.branch, (query.job.id, usize::MAX)); + assert!(old.is_none()); } -} -/// Detects query cycles by using depth first search over all active query jobs. -/// If a query cycle is found it will break the cycle by finding an edge which -/// uses a query latch and then resuming that waiter. -/// There may be multiple cycles involved in a deadlock, so this searches -/// all active queries for cycles before finally resuming all the waiters at once. -pub fn break_query_cycles( - query_map: QueryMap, - registry: &rustc_thread_pool::Registry, -) { - let mut wakelist = Vec::new(); - // It is OK per the comments: - // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798854932 - // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798866392 - #[allow(rustc::potential_query_instability)] - let mut jobs: Vec = query_map.keys().cloned().collect(); - - let mut found_cycle = false; - - while jobs.len() > 0 { - if remove_cycle(&query_map, &mut jobs, &mut wakelist) { - found_cycle = true; - } - } + for stack in stacks.values() { + let &QueryWait::Waiter { waited_on, waiter_idx } = stack.wait.as_ref().unwrap() else { + continue; + }; - // Check that a cycle was found. It is possible for a deadlock to occur without - // a query cycle if a query which can be waited on uses Rayon to do multithreading - // internally. Such a query (X) may be executing on 2 threads (A and B) and A may - // wait using Rayon on B. Rayon may then switch to executing another query (Y) - // which in turn will wait on X causing a deadlock. We have a false dependency from - // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here - // only considers the true dependency and won't detect a cycle. - if !found_cycle { - panic!( - "deadlock detected as we're unable to find a query cycle to break\n\ - current query map:\n{:#?}", - query_map - ); + let inclusion = + query_map[&waited_on].job.latch.upgrade().unwrap().info.try_lock().unwrap().waiters + [waiter_idx] + .query + .unwrap(); + let old = subqueries + .entry(inclusion.id) + .or_default() + .insert(inclusion.branch, (waited_on, waiter_idx)); + assert!(old.is_none()); + } + + let mut visited = IndexMap::new(); + let mut last_usage = None; + let mut last_waiter_idx = usize::MAX; + let mut current = root_query; + while let indexmap::map::Entry::Vacant(entry) = visited.entry(current) { + entry.insert((last_usage, last_waiter_idx)); + last_usage = Some(current); + (current, last_waiter_idx) = *subqueries + .get(¤t) + .unwrap_or_else(|| { + panic!( + "deadlock detected as we're unable to find a query cycle to break\n\ + current query map:\n{:#?}", + query_map + ) + }) + .first_key_value() + .unwrap() + .1; + } + let usage = visited[¤t].0; + let mut iter = visited.keys().rev(); + let mut cycle = Vec::new(); + loop { + let query_id = *iter.next().unwrap(); + let query = &query_map[&query_id]; + cycle.push(QueryInfo { span: query.job.span, query: query.query.clone() }); + if query_id == current { + break; + } } - // Mark all the thread we're about to wake up as unblocked. This needs to be done before - // we wake the threads up as otherwise Rayon could detect a deadlock if a thread we - // resumed fell asleep and this thread had yet to mark the remaining threads as unblocked. - for _ in 0..wakelist.len() { - rustc_thread_pool::mark_unblocked(registry); - } + cycle.reverse(); + let cycle_error = CycleError { + usage: usage.map(|id| { + let query = &query_map[&id]; + (query.job.span, query.query.clone()) + }), + cycle, + }; - for waiter in wakelist.into_iter() { - waiter.condvar.notify_one(); - } + let (waited_on, waiter_idx) = if last_waiter_idx != usize::MAX { + (current, last_waiter_idx) + } else { + let (&waited_on, &(_, waiter_idx)) = + visited.iter().rev().find(|(_, (_, waiter_idx))| *waiter_idx != usize::MAX).unwrap(); + (waited_on, waiter_idx) + }; + let waited_on = &query_map[&waited_on]; + let latch = waited_on.job.latch.upgrade().unwrap(); + let latch_info_lock = latch.info.try_lock().unwrap(); + let waiter = &latch_info_lock.waiters[waiter_idx]; + let mut cycle_lock = waiter.cycle.try_lock().unwrap(); + assert!(cycle_lock.is_none()); + *cycle_lock = Some(cycle_error); + rustc_thread_pool::mark_unblocked(registry); + waiter.condvar.notify_one(); + + // // Check that a cycle was found. It is possible for a deadlock to occur without + // // a query cycle if a query which can be waited on uses Rayon to do multithreading + // // internally. Such a query (X) may be executing on 2 threads (A and B) and A may + // // wait using Rayon on B. Rayon may then switch to executing another query (Y) + // // which in turn will wait on X causing a deadlock. We have a false dependency from + // // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here + // // only considers the true dependency and won't detect a cycle. + // if !found_cycle { + // panic!( + // "deadlock detected as we're unable to find a query cycle to break\n\ + // current query map:\n{:#?}", + // query_map + // ); + // } + + // // Mark all the thread we're about to wake up as unblocked. This needs to be done before + // // we wake the threads up as otherwise Rayon could detect a deadlock if a thread we + // // resumed fell asleep and this thread had yet to mark the remaining threads as unblocked. + // for _ in 0..wakelist.len() { + // rustc_thread_pool::mark_unblocked(registry); + // } + + // for waiter in wakelist.into_iter() { + // waiter.condvar.notify_one(); + // } } #[inline(never)] @@ -653,7 +641,7 @@ pub fn print_query_stack( ); } - current_query = query_info.job.parent; + current_query = query_info.job.parent.map(|i| i.id); count_total += 1; } diff --git a/compiler/rustc_query_system/src/query/mod.rs b/compiler/rustc_query_system/src/query/mod.rs index ce3456d532e69..39817e1e014d2 100644 --- a/compiler/rustc_query_system/src/query/mod.rs +++ b/compiler/rustc_query_system/src/query/mod.rs @@ -8,8 +8,8 @@ pub use self::plumbing::*; mod job; pub use self::job::{ - QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryMap, break_query_cycles, print_query_stack, - report_cycle, + QueryInclusion, QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryMap, break_query_cycles, + print_query_stack, report_cycle, }; mod caches; @@ -159,7 +159,7 @@ pub trait QueryContext: HasDepContext { fn next_job_id(self) -> QueryJobId; /// Get the query information from the TLS context. - fn current_query_job(self) -> Option; + fn current_query_inclusion(self) -> Option; fn collect_active_jobs( self, diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs index dea47c8fa787e..801678f3b11f0 100644 --- a/compiler/rustc_query_system/src/query/plumbing.rs +++ b/compiler/rustc_query_system/src/query/plumbing.rs @@ -24,7 +24,9 @@ use crate::dep_graph::{DepContext, DepGraphData, DepNode, DepNodeIndex, DepNodeP use crate::ich::StableHashingContext; use crate::query::caches::QueryCache; use crate::query::job::{QueryInfo, QueryJob, QueryJobId, QueryJobInfo, QueryLatch, report_cycle}; -use crate::query::{QueryContext, QueryMap, QueryStackFrame, SerializedDepNodeIndex}; +use crate::query::{ + QueryContext, QueryInclusion, QueryMap, QueryStackFrame, SerializedDepNodeIndex, +}; #[inline] fn equivalent_key(k: &K) -> impl Fn(&(K, V)) -> bool + '_ { @@ -286,7 +288,11 @@ where // We need the complete map to ensure we find a cycle to break. let query_map = qcx.collect_active_jobs(false).ok().expect("failed to collect active queries"); - let error = try_execute.find_cycle_in_stack(query_map, &qcx.current_query_job(), span); + let error = try_execute.find_cycle_in_stack( + query_map, + qcx.current_query_inclusion().map(|i| i.id), + span, + ); (mk_cycle(query, qcx, error.lift(qcx)), None) } @@ -296,8 +302,8 @@ fn wait_for_query( qcx: Qcx, span: Span, key: Q::Key, - latch: QueryLatch, - current: Option, + latch: &QueryLatch, + current: Option, ) -> (Q::Value, Option) where Q: QueryConfig, @@ -369,14 +375,14 @@ where } } - let current_job_id = qcx.current_query_job(); + let current_inclusion = qcx.current_query_inclusion(); match state_lock.entry(key_hash, equivalent_key(&key), |(k, _)| sharded::make_hash(k)) { Entry::Vacant(entry) => { // Nothing has computed or is computing the query, so we start a new job and insert it in the // state map. let id = qcx.next_job_id(); - let job = QueryJob::new(id, span, current_job_id); + let job = QueryJob::new(id, span, current_inclusion, std::thread::current().id()); entry.insert((key, QueryResult::Started(job))); // Drop the lock before we start executing the query @@ -394,7 +400,7 @@ where // Only call `wait_for_query` if we're using a Rayon thread pool // as it will attempt to mark the worker thread as blocked. - return wait_for_query(query, qcx, span, key, latch, current_job_id); + return wait_for_query(query, qcx, span, key, &latch, current_inclusion); } let id = job.id;