Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 110 additions & 17 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -1407,21 +1407,6 @@ cdbpath_motion_for_join(PlannerInfo *root,
}
}

/*
* Locus type Replicated can only be generated by join operation.
* And in the function cdbpathlocus_join there is a rule:
* <any locus type> join <Replicated> => any locus type
* Proof by contradiction, it shows that when code arrives here,
* is is impossible that any of the two input paths' locus
* is Replicated. So we add two asserts here.
*/
Assert(!CdbPathLocus_IsReplicated(outer.locus));
Assert(!CdbPathLocus_IsReplicated(inner.locus));

if (CdbPathLocus_IsReplicated(outer.locus) ||
CdbPathLocus_IsReplicated(inner.locus))
goto fail;

outer.has_wts = cdbpath_contains_wts(outer.path);
inner.has_wts = cdbpath_contains_wts(inner.path);

Expand Down Expand Up @@ -1566,6 +1551,21 @@ cdbpath_motion_for_join(PlannerInfo *root,
outer.bytes = outer.path->rows * outer.path->pathtarget->width;
inner.bytes = inner.path->rows * inner.path->pathtarget->width;

if (join_quals_contain_outer_references ||
CdbPathLocus_IsOuterQuery(outer.locus) ||
CdbPathLocus_IsOuterQuery(inner.locus))
{
/*
* CBDB_FIXME: Consider Replicated locus.
* Replicated join OuterQuery, make Replicated to OuterQuery locus may be wrong.
* OuterQuery will be finally be Broadcast or Gathered.
* If it's Gathered, we will insert/update/delete only on one segment for a replicated table, that's not right.
* Ex: insert into a replicated table join with OuterQuery subslect.
*/
if (CdbPathLocus_IsReplicated(outer.locus) || CdbPathLocus_IsReplicated(inner.locus))
goto fail;
}

if (join_quals_contain_outer_references)
{
if (CdbPathLocus_IsOuterQuery(outer.locus) &&
Expand Down Expand Up @@ -1593,6 +1593,98 @@ cdbpath_motion_for_join(PlannerInfo *root,
else
outer.move_to = inner.locus;
}
else if (CdbPathLocus_IsReplicated(outer.locus) ||
CdbPathLocus_IsReplicated(inner.locus))
{
/*
* Replicated locus could happen here before we add Motion for join.
* Ex: insert/update/delete a replicated table with returning and join with others.
* We must broadcast to all segments for replicated table, so the upper node have
* the Replicated locus.
*/

/*
* CBDB only allow to modify one CTE now limited by gramma, but in case that there
* are multiple references for writeable CTE. We couldn't handle that now.
*/
if ((CdbPathLocus_IsReplicated(outer.locus) && CdbPathLocus_IsReplicated(inner.locus)))
goto fail;

CdbpathMfjRel *replicated = &outer;
CdbpathMfjRel *other = &inner;
if (CdbPathLocus_IsReplicated(inner.locus))
{
replicated = &inner;
other = &outer;
}

if (CdbPathLocus_IsSegmentGeneral(other->locus))
{
/*
* If it's not ok to replicate(outer join) or the numsegments of SegmentGeneral is less than Replicated, gather them to SingleQE.
* Don't worry about operation on all segments for replicated table, there will be a Explicit Gather Motion to guarantee that.
*/
if(!replicated->ok_to_replicate ||
!other->ok_to_replicate ||
(CdbPathLocus_NumSegments(other->locus) < CdbPathLocus_NumSegments(replicated->locus)))
{
CdbPathLocus_MakeSingleQE(&replicated->move_to, CdbPathLocus_NumSegments(replicated->locus));
CdbPathLocus_MakeSingleQE(&other->move_to, CdbPathLocus_NumSegments(other->locus));
}
else
return cdbpathlocus_join(jointype, replicated->locus, other->locus);
}
else if (CdbPathLocus_IsGeneral(other->locus))
{
/*
* Quite similar to SegementGeneral and we don't need to care about num segments.
* And we must Gather segment to that as SingleQE to Entry Motion may be elided, see changes in cdbpathlocus_join.
*/
if(!replicated->ok_to_replicate || !other->ok_to_replicate)
{
CdbPathLocus_MakeSingleQE(&replicated->move_to, CdbPathLocus_NumSegments(replicated->locus));
CdbPathLocus_MakeSingleQE(&other->move_to, CdbPathLocus_NumSegments(replicated->locus));
}
else
return cdbpathlocus_join(jointype, replicated->locus, other->locus);
}
else if (CdbPathLocus_IsSingleQE(other->locus) || CdbPathLocus_IsEntry(other->locus))
{
/*
* Bring to SingleQE and we should guarantee not to be elided to Entry early.
* Let cdbpathlocus_join() do it after Motion added.
*/
CdbPathLocus_MakeSingleQE(&replicated->move_to, CdbPathLocus_NumSegments(replicated->locus));
}
else if (CdbPathLocus_IsPartitioned(other->locus))
{
/*
* Hashed, Strewn, HashedOJ are similar.
* Redistribute Partition to the num segments of Replicated if num segments are not matched.
*/
if (!replicated->ok_to_replicate)
{
CdbPathLocus_MakeSingleQE(&replicated->move_to, CdbPathLocus_NumSegments(replicated->locus));
CdbPathLocus_MakeSingleQE(&other->move_to, CdbPathLocus_NumSegments(other->locus));
}
else if (CdbPathLocus_NumSegments(other->locus) != CdbPathLocus_NumSegments(replicated->locus))
{
CdbPathLocus_MakeHashed(&other->move_to, other->locus.distkey,
CdbPathLocus_NumSegments(replicated->locus), 0);
}
else
{
/* Compatible! */
return other->locus;
}
}
else
{
Assert(false);
/* Shouldn't get here */
goto fail;
}
}
else if (CdbPathLocus_IsGeneral(outer.locus) ||
CdbPathLocus_IsGeneral(inner.locus))
{
Expand Down Expand Up @@ -2675,7 +2767,7 @@ create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *policy, Path *s
* turn_volatile_seggen_to_singleqe
*
* This function is the key tool to build correct plan
* for general or segmentgeneral locus paths that contain
* for general, segmentgeneral, replicated locus paths that contain
* volatile functions.
*
* If we find such a pattern:
Expand All @@ -2695,8 +2787,9 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
{
if ((CdbPathLocus_IsSegmentGeneral(path->locus) ||
CdbPathLocus_IsGeneral(path->locus) ||
CdbPathLocus_IsReplicated(path->locus) ||
CdbPathLocus_IsSegmentGeneralWorkers(path->locus)) &&
(contain_volatile_functions(node) || IsA(path, LimitPath)))
(contain_volatile_functions(node) || IsA(path, LimitPath)))
{
CdbPathLocus singleQE;
Path *mpath;
Expand Down
40 changes: 25 additions & 15 deletions src/backend/cdb/cdbpathlocus.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,46 +774,56 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b)
return resultlocus;
}

/*
* Could get here if Replicated join Entry and we Gather Replicated to SingleQE
* with cte1 as (insert into rpt_table values (1, 2) returning *)
* select * from cte1 join gp_segment_configuration g on g.dbid = cte1.c1;
* We return SingleQE to ensure not to be elided Motion.
*/
if ((CdbPathLocus_IsSingleQE(a) && CdbPathLocus_IsEntry(b)) ||
(CdbPathLocus_IsSingleQE(b) && CdbPathLocus_IsEntry(a)))
{
return CdbPathLocus_IsSingleQE(a) ? a : b;
}

if (CdbPathLocus_IsGeneral(a))
return b;

if (CdbPathLocus_IsGeneral(b))
return a;

/*
* If one rel is replicated, result stays with the other rel,
* If one rel is SegmentGeneral, result stays with the other rel,
* but need to ensure the result is on the common segments.
* NB: the code check SegmentGeneral and Replicated is quite similar,
* but we have to put check-segmentgeneral first here. Consider one
* is SegmentGeneral and the other is Replicated, only by this order
* we can be sure that this function return a locus of Replicated,
* else if we return SegmentGeneral, plan will allocate gangs on only
* one segment, which will insert/update/delelte rows on that segment
* for a replicated tables.
*/
if (CdbPathLocus_IsReplicated(a))
if (CdbPathLocus_IsSegmentGeneral(a))
{
b.numsegments = CdbPathLocus_CommonSegments(a, b);
return b;
}
if (CdbPathLocus_IsReplicated(b))
if (CdbPathLocus_IsSegmentGeneral(b))
{
a.numsegments = CdbPathLocus_CommonSegments(a, b);
return a;
}

/*
* If one rel is segmentgeneral, result stays with the other rel,
* If one rel is replicated, result stays with the other rel,
* but need to ensure the result is on the common segments.
*
* NB: the code check SegmentGeneral and replicated is quite similar,
* but we have to put check-segmentgeneral below. Consider one
* is segmentgeneral and the other is replicated, only by this order
* we can be sure that this function never return a locus of
* Replicated.
* update a replicated table join with a partitioned locus table will
* reach here.
*/

if (CdbPathLocus_IsSegmentGeneral(a))
if (CdbPathLocus_IsReplicated(a))
{
b.numsegments = CdbPathLocus_CommonSegments(a, b);
return b;
}
if (CdbPathLocus_IsSegmentGeneral(b))
if (CdbPathLocus_IsReplicated(b))
{
a.numsegments = CdbPathLocus_CommonSegments(a, b);
return a;
Expand Down
17 changes: 0 additions & 17 deletions src/backend/optimizer/util/pathnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -6004,23 +6004,6 @@ adjust_modifytable_subpath(PlannerInfo *root, CmdType operation,
* currently, because a ModifyTable node can only be at the top of the
* plan, it won't make any difference to the overall plan.
*
* GPDB_96_MERGE_FIXME: it might with e.g. a INSERT RETURNING in a CTE
* I tried here, the locus setting is quite simple, but failed if it's not
* in a CTE and the locus is General. Haven't figured out how to create
* flow in that case.
* Example:
* CREATE TABLE cte_returning_locus(c1 int) DISTRIBUTED BY (c1);
* COPY cte_returning_locus FROM PROGRAM 'seq 1 100';
* EXPLAIN WITH aa AS (
* INSERT INTO cte_returning_locus SELECT generate_series(3,300) RETURNING c1
* )
* SELECT count(*) FROM aa,cte_returning_locus WHERE aa.c1 = cte_returning_locus.c1;
*
* The returning doesn't need a motion to be hash joined, works fine. But
* without the WITH, what is the proper flow? FLOW_SINGLETON returns
* nothing, FLOW_PARTITIONED without hashExprs(General locus has no
* distkeys) returns duplication.
*
* GPDB_90_MERGE_FIXME: I've hacked a basic implementation of the above for
* the case where all the subplans are POLICYTYPE_ENTRY, but it seems like
* there should be a more general way to do this.
Expand Down
Loading