Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 93 additions & 7 deletions src/backend/cdb/cdbgroupingpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_clause.h"
#include "parser/parse_oper.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/selfuncs.h"

Expand Down Expand Up @@ -142,6 +143,9 @@ typedef struct
* partial_rel holds the partially aggregated results from the first stage.
*/
RelOptInfo *partial_rel;

/* is a DISTINCT?*/
bool is_distinct;
} cdb_agg_planning_context;

typedef struct
Expand Down Expand Up @@ -226,6 +230,11 @@ recognize_dqa_type(cdb_agg_planning_context *ctx);
static PathTarget *
strip_aggdistinct(PathTarget *target);

static void add_first_stage_group_agg_partial_path(PlannerInfo *root,
Path *path,
bool is_sorted,
cdb_agg_planning_context *ctx);

/*
* cdb_create_multistage_grouping_paths
*
Expand Down Expand Up @@ -300,6 +309,7 @@ cdb_create_multistage_grouping_paths(PlannerInfo *root,
* across subroutines.
*/
memset(&ctx, 0, sizeof(ctx));
ctx.is_distinct = false;
ctx.can_sort = can_sort;
ctx.can_hash = can_hash;
ctx.target = target;
Expand Down Expand Up @@ -582,6 +592,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root,
memset(&zero_agg_costs, 0, sizeof(zero_agg_costs));

memset(&ctx, 0, sizeof(ctx));
ctx.is_distinct = true;
ctx.can_sort = allow_sort;
ctx.can_hash = allow_hash;
ctx.target = target;
Expand Down Expand Up @@ -652,7 +663,7 @@ cdb_create_twostage_distinct_paths(PlannerInfo *root,
/*
* All set, generate the two-stage paths.
*/
create_two_stage_paths(root, &ctx, input_rel, output_rel, NIL);
create_two_stage_paths(root, &ctx, input_rel, output_rel, input_rel->partial_pathlist);
}

/*
Expand All @@ -663,6 +674,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,
RelOptInfo *input_rel, RelOptInfo *output_rel, List *partial_pathlist)
{
Path *cheapest_path = input_rel->cheapest_total_path;
Path *cheapest_partial_path = partial_pathlist ? (Path *) linitial(partial_pathlist) : NULL;

/*
* Consider ways to do the first Aggregate stage.
Expand Down Expand Up @@ -700,6 +712,24 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,
if (path == cheapest_path || is_sorted)
add_first_stage_group_agg_path(root, path, is_sorted, ctx);
}

if (ctx->is_distinct && partial_pathlist)
{
foreach(lc, partial_pathlist)
{
Path *path = (Path *) lfirst(lc);
bool is_sorted;

if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles))
continue;

is_sorted = pathkeys_contained_in(ctx->partial_needed_pathkeys,
path->pathkeys);

if (path == cheapest_partial_path|| is_sorted)
add_first_stage_group_agg_partial_path(root, path, is_sorted, ctx);
}
}
}

/*
Expand All @@ -721,15 +751,34 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,

if (partial_pathlist)
{
ListCell *lc;
ListCell *lc;

foreach(lc, partial_pathlist)
foreach (lc, partial_pathlist)
{
Path *path = (Path *) lfirst(lc);
Path *path = (Path *)lfirst(lc);

if (cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles))
continue;
add_partial_path(ctx->partial_rel, path);
if (!cdbpathlocus_collocates_tlist(root, path->locus, ctx->group_tles))
{
if (ctx->is_distinct && ctx->can_hash)
{
double dNumGroups = estimate_num_groups_on_segment(ctx->dNumGroupsTotal,
path->rows,
path->locus);

path = (Path *) create_agg_path(root,
ctx->partial_rel,
path,
ctx->partial_grouping_target,
AGG_HASHED,
ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE,
parallel_query_use_streaming_hashagg, /* streaming */
ctx->groupClause,
NIL,
ctx->agg_partial_costs,
dNumGroups);
}
add_partial_path(ctx->partial_rel, path);
}
}
}

Expand Down Expand Up @@ -849,6 +898,7 @@ create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,
path->pathkeys);
else
is_sorted = false;

if (path == cheapest_first_stage_path || is_sorted)
{
add_second_stage_group_agg_path(root, path, is_sorted,
Expand Down Expand Up @@ -1249,6 +1299,7 @@ add_second_stage_hash_agg_path(PlannerInfo *root,
/*
* Calculate the number of groups in the second stage, per segment.
*/
// consider parallel?
if (CdbPathLocus_IsPartitioned(group_locus))
dNumGroups = clamp_row_est(ctx->dNumGroupsTotal /
CdbPathLocus_NumSegments(group_locus));
Expand Down Expand Up @@ -2720,3 +2771,38 @@ cdb_prepare_path_for_hashed_agg(PlannerInfo *root,

return subpath;
}

static void add_first_stage_group_agg_partial_path(PlannerInfo *root,
Path *path,
bool is_sorted,
cdb_agg_planning_context *ctx)
{

if (ctx->agg_costs->distinctAggrefs ||
ctx->groupingSets)
return;

if (!is_sorted)
{
path = (Path *) create_sort_path(root,
ctx->partial_rel,
path,
ctx->partial_sort_pathkeys,
-1.0);
}

Assert(ctx->hasAggs || ctx->groupClause);
add_partial_path(ctx->partial_rel,
(Path *) create_agg_path(root,
ctx->partial_rel,
path,
ctx->partial_grouping_target,
ctx->groupClause ? AGG_SORTED : AGG_PLAIN,
ctx->hasAggs ? AGGSPLIT_INITIAL_SERIAL : AGGSPLIT_SIMPLE,
false, /* streaming */
ctx->groupClause,
NIL,
ctx->agg_partial_costs,
estimate_num_groups_on_segment(ctx->dNumGroupsTotal,
path->rows, path->locus)));
}
11 changes: 11 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ bool enable_parallel = false;
bool enable_parallel_semi_join = true;
bool enable_parallel_dedup_semi_join = true;
bool enable_parallel_dedup_semi_reverse_join = true;
bool parallel_query_use_streaming_hashagg = false;
int gp_appendonly_insert_files = 0;
int gp_appendonly_insert_files_tuples_range = 0;
int gp_random_insert_segments = 0;
Expand Down Expand Up @@ -3227,6 +3228,16 @@ struct config_bool ConfigureNamesBool_gp[] =
true,
NULL, NULL, NULL
},
{
{"parallel_query_use_streaming_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("allow to use of streaming hashagg in parallel query for DISTINCT."),
NULL,
GUC_EXPLAIN
},
&parallel_query_use_streaming_hashagg,
true,
NULL, NULL, NULL
},
{
{"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED,
gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."),
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ extern bool enable_parallel;
extern bool enable_parallel_semi_join;
extern bool enable_parallel_dedup_semi_join;
extern bool enable_parallel_dedup_semi_reverse_join;
extern bool parallel_query_use_streaming_hashagg;
extern int gp_appendonly_insert_files;
extern int gp_appendonly_insert_files_tuples_range;
extern int gp_random_insert_segments;
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@
"optimizer_use_gpdb_allocators",
"optimizer_xform_bind_threshold",
"parallel_leader_participation",
"parallel_query_use_streaming_hashagg",
"parallel_setup_cost",
"parallel_tuple_cost",
"password_encryption",
Expand Down
Loading