Skip to content

Commit c032b7a

Browse files
committed
Implement CachingHadoopFileIO
1 parent 910f271 commit c032b7a

File tree

7 files changed

+696
-3
lines changed

7 files changed

+696
-3
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.hadoop;
21+
22+
import com.github.benmanes.caffeine.cache.Cache;
23+
import com.github.benmanes.caffeine.cache.Caffeine;
24+
import com.github.benmanes.caffeine.cache.Weigher;
25+
import java.time.Duration;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.iceberg.io.InputFile;
29+
import org.apache.iceberg.util.SerializableSupplier;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* Variant of {@link HadoopFileIO} that adds metadata content caching features.
35+
* <p>
36+
* This FileIO intend to speedup scan planning by caching most of the table metadata contents in memory and reduce
37+
* remote reads.
38+
*/
39+
public class CachingHadoopFileIO extends HadoopFileIO {
40+
private static final Logger LOG = LoggerFactory.getLogger(CachingHadoopFileIO.class);
41+
private static ContentCache sharedCache;
42+
43+
private static ContentCache createCache(Configuration conf) {
44+
long durationSeconds = conf.getTimeDuration(ConfigProperties.CACHE_EXPIRE_AFTER_ACCESS,
45+
DEFAULT_EXPIRE_AFTER_ACCESS, TimeUnit.SECONDS);
46+
long totalBytes = conf.getLongBytes(ConfigProperties.CACHE_MAX_TOTAL_BYTES, DEFAULT_MAX_TOTAL_BYTES);
47+
long contentLength = Math.max(0, conf.getLongBytes(ConfigProperties.CACHE_MAX_CONTENT_LENGTH,
48+
DEFAULT_MAX_CONTENT_LENGTH));
49+
50+
return new ContentCache(durationSeconds, totalBytes, contentLength);
51+
}
52+
53+
public static synchronized ContentCache getSharedCache(Configuration conf) {
54+
if (sharedCache == null) {
55+
sharedCache = createCache(conf);
56+
}
57+
return sharedCache;
58+
}
59+
60+
public static final long DEFAULT_MAX_CONTENT_LENGTH = 8 * 1024 * 1024; // 8 MB
61+
public static final long DEFAULT_MAX_TOTAL_BYTES = 100 * 1024 * 1024; // 100 MB
62+
public static final long DEFAULT_EXPIRE_AFTER_ACCESS = 60; // 60 seconds
63+
64+
private ContentCache fileContentCache;
65+
66+
/**
67+
* Constructor used for dynamic FileIO loading.
68+
* <p>
69+
* {@link Configuration Hadoop configuration} must be set through {@link HadoopFileIO#setConf(Configuration)}
70+
*/
71+
public CachingHadoopFileIO() {
72+
}
73+
74+
public CachingHadoopFileIO(Configuration hadoopConf) {
75+
this(new SerializableConfiguration(hadoopConf)::get);
76+
}
77+
78+
public CachingHadoopFileIO(SerializableSupplier<Configuration> hadoopConf) {
79+
super(hadoopConf);
80+
initCache();
81+
}
82+
83+
@Override
84+
public void setConf(Configuration conf) {
85+
super.setConf(conf);
86+
initCache();
87+
}
88+
89+
private void initCache() {
90+
boolean isSharedCache = conf().getBoolean(ConfigProperties.CACHE_SHARED, false);
91+
if (isSharedCache) {
92+
this.fileContentCache = getSharedCache(conf());
93+
LOG.info("CachingHadoopFileIO created with shared cache. {} ", this.fileContentCache);
94+
} else {
95+
this.fileContentCache = createCache(conf());
96+
LOG.info("CachingHadoopFileIO created. {} ", this.fileContentCache);
97+
}
98+
}
99+
100+
@Override
101+
public InputFile newInputFile(String path) {
102+
HadoopInputFile inFile = HadoopInputFile.fromLocation(path, conf());
103+
if (path.contains("/metadata/")) {
104+
// TODO: Currently we just believe that this is a metadata file if it has "metadata" dir on its path. But metadata
105+
// location can be set differently for different tables. A table might have different metadata path by
106+
// setting custom "write.metadata.path" TableProperties.
107+
// We might need to extend FileIO interface. Add method 'newMetadataInputFile' and let caller use it to
108+
// differentiate metadata files vs other files.
109+
inFile.setContentCache(fileContentCache);
110+
return inFile;
111+
}
112+
return inFile;
113+
}
114+
115+
@Override
116+
public void deleteFile(String path) {
117+
super.deleteFile(path);
118+
fileContentCache.cache().invalidate(path);
119+
}
120+
121+
public void invalidateAllCache() {
122+
fileContentCache.cache().invalidateAll();
123+
fileContentCache.cache().cleanUp();
124+
LOG.debug("Content cache invalidated. {}", fileContentCache);
125+
}
126+
127+
public long cacheEstimatedSize() {
128+
return fileContentCache.cache().estimatedSize();
129+
}
130+
131+
public long cacheHitCount() {
132+
return fileContentCache.cache().stats().hitCount();
133+
}
134+
135+
public long cacheMissCount() {
136+
return fileContentCache.cache().stats().missCount();
137+
}
138+
139+
public long cacheLoadSuccessCount() {
140+
return fileContentCache.cache().stats().loadSuccessCount();
141+
}
142+
143+
public long cacheLoadFailureCount() {
144+
return fileContentCache.cache().stats().loadFailureCount();
145+
}
146+
147+
public long cacheTotalLoadTime() {
148+
return fileContentCache.cache().stats().totalLoadTime();
149+
}
150+
151+
public long cacheEvictionCount() {
152+
return fileContentCache.cache().stats().evictionCount();
153+
}
154+
155+
public long cacheEvictionWeight() {
156+
return fileContentCache.cache().stats().evictionWeight();
157+
}
158+
159+
public static class ContentCache {
160+
private final long expireAfterAccess;
161+
private final long maxTotalBytes;
162+
private final long maxContentLength;
163+
private final Cache<String, byte[]> cache;
164+
165+
private ContentCache(long expireAfterAccessSecond, long maxTotalBytes, long maxContentLength) {
166+
this.expireAfterAccess = expireAfterAccessSecond;
167+
this.maxTotalBytes = maxTotalBytes;
168+
this.maxContentLength = maxContentLength;
169+
this.cache = Caffeine.newBuilder()
170+
.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSecond))
171+
.maximumWeight(maxTotalBytes)
172+
.weigher((Weigher<String, byte[]>) (key, value) -> value.length)
173+
.recordStats()
174+
.build();
175+
}
176+
177+
public long expireAfterAccess() {
178+
return expireAfterAccess;
179+
}
180+
181+
public long maxContentLength() {
182+
return maxContentLength;
183+
}
184+
185+
public long maxTotalBytes() {
186+
return maxTotalBytes;
187+
}
188+
189+
public Cache<String, byte[]> cache() {
190+
return cache;
191+
}
192+
193+
@Override
194+
public String toString() {
195+
return getClass().getSimpleName() + '{' +
196+
"expireAfterAccess=" + expireAfterAccess + ", " +
197+
"maxContentLength=" + maxContentLength + ", " +
198+
"maxTotalBytes=" + maxTotalBytes + ", " +
199+
"cacheStats=" + cache.stats() + '}';
200+
}
201+
}
202+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.hadoop;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.iceberg.io.FileIO;
24+
25+
public class CachingHadoopTables extends HadoopTables {
26+
private final CachingHadoopFileIO fileIO;
27+
28+
public CachingHadoopTables() {
29+
this(new Configuration());
30+
}
31+
32+
public CachingHadoopTables(Configuration conf) {
33+
super(conf);
34+
fileIO = new CachingHadoopFileIO(getConf());
35+
}
36+
37+
@Override
38+
protected FileIO io() {
39+
return fileIO;
40+
}
41+
42+
public void invalidateAllCache() {
43+
fileIO.invalidateAllCache();
44+
}
45+
46+
public long cacheEstimatedSize() {
47+
return fileIO.cacheEstimatedSize();
48+
}
49+
50+
public long cacheHitCount() {
51+
return fileIO.cacheHitCount();
52+
}
53+
54+
public long cacheMissCount() {
55+
return fileIO.cacheMissCount();
56+
}
57+
58+
public long cacheLoadSuccessCount() {
59+
return fileIO.cacheLoadSuccessCount();
60+
}
61+
62+
public long cacheLoadFailureCount() {
63+
return fileIO.cacheLoadFailureCount();
64+
}
65+
66+
public long cacheTotalLoadTime() {
67+
return fileIO.cacheTotalLoadTime();
68+
}
69+
70+
public long cacheEvictionCount() {
71+
return fileIO.cacheEvictionCount();
72+
}
73+
74+
public long cacheEvictionWeight() {
75+
return fileIO.cacheEvictionWeight();
76+
}
77+
}

core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ private ConfigProperties() {
2626

2727
public static final String ENGINE_HIVE_ENABLED = "iceberg.engine.hive.enabled";
2828
public static final String KEEP_HIVE_STATS = "iceberg.hive.keep.stats";
29+
public static final String CACHE_MAX_CONTENT_LENGTH = "iceberg.cache.max-content-length";
30+
public static final String CACHE_MAX_TOTAL_BYTES = "iceberg.cache.max-total-bytes";
31+
public static final String CACHE_EXPIRE_AFTER_ACCESS = "iceberg.cache.expire-after-access";
32+
public static final String CACHE_SHARED = "iceberg.cache.shared";
2933
}

0 commit comments

Comments
 (0)