Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -2980,13 +2980,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
outer.ok_to_replicate = !outer.has_wts;
inner.ok_to_replicate = true;

/*
* For parallel mode, join is executed by each batches.
* It is hard to tell whether null exists in the whole table.
*/
if (parallel_aware && jointype == JOIN_LASJ_NOTIN)
goto fail;

switch (jointype)
{
case JOIN_INNER:
Expand Down
59 changes: 56 additions & 3 deletions src/backend/executor/nodeHash.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ MultiExecParallelHash(HashState *node)
{
bool hashkeys_null = false;

/* CBDB_PARALLEL: Siblings must have found null value. */
if (pstate->phs_lasj_has_null)
{
node->hs_hashkeys_null = true;
ExecSquelchNode(outerNode);
break;
}

slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
break;
Expand All @@ -333,14 +341,40 @@ MultiExecParallelHash(HashState *node)
&hashvalue, &hashkeys_null))
ExecParallelHashTableInsert(hashtable, slot, hashvalue);
hashtable->partialTuples++;

if (node->hs_quit_if_hashkeys_null && hashkeys_null)
{
/* CBDB_PARALLEL:
* If we are LASJ and found NULL value by ourself or sibling processes had
* found NULL values, quit and tell siblings to quit if possible.
*
* It's safe to fetch and set phs_lasj_has_null without lock here and at
* other places. As it's a atomic boolean value. And we should avoid more locks in HashJion Impl.
* If other processes miss it here and some others set it at the same time, just bypass
* and we may get it at the next Hash batch.
* If we missed it across all batches, we will know it when PHJ_BUILD_HASHING_INNER
* ends with the help of build_barrier.
* If we never participated in building hash table, check it when hash table
* creation job is finished.
*/
pstate->phs_lasj_has_null = true;
pg_write_barrier();
node->hs_hashkeys_null = true;
ExecSquelchNode(outerNode);
break;
}
}

/* CBDB_PARALLEL: No need to flush tuples if phs_lasj_has_null. */
/*
* Make sure that any tuples we wrote to disk are visible to
* others before anyone tries to load them.
*/
for (i = 0; i < hashtable->nbatch; ++i)
sts_end_write(hashtable->batches[i].inner_tuples);
if (!pstate->phs_lasj_has_null)
{
for (i = 0; i < hashtable->nbatch; ++i)
sts_end_write(hashtable->batches[i].inner_tuples);
}

/*
* Update shared counters. We need an accurate total tuple count
Expand All @@ -366,9 +400,23 @@ MultiExecParallelHash(HashState *node)
* skew).
*/
pstate->growth = PHJ_GROWTH_DISABLED;
/* In case we didn't find null values ourself. */
if (pstate->phs_lasj_has_null)
{
node->hs_hashkeys_null = true;
return;
}
}
}

/* In case we didn't participate in PHJ_BUILD_HASHING_INNER */
pg_memory_barrier();
if (pstate->phs_lasj_has_null)
{
node->hs_hashkeys_null = true;
return;
}

/*
* We're not yet attached to a batch. We all agree on the dimensions and
* number of inner tuples (for the empty table optimization).
Expand Down Expand Up @@ -3779,7 +3827,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);

/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
/*
* CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware)
* If phs_lasj_has_null is true, that means we have found null when building hash table,
* there were no batches to detach.
*/
if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier))
{
/*
* Technically we shouldn't access the barrier because we're no
Expand Down
1 change: 1 addition & 0 deletions src/backend/executor/nodeHashjoin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

BarrierInit(&pstate->sync_barrier, pcxt->nworkers);
BarrierInit(&pstate->batch0_barrier, pcxt->nworkers);
pstate->phs_lasj_has_null = false;

/* Set up the space we'll use for shared temporary files. */
SharedFileSetInit(&pstate->fileset, pcxt->seg);
Expand Down
1 change: 0 additions & 1 deletion src/backend/optimizer/path/joinpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -2318,7 +2318,6 @@ hash_inner_and_outer(PlannerInfo *root,
*/
if (innerrel->partial_pathlist != NIL &&
save_jointype != JOIN_UNIQUE_INNER &&
save_jointype != JOIN_LASJ_NOTIN &&
enable_parallel_hash)
{
cheapest_partial_inner =
Expand Down
1 change: 1 addition & 0 deletions src/include/executor/hashjoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ typedef struct ParallelHashJoinState
Barrier grow_buckets_barrier;
Barrier sync_barrier;
Barrier batch0_barrier;
volatile bool phs_lasj_has_null; /* LASJ has found null value, identify early quit */
pg_atomic_uint32 distributor; /* counter for load balancing */

SharedFileSet fileset; /* space for shared temporary files */
Expand Down
88 changes: 83 additions & 5 deletions src/test/regress/expected/gp_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -1728,10 +1728,12 @@ abort;
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
create table t3_null(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
begin;
set local enable_parallel = on;
set local gp_appendonly_insert_files = 2;
set local gp_appendonly_insert_files_tuples_range = 100;
set local max_parallel_workers_per_gather = 2;
set local enable_parallel_hash = off;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
insert into t3_null select i+1, i from generate_series(1, 1200) i;
Expand Down Expand Up @@ -1779,7 +1781,7 @@ select * from t1 where c1 not in (select c1 from t3_null);
(0 rows)

-- non-parallel results.
set enable_parallel = off;
set local enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
sum
----------------
Expand All @@ -1791,13 +1793,89 @@ select * from t1 where c1 not in (select c1 from t3_null);
----+----
(0 rows)

end;
drop table t1;
drop table t2;
drop table t3_null;
--
-- End of Test Parallel Hash Left Anti Semi (Not-In) Join.
--
--
-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join.
--
begin;
create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
set local enable_parallel = on;
set local max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 500000) i;
insert into t2 select i, i+1 from generate_series(1, 500000) i;
insert into t3_null select i, i+1 from generate_series(1, 500000) i;
insert into t3_null values(NULL, NULL);
analyze t1;
analyze t2;
analyze t3_null;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
QUERY PLAN
------------------------------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 6:1 (slice1; segments: 6)
-> Partial Aggregate
-> Parallel Hash Left Anti Semi (Not-In) Join
Hash Cond: (t1.c1 = t2.c2)
-> Parallel Seq Scan on t1
-> Parallel Hash
-> Parallel Broadcast Motion 6:6 (slice2; segments: 6)
-> Parallel Seq Scan on t2
Optimizer: Postgres query optimizer
(10 rows)

select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
sum
-----
1
(1 row)

explain(costs off) select * from t1 where c1 not in (select c2 from t3_null);
QUERY PLAN
------------------------------------------------------------------------
Gather Motion 6:1 (slice1; segments: 6)
-> Parallel Hash Left Anti Semi (Not-In) Join
Hash Cond: (t1.c1 = t3_null.c2)
-> Parallel Seq Scan on t1
-> Parallel Hash
-> Parallel Broadcast Motion 6:6 (slice2; segments: 6)
-> Parallel Seq Scan on t3_null
Optimizer: Postgres query optimizer
(8 rows)

select * from t1 where c1 not in (select c2 from t3_null);
c1 | c2
----+----
(0 rows)

-- non-parallel results.
set local enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
sum
-----
1
(1 row)

select * from t1 where c1 not in (select c2 from t3_null);
c1 | c2
----+----
(0 rows)

drop table t1;
drop table t2;
drop table t3_null;
end;
--
-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join.
--
--
-- Test alter ao/aocs table parallel_workers options
--
begin;
Expand Down
45 changes: 40 additions & 5 deletions src/test/regress/sql/gp_parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,12 @@ abort;
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
create table t3_null(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
begin;
set local enable_parallel = on;
set local gp_appendonly_insert_files = 2;
set local gp_appendonly_insert_files_tuples_range = 100;
set local max_parallel_workers_per_gather = 2;
set local enable_parallel_hash = off;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
insert into t3_null select i+1, i from generate_series(1, 1200) i;
Expand All @@ -507,16 +509,49 @@ select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
explain(costs off) select * from t1 where c1 not in (select c1 from t3_null);
select * from t1 where c1 not in (select c1 from t3_null);
-- non-parallel results.
set enable_parallel = off;
set local enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
select * from t1 where c1 not in (select c1 from t3_null);
end;
drop table t1;
drop table t2;
drop table t3_null;
--
-- End of Test Parallel Hash Left Anti Semi (Not-In) Join.
--

--
-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join.
--
begin;
create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1);
set local enable_parallel = on;
set local max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 500000) i;
insert into t2 select i, i+1 from generate_series(1, 500000) i;
insert into t3_null select i, i+1 from generate_series(1, 500000) i;
insert into t3_null values(NULL, NULL);
analyze t1;
analyze t2;
analyze t3_null;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
explain(costs off) select * from t1 where c1 not in (select c2 from t3_null);
select * from t1 where c1 not in (select c2 from t3_null);
-- non-parallel results.
set local enable_parallel = off;
select sum(t1.c1) from t1 where c1 not in (select c2 from t2);
select * from t1 where c1 not in (select c2 from t3_null);
drop table t1;
drop table t2;
drop table t3_null;
end;
--
-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join.
--

--
-- Test alter ao/aocs table parallel_workers options
--
Expand Down