Skip to content

Commit 5bc375e

Browse files
author
Song binghua
committed
fix crlf
1 parent 6d353b6 commit 5bc375e

File tree

8 files changed

+3716
-3716
lines changed

8 files changed

+3716
-3716
lines changed

cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java

Lines changed: 461 additions & 461 deletions
Large diffs are not rendered by default.
Lines changed: 101 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,101 @@
1-
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing, software
13-
* distributed under the License is distributed on an "AS IS" BASIS,
14-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* See the License for the specific language governing permissions and
16-
* limitations under the License.
17-
*/
18-
19-
package org.apache.iotdb.flink;
20-
21-
import org.apache.iotdb.flink.options.IoTDBSourceOptions;
22-
import org.apache.iotdb.rpc.IoTDBConnectionException;
23-
import org.apache.iotdb.rpc.StatementExecutionException;
24-
import org.apache.iotdb.session.Session;
25-
import org.apache.iotdb.session.SessionDataSet;
26-
import org.apache.iotdb.tsfile.read.common.RowRecord;
27-
28-
import org.apache.flink.configuration.Configuration;
29-
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
30-
import org.apache.thrift.transport.TTransportException;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
33-
34-
public abstract class IoTDBSource<T> extends RichSourceFunction<T> {
35-
36-
private static final Logger LOG = LoggerFactory.getLogger(IoTDBSource.class);
37-
private static final long serialVersionUID = 1L;
38-
private IoTDBSourceOptions sourceOptions;
39-
40-
private transient Session session;
41-
private transient SessionDataSet dataSet;
42-
43-
protected IoTDBSource(IoTDBSourceOptions ioTDBSourceOptions) {
44-
this.sourceOptions = ioTDBSourceOptions;
45-
}
46-
47-
@Override
48-
public void open(Configuration parameters) throws Exception {
49-
super.open(parameters);
50-
initSession();
51-
}
52-
53-
/**
54-
* Convert raw data (in form of RowRecord) extracted from IoTDB to user-defined data type
55-
*
56-
* @param rowRecord row record from IoTDB
57-
* @return object in user-defined form
58-
*/
59-
public abstract T convert(RowRecord rowRecord);
60-
61-
@Override
62-
public void run(SourceContext<T> sourceContext) throws Exception {
63-
dataSet = session.executeQueryStatement(sourceOptions.getSql());
64-
dataSet.setFetchSize(sourceOptions.getFetchSize());
65-
while (dataSet.hasNext()) {
66-
sourceContext.collect(convert(dataSet.next()));
67-
}
68-
dataSet.closeOperationHandle();
69-
}
70-
71-
@Override
72-
public void cancel() {
73-
try {
74-
dataSet.closeOperationHandle();
75-
} catch (StatementExecutionException | IoTDBConnectionException e) {
76-
LOG.error(e.getMessage());
77-
}
78-
}
79-
80-
@Override
81-
public void close() throws Exception {
82-
super.close();
83-
try {
84-
dataSet.closeOperationHandle();
85-
} catch (StatementExecutionException | IoTDBConnectionException e) {
86-
throw e;
87-
} finally {
88-
session.close();
89-
}
90-
}
91-
92-
void initSession() throws IoTDBConnectionException, TTransportException {
93-
session =
94-
new Session(
95-
sourceOptions.getHost(),
96-
sourceOptions.getPort(),
97-
sourceOptions.getUser(),
98-
sourceOptions.getPassword());
99-
session.open();
100-
}
101-
}
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iotdb.flink;
20+
21+
import org.apache.iotdb.flink.options.IoTDBSourceOptions;
22+
import org.apache.iotdb.rpc.IoTDBConnectionException;
23+
import org.apache.iotdb.rpc.StatementExecutionException;
24+
import org.apache.iotdb.session.Session;
25+
import org.apache.iotdb.session.SessionDataSet;
26+
import org.apache.iotdb.tsfile.read.common.RowRecord;
27+
28+
import org.apache.flink.configuration.Configuration;
29+
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
30+
import org.apache.thrift.transport.TTransportException;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
public abstract class IoTDBSource<T> extends RichSourceFunction<T> {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(IoTDBSource.class);
37+
private static final long serialVersionUID = 1L;
38+
private IoTDBSourceOptions sourceOptions;
39+
40+
private transient Session session;
41+
private transient SessionDataSet dataSet;
42+
43+
protected IoTDBSource(IoTDBSourceOptions ioTDBSourceOptions) {
44+
this.sourceOptions = ioTDBSourceOptions;
45+
}
46+
47+
@Override
48+
public void open(Configuration parameters) throws Exception {
49+
super.open(parameters);
50+
initSession();
51+
}
52+
53+
/**
54+
* Convert raw data (in form of RowRecord) extracted from IoTDB to user-defined data type
55+
*
56+
* @param rowRecord row record from IoTDB
57+
* @return object in user-defined form
58+
*/
59+
public abstract T convert(RowRecord rowRecord);
60+
61+
@Override
62+
public void run(SourceContext<T> sourceContext) throws Exception {
63+
dataSet = session.executeQueryStatement(sourceOptions.getSql());
64+
dataSet.setFetchSize(sourceOptions.getFetchSize());
65+
while (dataSet.hasNext()) {
66+
sourceContext.collect(convert(dataSet.next()));
67+
}
68+
dataSet.closeOperationHandle();
69+
}
70+
71+
@Override
72+
public void cancel() {
73+
try {
74+
dataSet.closeOperationHandle();
75+
} catch (StatementExecutionException | IoTDBConnectionException e) {
76+
LOG.error(e.getMessage());
77+
}
78+
}
79+
80+
@Override
81+
public void close() throws Exception {
82+
super.close();
83+
try {
84+
dataSet.closeOperationHandle();
85+
} catch (StatementExecutionException | IoTDBConnectionException e) {
86+
throw e;
87+
} finally {
88+
session.close();
89+
}
90+
}
91+
92+
void initSession() throws IoTDBConnectionException, TTransportException {
93+
session =
94+
new Session(
95+
sourceOptions.getHost(),
96+
sourceOptions.getPort(),
97+
sourceOptions.getUser(),
98+
sourceOptions.getPassword());
99+
session.open();
100+
}
101+
}

0 commit comments

Comments
 (0)