diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index e05bfff82994..97ef3b3a064b 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1685,3 +1685,35 @@ The connector supports redirection from Iceberg tables to Hive tables with the The connector supports configuring and using [file system caching](/object-storage/file-system-cache). + +### Manifest File Caching + +As of Iceberg version 1.1.0, Apache Iceberg provides a mechanism to cache the +contents of Iceberg manifest files in memory. This feature helps to reduce +repeated reads of small Iceberg manifest files from remote storage. + +Note that currently, manifest file caching is supported for Hive Metastore catalog. + +The following configuration properties are available: + +:::{list-table} Manifest File Caching configuration properties +:widths: 30, 58, 12 +:header-rows: 1 + +* - Property name + - Description + - Default +* - `iceberg.hive.manifest.cache-enabled` + - Enable or disable the manifest caching feature. + - `false` +* - `iceberg.hive.manifest.cache.max-total-size` + - Maximum size of cache size. + - `100MB` +* - `iceberg.hive.manifest.cache.expiration-interval-duration` + - Maximum time duration for which an entry stays in the manifest cache. + - `60s` +* - `iceberg.hive.manifest.cache.max-content-length` + - Maximum length of a manifest file to be considered for caching. Manifest files + with a length exceeding this size will not be cached. + - `8MB` +::: diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index c40982a8d02d..b5f20a9ddb2f 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -39,6 +39,11 @@ jackson-databind + + com.github.ben-manes.caffeine + caffeine + + com.google.errorprone error_prone_annotations diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index e3c1e391f7dd..953e6763ba37 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.iceberg; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -22,6 +24,8 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; import io.airlift.slice.Slices; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; @@ -64,10 +68,12 @@ import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; +import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types.NestedField; @@ -89,6 +95,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -158,6 +165,10 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; @@ -954,4 +965,37 @@ public static long getModificationTime(String path, TrinoFileSystem fileSystem) throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to get file modification time: " + path, e); } } + + public static Map loadManifestCachingProperties( + Map properties, + DataSize maxManifestCacheSize, + Duration manifestCacheExpireDuration, + DataSize manifestCacheMaxContentLength) + { + properties.put(IO_MANIFEST_CACHE_ENABLED, "true"); + properties.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(maxManifestCacheSize.toBytes())); + properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(manifestCacheExpireDuration.toMillis())); + properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(manifestCacheMaxContentLength.toBytes())); + return properties; + } + + public static Optional> createFileIOCache(boolean isManifestCachingEnabled, Duration manifestCacheExpireDuration) + { + Optional> fileIOCache; + if (isManifestCachingEnabled) { + fileIOCache = Optional.of(Caffeine.newBuilder() + .maximumSize(getMaxFileIO()) + .expireAfterAccess(manifestCacheExpireDuration.toMillis(), TimeUnit.MILLISECONDS) + .build()); + } + else { + fileIOCache = Optional.empty(); + } + return fileIOCache; + } + + public static int getMaxFileIO() + { + return SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.defaultValue(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java index b7a31bdbe817..442e2bb095fd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java @@ -13,17 +13,27 @@ */ package io.trino.plugin.iceberg.catalog.hms; +import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreFactory; +import io.trino.plugin.iceberg.CatalogType; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.spi.connector.ConnectorSession; +import org.apache.iceberg.io.FileIO; +import java.util.HashMap; import java.util.Optional; +import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.IcebergUtil.createFileIOCache; +import static io.trino.plugin.iceberg.IcebergUtil.loadManifestCachingProperties; import static java.util.Objects.requireNonNull; public class HiveMetastoreTableOperationsProvider @@ -31,12 +41,26 @@ public class HiveMetastoreTableOperationsProvider { private final TrinoFileSystemFactory fileSystemFactory; private final ThriftMetastoreFactory thriftMetastoreFactory; + private final boolean isManifestCachingEnabled; + private final DataSize maxManifestCacheSize; + private final Duration manifestCacheExpireDuration; + private final DataSize manifestCacheMaxContentLength; + private final Optional> fileIOCache; @Inject - public HiveMetastoreTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, ThriftMetastoreFactory thriftMetastoreFactory) + public HiveMetastoreTableOperationsProvider( + TrinoFileSystemFactory fileSystemFactory, + ThriftMetastoreFactory thriftMetastoreFactory, + IcebergHiveMetastoreCatalogConfig icebergHiveMetastoreCatalogConfig) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.thriftMetastoreFactory = requireNonNull(thriftMetastoreFactory, "thriftMetastoreFactory is null"); + requireNonNull(icebergHiveMetastoreCatalogConfig, "icebergHiveMetastoreCatalogConfig is null"); + this.isManifestCachingEnabled = icebergHiveMetastoreCatalogConfig.isManifestCachingEnabled(); + this.maxManifestCacheSize = icebergHiveMetastoreCatalogConfig.getMaxManifestCacheSize(); + this.manifestCacheExpireDuration = icebergHiveMetastoreCatalogConfig.getManifestCacheExpireDuration(); + this.manifestCacheMaxContentLength = icebergHiveMetastoreCatalogConfig.getManifestCacheMaxContentLength(); + this.fileIOCache = createFileIOCache(isManifestCachingEnabled, manifestCacheExpireDuration); } @Override @@ -48,8 +72,24 @@ public IcebergTableOperations createTableOperations( Optional owner, Optional location) { + FileIO fileIO; + if (fileIOCache.isPresent()) { + fileIO = fileIOCache.get().get( + HIVE_METASTORE, + k -> new ForwardingFileIo( + fileSystemFactory.create(session), + loadManifestCachingProperties( + new HashMap<>(), + maxManifestCacheSize, + manifestCacheExpireDuration, + manifestCacheMaxContentLength))); + } + else { + fileIO = new ForwardingFileIo(fileSystemFactory.create(session), ImmutableMap.of()); + } + return new HiveMetastoreTableOperations( - new ForwardingFileIo(fileSystemFactory.create(session)), + fileIO, ((TrinoHiveCatalog) catalog).getMetastore(), thriftMetastoreFactory.createMetastore(Optional.of(session.getIdentity())), session, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogConfig.java new file mode 100644 index 000000000000..09e66cd99093 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; + +import static io.airlift.units.DataSize.Unit.BYTE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; + +public class IcebergHiveMetastoreCatalogConfig +{ + private boolean manifestCachingEnabled; + private DataSize maxManifestCacheSize = DataSize.of(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT, BYTE); + private Duration manifestCacheExpireDuration = new Duration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT, MILLISECONDS); + private DataSize manifestCacheMaxContentLength = DataSize.of(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT, BYTE); + + public boolean isManifestCachingEnabled() + { + return manifestCachingEnabled; + } + + @Config("iceberg.hive.manifest.cache-enabled") + @ConfigDescription("Enable/disable the manifest caching feature in Hive Metastore catalog") + public IcebergHiveMetastoreCatalogConfig setManifestCachingEnabled(boolean manifestCachingEnabled) + { + this.manifestCachingEnabled = manifestCachingEnabled; + return this; + } + + public DataSize getMaxManifestCacheSize() + { + return maxManifestCacheSize; + } + + @Config("iceberg.hive.manifest.cache.max-total-size") + @ConfigDescription("Maximum total amount to cache in the manifest cache of Hive Metastore catalog") + public IcebergHiveMetastoreCatalogConfig setMaxManifestCacheSize(DataSize maxManifestCacheSize) + { + this.maxManifestCacheSize = maxManifestCacheSize; + return this; + } + + public Duration getManifestCacheExpireDuration() + { + return manifestCacheExpireDuration; + } + + @Config("iceberg.hive.manifest.cache.expiration-interval-duration") + @ConfigDescription("Maximum duration for which an entry stays in the manifest cache of Hive Metastore catalog") + public IcebergHiveMetastoreCatalogConfig setManifestCacheExpireDuration(Duration manifestCacheExpireDuration) + { + this.manifestCacheExpireDuration = manifestCacheExpireDuration; + return this; + } + + public DataSize getManifestCacheMaxContentLength() + { + return manifestCacheMaxContentLength; + } + + @Config("iceberg.hive.manifest.cache.max-content-length") + @ConfigDescription("Maximum length of a manifest file to be considered for caching in Hive Metastore catalog") + public IcebergHiveMetastoreCatalogConfig setManifestCacheMaxContentLength(DataSize manifestCacheMaxContentLength) + { + this.manifestCacheMaxContentLength = manifestCacheMaxContentLength; + return this; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java index b254247f9828..f17b174a6dbd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -44,6 +44,7 @@ public class IcebergHiveMetastoreCatalogModule protected void setup(Binder binder) { install(new ThriftMetastoreModule()); + configBinder(binder).bindConfig(IcebergHiveMetastoreCatalogConfig.class); binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(MetastoreValidator.class).asEagerSingleton(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestIcebergHiveMetastoreCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestIcebergHiveMetastoreCatalogConfig.java new file mode 100644 index 000000000000..07b1df6c4f08 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestIcebergHiveMetastoreCatalogConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.BYTE; +import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; +import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; + +public class TestIcebergHiveMetastoreCatalogConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(IcebergHiveMetastoreCatalogConfig.class) + .setManifestCachingEnabled(false) + .setMaxManifestCacheSize(DataSize.of(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT, BYTE)) + .setManifestCacheExpireDuration(new Duration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT, MILLISECONDS)) + .setManifestCacheMaxContentLength(DataSize.of(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT, BYTE))); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.hive.manifest.cache-enabled", "true") + .put("iceberg.hive.manifest.cache.max-total-size", "1GB") + .put("iceberg.hive.manifest.cache.expiration-interval-duration", "10m") + .put("iceberg.hive.manifest.cache.max-content-length", "10MB") + .buildOrThrow(); + + IcebergHiveMetastoreCatalogConfig expected = new IcebergHiveMetastoreCatalogConfig() + .setManifestCachingEnabled(true) + .setMaxManifestCacheSize(DataSize.of(1, GIGABYTE)) + .setManifestCacheExpireDuration(new Duration(10, MINUTES)) + .setManifestCacheMaxContentLength(DataSize.of(10, MEGABYTE)); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java index b13affd3017d..eae8a2327428 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java @@ -130,7 +130,7 @@ public ThriftMetastore createMetastore(Optional identity) { return thriftMetastore; } - }), + }, new IcebergHiveMetastoreCatalogConfig()), useUniqueTableLocations, false, false,