diff --git a/src/backend/cdb/cdbgroupingpaths.c b/src/backend/cdb/cdbgroupingpaths.c index a54c6452471..33d5024579e 100644 --- a/src/backend/cdb/cdbgroupingpaths.c +++ b/src/backend/cdb/cdbgroupingpaths.c @@ -67,6 +67,7 @@ #include "optimizer/tlist.h" #include "parser/parse_clause.h" #include "parser/parse_oper.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/selfuncs.h" @@ -142,6 +143,9 @@ typedef struct * partial_rel holds the partially aggregated results from the first stage. */ RelOptInfo *partial_rel; + + /* is a DISTINCT?*/ + bool is_distinct; } cdb_agg_planning_context; typedef struct @@ -226,6 +230,11 @@ recognize_dqa_type(cdb_agg_planning_context *ctx); static PathTarget * strip_aggdistinct(PathTarget *target); +static void add_first_stage_group_agg_partial_path(PlannerInfo *root, + Path *path, + bool is_sorted, + cdb_agg_planning_context *ctx); + /* * cdb_create_multistage_grouping_paths * @@ -300,6 +309,7 @@ cdb_create_multistage_grouping_paths(PlannerInfo *root, * across subroutines. */ memset(&ctx, 0, sizeof(ctx)); + ctx.is_distinct = false; ctx.can_sort = can_sort; ctx.can_hash = can_hash; ctx.target = target; @@ -582,6 +592,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root, memset(&zero_agg_costs, 0, sizeof(zero_agg_costs)); memset(&ctx, 0, sizeof(ctx)); + ctx.is_distinct = true; ctx.can_sort = allow_sort; ctx.can_hash = allow_hash; ctx.target = target; @@ -652,7 +663,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root, /* * All set, generate the two-stage paths. */ - create_two_stage_paths(root, &ctx, input_rel, output_rel, NIL); + create_two_stage_paths(root, &ctx, input_rel, output_rel, input_rel->partial_pathlist); } /* @@ -663,6 +674,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, RelOptInfo *input_rel, RelOptInfo *output_rel, List *partial_pathlist) { Path *cheapest_path = input_rel->cheapest_total_path; + Path *cheapest_partial_path = partial_pathlist ? (Path *) linitial(partial_pathlist) : NULL; /* * Consider ways to do the first Aggregate stage. @@ -700,6 +712,24 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, if (path == cheapest_path || is_sorted) add_first_stage_group_agg_path(root, path, is_sorted, ctx); } + + if (ctx->is_distinct && partial_pathlist) + { + foreach(lc, partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) + continue; + + is_sorted = pathkeys_contained_in(ctx->partial_needed_pathkeys, + path->pathkeys); + + if (path == cheapest_partial_path|| is_sorted) + add_first_stage_group_agg_partial_path(root, path, is_sorted, ctx); + } + } } /* @@ -721,15 +751,34 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, if (partial_pathlist) { - ListCell *lc; + ListCell *lc; - foreach(lc, partial_pathlist) + foreach (lc, partial_pathlist) { - Path *path = (Path *) lfirst(lc); + Path *path = (Path *)lfirst(lc); - if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) - continue; - add_partial_path(ctx->partial_rel, path); + if (!cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles)) + { + if (ctx->is_distinct && ctx->can_hash) + { + double dNumGroups = estimate_num_groups_on_segment(ctx->dNumGroupsTotal, + path->rows, + path->locus); + + path = (Path *) create_agg_path(root, + ctx->partial_rel, + path, + ctx->partial_grouping_target, + AGG_HASHED, + ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE, + parallel_query_use_streaming_hashagg, /* streaming */ + ctx->groupClause, + NIL, + ctx->agg_partial_costs, + dNumGroups); + } + add_partial_path(ctx->partial_rel, path); + } } } @@ -849,6 +898,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx, path->pathkeys); else is_sorted = false; + if (path == cheapest_first_stage_path || is_sorted) { add_second_stage_group_agg_path(root, path, is_sorted, @@ -1249,6 +1299,7 @@ add_second_stage_hash_agg_path(PlannerInfo *root, /* * Calculate the number of groups in the second stage, per segment. */ + // consider parallel? if (CdbPathLocus_IsPartitioned(group_locus)) dNumGroups = clamp_row_est(ctx->dNumGroupsTotal / CdbPathLocus_NumSegments(group_locus)); @@ -2720,3 +2771,38 @@ cdb_prepare_path_for_hashed_agg(PlannerInfo *root, return subpath; } + +static void add_first_stage_group_agg_partial_path(PlannerInfo *root, + Path *path, + bool is_sorted, + cdb_agg_planning_context *ctx) +{ + + if (ctx->agg_costs->distinctAggrefs || + ctx->groupingSets) + return; + + if (!is_sorted) + { + path = (Path *) create_sort_path(root, + ctx->partial_rel, + path, + ctx->partial_sort_pathkeys, + -1.0); + } + + Assert(ctx->hasAggs || ctx->groupClause); + add_partial_path(ctx->partial_rel, + (Path *) create_agg_path(root, + ctx->partial_rel, + path, + ctx->partial_grouping_target, + ctx->groupClause ? AGG_SORTED : AGG_PLAIN, + ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE, + false, /* streaming */ + ctx->groupClause, + NIL, + ctx->agg_partial_costs, + estimate_num_groups_on_segment(ctx->dNumGroupsTotal, + path->rows, path->locus))); +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 6c3b7dae365..1f6e13bb16b 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -151,6 +151,7 @@ bool enable_parallel = false; bool enable_parallel_semi_join = true; bool enable_parallel_dedup_semi_join = true; bool enable_parallel_dedup_semi_reverse_join = true; +bool parallel_query_use_streaming_hashagg = false; int gp_appendonly_insert_files = 0; int gp_appendonly_insert_files_tuples_range = 0; int gp_random_insert_segments = 0; @@ -3227,6 +3228,16 @@ struct config_bool ConfigureNamesBool_gp[] = true, NULL, NULL, NULL }, + { + {"parallel_query_use_streaming_hashagg", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("allow to use of streaming hashagg in parallel query for DISTINCT."), + NULL, + GUC_EXPLAIN + }, + ¶llel_query_use_streaming_hashagg, + true, + NULL, NULL, NULL + }, { {"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED, gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."), diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index c0e0e293428..de0f931c797 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -293,6 +293,7 @@ extern bool enable_parallel; extern bool enable_parallel_semi_join; extern bool enable_parallel_dedup_semi_join; extern bool enable_parallel_dedup_semi_reverse_join; +extern bool parallel_query_use_streaming_hashagg; extern int gp_appendonly_insert_files; extern int gp_appendonly_insert_files_tuples_range; extern int gp_random_insert_segments; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index 2684fd7cb6e..a2a325bd9ed 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -498,6 +498,7 @@ "optimizer_use_gpdb_allocators", "optimizer_xform_bind_threshold", "parallel_leader_participation", + "parallel_query_use_streaming_hashagg", "parallel_setup_cost", "parallel_tuple_cost", "password_encryption", diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index b8639f9fb9c..6ed0b1b6bb0 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -3069,6 +3069,201 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant (4 rows) abort; +-- +-- Test Parallel DISTINCT +-- +drop table if exists t_distinct_0; +NOTICE: table "t_distinct_0" does not exist, skipping +create table t_distinct_0(a int, b int) using ao_column distributed randomly; +insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +analyze t_distinct_0; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: a + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: a + -> HashAggregate + Group Key: a + -> Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(9 rows) + +set enable_parallel = on; +-- first stage HashAgg, second stage GroupAgg +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(13 rows) + +set parallel_query_use_streaming_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(13 rows) + +-- GroupAgg +set enable_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +----------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + Merge Key: a + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> GroupAggregate + Group Key: a + -> Sort + Sort Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(15 rows) + +-- HashAgg +set enable_hashagg = on; +set enable_groupagg = off; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +set parallel_query_use_streaming_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +-- multi DISTINCT tlist +explain(costs off) +select distinct a, b from t_distinct_0; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a, b + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a, b + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a, b + -> Parallel Seq Scan on t_distinct_0 + Optimizer: Postgres query optimizer +(10 rows) + +-- DISTINCT on distribution key +drop table if exists t_distinct_1; +NOTICE: table "t_distinct_1" does not exist, skipping +create table t_distinct_1(a int, b int) using ao_column; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into t_distinct_1 select * from t_distinct_0; +analyze t_distinct_1; +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_1; + QUERY PLAN +------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: a + -> Seq Scan on t_distinct_1 + Optimizer: Postgres query optimizer +(5 rows) + +set enable_parallel = on; +explain(costs off) +select distinct a from t_distinct_1; + QUERY PLAN +------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: a + -> Parallel Seq Scan on t_distinct_1 + Optimizer: Postgres query optimizer +(10 rows) + +-- +-- End of test Parallel DISTINCT +-- -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index b3fab79dd09..74a60e6ed2d 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -986,6 +986,69 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_anti.a where t2_anti.a is null; abort; +-- +-- Test Parallel DISTINCT +-- +drop table if exists t_distinct_0; +create table t_distinct_0(a int, b int) using ao_column distributed randomly; +insert into t_distinct_0 select i, i+1 from generate_series(1, 1000) i; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +insert into t_distinct_0 select * from t_distinct_0; +analyze t_distinct_0; +explain(costs off) +select distinct a from t_distinct_0; +set enable_parallel = on; +-- first stage HashAgg, second stage GroupAgg +explain(costs off) +select distinct a from t_distinct_0; +set parallel_query_use_streaming_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; +-- GroupAgg +set enable_hashagg = off; +explain(costs off) +select distinct a from t_distinct_0; +-- HashAgg +set enable_hashagg = on; +set enable_groupagg = off; +explain(costs off) +select distinct a from t_distinct_0; +set parallel_query_use_streaming_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0; +-- multi DISTINCT tlist +explain(costs off) +select distinct a, b from t_distinct_0; + +-- DISTINCT on distribution key +drop table if exists t_distinct_1; +create table t_distinct_1(a int, b int) using ao_column; +insert into t_distinct_1 select * from t_distinct_0; +analyze t_distinct_1; +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_1; +set enable_parallel = on; +explain(costs off) +select distinct a from t_distinct_1; + +-- +-- End of test Parallel DISTINCT +-- + -- start_ignore drop schema test_parallel cascade; -- end_ignore