Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.index.codec.customcodecs.Lucene103QatCodec;
import org.opensearch.index.codec.customcodecs.Lucene104QatCodec;
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
import org.opensearch.test.rest.OpenSearchRestTestCase;

Expand Down Expand Up @@ -107,9 +107,9 @@ public void testCreateIndexWithQatSPICodecWithQatHardwareUnavailable() throws IO
.put(
"index.codec",
randomFrom(
Lucene103QatCodec.Mode.QAT_LZ4.getCodec(),
Lucene103QatCodec.Mode.QAT_DEFLATE.getCodec(),
Lucene103QatCodec.Mode.QAT_ZSTD.getCodec()
Lucene104QatCodec.Mode.QAT_LZ4.getCodec(),
Lucene104QatCodec.Mode.QAT_DEFLATE.getCodec(),
Lucene104QatCodec.Mode.QAT_ZSTD.getCodec()
)
)
.put("index.codec.compression_level", randomIntBetween(1, 6))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,39 +42,39 @@ public Map<String, Codec> getCodecs(MapperService mapperService, IndexSettings i
final int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(ZSTD_CODEC, new Zstd103Codec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict103Codec(compressionLevel));
codecs.put(ZSTD_CODEC, new Zstd104Codec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict104Codec(compressionLevel));
if (QatZipperFactory.isQatAvailable()) {
codecs.put(
QAT_LZ4_CODEC,
new QatLz4103Codec(compressionLevel, () -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); })
new QatLz4104Codec(compressionLevel, () -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); })
);
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate103Codec(compressionLevel, () -> {
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate104Codec(compressionLevel, () -> {
return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING);
}));
codecs.put(
QAT_ZSTD_CODEC,
new QatZstd103Codec(compressionLevel, () -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); })
new QatZstd104Codec(compressionLevel, () -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); })
);
}
} else {
codecs.put(ZSTD_CODEC, new Zstd103Codec(compressionLevel, defaultCodec));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict103Codec(compressionLevel, defaultCodec));
codecs.put(ZSTD_CODEC, new Zstd104Codec(compressionLevel, defaultCodec));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict104Codec(compressionLevel, defaultCodec));
if (QatZipperFactory.isQatAvailable()) {
codecs.put(
QAT_LZ4_CODEC,
new QatLz4103Codec(
new QatLz4104Codec(
compressionLevel,
() -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); },
defaultCodec
)
);
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate103Codec(compressionLevel, () -> {
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate104Codec(compressionLevel, () -> {
return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING);
}, defaultCodec));
codecs.put(
QAT_ZSTD_CODEC,
new QatZstd103Codec(
new QatZstd104Codec(
compressionLevel,
() -> { return indexSettings.getValue(INDEX_CODEC_QAT_MODE_SETTING); },
defaultCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ private static boolean isQatCodec(String codecName) {
|| codecName.equals(Lucene101QatCodec.Mode.QAT_LZ4.getCodec())
|| codecName.equals(Lucene101QatCodec.Mode.QAT_DEFLATE.getCodec())
|| codecName.equals(Lucene101QatCodec.Mode.QAT_ZSTD.getCodec())
|| codecName.equals(Lucene103QatCodec.Mode.QAT_LZ4.getCodec())
|| codecName.equals(Lucene103QatCodec.Mode.QAT_DEFLATE.getCodec())
|| codecName.equals(Lucene103QatCodec.Mode.QAT_ZSTD.getCodec());
|| codecName.equals(Lucene104QatCodec.Mode.QAT_LZ4.getCodec())
|| codecName.equals(Lucene104QatCodec.Mode.QAT_DEFLATE.getCodec())
|| codecName.equals(Lucene104QatCodec.Mode.QAT_ZSTD.getCodec());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.lucene104.Lucene104Codec;

import java.util.Set;
import java.util.function.Supplier;

import static org.opensearch.index.codec.customcodecs.backward_codecs.lucene99.Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL;

/**
*
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
* Supports two modes zstd and zstd_no_dict.
* Uses Lucene104 as the delegate codec
*
* @opensearch.internal
*/
public abstract class Lucene104CustomCodec extends FilterCodec {

/** Each mode represents a compression algorithm. */
public enum Mode {
/**
* ZStandard mode with dictionary
*/
ZSTD("ZSTD104", Set.of("zstd")),
/**
* ZStandard mode without dictionary
*/
ZSTD_NO_DICT("ZSTDNODICT104", Set.of("zstd_no_dict"));

private final String codec;
private final Set<String> aliases;

Mode(String codec, Set<String> aliases) {
this.codec = codec;
this.aliases = aliases;
}

/**
* Returns the Codec that is registered with Lucene
*/
public String getCodec() {
return codec;
}

/**
* Returns the aliases of the Codec
*/
public Set<String> getAliases() {
return aliases;
}
}

private final StoredFieldsFormat storedFieldsFormat;

/**
* Creates a new compression codec with the default compression level.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
*/
public Lucene104CustomCodec(Mode mode) {
this(mode, DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
*/
public Lucene104CustomCodec(Mode mode, int compressionLevel) {
super(mode.getCodec(), new Lucene104Codec());
this.storedFieldsFormat = new Lucene104CustomStoredFieldsFormat(mode, compressionLevel);
}

/**
* Creates a new compression codec with the given compression level. We use
* lowercase letters when registering the codec so that we remain consistent with
* the other compression codecs: default, lucene_default, and best_compression.
*
* @param mode The compression codec (ZSTD or ZSTDNODICT).
* @param compressionLevel The compression level.
* @param defaultCodecSupplier Default OpenSearch codec supplier
*/
public Lucene104CustomCodec(Mode mode, int compressionLevel, Supplier<Codec> defaultCodecSupplier) {
super(mode.getCodec(), defaultCodecSupplier.get());
this.storedFieldsFormat = new Lucene104CustomStoredFieldsFormat(mode, compressionLevel);
}

@Override
public StoredFieldsFormat storedFieldsFormat() {
return storedFieldsFormat;
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.customcodecs;

import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.index.codec.customcodecs.backward_codecs.lucene99.Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL;

/** Stored field format used by pluggable codec */
public class Lucene104CustomStoredFieldsFormat extends StoredFieldsFormat {

/** A key that we use to map to a mode */
public static final String MODE_KEY = Lucene104CustomStoredFieldsFormat.class.getSimpleName() + ".mode";

protected static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024;
protected static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096;
protected static final int ZSTD_BLOCK_SHIFT = 10;

private final CompressionMode zstdCompressionMode;
private final CompressionMode zstdNoDictCompressionMode;

private final Lucene104CustomCodec.Mode mode;
private final int compressionLevel;

/** default constructor */
public Lucene104CustomStoredFieldsFormat() {
this(Lucene104CustomCodec.Mode.ZSTD, DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new instance.
*
* @param mode The mode represents ZSTD or ZSTDNODICT
*/
public Lucene104CustomStoredFieldsFormat(Lucene104CustomCodec.Mode mode) {
this(mode, DEFAULT_COMPRESSION_LEVEL);
}

/**
* Creates a new instance with the specified mode and compression level.
*
* @param mode The mode represents ZSTD or ZSTDNODICT
* @param compressionLevel The compression level for the mode.
*/
public Lucene104CustomStoredFieldsFormat(Lucene104CustomCodec.Mode mode, int compressionLevel) {
this.mode = Objects.requireNonNull(mode);
this.compressionLevel = compressionLevel;
zstdCompressionMode = new ZstdCompressionMode(compressionLevel);
zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel);
}

/**
* Returns a {@link StoredFieldsReader} to load stored fields.
* @param directory The index directory.
* @param si The SegmentInfo that stores segment information.
* @param fn The fieldInfos.
* @param context The IOContext that holds additional details on the merge/search context.
*/
@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException {
if (si.getAttribute(MODE_KEY) != null) {
String value = si.getAttribute(MODE_KEY);
Lucene104CustomCodec.Mode mode = Lucene104CustomCodec.Mode.valueOf(value);
return impl(mode).fieldsReader(directory, si, fn, context);
} else {
throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name);
}
}

/**
* Returns a {@link StoredFieldsReader} to write stored fields.
* @param directory The index directory.
* @param si The SegmentInfo that stores segment information.
* @param context The IOContext that holds additional details on the merge/search context.
*/
@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException {
String previous = si.putAttribute(MODE_KEY, mode.name());
if (previous != null && previous.equals(mode.name()) == false) {
throw new IllegalStateException(
"found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name()
);
}
return impl(mode).fieldsWriter(directory, si, context);
}

StoredFieldsFormat impl(Lucene104CustomCodec.Mode mode) {
switch (mode) {
case ZSTD:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstd", this.zstdCompressionMode);
case ZSTD_NO_DICT:
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
default:
throw new IllegalStateException("Unsupported compression mode: " + mode);
}
}

private StoredFieldsFormat getCustomCompressingStoredFieldsFormat(String formatName, CompressionMode compressionMode) {
return new Lucene90CompressingStoredFieldsFormat(
formatName,
compressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT
);
}

public Lucene104CustomCodec.Mode getMode() {
return mode;
}

/**
* Returns the compression level.
*/
public int getCompressionLevel() {
return compressionLevel;
}

public CompressionMode getCompressionMode() {
return mode == Lucene104CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
}

}
Loading
Loading