diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 0aa3780372d..4b607d7e2e1 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -142,6 +142,8 @@ typedef struct FixedParallelState * worker will get a different parallel worker number. */ int ParallelWorkerNumber = -1; +int ParallelWorkerNumberOfSlice = -1; +int TotalParallelWorkerNumberOfSlice = 0; /* Is there a parallel message pending which we need to receive? */ volatile bool ParallelMessagePending = false; @@ -1746,6 +1748,8 @@ void GpDestroyParallelDSMEntry() ParallelSession->area = NULL; } LWLockRelease(GpParallelDSMHashLock); + ParallelWorkerNumberOfSlice = -1; + TotalParallelWorkerNumberOfSlice = 0; } void @@ -1835,6 +1839,8 @@ GpInsertParallelDSMHash(PlanState *planstate) entry->tolaunch = parallel_workers - 1; entry->parallel_workers = parallel_workers; entry->temp_worker_id = 0; + ParallelWorkerNumberOfSlice = 0; /* The first worker. */ + Assert(TotalParallelWorkerNumberOfSlice == parallel_workers); /* Create a DSA area that can be used by the leader and all workers. */ char *area_space = shm_toc_allocate(entry->toc, dsa_minsize); @@ -1894,7 +1900,7 @@ GpInsertParallelDSMHash(PlanState *planstate) .nworkers = parallel_workers, .worker_id = entry->temp_worker_id, }; - + ParallelWorkerNumberOfSlice = ctx.worker_id; InitializeGpParallelWorkers(planstate, &ctx); } diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 60d4304bbf2..4b832f0cbee 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -44,6 +44,7 @@ #include "cdb/cdbutil.h" #include "cdb/cdbvars.h" +#include "port/pg_bitutils.h" typedef struct { @@ -2998,6 +2999,8 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, CdbPathLocus_MakeNull(&inner.move_to); outer.isouter = true; inner.isouter = false; + int outerParallel = outer.locus.parallel_workers; + int innerParallel = inner.locus.parallel_workers; Assert(cdbpathlocus_is_valid(outer.locus)); Assert(cdbpathlocus_is_valid(inner.locus)); @@ -3091,6 +3094,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, case JOIN_INNER: break; case JOIN_SEMI: + if (!enable_parallel_semi_join) + goto fail; + /* FALLTHROUGH */ case JOIN_ANTI: case JOIN_LEFT: case JOIN_LASJ_NOTIN: @@ -3100,10 +3106,110 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, case JOIN_UNIQUE_INNER: case JOIN_RIGHT: case JOIN_FULL: - case JOIN_DEDUP_SEMI: - case JOIN_DEDUP_SEMI_REVERSE: /* Join types are not supported in parallel yet. */ goto fail; + case JOIN_DEDUP_SEMI: + if (!enable_parallel_dedup_semi_join) + goto fail; + + if (!CdbPathLocus_IsPartitioned(inner.locus)) + goto fail; + + if (CdbPathLocus_IsPartitioned(outer.locus) || + CdbPathLocus_IsBottleneck(outer.locus)) + { + /* ok */ + } + else if (CdbPathLocus_IsGeneral(outer.locus)) + { + CdbPathLocus_MakeSingleQE(&outer.locus, + CdbPathLocus_NumSegments(inner.locus)); + outer.path->locus = outer.locus; + } + else if (CdbPathLocus_IsSegmentGeneral(outer.locus)) + { + CdbPathLocus_MakeSingleQE(&outer.locus, + CdbPathLocus_CommonSegments(inner.locus, + outer.locus)); + outer.path->locus = outer.locus; + } + else if (CdbPathLocus_IsSegmentGeneralWorkers(outer.locus)) + { + /* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */ + goto fail; + } + else + goto fail; + inner.ok_to_replicate = false; + + /* + * CBDB_PARALLEL: + * rowidexpr is executed by 48 bits of row counter of a 64 bit int. + * When in parallel mode, we need to compute the total bits of the + * left 16 bits for segments and parallel workers. + * The formula is: + * parallel_bits + seg_bits + * while segs is max(dbid) across cluster in case that dbid segments + * are uncontinuous. + * And keep some room to make sure there should not be + * duplicated rows when execution. + */ + if (outerParallel > 1) + { + int segs = cdbcomponent_get_maxdbid(); + int parallel_bits = pg_leftmost_one_pos32(outerParallel) + 1; + int seg_bits = pg_leftmost_one_pos32(segs) + 1; + if (parallel_bits + seg_bits > 16) + goto fail; + } + outer.path = add_rowid_to_path(root, outer.path, p_rowidexpr_id); + *p_outer_path = outer.path; + break; + + case JOIN_DEDUP_SEMI_REVERSE: + if (!enable_parallel_dedup_semi_reverse_join) + goto fail; + /* same as JOIN_DEDUP_SEMI, but with inner and outer reversed */ + if (!CdbPathLocus_IsPartitioned(outer.locus)) + goto fail; + if (CdbPathLocus_IsPartitioned(inner.locus) || + CdbPathLocus_IsBottleneck(inner.locus)) + { + /* ok */ + } + else if (CdbPathLocus_IsGeneral(inner.locus)) + { + CdbPathLocus_MakeSingleQE(&inner.locus, + CdbPathLocus_NumSegments(outer.locus)); + inner.path->locus = inner.locus; + } + else if (CdbPathLocus_IsSegmentGeneral(inner.locus)) + { + CdbPathLocus_MakeSingleQE(&inner.locus, + CdbPathLocus_CommonSegments(outer.locus, + inner.locus)); + inner.path->locus = inner.locus; + } + else if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus)) + { + /* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */ + goto fail; + } + else + goto fail; + outer.ok_to_replicate = false; + if (innerParallel > 1) + { + int segs = cdbcomponent_get_maxdbid(); + int parallel_bits = pg_leftmost_one_pos32(innerParallel) + 1; + int seg_bits = pg_leftmost_one_pos32(segs) + 1; + if (parallel_bits + seg_bits > 16) + goto fail; + } + inner.path = add_rowid_to_path(root, inner.path, p_rowidexpr_id); + *p_inner_path = inner.path; + break; + default: elog(ERROR, "unexpected join type %d", jointype); } @@ -3111,8 +3217,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, /* Get rel sizes. */ outer.bytes = outer.path->rows * outer.path->pathtarget->width; inner.bytes = inner.path->rows * inner.path->pathtarget->width; - int outerParallel = outer.locus.parallel_workers; - int innerParallel = inner.locus.parallel_workers; if (join_quals_contain_outer_references || CdbPathLocus_IsOuterQuery(outer.locus) || diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index a26db1b75ea..428b1be04bf 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -58,6 +58,7 @@ #include "cdb/cdbvars.h" #include "utils/pg_locale.h" +#include "port/pg_bitutils.h" typedef struct LastAttnumInfo { @@ -1139,7 +1140,21 @@ ExecInitExprRec(Expr *node, ExprState *state, * value. */ scratch.opcode = EEOP_ROWIDEXPR; - scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48; + + /* + * CBDB_PARALLEL + * Planner have ensured that there is enough space for num of segments and parallel workers. + * As we has not set ParallelWokerNumber yet now, use TotalParallelWorkerNumberOfSlice here + * and keep bits space for ParallelWokerNumber. + */ + if (TotalParallelWorkerNumberOfSlice > 0) + { + int parallel_bits = pg_leftmost_one_pos32(TotalParallelWorkerNumberOfSlice) + 1; + /* Planner has checked that there is enough room. */ + scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << (48 + parallel_bits); + } + else + scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48; ExprEvalPushStep(state, &scratch); break; diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 8985b03fc5f..2e8a979a36b 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1603,6 +1603,12 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_CASE(EEOP_ROWIDEXPR) { int64 rowcounter = ++op->d.rowidexpr.rowcounter; + /* + * CBDB_PARALLEL_FIXME + * Take ParallelWorkerNumberOfSlice into account for just once when initialization. + */ + if (IsParallelWorkerOfSlice()) + rowcounter |= (((int64) ParallelWorkerNumberOfSlice) << (48)); *op->resvalue = Int64GetDatum(rowcounter); *op->resnull = false; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index bc16a771fb2..6f85dc0a8bb 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1925,6 +1925,12 @@ InitPlan(QueryDesc *queryDesc, int eflags) ExecSlice *sendSlice = &estate->es_sliceTable->slices[m->motionID]; estate->currentSliceId = sendSlice->parentIndex; estate->useMppParallelMode = sendSlice->useMppParallelMode; + /* + * CBDB_PARALLEL + * Remember: parallel_workers is set to no less than = 1 when gang is filled + * for convenience in Motion execution. + */ + TotalParallelWorkerNumberOfSlice = sendSlice->parallel_workers > 1 ? sendSlice->parallel_workers : 0; } /* Compute SubPlans' root plan nodes for SubPlans reachable from this plan root */ estate->locallyExecutableSubplans = getLocallyExecutableSubplans(plannedstmt, start_plan_node); @@ -1961,9 +1967,10 @@ InitPlan(QueryDesc *queryDesc, int eflags) bool save_useMppParallelMode = estate->useMppParallelMode; estate->currentSliceId = estate->es_plannedstmt->subplan_sliceIds[subplan_id - 1]; - /* FIXME: test whether mpp parallel style exists for subplan case */ + /* CBDB_PARALLEL_FIXME: test whether mpp parallel style exists for subplan case */ estate->useMppParallelMode = false; + /* CBDB_PARALLEL_FIXME: update TotalParallelWorkerNumberOfSlice for subplan, could it be possible? */ Plan *subplan = (Plan *) lfirst(l); subplanstate = ExecInitNode(subplan, estate, sp_eflags); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 48727a544e1..83fd08d3486 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -6784,7 +6784,7 @@ get_parallel_divisor(Path *path) * parallel plan. */ /* - * GPDB parallel: We don't have a leader like upstream. + * CBDB_PARALLEL: We don't have a leader like upstream. * parallel_divisor is usually used to estimate rows. * Since we don't have a leader in GP parallel style, set it the same * as path's parallel_workers which may be 0 sometimes. diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index adaa8dfd541..9f0caa11a90 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1327,8 +1327,6 @@ sort_inner_and_outer(PlannerInfo *root, if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && - save_jointype != JOIN_DEDUP_SEMI && - save_jointype != JOIN_DEDUP_SEMI_REVERSE && save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) @@ -1936,8 +1934,6 @@ match_unsorted_outer(PlannerInfo *root, if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && - save_jointype != JOIN_DEDUP_SEMI && - save_jointype != JOIN_DEDUP_SEMI_REVERSE && save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) @@ -2032,6 +2028,9 @@ consider_parallel_nestloop(PlannerInfo *root, if (jointype == JOIN_UNIQUE_INNER) jointype = JOIN_INNER; + + if (jointype == JOIN_DEDUP_SEMI || jointype == JOIN_DEDUP_SEMI_REVERSE) + jointype = JOIN_INNER; foreach(lc1, outerrel->partial_pathlist) { @@ -2309,8 +2308,6 @@ hash_inner_and_outer(PlannerInfo *root, save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT && - save_jointype != JOIN_DEDUP_SEMI && - save_jointype != JOIN_DEDUP_SEMI_REVERSE && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 92775541695..580eb196121 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -144,6 +144,9 @@ bool gp_appendonly_verify_write_block = false; bool gp_appendonly_compaction = true; int gp_appendonly_compaction_threshold = 0; bool enable_parallel = false; +bool enable_parallel_semi_join = true; +bool enable_parallel_dedup_semi_join = true; +bool enable_parallel_dedup_semi_reverse_join = true; int gp_appendonly_insert_files = 0; int gp_appendonly_insert_files_tuples_range = 0; int gp_random_insert_segments = 0; @@ -3041,6 +3044,36 @@ struct config_bool ConfigureNamesBool_gp[] = false, NULL, NULL, NULL }, + { + {"enable_parallel_semi_join", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("allow to use of parallel semi join."), + NULL, + GUC_EXPLAIN + }, + &enable_parallel_semi_join, + true, + NULL, NULL, NULL + }, + { + {"enable_parallel_dedup_semi_join", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("allow to use of parallel dedup semi join."), + NULL, + GUC_EXPLAIN + }, + &enable_parallel_dedup_semi_join, + true, + NULL, NULL, NULL + }, + { + {"enable_parallel_dedup_semi_reverse_join", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("allow to use of parallel dedup semi reverse join."), + NULL, + GUC_EXPLAIN + }, + &enable_parallel_dedup_semi_reverse_join, + true, + NULL, NULL, NULL + }, { {"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED, gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."), diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 58a1ab12665..1704cab7ab1 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -63,6 +63,10 @@ extern volatile bool ParallelMessagePending; extern PGDLLIMPORT int ParallelWorkerNumber; extern PGDLLIMPORT bool InitializingParallelWorker; +/* CBDB_PARALLEL: Total parallel workers of a slice including myself, 0 for no parallel */ +extern PGDLLIMPORT int ParallelWorkerNumberOfSlice; +extern PGDLLIMPORT int TotalParallelWorkerNumberOfSlice; + typedef struct ParallelEntryTag { int cid; @@ -90,6 +94,15 @@ typedef struct GpParallelDSMEntry Barrier build_barrier; /* synchronization for the build dsm phases */ } GpParallelDSMEntry; +/* + * CBDB_PARALLEL + * The Postgres uses ParallelWorkerNumber to handle background workers including + * parallel workers under Gather node. + * To avoid mixing them and assertion failure, we use ParallelWorkerNumberOfSlice + * to indentify CBDB style parallel mode. + */ +#define IsParallelWorkerOfSlice() (ParallelWorkerNumberOfSlice >= 0) + #define IsParallelWorker() (ParallelWorkerNumber >= 0) extern ParallelContext *CreateParallelContext(const char *library_name, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 237d5ec844f..1f0471e5584 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -290,6 +290,9 @@ extern bool gp_appendonly_verify_block_checksums; extern bool gp_appendonly_verify_write_block; extern bool gp_appendonly_compaction; extern bool enable_parallel; +extern bool enable_parallel_semi_join; +extern bool enable_parallel_dedup_semi_join; +extern bool enable_parallel_dedup_semi_reverse_join; extern int gp_appendonly_insert_files; extern int gp_appendonly_insert_files_tuples_range; extern int gp_random_insert_segments; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index aa8be6f4ec7..3b5095f543b 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -125,6 +125,9 @@ "enable_nestloop", "enable_parallel_append", "enable_parallel_hash", + "enable_parallel_semi_join", + "enable_parallel_dedup_semi_join", + "enable_parallel_dedup_semi_reverse_join", "enable_partition_pruning", "enable_partitionwise_aggregate", "enable_partitionwise_join", diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index e4700506f19..8935ce25f62 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -37,6 +37,309 @@ create schema test_parallel; set search_path to test_parallel; -- set this to default in case regress change it by gpstop. set gp_appendonly_insert_files = 4; +-- +-- Parallel Parallel Join: DEDUP_SEMI and DEDUP_SEMI_REVERSE +-- +begin; +create table foo(a int) with(parallel_workers=2) distributed randomly; +create table bar(b int) with(parallel_workers=2) distributed randomly; +insert into foo select i from generate_series(1, 1000)i; +insert into bar select i from generate_series(1, 20000)i; +analyze foo; +analyze bar; +-- non-parallel +set local enable_parallel = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +--------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (RowIdExpr) + -> Hash Join + Hash Cond: (bar.b = foo.a) + -> Seq Scan on bar + -> Hash + -> Broadcast Motion 3:3 (slice3; segments: 3) + -> Seq Scan on foo + Optimizer: Postgres query optimizer +(14 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +set local enable_parallel = on; +-- Parallel DEDUP_SEMI +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +----------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Parallel Hash Join + Hash Cond: (foo.a = bar.b) + -> Broadcast Workers Motion 6:6 (slice3; segments: 6) + -> Parallel Seq Scan on foo + -> Parallel Hash + -> Parallel Seq Scan on bar + Optimizer: Postgres query optimizer +(15 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel DEDUP_SEMI_REVERSE +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +------------------------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Parallel Hash Join + Hash Cond: (bar.b = foo.a) + -> Parallel Seq Scan on bar + -> Parallel Hash + -> Broadcast Workers Motion 6:6 (slice3; segments: 6) + -> Parallel Seq Scan on foo + Optimizer: Postgres query optimizer +(15 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel oblivious +set local enable_parallel_hash = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +--------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Hash Join + Hash Cond: (bar.b = foo.a) + -> Parallel Seq Scan on bar + -> Hash + -> Broadcast Motion 3:6 (slice3; segments: 3) + -> Seq Scan on foo + Optimizer: Postgres query optimizer +(15 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +set local enable_parallel_dedup_semi_reverse_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +--------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (RowIdExpr) + -> Hash Join + Hash Cond: (bar.b = foo.a) + -> Seq Scan on bar + -> Hash + -> Broadcast Motion 3:3 (slice3; segments: 3) + -> Seq Scan on foo + Optimizer: Postgres query optimizer +(14 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel Nestloop DEDUP_SEMI +set local min_parallel_table_scan_size = 0; +set local enable_parallel_semi_join = off; +set local enable_nestloop = on; +set local enable_hashjoin = off; +set local enable_mergejoin = off; +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + QUERY PLAN +------------------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Nested Loop + Join Filter: (bar.b = foo.a) + -> Redistribute Motion 6:6 (slice3; segments: 6) + Hash Key: bar.b + Hash Module: 3 + -> Parallel Seq Scan on bar + -> Materialize + -> Redistribute Motion 3:6 (slice4; segments: 3) + Hash Key: foo.a + Hash Module: 3 + -> Seq Scan on foo + Optimizer: Postgres query optimizer +(20 rows) + +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel Nestloop DEDUP_SEMI_REVERSE +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = off; +explain (costs off) +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + QUERY PLAN +------------------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Nested Loop + Join Filter: (bar.b = foo.a) + -> Redistribute Motion 6:6 (slice3; segments: 6) + Hash Key: foo.a + Hash Module: 3 + -> Parallel Seq Scan on foo + -> Materialize + -> Redistribute Motion 3:6 (slice4; segments: 3) + Hash Key: bar.b + Hash Module: 3 + -> Seq Scan on bar + Optimizer: Postgres query optimizer +(20 rows) + +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel Mergejoin DEDUP_SEMI +set local enable_parallel_semi_join = off; +set local enable_hashjoin = off; +set local enable_mergejoin = on; +set local enable_nestloop = off; +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +--------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (RowIdExpr) + -> Merge Join + Merge Cond: (foo.a = bar.b) + -> Sort + Sort Key: foo.a + -> Broadcast Motion 6:3 (slice3; segments: 6) + -> Parallel Seq Scan on foo + -> Sort + Sort Key: bar.b + -> Seq Scan on bar + Optimizer: Postgres query optimizer +(17 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +-- Parallel Mergejoin DEDUP_SEMI_REVERSE +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + QUERY PLAN +--------------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> HashAggregate + Group Key: (RowIdExpr) + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: (RowIdExpr) + Hash Module: 3 + -> Merge Join + Merge Cond: (bar.b = foo.a) + -> Sort + Sort Key: bar.b + -> Parallel Seq Scan on bar + -> Sort + Sort Key: foo.a + -> Broadcast Motion 3:6 (slice3; segments: 3) + -> Seq Scan on foo + Optimizer: Postgres query optimizer +(18 rows) + +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + sum +-------- + 500500 +(1 row) + +abort; -- CBDB(#131): test parallel_workers during create AO/AOCO table take effect begin; set local enable_parallel = on; diff --git a/src/test/regress/expected/rangefuncs_cdb.out b/src/test/regress/expected/rangefuncs_cdb.out index 8cc293660b2..82c79012113 100644 --- a/src/test/regress/expected/rangefuncs_cdb.out +++ b/src/test/regress/expected/rangefuncs_cdb.out @@ -2,7 +2,12 @@ -- Will run in parallel mode with enable_parallel=on and non-parallel mode. -- Filter this gucs to pass regression. -- -SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); name | setting --------------------------------+--------- enable_async_append | on diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 891e1bdd9bc..6122b670526 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -100,7 +100,12 @@ select count(*) = 0 as ok from pg_stat_wal_receiver; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -- GP parallel tests will run another with enable_parallel=on, filter this to pass regression. -select name, setting from pg_settings where name like 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); name | setting --------------------------------+--------- enable_async_append | on diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index e32dd9a1bc3..351422d82c3 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -39,6 +39,92 @@ set search_path to test_parallel; -- set this to default in case regress change it by gpstop. set gp_appendonly_insert_files = 4; +-- +-- Parallel Parallel Join: DEDUP_SEMI and DEDUP_SEMI_REVERSE +-- +begin; +create table foo(a int) with(parallel_workers=2) distributed randomly; +create table bar(b int) with(parallel_workers=2) distributed randomly; +insert into foo select i from generate_series(1, 1000)i; +insert into bar select i from generate_series(1, 20000)i; +analyze foo; +analyze bar; + +-- non-parallel +set local enable_parallel = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +set local enable_parallel = on; + +-- Parallel DEDUP_SEMI +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +-- Parallel DEDUP_SEMI_REVERSE +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +-- Parallel oblivious +set local enable_parallel_hash = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +set local enable_parallel_dedup_semi_reverse_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +-- Parallel Nestloop DEDUP_SEMI +set local min_parallel_table_scan_size = 0; +set local enable_parallel_semi_join = off; +set local enable_nestloop = on; +set local enable_hashjoin = off; +set local enable_mergejoin = off; +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + +-- Parallel Nestloop DEDUP_SEMI_REVERSE +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = off; +explain (costs off) +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); +select sum(bar.b) from bar where exists (select 1 from foo where foo.a = bar.b); + +-- Parallel Mergejoin DEDUP_SEMI +set local enable_parallel_semi_join = off; +set local enable_hashjoin = off; +set local enable_mergejoin = on; +set local enable_nestloop = off; +set local enable_parallel_semi_join = off; +set local enable_parallel_dedup_semi_reverse_join = off; +set local enable_parallel_dedup_semi_join = on; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +-- Parallel Mergejoin DEDUP_SEMI_REVERSE +set local enable_parallel_dedup_semi_reverse_join = on; +set local enable_parallel_dedup_semi_join = off; +explain (costs off) +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); +select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); + +abort; + + -- CBDB(#131): test parallel_workers during create AO/AOCO table take effect begin; set local enable_parallel = on; diff --git a/src/test/regress/sql/rangefuncs_cdb.sql b/src/test/regress/sql/rangefuncs_cdb.sql index d34502b9bcf..4915c875345 100644 --- a/src/test/regress/sql/rangefuncs_cdb.sql +++ b/src/test/regress/sql/rangefuncs_cdb.sql @@ -2,7 +2,12 @@ -- Will run in parallel mode with enable_parallel=on and non-parallel mode. -- Filter this gucs to pass regression. -- -SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); -- start_ignore create schema rangefuncs_cdb; set search_path to rangefuncs_cdb, public; diff --git a/src/test/regress/sql/sysviews.sql b/src/test/regress/sql/sysviews.sql index 54ef918640f..feef1db29ab 100644 --- a/src/test/regress/sql/sysviews.sql +++ b/src/test/regress/sql/sysviews.sql @@ -49,7 +49,12 @@ select count(*) = 0 as ok from pg_stat_wal_receiver; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -- GP parallel tests will run another with enable_parallel=on, filter this to pass regression. -select name, setting from pg_settings where name like 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/singlenode_regress/expected/sysviews.out b/src/test/singlenode_regress/expected/sysviews.out index 13feb143ea4..8ab72a9e326 100644 --- a/src/test/singlenode_regress/expected/sysviews.out +++ b/src/test/singlenode_regress/expected/sysviews.out @@ -99,7 +99,12 @@ select count(*) = 0 as ok from pg_stat_wal_receiver; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -select name, setting from pg_settings where name like 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); name | setting --------------------------------+--------- enable_async_append | on diff --git a/src/test/singlenode_regress/sql/sysviews.sql b/src/test/singlenode_regress/sql/sysviews.sql index f8867ffbe3b..bd46e2a5e66 100644 --- a/src/test/singlenode_regress/sql/sysviews.sql +++ b/src/test/singlenode_regress/sql/sysviews.sql @@ -48,7 +48,12 @@ select count(*) = 0 as ok from pg_stat_wal_receiver; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -select name, setting from pg_settings where name like 'enable%' and name not in ('enable_parallel', 'enable_answer_query_using_materialized_views'); +SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%' and +name not in ('enable_parallel', + 'enable_answer_query_using_materialized_views', + 'enable_parallel_dedup_semi_join', + 'enable_parallel_dedup_semi_reverse_join', + 'enable_parallel_semi_join'); -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail