Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 136 additions & 19 deletions gpMgmt/bin/gpdirtableload
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,32 @@ def parseargs():

parser.add_argument('--database', '-d', default="gpadmin",
help='Database to connect to')
parser.add_argument('--dest-path', help='Path relative to the table root directory')
parser.add_argument('--mode', choices=['upload', 'download'], default="upload",
help='Upload or download file to/from directory table')
parser.add_argument('--dest-path', help='In upload mode, this means path relative to '
'the table root directory, while in download '
'mode, means directory to download')

parser.add_argument('--force-password-auth', default=False, action='store_true',
help='Force a password prompt')

parser.add_argument('--host', default="localhost",
help='Host to connect to')
parser.add_argument('--input-file', help='Input files or directory')
parser.add_argument('--input-file', help='In upload mode, this means input files or '
'directory, while in download mode, means '
'which directory table to download')

parser.add_argument('--logfile', help='Log output to logfile')

parser.add_argument('--tag', help='In download mode, only download the same tag files')
parser.add_argument('--force-write', default=False, action='store_true',
help='In download mode, force write files when files have existed')

parser.add_argument('--port', '-p', type=int, default="5432",
help='Port to connect to')
parser.add_argument('--stop-on-error', default=False,
help='Stop loading files when an error occurs')
parser.add_argument('--table', '-t', help='Directory table to load to')
parser.add_argument('--tag', help='Tag name')
parser.add_argument('--tasks', '-T', type=int, default="1",
help='The maximum number of files that concurrently loads')
parser.add_argument('--user', '-U', default="gpadmin",
Expand Down Expand Up @@ -155,13 +164,21 @@ class gpdirtableload:
self.options.qv = self.INFO
self.startTimestamp = time.time()
self.pool = None
self.upload = True

# set default log level
if self.options.verbose is not None:
self.options.qv = self.DEBUG
else:
self.options.qv = self.INFO

# set load from/to
if self.options.mode is not None and self.options.mode == 'download':
self.upload = False

if self.options.dest_path is None:
self.log(self.ERROR, '--dest-path must be set')

# default to gpAdminLogs for a log file, may be overwritten
if self.options.logfile is None:
self.options.logfile = os.path.join(os.environ.get('HOME', '.'), 'gpAdminLogs')
Expand Down Expand Up @@ -334,16 +351,32 @@ class gpdirtableload:
self.allFiles.append(filepath)
self.numFiles = 1

def collectAllFilesToDownload(self):
self.allFilesToDownload = []
self.numFiles = 0

qry = "SELECT relative_path FROM %s " % self.options.table

if self.options.tag:
qry += "WHERE tag = \'%s\'" % self.options.tag

self.allFilesToDownload = [s[0] for s in
self.db.query(qry).getresult()]
self.numFiles = len(self.allFilesToDownload)

def confirmWorkers(self):
if self.numFiles < self.options.tasks:
self.numWorkers = self.numFiles
else:
self.numWorkers = self.options.tasks

def startLoadFiles(self):
def startUploadFiles(self):
"""
startLoadFiles
startUploadFiles
"""
if self.options.input_file is None:
self.log(self.ERROR, '--input-file must be set in upload mode')

self.pool = WorkerPool(numWorkers=self.numWorkers, should_stop=self.options.stop_on_error)

srcfile = None
Expand All @@ -357,7 +390,7 @@ class gpdirtableload:
self.log(self.ERROR, 'cannot find greenplum environment ' +
'file: environment misconfigured')

cmdstrbase = "source %s ;"
cmdstrbase = "source %s ;" % srcfile

cmdstrbase += "export PGPASSWORD=%s ; psql " % self.options.password

Expand Down Expand Up @@ -401,13 +434,94 @@ class gpdirtableload:
self.pool.haltWork()
self.pool.joinWorkers()

def run2(self):
def startDownloadFiles(self):
"""
startDownloadFiles
"""
self.pool = WorkerPool(numWorkers=self.numWorkers, should_stop=self.options.stop_on_error)

if not self.options.dest_path:
self.log(self.ERROR, 'dest-path is not set.')
if (not os.path.exists(self.options.dest_path)):
self.log(self.ERROR, 'Directory %s does not exist.' % self.options.dest_path)
if (not os.path.isdir(self.options.dest_path)):
self.log(self.ERROR, 'File path %s is not a directory.' %self.options.dest_path)

srcfile = None
if os.environ.get('GPHOME_LOADERS'):
srcfile = os.path.join(os.environ.get('GPHOME_LOADERS'),
'greenplum_loaders_path.sh')
elif os.environ.get('GPHOME'):
srcfile = os.path.join(os.environ.get('GPHOME'),
'greenplum_path.sh')
if (not (srcfile and os.path.exists(srcfile))):
self.log(self.ERROR, 'cannot find greenplum environment ' +
'file: environment misconfigured')

cmdstrbase = "source %s ;" % srcfile

cmdstrbase += "export PGPASSWORD=%s ; psql " % self.options.password

if self.options.database != None:
cmdstrbase += "-d %s " % self.options.database
if self.options.host != None:
cmdstrbase += "-h %s " % self.options.host
if self.options.port != 0:
cmdstrbase += "-p %d " % self.options.port
if self.options.user != None:
cmdstrbase += "-U %s " % self.options.user

try:
for file in self.allFilesToDownload:
fullpath = self.options.dest_path + '/' + file
if (os.path.exists(fullpath) and not self.options.force_write):
if (not os.path.isdir(fullpath)):
continue
else:
self.log(self.ERROR, 'file directory %s has existed' % fullpath)

filedir = os.path.dirname(fullpath)
if (not os.path.exists(filedir)):
os.makedirs(filedir, exist_ok=True)

cmdstr = cmdstrbase
cmdstr += '-c \"copy binary directory table %s \'%s\' to \'%s\' \"' % (self.options.table, file, fullpath)

cmd = Command(name='download directory table', ctxt=LOCAL, cmdStr=cmdstr)
self.pool.addCommand(cmd)
self.pool.join()
items = self.pool.getCompletedItems()
for i in items:
if not i.was_successful():
self.log(self.ERROR, 'failed download directory table %s to %s, msg:%s' %
(self.options.table, self.options.dest_path, i.get_results().stderr))
self.pool.check_results()
except Exception as err:
self.log(self.ERROR, 'errors in job:')
self.log(self.ERROR, err.__str__())
self.log(self.ERROR, 'exiting early')
finally:
self.pool.haltWork()
self.pool.joinWorkers()

def run_upload(self):
try:
start = time.time()
self.collectAllFiles()
self.confirmWorkers()
self.setup_connection()
self.startLoadFiles()
self.startUploadFiles()
self.log(self.INFO, 'running time: %.2f seconds' % (time.time() - start))
except Exception as e:
raise

def run_download(self):
try:
start = time.time()
self.setup_connection()
self.collectAllFilesToDownload()
self.confirmWorkers()
self.startDownloadFiles()
self.log(self.INFO, 'running time: %.2f seconds' % (time.time() - start))
except Exception as e:
raise
Expand All @@ -422,24 +536,27 @@ class gpdirtableload:
signal.signal(signal.SIGHUP, signal.SIG_IGN)

try:
try:
self.run2()
except Exception:
traceback.print_exc(file=self.logfile)
self.logfile.flush()
self.exitValue = 2
if (self.options.qv > self.INFO):
traceback.print_exc()
else:
self.log(self.ERROR, "unexpected error -- backtrace " +
"written to log file")
if self.upload == True:
self.run_upload()
else:
self.run_download()
except (Exception, SystemExit):
traceback.print_exc(file=self.logfile)
self.logfile.flush()
self.exitValue = 2
if (self.options.qv > self.INFO):
traceback.print_exc()
else:
self.log(self.ERROR, "unexpected error -- backtrace " +
"written to log file")
finally:
if self.exitValue == 0:
self.log(self.INFO, 'gpdirtableload succeeded')
elif self.exitValue == 1:
self.log(self.INFO, 'gpdirtableload succeeded with warnings')
else:
self.log(self.INFO, 'gpdirtableload failed')
os._exit(self.exitValue)


if __name__ == '__main__':
Expand Down
15 changes: 12 additions & 3 deletions src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,18 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
*/
PG_TRY();
{
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
stmt->attlist, options);
if (rel && rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE)
{
cstate = BeginCopyToDirectoryTable(pstate, stmt->filename, stmt->dirfilename,
rel, stmt->is_program, options);
}

else
{
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
stmt->attlist, options);
}

/*
* "copy t to file on segment" CopyDispatchOnSegment
Expand Down
Loading