Skip to content

Commit 8f137b1

Browse files
authored
Merge branch 'main' into feat-gist-index
2 parents b133f89 + 049fe2f commit 8f137b1

File tree

11 files changed

+487
-115
lines changed

11 files changed

+487
-115
lines changed

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
// limitations under the License.
1414

1515
mod grace_join;
16-
mod grace_memory;
16+
pub mod grace_memory;
1717
mod grace_state;
1818

1919
pub use grace_join::GraceHashJoin;
20+
pub use grace_memory::GraceMemoryJoin;
2021
pub use grace_state::*;

src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs

Lines changed: 122 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ use std::sync::Mutex;
1919
use std::sync::PoisonError;
2020
use std::sync::Weak;
2121

22-
use databend_common_catalog::table_context::TableContext;
2322
use databend_common_exception::ErrorCode;
2423
use databend_common_exception::Result;
2524
use databend_common_expression::FunctionContext;
2625
use databend_common_expression::HashMethodKind;
26+
use databend_common_pipeline_transforms::MemorySettings;
2727
use databend_common_sql::plans::JoinType;
2828

29+
use crate::pipelines::memory_settings::MemorySettingsExt;
2930
use crate::pipelines::processors::HashJoinDesc;
3031
use crate::pipelines::processors::transforms::BasicHashJoinState;
3132
use crate::pipelines::processors::transforms::GraceHashJoin;
@@ -39,6 +40,9 @@ use crate::pipelines::processors::transforms::memory::SemiRightHashJoin;
3940
use crate::pipelines::processors::transforms::memory::left_join::OuterLeftHashJoin;
4041
use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell;
4142
use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState;
43+
use crate::pipelines::processors::transforms::new_hash_join::grace::GraceMemoryJoin;
44+
use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoin;
45+
use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoinState;
4246
use crate::sessions::QueryContext;
4347

4448
pub struct HashJoinFactory {
@@ -49,6 +53,7 @@ pub struct HashJoinFactory {
4953
function_ctx: FunctionContext,
5054
grace_state: CStyleCell<HashMap<usize, Weak<GraceHashJoinState>>>,
5155
basic_state: CStyleCell<HashMap<usize, Weak<BasicHashJoinState>>>,
56+
hybrid_state: CStyleCell<HashMap<usize, Weak<HybridHashJoinState>>>,
5257
}
5358

5459
impl HashJoinFactory {
@@ -66,6 +71,7 @@ impl HashJoinFactory {
6671
mutex: Mutex::new(()),
6772
grace_state: CStyleCell::new(HashMap::new()),
6873
basic_state: CStyleCell::new(HashMap::new()),
74+
hybrid_state: CStyleCell::new(HashMap::new()),
6975
})
7076
}
7177

@@ -74,6 +80,7 @@ impl HashJoinFactory {
7480
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
7581

7682
let ctx = self.ctx.clone();
83+
7784
match self.grace_state.as_mut().entry(id) {
7885
Entry::Occupied(v) => match v.get().upgrade() {
7986
Some(v) => Ok(v),
@@ -110,6 +117,27 @@ impl HashJoinFactory {
110117
}
111118
}
112119

120+
pub fn create_hybrid_state(self: &Arc<Self>, level: usize) -> Result<Arc<HybridHashJoinState>> {
121+
let locked = self.mutex.lock();
122+
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
123+
124+
match self.hybrid_state.as_mut().entry(level) {
125+
Entry::Occupied(v) => match v.get().upgrade() {
126+
Some(v) => Ok(v),
127+
None => Err(ErrorCode::Internal(format!(
128+
"Error state: The level {} hybrid hash state has been destroyed.",
129+
level
130+
))),
131+
},
132+
Entry::Vacant(v) => {
133+
let ctx = self.ctx.clone();
134+
let hybrid_state = HybridHashJoinState::create(ctx, level, self.clone())?;
135+
v.insert(Arc::downgrade(&hybrid_state));
136+
Ok(hybrid_state)
137+
}
138+
}
139+
}
140+
113141
pub fn remove_basic_state(&self, id: usize) {
114142
let locked = self.mutex.lock();
115143
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
@@ -124,64 +152,13 @@ impl HashJoinFactory {
124152
}
125153

126154
pub fn create_hash_join(self: &Arc<Self>, typ: JoinType, id: usize) -> Result<Box<dyn Join>> {
127-
let settings = self.ctx.get_settings();
155+
// let settings = self.ctx.get_settings();
156+
//
157+
// if settings.get_force_join_data_spill()? {
158+
// return self.create_grace_join(typ, id);
159+
// }
128160

129-
if settings.get_force_join_data_spill()? {
130-
return self.create_grace_join(typ, id);
131-
}
132-
133-
match typ {
134-
JoinType::Inner => Ok(Box::new(InnerHashJoin::create(
135-
&self.ctx,
136-
self.function_ctx.clone(),
137-
self.hash_method.clone(),
138-
self.desc.clone(),
139-
self.create_basic_state(id)?,
140-
)?)),
141-
JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create(
142-
&self.ctx,
143-
self.function_ctx.clone(),
144-
self.hash_method.clone(),
145-
self.desc.clone(),
146-
self.create_basic_state(id)?,
147-
)?)),
148-
JoinType::LeftAnti => Ok(Box::new(AntiLeftHashJoin::create(
149-
&self.ctx,
150-
self.function_ctx.clone(),
151-
self.hash_method.clone(),
152-
self.desc.clone(),
153-
self.create_basic_state(id)?,
154-
)?)),
155-
JoinType::LeftSemi => Ok(Box::new(SemiLeftHashJoin::create(
156-
&self.ctx,
157-
self.function_ctx.clone(),
158-
self.hash_method.clone(),
159-
self.desc.clone(),
160-
self.create_basic_state(id)?,
161-
)?)),
162-
JoinType::Right => Ok(Box::new(OuterRightHashJoin::create(
163-
&self.ctx,
164-
self.function_ctx.clone(),
165-
self.hash_method.clone(),
166-
self.desc.clone(),
167-
self.create_basic_state(id)?,
168-
)?)),
169-
JoinType::RightSemi => Ok(Box::new(SemiRightHashJoin::create(
170-
&self.ctx,
171-
self.function_ctx.clone(),
172-
self.hash_method.clone(),
173-
self.desc.clone(),
174-
self.create_basic_state(id)?,
175-
)?)),
176-
JoinType::RightAnti => Ok(Box::new(AntiRightHashJoin::create(
177-
&self.ctx,
178-
self.function_ctx.clone(),
179-
self.hash_method.clone(),
180-
self.desc.clone(),
181-
self.create_basic_state(id)?,
182-
)?)),
183-
_ => unreachable!(),
184-
}
161+
Ok(Box::new(self.create_hybrid_join(typ, id)?))
185162
}
186163

187164
pub fn create_grace_join(self: &Arc<Self>, typ: JoinType, id: usize) -> Result<Box<dyn Join>> {
@@ -322,4 +299,91 @@ impl HashJoinFactory {
322299
_ => unreachable!(),
323300
}
324301
}
302+
303+
/// Create a basic memory join (used internally by HybridHashJoin)
304+
pub fn create_memory_join(
305+
self: &Arc<Self>,
306+
typ: JoinType,
307+
level: usize,
308+
) -> Result<Box<dyn GraceMemoryJoin>> {
309+
let basic_state = self.create_basic_state(level)?;
310+
match typ {
311+
JoinType::Inner => Ok(Box::new(InnerHashJoin::create(
312+
&self.ctx,
313+
self.function_ctx.clone(),
314+
self.hash_method.clone(),
315+
self.desc.clone(),
316+
basic_state,
317+
)?)),
318+
JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create(
319+
&self.ctx,
320+
self.function_ctx.clone(),
321+
self.hash_method.clone(),
322+
self.desc.clone(),
323+
basic_state,
324+
)?)),
325+
JoinType::LeftAnti => Ok(Box::new(AntiLeftHashJoin::create(
326+
&self.ctx,
327+
self.function_ctx.clone(),
328+
self.hash_method.clone(),
329+
self.desc.clone(),
330+
basic_state,
331+
)?)),
332+
JoinType::LeftSemi => Ok(Box::new(SemiLeftHashJoin::create(
333+
&self.ctx,
334+
self.function_ctx.clone(),
335+
self.hash_method.clone(),
336+
self.desc.clone(),
337+
basic_state,
338+
)?)),
339+
JoinType::Right => Ok(Box::new(OuterRightHashJoin::create(
340+
&self.ctx,
341+
self.function_ctx.clone(),
342+
self.hash_method.clone(),
343+
self.desc.clone(),
344+
basic_state,
345+
)?)),
346+
JoinType::RightSemi => Ok(Box::new(SemiRightHashJoin::create(
347+
&self.ctx,
348+
self.function_ctx.clone(),
349+
self.hash_method.clone(),
350+
self.desc.clone(),
351+
basic_state,
352+
)?)),
353+
JoinType::RightAnti => Ok(Box::new(AntiRightHashJoin::create(
354+
&self.ctx,
355+
self.function_ctx.clone(),
356+
self.hash_method.clone(),
357+
self.desc.clone(),
358+
basic_state,
359+
)?)),
360+
_ => unreachable!(),
361+
}
362+
}
363+
364+
/// Create a HybridHashJoin at the specified level
365+
pub fn create_hybrid_join(
366+
self: &Arc<Self>,
367+
typ: JoinType,
368+
level: usize,
369+
) -> Result<HybridHashJoin> {
370+
let basic_state = self.create_basic_state(level)?;
371+
let memory_join = self.create_memory_join(typ, level)?;
372+
let memory_settings = MemorySettings::from_join_settings(&self.ctx)?;
373+
374+
// Use shared hybrid_state so all processors can coordinate spill
375+
let hybrid_state = self.create_hybrid_state(level)?;
376+
377+
Ok(HybridHashJoin::create(
378+
self.ctx.clone(),
379+
self.function_ctx.clone(),
380+
self.hash_method.clone(),
381+
self.desc.clone(),
382+
memory_settings,
383+
hybrid_state,
384+
basic_state,
385+
memory_join,
386+
typ,
387+
))
388+
}
325389
}

0 commit comments

Comments
 (0)