Skip to content

Commit c5c4cfe

Browse files
[HUDI-3507] Support export command based on Call Produce Command (#5901)
1 parent fec49dc commit c5c4cfe

4 files changed

Lines changed: 309 additions & 19 deletions

File tree

hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package org.apache.hudi.cli.commands;
2020

21+
import org.apache.avro.generic.GenericRecord;
22+
import org.apache.avro.generic.IndexedRecord;
23+
import org.apache.avro.specific.SpecificData;
24+
import org.apache.hadoop.fs.FileStatus;
25+
import org.apache.hadoop.fs.FileSystem;
26+
import org.apache.hadoop.fs.Path;
2127
import org.apache.hudi.avro.HoodieAvroUtils;
2228
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
2329
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -36,14 +42,6 @@
3642
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
3743
import org.apache.hudi.common.util.ClosableIterator;
3844
import org.apache.hudi.exception.HoodieException;
39-
40-
import org.apache.avro.generic.GenericRecord;
41-
import org.apache.avro.generic.IndexedRecord;
42-
import org.apache.avro.specific.SpecificData;
43-
import org.apache.hadoop.fs.FileStatus;
44-
import org.apache.hadoop.fs.Path;
45-
import org.apache.log4j.LogManager;
46-
import org.apache.log4j.Logger;
4745
import org.springframework.shell.core.CommandMarker;
4846
import org.springframework.shell.core.annotation.CliCommand;
4947
import org.springframework.shell.core.annotation.CliOption;
@@ -60,10 +58,10 @@
6058

6159
/**
6260
* CLI commands to export various information from a HUDI dataset.
63-
*
61+
* <p>
6462
* "export instants": Export Instants and their metadata from the Timeline to a local
65-
* directory specified by the parameter --localFolder
66-
* The instants are exported in the json format.
63+
* directory specified by the parameter --localFolder
64+
* The instants are exported in the json format.
6765
*/
6866
@Component
6967
public class ExportCommand implements CommandMarker {
@@ -83,7 +81,7 @@ public String exportInstants(
8381
int numExports = limit == -1 ? Integer.MAX_VALUE : limit;
8482
int numCopied = 0;
8583

86-
if (! new File(localFolder).isDirectory()) {
84+
if (!new File(localFolder).isDirectory()) {
8785
throw new HoodieException(localFolder + " is not a valid local directory");
8886
}
8987

@@ -94,7 +92,7 @@ public String exportInstants(
9492

9593
// Archived instants are in the commit archive files
9694
FileStatus[] statuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
97-
List<FileStatus> archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int)(f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList());
95+
List<FileStatus> archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int) (f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList());
9896

9997
if (descending) {
10098
Collections.reverse(nonArchivedInstants);
@@ -115,11 +113,11 @@ public String exportInstants(
115113

116114
private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSet, int limit, String localFolder) throws Exception {
117115
int copyCount = 0;
116+
FileSystem fileSystem = FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf);
118117

119118
for (FileStatus fs : statuses) {
120119
// read the archived file
121-
Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf),
122-
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
120+
Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
123121

124122
// read the avro blocks
125123
while (reader.hasNext() && copyCount < limit) {
@@ -130,7 +128,7 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
130128
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
131129
// metadata record from the entry and convert it to json.
132130
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
133-
.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
131+
.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
134132
final String action = archiveEntryRecord.get("actionType").toString();
135133
if (!actionSet.contains(action)) {
136134
continue;
@@ -157,7 +155,7 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
157155
default:
158156
throw new HoodieException("Unknown type of action " + action);
159157
}
160-
158+
161159
final String instantTime = archiveEntryRecord.get("commitTime").toString();
162160
final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
163161
writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
@@ -178,9 +176,8 @@ private int copyNonArchivedInstants(List<HoodieInstant> instants, int limit, Str
178176
int copyCount = 0;
179177

180178
if (instants.isEmpty()) {
181-
return limit;
179+
return copyCount;
182180
}
183-
final Logger LOG = LogManager.getLogger(ExportCommand.class);
184181

185182
final HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
186183
final HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
@@ -221,6 +218,7 @@ private int copyNonArchivedInstants(List<HoodieInstant> instants, int limit, Str
221218

222219
if (data != null) {
223220
writeToFile(localPath, data);
221+
copyCount = copyCount + 1;
224222
}
225223
}
226224

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.avro.generic.GenericRecord
21+
import org.apache.avro.specific.SpecificData
22+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
23+
import org.apache.hudi.avro.HoodieAvroUtils
24+
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
25+
import org.apache.hudi.common.fs.FSUtils
26+
import org.apache.hudi.common.model.HoodieLogFile
27+
import org.apache.hudi.common.table.HoodieTableMetaClient
28+
import org.apache.hudi.common.table.log.HoodieLogFormat
29+
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
30+
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineMetadataUtils}
31+
import org.apache.hudi.exception.HoodieException
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.sql.Row
34+
import org.apache.spark.sql.catalyst.TableIdentifier
35+
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
36+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
37+
38+
import java.io.File
39+
import java.util
40+
import java.util.Collections
41+
import java.util.function.Supplier
42+
import scala.collection.JavaConverters._
43+
import scala.util.control.Breaks.break
44+
45+
class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
46+
var sortByFieldParameter: ProcedureParameter = _
47+
48+
val defaultActions = "clean,commit,deltacommit,rollback,savepoint,restore"
49+
50+
private val PARAMETERS = Array[ProcedureParameter](
51+
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
52+
ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None),
53+
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
54+
ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions),
55+
ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
56+
)
57+
58+
private val OUTPUT_TYPE = new StructType(Array[StructField](
59+
StructField("export_detail", DataTypes.StringType, nullable = true, Metadata.empty)
60+
))
61+
62+
def parameters: Array[ProcedureParameter] = PARAMETERS
63+
64+
def outputType: StructType = OUTPUT_TYPE
65+
66+
override def call(args: ProcedureArgs): Seq[Row] = {
67+
super.checkArgs(PARAMETERS, args)
68+
69+
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
70+
val localFolder = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
71+
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
72+
val actions: String = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
73+
val desc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
74+
75+
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
76+
val basePath = hoodieCatalogTable.tableLocation
77+
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
78+
val archivePath = new Path(basePath + "/.hoodie/.commits_.archive*")
79+
val actionSet: util.Set[String] = Set(actions.split(","): _*).asJava
80+
val numExports = if (limit == -1) Integer.MAX_VALUE else limit
81+
var numCopied = 0
82+
83+
if (!new File(localFolder).isDirectory) throw new HoodieException(localFolder + " is not a valid local directory")
84+
85+
// The non archived instants can be listed from the Timeline.
86+
val nonArchivedInstants: util.List[HoodieInstant] = metaClient
87+
.getActiveTimeline
88+
.filterCompletedInstants.getInstants.iterator().asScala
89+
.filter((i: HoodieInstant) => actionSet.contains(i.getAction))
90+
.toList.asJava
91+
92+
// Archived instants are in the commit archive files
93+
val statuses: Array[FileStatus] = FSUtils.getFs(basePath, jsc.hadoopConfiguration()).globStatus(archivePath)
94+
val archivedStatuses = List(statuses: _*)
95+
.sortWith((f1, f2) => (f1.getModificationTime - f2.getModificationTime).toInt > 0).asJava
96+
97+
if (desc) {
98+
Collections.reverse(nonArchivedInstants)
99+
numCopied = copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports, localFolder)
100+
if (numCopied < numExports) {
101+
Collections.reverse(archivedStatuses)
102+
numCopied += copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports - numCopied, localFolder)
103+
}
104+
} else {
105+
numCopied = copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports, localFolder)
106+
if (numCopied < numExports) numCopied += copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports - numCopied, localFolder)
107+
}
108+
109+
Seq(Row("Exported " + numCopied + " Instants to " + localFolder))
110+
}
111+
112+
@throws[Exception]
113+
private def copyArchivedInstants(basePath: String, statuses: util.List[FileStatus], actionSet: util.Set[String], limit: Int, localFolder: String) = {
114+
import scala.collection.JavaConversions._
115+
var copyCount = 0
116+
val fileSystem = FSUtils.getFs(basePath, jsc.hadoopConfiguration())
117+
for (fs <- statuses) {
118+
// read the archived file
119+
val reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath), HoodieArchivedMetaEntry.getClassSchema)
120+
// read the avro blocks
121+
while ( {
122+
reader.hasNext && copyCount < limit
123+
}) {
124+
val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
125+
try {
126+
val recordItr = blk.getRecordIterator
127+
try while ( {
128+
recordItr.hasNext
129+
}) {
130+
val ir = recordItr.next
131+
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
132+
// metadata record from the entry and convert it to json.
133+
val archiveEntryRecord = SpecificData.get.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir).asInstanceOf[HoodieArchivedMetaEntry]
134+
val action = archiveEntryRecord.get("actionType").toString
135+
if (!actionSet.contains(action)) break() //todo: continue is not supported
136+
val metadata: GenericRecord = action match {
137+
case HoodieTimeline.CLEAN_ACTION =>
138+
archiveEntryRecord.getHoodieCleanMetadata
139+
140+
case HoodieTimeline.COMMIT_ACTION =>
141+
archiveEntryRecord.getHoodieCommitMetadata
142+
143+
case HoodieTimeline.DELTA_COMMIT_ACTION =>
144+
archiveEntryRecord.getHoodieCommitMetadata
145+
146+
case HoodieTimeline.ROLLBACK_ACTION =>
147+
archiveEntryRecord.getHoodieRollbackMetadata
148+
149+
case HoodieTimeline.SAVEPOINT_ACTION =>
150+
archiveEntryRecord.getHoodieSavePointMetadata
151+
152+
case HoodieTimeline.COMPACTION_ACTION =>
153+
archiveEntryRecord.getHoodieCompactionMetadata
154+
155+
case _ => logInfo("Unknown type of action " + action)
156+
null
157+
}
158+
val instantTime = archiveEntryRecord.get("commitTime").toString
159+
val outPath = localFolder + Path.SEPARATOR + instantTime + "." + action
160+
if (metadata != null) writeToFile(fileSystem, outPath, HoodieAvroUtils.avroToJson(metadata, true))
161+
if ( {
162+
copyCount += 1;
163+
copyCount
164+
} == limit) break //todo: break is not supported
165+
}
166+
finally if (recordItr != null) recordItr.close()
167+
}
168+
}
169+
reader.close()
170+
}
171+
copyCount
172+
}
173+
174+
@throws[Exception]
175+
private def copyNonArchivedInstants(metaClient: HoodieTableMetaClient, instants: util.List[HoodieInstant], limit: Int, localFolder: String): Int = {
176+
import scala.collection.JavaConversions._
177+
var copyCount = 0
178+
if (instants.nonEmpty) {
179+
val timeline = metaClient.getActiveTimeline
180+
val fileSystem = FSUtils.getFs(metaClient.getBasePath, jsc.hadoopConfiguration())
181+
for (instant <- instants) {
182+
val localPath = localFolder + Path.SEPARATOR + instant.getFileName
183+
val data: Array[Byte] = instant.getAction match {
184+
case HoodieTimeline.CLEAN_ACTION =>
185+
val metadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(instant).get)
186+
HoodieAvroUtils.avroToJson(metadata, true)
187+
188+
case HoodieTimeline.DELTA_COMMIT_ACTION =>
189+
// Already in json format
190+
timeline.getInstantDetails(instant).get
191+
192+
case HoodieTimeline.COMMIT_ACTION =>
193+
// Already in json format
194+
timeline.getInstantDetails(instant).get
195+
196+
case HoodieTimeline.COMPACTION_ACTION =>
197+
// Already in json format
198+
timeline.getInstantDetails(instant).get
199+
200+
case HoodieTimeline.ROLLBACK_ACTION =>
201+
val metadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get)
202+
HoodieAvroUtils.avroToJson(metadata, true)
203+
204+
case HoodieTimeline.SAVEPOINT_ACTION =>
205+
val metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(instant).get)
206+
HoodieAvroUtils.avroToJson(metadata, true)
207+
208+
case _ => null
209+
210+
}
211+
if (data != null) {
212+
writeToFile(fileSystem, localPath, data)
213+
copyCount = copyCount + 1
214+
}
215+
}
216+
}
217+
copyCount
218+
}
219+
220+
@throws[Exception]
221+
private def writeToFile(fs: FileSystem, path: String, data: Array[Byte]): Unit = {
222+
val out = fs.create(new Path(path))
223+
out.write(data)
224+
out.flush()
225+
out.close()
226+
}
227+
228+
override def build = new ExportInstantsProcedure()
229+
}
230+
231+
object ExportInstantsProcedure {
232+
val NAME = "export_instants"
233+
234+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
235+
override def get() = new ExportInstantsProcedure()
236+
}
237+
}
238+
239+

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ object HoodieProcedures {
4747
mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
4848
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
4949
mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
50+
mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
5051
mapBuilder.build
5152
}
5253
}

0 commit comments

Comments
 (0)