diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index a9a2b8ee1dd..93b249f2b95 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -2980,13 +2980,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, outer.ok_to_replicate = !outer.has_wts; inner.ok_to_replicate = true; - /* - * For parallel mode, join is executed by each batches. - * It is hard to tell whether null exists in the whole table. - */ - if (parallel_aware && jointype == JOIN_LASJ_NOTIN) - goto fail; - switch (jointype) { case JOIN_INNER: diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 9c17ecbdfe3..0b1213e589e 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -324,6 +324,14 @@ MultiExecParallelHash(HashState *node) { bool hashkeys_null = false; + /* CBDB_PARALLEL: Siblings must have found null value. */ + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + ExecSquelchNode(outerNode); + break; + } + slot = ExecProcNode(outerNode); if (TupIsNull(slot)) break; @@ -333,14 +341,40 @@ MultiExecParallelHash(HashState *node) &hashvalue, &hashkeys_null)) ExecParallelHashTableInsert(hashtable, slot, hashvalue); hashtable->partialTuples++; + + if (node->hs_quit_if_hashkeys_null && hashkeys_null) + { + /* CBDB_PARALLEL: + * If we are LASJ and found NULL value by ourself or sibling processes had + * found NULL values, quit and tell siblings to quit if possible. + * + * It's safe to fetch and set phs_lasj_has_null without lock here and at + * other places. As it's a atomic boolean value. And we should avoid more locks in HashJion Impl. + * If other processes miss it here and some others set it at the same time, just bypass + * and we may get it at the next Hash batch. + * If we missed it across all batches, we will know it when PHJ_BUILD_HASHING_INNER + * ends with the help of build_barrier. + * If we never participated in building hash table, check it when hash table + * creation job is finished. + */ + pstate->phs_lasj_has_null = true; + pg_write_barrier(); + node->hs_hashkeys_null = true; + ExecSquelchNode(outerNode); + break; + } } + /* CBDB_PARALLEL: No need to flush tuples if phs_lasj_has_null. */ /* * Make sure that any tuples we wrote to disk are visible to * others before anyone tries to load them. */ - for (i = 0; i < hashtable->nbatch; ++i) - sts_end_write(hashtable->batches[i].inner_tuples); + if (!pstate->phs_lasj_has_null) + { + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].inner_tuples); + } /* * Update shared counters. We need an accurate total tuple count @@ -366,9 +400,23 @@ MultiExecParallelHash(HashState *node) * skew). */ pstate->growth = PHJ_GROWTH_DISABLED; + /* In case we didn't find null values ourself. */ + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + return; + } } } + /* In case we didn't participate in PHJ_BUILD_HASHING_INNER */ + pg_memory_barrier(); + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + return; + } + /* * We're not yet attached to a batch. We all agree on the dimensions and * number of inner tuples (for the empty table optimization). @@ -3779,7 +3827,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); /* Detach from the batch we were last working on. */ - if (BarrierArriveAndDetach(&batch->batch_barrier)) + /* + * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware) + * If phs_lasj_has_null is true, that means we have found null when building hash table, + * there were no batches to detach. + */ + if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier)) { /* * Technically we shouldn't access the barrier because we're no diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index e49a857becf..67255b717d4 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2065,6 +2065,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) BarrierInit(&pstate->sync_barrier, pcxt->nworkers); BarrierInit(&pstate->batch0_barrier, pcxt->nworkers); + pstate->phs_lasj_has_null = false; /* Set up the space we'll use for shared temporary files. */ SharedFileSetInit(&pstate->fileset, pcxt->seg); diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index c12317b2a89..d3220af0b48 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2318,7 +2318,6 @@ hash_inner_and_outer(PlannerInfo *root, */ if (innerrel->partial_pathlist != NIL && save_jointype != JOIN_UNIQUE_INNER && - save_jointype != JOIN_LASJ_NOTIN && enable_parallel_hash) { cheapest_partial_inner = diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 0ecbcc63da0..05500c34526 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -286,6 +286,7 @@ typedef struct ParallelHashJoinState Barrier grow_buckets_barrier; Barrier sync_barrier; Barrier batch0_barrier; + volatile bool phs_lasj_has_null; /* LASJ has found null value, identify early quit */ pg_atomic_uint32 distributor; /* counter for load balancing */ SharedFileSet fileset; /* space for shared temporary files */ diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 0ead2658f4c..daa16156db8 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -1728,10 +1728,12 @@ abort; create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); -set enable_parallel = on; -set gp_appendonly_insert_files = 2; -set gp_appendonly_insert_files_tuples_range = 100; -set max_parallel_workers_per_gather = 2; +begin; +set local enable_parallel = on; +set local gp_appendonly_insert_files = 2; +set local gp_appendonly_insert_files_tuples_range = 100; +set local max_parallel_workers_per_gather = 2; +set local enable_parallel_hash = off; insert into t1 select i, i from generate_series(1, 5000000) i; insert into t2 select i+1, i from generate_series(1, 1200) i; insert into t3_null select i+1, i from generate_series(1, 1200) i; @@ -1779,7 +1781,7 @@ select * from t1 where c1 not in (select c1 from t3_null); (0 rows) -- non-parallel results. -set enable_parallel = off; +set local enable_parallel = off; select sum(t1.c1) from t1 where c1 not in (select c1 from t2); sum ---------------- @@ -1791,6 +1793,7 @@ select * from t1 where c1 not in (select c1 from t3_null); ----+---- (0 rows) +end; drop table t1; drop table t2; drop table t3_null; @@ -1798,6 +1801,81 @@ drop table t3_null; -- End of Test Parallel Hash Left Anti Semi (Not-In) Join. -- -- +-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +begin; +create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 500000) i; +insert into t2 select i, i+1 from generate_series(1, 500000) i; +insert into t3_null select i, i+1 from generate_series(1, 500000) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + QUERY PLAN +------------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> Parallel Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t2.c2) + -> Parallel Seq Scan on t1 + -> Parallel Hash + -> Parallel Broadcast Motion 6:6 (slice2; segments: 6) + -> Parallel Seq Scan on t2 + Optimizer: Postgres query optimizer +(10 rows) + +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + sum +----- + 1 +(1 row) + +explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); + QUERY PLAN +------------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> Parallel Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t3_null.c2) + -> Parallel Seq Scan on t1 + -> Parallel Hash + -> Parallel Broadcast Motion 6:6 (slice2; segments: 6) + -> Parallel Seq Scan on t3_null + Optimizer: Postgres query optimizer +(8 rows) + +select * from t1 where c1 not in (select c2 from t3_null); + c1 | c2 +----+---- +(0 rows) + +-- non-parallel results. +set local enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + sum +----- + 1 +(1 row) + +select * from t1 where c1 not in (select c2 from t3_null); + c1 | c2 +----+---- +(0 rows) + +drop table t1; +drop table t2; +drop table t3_null; +end; +-- +-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +-- -- Test alter ao/aocs table parallel_workers options -- begin; diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index 16d2ef34afa..0e9c794bb34 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -491,10 +491,12 @@ abort; create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); -set enable_parallel = on; -set gp_appendonly_insert_files = 2; -set gp_appendonly_insert_files_tuples_range = 100; -set max_parallel_workers_per_gather = 2; +begin; +set local enable_parallel = on; +set local gp_appendonly_insert_files = 2; +set local gp_appendonly_insert_files_tuples_range = 100; +set local max_parallel_workers_per_gather = 2; +set local enable_parallel_hash = off; insert into t1 select i, i from generate_series(1, 5000000) i; insert into t2 select i+1, i from generate_series(1, 1200) i; insert into t3_null select i+1, i from generate_series(1, 1200) i; @@ -507,9 +509,10 @@ select sum(t1.c1) from t1 where c1 not in (select c1 from t2); explain(costs off) select * from t1 where c1 not in (select c1 from t3_null); select * from t1 where c1 not in (select c1 from t3_null); -- non-parallel results. -set enable_parallel = off; +set local enable_parallel = off; select sum(t1.c1) from t1 where c1 not in (select c1 from t2); select * from t1 where c1 not in (select c1 from t3_null); +end; drop table t1; drop table t2; drop table t3_null; @@ -517,6 +520,38 @@ drop table t3_null; -- End of Test Parallel Hash Left Anti Semi (Not-In) Join. -- +-- +-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +begin; +create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 500000) i; +insert into t2 select i, i+1 from generate_series(1, 500000) i; +insert into t3_null select i, i+1 from generate_series(1, 500000) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); +select * from t1 where c1 not in (select c2 from t3_null); +-- non-parallel results. +set local enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +select * from t1 where c1 not in (select c2 from t3_null); +drop table t1; +drop table t2; +drop table t3_null; +end; +-- +-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- + -- -- Test alter ao/aocs table parallel_workers options --