Skip to content

Commit 64376f1

Browse files
authored
Merge pull request #1 from apache/master
deal with TsFileWriteWithRowBatch
2 parents fdc8d28 + 821a3b2 commit 64376f1

File tree

11 files changed

+87
-79
lines changed

11 files changed

+87
-79
lines changed

client/src/main/java/org/apache/iotdb/cli/tool/ExportCsv.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.apache.iotdb.cli.tool;
2021

2122
import java.io.BufferedReader;
@@ -40,6 +41,7 @@
4041
import org.apache.commons.cli.Option;
4142
import org.apache.commons.cli.Options;
4243
import org.apache.commons.cli.ParseException;
44+
import org.apache.iotdb.cli.client.AbstractClient;
4345
import org.apache.iotdb.cli.exception.ArgsErrorException;
4446
import org.apache.iotdb.jdbc.Config;
4547
import org.apache.iotdb.jdbc.IoTDBConnection;
@@ -70,7 +72,9 @@ public class ExportCsv extends AbstractCsvTool {
7072
private static String targetDirectory;
7173

7274
private static final int EXPORT_PER_LINE_COUNT = 10000;
73-
75+
76+
private static String TIMESTAMP_PRECISION = "ms";
77+
7478
/**
7579
* main function of export csv tool.
7680
*/
@@ -128,7 +132,7 @@ public static void main(String[] args) throws IOException, SQLException {
128132
}
129133
} catch (ClassNotFoundException e) {
130134
System.out.println("Failed to export data because cannot find IoTDB JDBC Driver, "
131-
+ "please check whether you have imported driver or not: " + e.getMessage());
135+
+ "please check whether you have imported driver or not: " + e.getMessage());
132136
} catch (TException e) {
133137
System.out.println("Encounter an error when connecting to server, because " + e.getMessage());
134138
} catch (SQLException e) {
@@ -149,7 +153,7 @@ private static void parseSpecialParams(CommandLine commandLine)
149153
throws ArgsErrorException {
150154
targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, commandLine);
151155
targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
152-
if(targetFile == null){
156+
if (targetFile == null) {
153157
targetFile = DUMP_FILE_NAME_DEFAULT;
154158
}
155159
timeFormat = commandLine.getOptionValue(TIME_FORMAT_ARGS);
@@ -222,14 +226,15 @@ private static Options createOptions() {
222226
}
223227

224228
private static void dumpFromSqlFile(String filePath) throws IOException {
225-
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))){
229+
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
226230
String sql;
227231
int index = 0;
228232
while ((sql = reader.readLine()) != null) {
229233
try {
230234
dumpResult(sql, index);
231235
} catch (SQLException e) {
232-
System.out.println("Cannot dump data for statement " + sql + ", because : " + e.getMessage());
236+
System.out
237+
.println("Cannot dump data for statement " + sql + ", because : " + e.getMessage());
233238
}
234239
index++;
235240
}
@@ -246,18 +251,18 @@ private static void dumpFromSqlFile(String filePath) throws IOException {
246251
private static void dumpResult(String sql, int index)
247252
throws SQLException {
248253

249-
final String path = index > 0 ? targetDirectory + targetFile + ".csv" : targetDirectory + targetFile + index + ".csv";
254+
final String path = targetDirectory + targetFile + index + ".csv";
250255
File tf = new File(path);
251256
try {
252257
if (!tf.exists() && !tf.createNewFile()) {
253-
System.out.println("Could not create target file for sql statement: " + sql);
254-
return;
258+
System.out.println("Could not create target file for sql statement: " + sql);
259+
return;
255260
}
256261
} catch (IOException e) {
257-
System.out.println("Cannot create dump file "+ path + "because: " + e.getMessage());
262+
System.out.println("Cannot create dump file " + path + "because: " + e.getMessage());
258263
return;
259264
}
260-
System.out.println("Start to export data from sql statement: "+ sql);
265+
System.out.println("Start to export data from sql statement: " + sql);
261266
try (Statement statement = connection.createStatement();
262267
ResultSet rs = statement.executeQuery(sql);
263268
BufferedWriter bw = new BufferedWriter(new FileWriter(tf))) {
@@ -269,8 +274,10 @@ private static void dumpResult(String sql, int index)
269274
writeMetadata(bw, count, metadata);
270275

271276
int line = writeResultSet(rs, bw, count);
272-
System.out.println(String.format("Statement [%s] has dumped to file %s successfully! It costs "
273-
+ "%dms to export %d lines.", sql, path, System.currentTimeMillis() - startTime, line));
277+
System.out
278+
.println(String.format("Statement [%s] has dumped to file %s successfully! It costs "
279+
+ "%dms to export %d lines.", sql, path, System.currentTimeMillis() - startTime,
280+
line));
274281
} catch (IOException e) {
275282
System.out.println("Cannot dump result because: " + e.getMessage());
276283
}
@@ -300,9 +307,10 @@ private static int writeResultSet(ResultSet rs, BufferedWriter bw, int count)
300307
writeValue(rs, count, bw);
301308
}
302309
line++;
303-
if(line % EXPORT_PER_LINE_COUNT == 0){
310+
if (line % EXPORT_PER_LINE_COUNT == 0) {
304311
long tmp = System.currentTimeMillis();
305-
System.out.println(String.format("%d lines have been exported, it takes %dms", line, (tmp-timestamp)));
312+
System.out.println(
313+
String.format("%d lines have been exported, it takes %dms", line, (tmp - timestamp)));
306314
timestamp = tmp;
307315
}
308316
}
@@ -313,9 +321,11 @@ private static void writeTime(ResultSet rs, BufferedWriter bw) throws SQLExcepti
313321
ZonedDateTime dateTime;
314322
switch (timeFormat) {
315323
case "default":
316-
dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(rs.getLong(1)),
317-
zoneId);
318-
bw.write(dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + ",");
324+
long timestamp = rs.getLong(1);
325+
String str = AbstractClient
326+
.parseLongToDateWithPrecision(DateTimeFormatter.ISO_OFFSET_DATE_TIME, timestamp, zoneId,
327+
TIMESTAMP_PRECISION);
328+
bw.write(str + ",");
319329
break;
320330
case "timestamp":
321331
case "long":

docs/Documentation/UserGuide/7-TsFile/2-Usage.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
208208
* It uses the interface:
209209
* public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
210210
*/
211-
public class TsFileWrite {
211+
public class TsFileWriteWithTSRecord {
212212
213213
public static void main(String args[]) {
214214
try {
@@ -295,7 +295,7 @@ public class TsFileWriteWithRowBatch {
295295
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
296296

297297
long[] timestamps = rowBatch.timestamps;
298-
Object[] sensors = rowBatch.sensors;
298+
Object[] values = rowBatch.values;
299299

300300
long timestamp = 1;
301301
long value = 1000000L;
@@ -304,11 +304,11 @@ public class TsFileWriteWithRowBatch {
304304
int row = rowBatch.batchSize++;
305305
timestamps[row] = timestamp++;
306306
for (int i = 0; i < sensorNum; i++) {
307-
long[] sensor = (long[]) sensors[i];
307+
long[] sensor = (long[]) values[i];
308308
sensor[row] = value;
309309
}
310310
// write RowBatch to TsFile
311-
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
311+
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
312312
tsFileWriter.write(rowBatch);
313313
rowBatch.reset();
314314
}

tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static void main(String[] args) {
5959
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
6060

6161
long[] timestamps = rowBatch.timestamps;
62-
Object[] sensors = rowBatch.sensors;
62+
Object[] values = rowBatch.values;
6363

6464
long timestamp = 1;
6565
long value = 1000000L;
@@ -68,11 +68,11 @@ public static void main(String[] args) {
6868
int row = rowBatch.batchSize++;
6969
timestamps[row] = timestamp++;
7070
for (int i = 0; i < sensorNum; i++) {
71-
long[] sensor = (long[]) sensors[i];
71+
long[] sensor = (long[]) values[i];
7272
sensor[row] = value;
7373
}
7474
// write RowBatch to TsFile
75-
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
75+
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
7676
tsFileWriter.write(rowBatch);
7777
rowBatch.reset();
7878
}

tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/DeltaBinaryEncoder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
/**
30-
* <p> DeltaBinaryEncoder is a encoder for compressing data in type of integer and long.We adapt a
30+
* <p> DeltaBinaryEncoder is a encoder for compressing data in type of integer and long. We adapt a
3131
* hypothesis that contiguous data points have similar values. Thus the difference value of two
32-
* adjacent points is smaller than those two point values. One integer in java takes 32-bits. If an
32+
* adjacent points is smaller than those two point values. One integer in java takes 32-bits. If a
3333
* positive number is less than 2^m, the bits of this integer which index from m to 31 are all 0.
3434
* Given an array which length is n, if all values in input data array are all positive and less
3535
* than 2^m, we need actually m*n, but not 32*n bits to store the array. </p> <p> DeltaBinaryEncoder
3636
* calculates difference between two adjacent points and record the minimum of those difference
37-
* values firstly. Then it save two_diff value that difference minus minimum of them, to make sure
37+
* values firstly. Then it saves two_diff value that difference minus minimum of them, to make sure
3838
* all two_diff values are positive. Then it statistics the longest bit length {@code m} it takes
3939
* for each two_diff value, which means the bit length that maximum two_diff value takes. Only the
4040
* low m bits are saved into result byte array for all two_diff values. </p>

tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,25 @@ public void write(RowBatch rowBatch) throws WriteProcessException, IOException {
9191

9292
private void writeByDataType(
9393
RowBatch rowBatch, String measurementId, TSDataType dataType, int index) throws IOException {
94+
int batchSize = rowBatch.batchSize;
9495
switch (dataType) {
9596
case INT32:
96-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index]);
97+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index], batchSize);
9798
break;
9899
case INT64:
99-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index]);
100+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index], batchSize);
100101
break;
101102
case FLOAT:
102-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index]);
103+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index], batchSize);
103104
break;
104105
case DOUBLE:
105-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index]);
106+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index], batchSize);
106107
break;
107108
case BOOLEAN:
108-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index]);
109+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index], batchSize);
109110
break;
110111
case TEXT:
111-
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index]);
112+
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index], batchSize);
112113
break;
113114
default:
114115
throw new UnSupportedDataTypeException(

tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -200,85 +200,85 @@ public void write(long time, Binary value) {
200200
}
201201

202202
@Override
203-
public void write(long[] timestamps, int[] values) {
203+
public void write(long[] timestamps, int[] values, int batchSize) {
204204
this.time = timestamps[timestamps.length - 1];
205205
valueCountInOnePage += timestamps.length;
206206
if (minTimestamp == Long.MIN_VALUE) {
207207
minTimestamp = timestamps[0];
208208
}
209-
dataPageWriter.write(timestamps, values);
209+
dataPageWriter.write(timestamps, values, batchSize);
210210
pageStatistics.updateStats(values);
211211
checkPageSizeAndMayOpenANewPage();
212212
}
213213

214214
@Override
215-
public void write(long[] timestamps, long[] values) {
215+
public void write(long[] timestamps, long[] values, int batchSize) {
216216
this.time = timestamps[timestamps.length - 1];
217217
valueCountInOnePage += timestamps.length;
218218
if (minTimestamp == Long.MIN_VALUE) {
219219
minTimestamp = timestamps[0];
220220
}
221-
dataPageWriter.write(timestamps, values);
221+
dataPageWriter.write(timestamps, values, batchSize);
222222
pageStatistics.updateStats(values);
223223
checkPageSizeAndMayOpenANewPage();
224224
}
225225

226226
@Override
227-
public void write(long[] timestamps, boolean[] values) {
227+
public void write(long[] timestamps, boolean[] values, int batchSize) {
228228
this.time = timestamps[timestamps.length - 1];
229229
valueCountInOnePage += timestamps.length;
230230
if (minTimestamp == Long.MIN_VALUE) {
231231
minTimestamp = timestamps[0];
232232
}
233-
dataPageWriter.write(timestamps, values);
233+
dataPageWriter.write(timestamps, values, batchSize);
234234
pageStatistics.updateStats(values);
235235
checkPageSizeAndMayOpenANewPage();
236236
}
237237

238238
@Override
239-
public void write(long[] timestamps, float[] values) {
239+
public void write(long[] timestamps, float[] values, int batchSize) {
240240
this.time = timestamps[timestamps.length - 1];
241241
valueCountInOnePage += timestamps.length;
242242
if (minTimestamp == Long.MIN_VALUE) {
243243
minTimestamp = timestamps[0];
244244
}
245-
dataPageWriter.write(timestamps, values);
245+
dataPageWriter.write(timestamps, values, batchSize);
246246
pageStatistics.updateStats(values);
247247
checkPageSizeAndMayOpenANewPage();
248248
}
249249

250250
@Override
251-
public void write(long[] timestamps, double[] values) {
251+
public void write(long[] timestamps, double[] values, int batchSize) {
252252
this.time = timestamps[timestamps.length - 1];
253253
valueCountInOnePage += timestamps.length;
254254
if (minTimestamp == Long.MIN_VALUE) {
255255
minTimestamp = timestamps[0];
256256
}
257-
dataPageWriter.write(timestamps, values);
257+
dataPageWriter.write(timestamps, values, batchSize);
258258
pageStatistics.updateStats(values);
259259
checkPageSizeAndMayOpenANewPage();
260260
}
261261

262262
@Override
263-
public void write(long[] timestamps, BigDecimal[] values) {
263+
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
264264
this.time = timestamps[timestamps.length - 1];
265265
valueCountInOnePage += timestamps.length;
266266
if (minTimestamp == Long.MIN_VALUE) {
267267
minTimestamp = timestamps[0];
268268
}
269-
dataPageWriter.write(timestamps, values);
269+
dataPageWriter.write(timestamps, values, batchSize);
270270
pageStatistics.updateStats(values);
271271
checkPageSizeAndMayOpenANewPage();
272272
}
273273

274274
@Override
275-
public void write(long[] timestamps, Binary[] values) {
275+
public void write(long[] timestamps, Binary[] values, int batchSize) {
276276
this.time = timestamps[timestamps.length - 1];
277277
valueCountInOnePage += timestamps.length;
278278
if (minTimestamp == Long.MIN_VALUE) {
279279
minTimestamp = timestamps[0];
280280
}
281-
dataPageWriter.write(timestamps, values);
281+
dataPageWriter.write(timestamps, values, batchSize);
282282
pageStatistics.updateStats(values);
283283
checkPageSizeAndMayOpenANewPage();
284284
}

tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,37 +68,37 @@ public interface IChunkWriter {
6868
/**
6969
* write time series
7070
*/
71-
void write(long[] timestamps, int[] values);
71+
void write(long[] timestamps, int[] values, int batchSize);
7272

7373
/**
7474
* write time series
7575
*/
76-
void write(long[] timestamps, long[] values);
76+
void write(long[] timestamps, long[] values, int batchSize);
7777

7878
/**
7979
* write time series
8080
*/
81-
void write(long[] timestamps, boolean[] values);
81+
void write(long[] timestamps, boolean[] values, int batchSize);
8282

8383
/**
8484
* write time series
8585
*/
86-
void write(long[] timestamps, float[] values);
86+
void write(long[] timestamps, float[] values, int batchSize);
8787

8888
/**
8989
* write time series
9090
*/
91-
void write(long[] timestamps, double[] values);
91+
void write(long[] timestamps, double[] values, int batchSize);
9292

9393
/**
9494
* write time series
9595
*/
96-
void write(long[] timestamps, BigDecimal[] values);
96+
void write(long[] timestamps, BigDecimal[] values, int batchSize);
9797

9898
/**
9999
* write time series
100100
*/
101-
void write(long[] timestamps, Binary[] values);
101+
void write(long[] timestamps, Binary[] values, int batchSize);
102102

103103
/**
104104
* flush data to TsFileIOWriter.

0 commit comments

Comments
 (0)