Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/backend/cdb/dispatcher/cdbdisp_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbcopy.h"
#include "executor/execUtils.h"
#include "cdb/cdbpq.h"

#define QUERY_STRING_TRUNCATE_SIZE (1024)

extern bool Test_print_direct_dispatch_info;

extern bool gp_print_create_gang_time;

ExtendProtocolDataStore epd_storage = {0};
ExtendProtocolData epd = &epd_storage;

typedef struct ParamWalkerContext
{
plan_tree_base_prefix base; /* Required prefix for
Expand Down Expand Up @@ -1678,3 +1683,104 @@ findParamType(List *params, int paramid)

return InvalidOid;
}

/*
* process data in pointer cosume_p, ex: copy everything interested in.
* For EP_TAG_I, EP_TAG_U, EP_TAG_D, consume_p is a Bitmapset**, we just
* copy the content and process them later.
*/
void
ConsumeExtendProtocolData(ExtendProtocolSubTag subtag, void *consume_p)
{
Assert(epd);

if ((epd->consumed_bitmap & (1 << subtag)) == 0)
return;

switch (subtag)
{
case EP_TAG_I:
case EP_TAG_U:
case EP_TAG_D:
Assert(consume_p != NULL);
*((Bitmapset **) consume_p) = bms_copy(list_nth(epd->subtagdata, subtag));
bms_free(list_nth(epd->subtagdata, subtag)); /* clean up */
break;
default:
Assert(false);
}

/* Mark subtag consumed. */
epd->consumed_bitmap &= ~(1 << subtag);
}

/*
* Contents must be allocated in TopTransactionMemoryContext.
*/
void InitExtendProtocolData(void)
{
MemoryContext oldctx = MemoryContextSwitchTo(TopTransactionContext);

/* Make bitmapset allocated under the context we set. */
Bitmapset *inserted = bms_make_singleton(0);
inserted = bms_del_member(inserted, 0);
Bitmapset *updated = bms_make_singleton(0);
updated = bms_del_member(updated, 0);
Bitmapset *deleted = bms_make_singleton(0);
deleted = bms_del_member(deleted, 0);

epd->subtagdata = list_make1(inserted);
epd->subtagdata = lappend(epd->subtagdata, updated);
epd->subtagdata = lappend(epd->subtagdata, deleted);

epd->consumed_bitmap = 0;

MemoryContextSwitchTo(oldctx);
}

/*
* Handle extend protocol aside from upstream.
* Store everything in TopTransactionMemoryContext.
* Do not error here, let libpq work.
* End of each process when we reached a EP_TAG_MAX, callers
* should follow this behaviour!.
*/
bool HandleExtendProtocol(PGconn *conn)
{
int subtag;
int num;
if (Gp_role != GP_ROLE_DISPATCH)
return false;

for (;;)
{
if (pqGetInt(&subtag, 4, conn))
return false;
switch (subtag)
{
case EP_TAG_I:
case EP_TAG_U:
case EP_TAG_D:
if (pqGetInt(&num, 4, conn))
return false;
for (int i = 0; i < num; i++)
{
Bitmapset *relids = (Bitmapset *) list_nth(epd->subtagdata, subtag);
int relid;
if (pqGetInt(&relid, 4, conn))
return false;
relids = bms_add_member(relids, relid);
list_nth_replace(epd->subtagdata, subtag, relids);
/* Mark subtag to be consumed. */
epd->consumed_bitmap |= 1 << subtag;
}
break;
case EP_TAG_MAX:
/* End of this run. */
return true;
default:
Assert(false);
return false;
}
}
}
172 changes: 132 additions & 40 deletions src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);

static void AdjustReplicatedTableCounts(EState *estate);

static void
MaintainMaterializedViewStatus(QueryDesc *queryDesc, CmdType operation);

/* end of local decls */


Expand Down Expand Up @@ -248,6 +251,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
bool shouldDispatch;
bool needDtx;
List *volatile toplevelOidCache = NIL;
bool has_writable_operation = false;

/* sanity checks: queryDesc must not be started already */
Assert(queryDesc != NULL);
Expand Down Expand Up @@ -395,7 +399,10 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
*/
if (queryDesc->plannedstmt->rowMarks != NIL ||
queryDesc->plannedstmt->hasModifyingCTE)
{
estate->es_output_cid = GetCurrentCommandId(true);
has_writable_operation = true;
}

/*
* A SELECT without modifying CTEs can't possibly queue triggers,
Expand All @@ -411,6 +418,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
case CMD_DELETE:
case CMD_UPDATE:
estate->es_output_cid = GetCurrentCommandId(true);
has_writable_operation = true;
break;

default:
Expand Down Expand Up @@ -545,6 +553,11 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
else
shouldDispatch = false;

if (IS_QD_OR_SINGLENODE() && has_writable_operation)
{
InitExtendProtocolData();
}

/*
* We don't eliminate aliens if we don't have an MPP plan
* or we are executing on master.
Expand Down Expand Up @@ -1037,7 +1050,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
/* should never happen */
Assert(!"undefined parallel execution strategy");
}
if ((exec_identity == GP_IGNORE || exec_identity == GP_ROOT_SLICE) && operation != CMD_SELECT)
if ((exec_identity == GP_IGNORE || exec_identity == GP_ROOT_SLICE) &&
(operation != CMD_SELECT || (queryDesc->plannedstmt->hasModifyingCTE)))
es_processed = mppExecutorWait(queryDesc);

/*
Expand All @@ -1062,45 +1076,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
queryDesc->plannedstmt->hasModifyingCTE) &&
((es_processed > 0 || estate->es_processed > 0) || !queryDesc->plannedstmt->canSetTag))
{
List *rtable = queryDesc->plannedstmt->rtable;
int length = list_length(rtable);
ListCell *lc;
List *unique_result_relations = list_concat_unique_int(NIL, queryDesc->plannedstmt->resultRelations);

foreach(lc, unique_result_relations)
{

int varno = lfirst_int(lc);
RangeTblEntry *rte = rt_fetch(varno, rtable);

/* Avoid crash in case we don't find a rte. */
if (varno > length + 1)
{
ereport(WARNING, (errmsg("could not find rte of varno: %u ", varno)));
continue;
}

switch (operation)
{
case CMD_INSERT:
SetRelativeMatviewAuxStatus(rte->relid,
MV_DATA_STATUS_EXPIRED_INSERT_ONLY,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
case CMD_UPDATE:
case CMD_DELETE:
SetRelativeMatviewAuxStatus(rte->relid,
MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
default:
/* If there were writable CTE, just mark it as expired. */
if (queryDesc->plannedstmt->hasModifyingCTE)
SetRelativeMatviewAuxStatus(rte->relid, MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
}
}
MaintainMaterializedViewStatus(queryDesc, operation);
}
}
PG_CATCH();
Expand Down Expand Up @@ -4291,3 +4267,119 @@ already_under_executor_run(void)
{
return executor_run_nesting_level > 0;
}

/*
* Maintain the status of Materialized Views in response to write operations on the underlying relations.
*
* For partitioned tables, changes are tracked using the relations of their leaf partitions rather than
* the parent tables themselves. This minimizes the impact on Materialized Views that depend on the
* partition tree, ensuring only relevant partitions are affected.
*
* In the case of cross-partition updates, an UPDATE operation on a parent table is decomposed into
* an INSERT on one leaf partition and a DELETE on another. As a result, the status transition follows
* an UP direction for both INSERT and DELETE operations, rather than an UP_AND_DOWN direction on the
* parent table. This approach optimizes performance and reduces unnecessary status changes avoding
* invalidations of unrelated materialized views.
*
* For non-partitioned tables, the status transition is handled based on the semantic relations.
*/
static void
MaintainMaterializedViewStatus(QueryDesc *queryDesc, CmdType operation)
{
Bitmapset *inserted = NULL;
Bitmapset *updated = NULL;
Bitmapset *deleted = NULL;
List *unique_result_relations = NIL;
List *rtable = queryDesc->plannedstmt->rtable;
int length = list_length(rtable);
ListCell *lc;
int relid = -1;

/*
* Process epd first to get the addected relations..
*/
ConsumeExtendProtocolData(EP_TAG_I, &inserted);
ConsumeExtendProtocolData(EP_TAG_U, &updated);
ConsumeExtendProtocolData(EP_TAG_D, &deleted);

relid = -1;
while((relid = bms_next_member(inserted, relid)) >= 0)
{
/* Only need to transfer to UP direction. */
SetRelativeMatviewAuxStatus(relid, MV_DATA_STATUS_EXPIRED_INSERT_ONLY,
MV_DATA_STATUS_TRANSFER_DIRECTION_UP);

}

relid = -1;
while((relid = bms_next_member(updated, relid)) >= 0)
{
SetRelativeMatviewAuxStatus(relid, MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_UP);

}

relid = -1;
while((relid = bms_next_member(deleted, relid)) >= 0)
{
SetRelativeMatviewAuxStatus(relid, MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_UP);

}

unique_result_relations = list_concat_unique_int(NIL, queryDesc->plannedstmt->resultRelations);
foreach(lc, unique_result_relations)
{
int varno = lfirst_int(lc);
RangeTblEntry *rte = rt_fetch(varno, rtable);

/* Avoid crash in case we don't find a rte. */
if (varno > length + 1)
{
ereport(WARNING, (errmsg("could not find rte of varno: %u ", varno)));
continue;
}

if (RELKIND_PARTITIONED_TABLE == rte->relkind)
{
/*
* There should be leaf paritions if we modifed a partitioned table
* Do a second check and fall back to partitioned table
* in case that if we failed to find a one.
*/
if (bms_is_empty(inserted) &&
bms_is_empty(updated) &&
bms_is_empty(deleted))
{
ereport(WARNING,
(errmsg("fail to find leafs of partitioned table: %u ", rte->relid)));
}
/* Should already be processed, just bypass. */
continue;
}
else
{
/* Do a normal update. */
switch (operation)
{
case CMD_INSERT:
SetRelativeMatviewAuxStatus(rte->relid,
MV_DATA_STATUS_EXPIRED_INSERT_ONLY,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
case CMD_UPDATE:
case CMD_DELETE:
SetRelativeMatviewAuxStatus(rte->relid,
MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
default:
/* If there were writable CTE, just mark it as expired. */
if (queryDesc->plannedstmt->hasModifyingCTE)
SetRelativeMatviewAuxStatus(rte->relid, MV_DATA_STATUS_EXPIRED,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
break;
}
}
}
}
Loading
Loading