Skip to content

Commit fdc8d28

Browse files
yuyuankangJialin Qiao
authored andcommitted
implement rpc compression (#323)
* add rpc compression parameter
1 parent 1c83237 commit fdc8d28

File tree

8 files changed

+60
-7
lines changed

8 files changed

+60
-7
lines changed

jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ private Config(){}
5252

5353
public static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver";
5454

55+
public static boolean rpcThriftCompressionEnable = false;
56+
5557
}

jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
5858
import org.apache.thrift.TException;
5959
import org.apache.thrift.protocol.TBinaryProtocol;
60+
import org.apache.thrift.protocol.TCompactProtocol;
6061
import org.apache.thrift.transport.TSocket;
6162
import org.apache.thrift.transport.TTransportException;
6263
import org.slf4j.Logger;
@@ -89,7 +90,12 @@ public IoTDBConnection(String url, Properties info) throws SQLException, TTransp
8990
supportedProtocols.add(TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1);
9091

9192
openTransport();
92-
client = new TSIService.Client(new TBinaryProtocol(transport));
93+
if(Config.rpcThriftCompressionEnable) {
94+
client = new TSIService.Client(new TCompactProtocol(transport));
95+
}
96+
else {
97+
client = new TSIService.Client(new TBinaryProtocol(transport));
98+
}
9399
// open client session
94100
openSession();
95101
// Wrap the client with a thread-safe proxy to serialize the RPC calls
@@ -463,7 +469,12 @@ public boolean reconnect() {
463469
if (transport != null) {
464470
transport.close();
465471
openTransport();
466-
client = new TSIService.Client(new TBinaryProtocol(transport));
472+
if(Config.rpcThriftCompressionEnable) {
473+
client = new TSIService.Client(new TCompactProtocol(transport));
474+
}
475+
else {
476+
client = new TSIService.Client(new TBinaryProtocol(transport));
477+
}
467478
openSession();
468479
client = newSynchronizedClient(client);
469480
flag = true;

server/src/assembly/resources/conf/iotdb-engine.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ rpc_address=0.0.0.0
2525

2626
rpc_port=6667
2727

28+
rpc_thrift_compression_enable=false
29+
2830
####################
2931
### Dynamic Parameter Adapter Configuration
3032
####################

server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class IoTDBConfig {
3838

3939
private String rpcAddress = "0.0.0.0";
4040

41+
/**
42+
* whether to use thrift compression.
43+
*/
44+
private boolean rpcThriftCompressionEnable = false;
45+
4146
/**
4247
* Port which the JDBC server listens to.
4348
*/
@@ -633,6 +638,14 @@ public void setMemtableSizeThreshold(long memtableSizeThreshold) {
633638
this.memtableSizeThreshold = memtableSizeThreshold;
634639
}
635640

641+
public boolean isRpcThriftCompressionEnable() {
642+
return rpcThriftCompressionEnable;
643+
}
644+
645+
public void setRpcThriftCompressionEnable(boolean rpcThriftCompressionEnable) {
646+
this.rpcThriftCompressionEnable = rpcThriftCompressionEnable;
647+
}
648+
636649
public boolean isMetaDataCacheEnable() {
637650
return metaDataCacheEnable;
638651
}

server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ private void loadProps() {
120120

121121
conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
122122

123+
conf.setRpcThriftCompressionEnable(Boolean.parseBoolean(properties.getProperty("rpc_thrift_compression_enable",
124+
Boolean.toString(conf.isRpcThriftCompressionEnable()))));
125+
123126
conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
124127
Integer.toString(conf.getRpcPort()))));
125128

server/src/main/java/org/apache/iotdb/db/service/JDBCService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
3131
import org.apache.thrift.protocol.TBinaryProtocol;
3232
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
33+
import org.apache.thrift.protocol.TCompactProtocol;
34+
import org.apache.thrift.protocol.TProtocolFactory;
3335
import org.apache.thrift.server.TServer;
3436
import org.apache.thrift.server.TThreadPoolServer;
3537
import org.apache.thrift.transport.TServerSocket;
@@ -49,7 +51,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
4951
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
5052
getID().getJmxName());
5153
private Thread jdbcServiceThread;
52-
private Factory protocolFactory;
54+
private TProtocolFactory protocolFactory;
5355
private Processor<TSIService.Iface> processor;
5456
private TThreadPoolServer.Args poolArgs;
5557
private TSServiceImpl impl;
@@ -188,7 +190,12 @@ private class JDBCServiceThread extends Thread {
188190

189191
public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
190192
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
191-
protocolFactory = new TBinaryProtocol.Factory();
193+
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
194+
protocolFactory = new TCompactProtocol.Factory();
195+
}
196+
else {
197+
protocolFactory = new TBinaryProtocol.Factory();
198+
}
192199
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
193200
impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
194201
processor = new TSIService.Processor<>(impl);

server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
3232
import org.apache.thrift.protocol.TBinaryProtocol;
3333
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
34+
import org.apache.thrift.protocol.TCompactProtocol;
35+
import org.apache.thrift.protocol.TProtocolFactory;
3436
import org.apache.thrift.server.TServer;
3537
import org.apache.thrift.server.TThreadPoolServer;
3638
import org.apache.thrift.transport.TServerSocket;
@@ -98,7 +100,7 @@ private class SyncServiceThread extends Thread {
98100

99101
private TServerSocket serverTransport;
100102
private TServer poolServer;
101-
private Factory protocolFactory;
103+
private TProtocolFactory protocolFactory;
102104
private Processor<SyncService.Iface> processor;
103105
private TThreadPoolServer.Args poolArgs;
104106

@@ -111,7 +113,12 @@ public void run() {
111113
try {
112114
serverTransport = new TServerSocket(
113115
new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
114-
protocolFactory = new TBinaryProtocol.Factory();
116+
if(conf.isRpcThriftCompressionEnable()) {
117+
protocolFactory = new TCompactProtocol.Factory();
118+
}
119+
else {
120+
protocolFactory = new TBinaryProtocol.Factory();
121+
}
115122
processor = new SyncService.Processor<>(new SyncServiceImpl());
116123
poolArgs = new TThreadPoolServer.Args(serverTransport);
117124
poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,

server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.commons.io.FileUtils;
4747
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
4848
import org.apache.iotdb.db.concurrent.ThreadName;
49+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
4950
import org.apache.iotdb.db.exception.SyncConnectionException;
5051
import org.apache.iotdb.db.sync.conf.Constans;
5152
import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
@@ -55,6 +56,7 @@
5556
import org.apache.iotdb.service.sync.thrift.SyncService;
5657
import org.apache.thrift.TException;
5758
import org.apache.thrift.protocol.TBinaryProtocol;
59+
import org.apache.thrift.protocol.TCompactProtocol;
5860
import org.apache.thrift.protocol.TProtocol;
5961
import org.apache.thrift.transport.TSocket;
6062
import org.apache.thrift.transport.TTransport;
@@ -265,7 +267,13 @@ public void syncAllData() throws SyncConnectionException {
265267
@Override
266268
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
267269
transport = new TSocket(serverIp, serverPort);
268-
TProtocol protocol = new TBinaryProtocol(transport);
270+
TProtocol protocol;
271+
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
272+
protocol = new TCompactProtocol(transport);
273+
}
274+
else {
275+
protocol = new TBinaryProtocol(transport);
276+
}
269277
serviceClient = new SyncService.Client(protocol);
270278
try {
271279
transport.open();

0 commit comments

Comments
 (0)