Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 125 additions & 6 deletions src/backend/commands/copyfrom.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "cdb/cdbaocsam.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
Expand Down Expand Up @@ -726,7 +728,7 @@ formDirTableSlot(CopyFromState cstate,
char *field[5];
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
List *attnumlist = cstate->qd_attnumlist;
List *attnumlist = cstate->attnumlist;
pg_time_t stampTime = (pg_time_t) time(NULL);
char lastModified[128];

Expand Down Expand Up @@ -783,6 +785,7 @@ CopyFromDirectoryTable(CopyFromState cstate)
char buffer[DIR_FILE_BUFF_SIZE];
int64 processed = 0;
int64 fileSize = 0;
UFile *file;
CdbCopy *cdbCopy = NULL;
char *dirTablePath;
char *orgiFileName;
Expand All @@ -795,6 +798,7 @@ CopyFromDirectoryTable(CopyFromState cstate)
DirectoryTable *dirTable;
pg_cryptohash_ctx *hashCtx;
uint8 md5Sum[MD5_DIGEST_LENGTH];
char errorMessage[256];
GpDistributionData *distData = NULL; /* distribution data used to compute target seg */

/*
Expand Down Expand Up @@ -989,8 +993,6 @@ CopyFromDirectoryTable(CopyFromState cstate)
List *recheckIndexes = NIL;
CommandId mycid = GetCurrentCommandId(true);
MemoryContext oldcontext = CurrentMemoryContext;
char errorMessage[256];
UFile *file;
bool has_tuple = false;
bool update_indexes;

Expand Down Expand Up @@ -1164,9 +1166,124 @@ CopyFromDirectoryTable(CopyFromState cstate)
}
else
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("This copy from dispatch mode is not supported.")));
List *recheckIndexes = NIL;
CommandId mycid = GetCurrentCommandId(true);
bool update_indexes;

formDirTableSlot(cstate,
dirTable->spcId,
relaFileName,
0,
NULL,
cstate->opts.tags,
myslot->tts_values,
myslot->tts_isnull);
ExecStoreVirtualTuple(myslot);

/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
myslot, mycid, 0, NULL);

recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
myslot,
estate,
false,
false,
NULL,
NIL);

/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, myslot,
recheckIndexes, cstate->transition_capture);

list_free(recheckIndexes);

CommandCounterIncrement();

if (UFileExists(dirTable->spcId, orgiFileName))
{
UFileUnlink(dirTable->spcId, orgiFileName);
}

if (orgiFileDir)
UFileEnsurePath(dirTable->spcId, orgiFileDir);

file = UFileOpen(dirTable->spcId,
orgiFileName,
O_CREAT | O_WRONLY,
errorMessage,
sizeof(errorMessage));

if (file == NULL)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("failed to open file \"%s\": %s", orgiFileName, errorMessage)));

/* Delete uploaded file when the transaction fails */
UFileAddPendingDelete(cstate->rel, dirTable->spcId, orgiFileName, false);

hashCtx = pg_cryptohash_create(PG_MD5);
if (hashCtx == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to create md5hash context: out of memory")));
pg_cryptohash_init(hashCtx);

for (;;)
{
CHECK_FOR_INTERRUPTS();

bytesRead = CopyReadBinaryData(cstate, buffer, DIR_FILE_BUFF_SIZE);

if (bytesRead > 0)
{
if (UFileWrite(file, buffer, bytesRead) == -1)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("failed to write file \"%s\": %s", orgiFileName, UFileGetLastError(file))));

fileSize += bytesRead;
pg_cryptohash_update(hashCtx, (const uint8 *) buffer, bytesRead);
}

if (bytesRead != DIR_FILE_BUFF_SIZE)
{
break;
}
}

if (UFileSync(file) != 0)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("unable to sync file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file))));

UFileClose(file);

pg_cryptohash_final(hashCtx, md5Sum, sizeof(md5Sum));
pg_cryptohash_free(hashCtx);
bytesToHex(md5Sum, hexMd5Sum);

myslot->tts_values[1] = Int64GetDatum(fileSize);
myslot->tts_values[3] = CStringGetTextDatum((char *) hexMd5Sum);
myslot->tts_isnull[3] = false;

simple_table_tuple_update(resultRelInfo->ri_RelationDesc, &myslot->tts_tid, myslot,
estate->es_snapshot, &update_indexes);

ExecClearTuple(myslot);

/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*
* MPP: incrementing this counter here only matters for utility
* mode. in dispatch mode only the dispatcher COPY collects row
* count, so this counter is meaningless.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}

cstate->filename = NULL;
Expand Down Expand Up @@ -1273,6 +1390,8 @@ BeginCopyFromDirectoryTable(ParseState *pstate,
cstate->dispatch_mode = COPY_DISPATCH;
else if (Gp_role == GP_ROLE_EXECUTE)
cstate->dispatch_mode = COPY_EXECUTOR;
else
cstate->dispatch_mode = COPY_DIRECT;

cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
Expand Down
1 change: 0 additions & 1 deletion src/backend/commands/tablecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
if (relkind == RELKIND_DIRECTORY_TABLE)
{
schema = GetDirectoryTableSchema();
stmt->distributedBy = GetDirectoryTableDistributedBy();
}
else
{
Expand Down
22 changes: 6 additions & 16 deletions src/backend/parser/gram.y
Original file line number Diff line number Diff line change
Expand Up @@ -8414,7 +8414,7 @@ AlterStorageUserMappingStmt:

CreateDirectoryTableStmt:
CREATE DIRECTORY TABLE qualified_name
table_access_method_clause OptTableSpace OptDistributedBy OptTagOptList
table_access_method_clause OptTableSpace OptTagOptList
{
CreateDirectoryTableStmt *n = makeNode(CreateDirectoryTableStmt);
$4->relpersistence = RELPERSISTENCE_PERMANENT;
Expand All @@ -8427,20 +8427,15 @@ CreateDirectoryTableStmt:
/* TODO: support tablespace for data table ? */
n->base.tablespacename = NULL;
n->base.if_not_exists = false;
n->base.distributedBy = (DistributedBy *) $7;
if (n->base.distributedBy != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Create directory table is not allowed to set distributed by."),
parser_errposition(@7)));
n->base.distributedBy = GetDirectoryTableDistributedBy();
n->base.relKind = RELKIND_DIRECTORY_TABLE;
n->base.tags = $8;
n->base.tags = $7;
n->tablespacename = $6;

$$ = (Node *) n;
}
| CREATE DIRECTORY TABLE IF_P NOT EXISTS qualified_name
table_access_method_clause OptTableSpace OptDistributedBy OptTagOptList
table_access_method_clause OptTableSpace OptTagOptList
{
CreateDirectoryTableStmt *n = makeNode(CreateDirectoryTableStmt);
$7->relpersistence = RELPERSISTENCE_PERMANENT;
Expand All @@ -8453,14 +8448,9 @@ CreateDirectoryTableStmt:
/* TODO: support tablespace for data table? */
n->base.tablespacename = NULL;
n->base.if_not_exists = true;
n->base.distributedBy = (DistributedBy *) $10;
if (n->base.distributedBy != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Create directory table is not allowed to set distributed by."),
parser_errposition(@10)));
n->base.distributedBy = GetDirectoryTableDistributedBy();
n->base.relKind = RELKIND_DIRECTORY_TABLE;
n->base.tags = $11;
n->base.tags = $10;
n->tablespacename = $9;

$$ = (Node *) n;
Expand Down
6 changes: 3 additions & 3 deletions src/test/regress/output/directory_table.source
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,15 @@ SELECT relname, relisshared, relpersistence, relkind FROM pg_class WHERE relname
CREATE DIRECTORY TABLE dir_table1;
CREATE DIRECTORY TABLE dir_table2 TABLESPACE directory_tblspc;
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED BY(relative_path); -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED RANDOMLY; -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED REPLICATED; -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc;
Expand Down
6 changes: 3 additions & 3 deletions src/test/regress/output/directory_table_optimizer.source
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,15 @@ SELECT relname, relisshared, relpersistence, relkind FROM pg_class WHERE relname
CREATE DIRECTORY TABLE dir_table1;
CREATE DIRECTORY TABLE dir_table2 TABLESPACE directory_tblspc;
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED BY(relative_path); -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED RANDOMLY; -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTED REPLICATED; -- fail
ERROR: Create directory table is not allowed to set distributed by.
ERROR: syntax error at or near "DISTRIBUTED"
LINE 1: ...TORY TABLE dir_table3 TABLESPACE directory_tblspc DISTRIBUTE...
^
CREATE DIRECTORY TABLE dir_table3 TABLESPACE directory_tblspc;
Expand Down