From 0a32af5f74d91e675f255e1aecfd539d2b810837 Mon Sep 17 00:00:00 2001 From: John Fallows Date: Fri, 16 Aug 2024 11:38:19 -0700 Subject: [PATCH] Ensure id encoding is consistent for encode and decode --- .../internal/ApicurioCatalogHandler.java | 58 +++-- .../src/main/zilla/internal.idl | 8 +- .../internal/ApicurioCatalogHandlerTest.java | 229 ++++++++++++++++++ .../apicurio/internal/ApicurioCatalogIT.java | 80 ------ 4 files changed, 280 insertions(+), 95 deletions(-) create mode 100644 runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandlerTest.java diff --git a/runtime/catalog-apicurio/src/main/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandler.java b/runtime/catalog-apicurio/src/main/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandler.java index ebbc32c72e..0f435119af 100644 --- a/runtime/catalog-apicurio/src/main/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandler.java +++ b/runtime/catalog-apicurio/src/main/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandler.java @@ -38,12 +38,14 @@ import jakarta.json.JsonReader; import jakarta.json.stream.JsonParsingException; +import org.agrona.BitUtil; import org.agrona.DirectBuffer; import org.agrona.collections.Int2ObjectCache; import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.catalog.apicurio.config.ApicurioOptionsConfig; -import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioPrefixFW; +import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioDefaultIdFW; +import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioLegacyIdFW; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; @@ -60,9 +62,12 @@ public class ApicurioCatalogHandler implements CatalogHandler private static final long RESET_RETRY_DELAY_MS_DEFAULT = 0L; private static final long RETRY_INITIAL_DELAY_MS_DEFAULT = 1000L; - private final ApicurioPrefixFW.Builder prefixRW = new ApicurioPrefixFW.Builder() + private final ApicurioLegacyIdFW.Builder legacyIdRW = new ApicurioLegacyIdFW.Builder() .wrap(new UnsafeBuffer(new byte[5]), 0, 5); + private final ApicurioDefaultIdFW.Builder defaultIdRW = new ApicurioDefaultIdFW.Builder() + .wrap(new UnsafeBuffer(new byte[9]), 0, 9); + private final HttpClient client; private final String baseUrl; private final CRC32C crc32c; @@ -73,13 +78,14 @@ public class ApicurioCatalogHandler implements CatalogHandler private final long catalogId; private final String groupId; private final String useId; + private final IdDecoder decodeId; private final IdEncoder encodeId; - private final int idSize; + private final int sizeofId; + private final int encodePadding; private final String artifactPath; private final ConcurrentMap> cachedArtifacts; private final ConcurrentMap> cachedArtifactIds; - public ApicurioCatalogHandler( ApicurioOptionsConfig config, EngineContext context, @@ -87,6 +93,7 @@ public ApicurioCatalogHandler( { this(config, context, catalogId, new ApicurioCache()); } + public ApicurioCatalogHandler( ApicurioOptionsConfig config, EngineContext context, @@ -101,8 +108,10 @@ public ApicurioCatalogHandler( this.maxAgeMillis = config.maxAge.toMillis(); this.groupId = config.groupId; this.useId = config.useId; + this.decodeId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? this::decodeLegacyId : this::decodeDefaultId; this.encodeId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? this::encodeLegacyId : this::encodeDefaultId; - this.idSize = config.idEncoding.equals(LEGACY_ID_ENCODING) ? SIZE_OF_INT : SIZE_OF_LONG; + this.sizeofId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? SIZE_OF_INT : SIZE_OF_LONG; + this.encodePadding = BitUtil.SIZE_OF_BYTE + sizeofId; this.artifactPath = useId.equals(CONTENT_ID) ? ARTIFACT_BY_CONTENT_ID_PATH : ARTIFACT_BY_GLOBAL_ID_PATH; this.event = new ApicurioEventContext(context); this.catalogId = catalogId; @@ -325,7 +334,7 @@ public int resolve( int schemaId = NO_SCHEMA_ID; if (data.getByte(index) == MAGIC_BYTE) { - schemaId = encodeId.encode(data, index + SIZE_OF_BYTE); + schemaId = decodeId.decode(data, index + SIZE_OF_BYTE); } return schemaId; } @@ -346,8 +355,8 @@ public int decode( if (data.getByte(index) == MAGIC_BYTE) { progress += SIZE_OF_BYTE; - schemaId = encodeId.encode(data, index + progress); - progress += idSize; + schemaId = decodeId.decode(data, index + progress); + progress += sizeofId; } if (schemaId > NO_SCHEMA_ID) @@ -368,17 +377,16 @@ public int encode( ValueConsumer next, Encoder encoder) { - ApicurioPrefixFW prefix = prefixRW.rewrap().schemaId(schemaId).build(); - next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof()); + int prefixLen = encodeId.encode(schemaId, next); int valLength = encoder.accept(traceId, bindingId, schemaId, data, index, length, next); - return valLength > 0 ? prefix.sizeof() + valLength : -1; + return valLength > 0 ? prefixLen + valLength : -1; } @Override public int encodePadding( int length) { - return MAX_PADDING_LENGTH; + return encodePadding; } private URI toURI( @@ -415,13 +423,29 @@ private int resolveId( } private int encodeDefaultId( + int schemaId, ValueConsumer next) + { + ApicurioDefaultIdFW prefix = defaultIdRW.rewrap().schemaId(schemaId).build(); + next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof()); + return prefix.sizeof(); + } + + private int encodeLegacyId( + int schemaId, ValueConsumer next) + { + ApicurioLegacyIdFW prefix = legacyIdRW.rewrap().schemaId(schemaId).build(); + next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof()); + return prefix.sizeof(); + } + + private int decodeDefaultId( DirectBuffer data, int index) { return (int) data.getLong(index, ByteOrder.BIG_ENDIAN); } - private int encodeLegacyId( + private int decodeLegacyId( DirectBuffer data, int index) { @@ -431,6 +455,12 @@ private int encodeLegacyId( @FunctionalInterface private interface IdEncoder { - int encode(DirectBuffer data, int index); + int encode(int schemaId, ValueConsumer next); + } + + @FunctionalInterface + private interface IdDecoder + { + int decode(DirectBuffer data, int index); } } diff --git a/runtime/catalog-apicurio/src/main/zilla/internal.idl b/runtime/catalog-apicurio/src/main/zilla/internal.idl index 6e106c2a6a..b7f3a6b3aa 100644 --- a/runtime/catalog-apicurio/src/main/zilla/internal.idl +++ b/runtime/catalog-apicurio/src/main/zilla/internal.idl @@ -16,7 +16,13 @@ scope internal { option byteorder network; - struct ApicurioPrefix + struct ApicurioDefaultId + { + uint8 magic = 0; + int64 schemaId; + } + + struct ApicurioLegacyId { uint8 magic = 0; int32 schemaId; diff --git a/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandlerTest.java b/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandlerTest.java new file mode 100644 index 0000000000..1a50a002fb --- /dev/null +++ b/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogHandlerTest.java @@ -0,0 +1,229 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.catalog.apicurio.internal; + +import static org.agrona.BitUtil.SIZE_OF_BYTE; +import static org.agrona.BitUtil.SIZE_OF_INT; +import static org.agrona.BitUtil.SIZE_OF_LONG; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import java.time.Duration; + +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; +import org.junit.Test; + +import io.aklivity.zilla.runtime.catalog.apicurio.config.ApicurioOptionsConfig; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; +import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; + +public class ApicurioCatalogHandlerTest +{ + private static final int SIZE_OF_DEFAULT_PREFIX = SIZE_OF_BYTE + SIZE_OF_LONG; + private static final int SIZE_OF_LEGACY_PREFIX = SIZE_OF_BYTE + SIZE_OF_INT; + + private EngineContext context = mock(EngineContext.class); + + @Test + public void shouldVerifyDefaultEncodePadding() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + int actual = catalog.encodePadding(0); + + assertEquals(SIZE_OF_DEFAULT_PREFIX, actual); + } + + @Test + public void shouldVerifyLegacyEncodePadding() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .idEncoding("legacy") + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + int actual = catalog.encodePadding(0); + + assertEquals(SIZE_OF_LEGACY_PREFIX, actual); + } + + @Test + public void shouldEncodeDefaultSchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + DirectBuffer data = new UnsafeBuffer(); + + byte[] bytes = { + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.encode(0L, 0L, 1, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Encoder.IDENTITY); + + assertEquals(SIZE_OF_DEFAULT_PREFIX + bytes.length, actual); + } + + @Test + public void shouldEncodeLegacySchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .idEncoding("legacy") + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + DirectBuffer data = new UnsafeBuffer(); + + byte[] bytes = { + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.encode(0L, 0L, 1, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Encoder.IDENTITY); + + assertEquals(SIZE_OF_LEGACY_PREFIX + bytes.length, actual); + } + + @Test + public void shouldDecodeDefaultSchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + DirectBuffer data = new UnsafeBuffer(); + + byte[] bytes = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY); + + assertEquals(data.capacity() - SIZE_OF_DEFAULT_PREFIX, actual); + } + + @Test + public void shouldDecodeLegacySchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .idEncoding("legacy") + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + DirectBuffer data = new UnsafeBuffer(); + + byte[] bytes = { + 0x00, 0x00, 0x00, 0x00, 0x09, + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY); + + assertEquals(data.capacity() - SIZE_OF_LEGACY_PREFIX, actual); + } + + @Test + public void shouldResolveDefaultSchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + MutableDirectBuffer data = new UnsafeBuffer(new byte[SIZE_OF_DEFAULT_PREFIX + 13]); + + data.putByte(0, (byte) 0); + + byte[] bytes = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.resolve(data, 0, data.capacity()); + + assertEquals(9, actual); + } + + @Test + public void shouldResolveLegacySchemaId() + { + ApicurioOptionsConfig config = ApicurioOptionsConfig.builder() + .url("http://localhost:8080") + .groupId("groupId") + .maxAge(Duration.ofSeconds(1)) + .idEncoding("legacy") + .build(); + + ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); + + MutableDirectBuffer data = new UnsafeBuffer(new byte[SIZE_OF_LEGACY_PREFIX + 13]); + + data.putByte(0, (byte) 0); + + byte[] bytes = { + 0x00, 0x00, 0x00, 0x00, 0x09, + 0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x69, 0x76, 0x65 }; + + data.wrap(bytes, 0, bytes.length); + + int actual = catalog.resolve(data, 0, data.capacity()); + + assertEquals(9, actual); + } +} diff --git a/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogIT.java b/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogIT.java index ca55dc20c0..010561c152 100644 --- a/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogIT.java +++ b/runtime/catalog-apicurio/src/test/java/io/aklivity/zilla/runtime/catalog/apicurio/internal/ApicurioCatalogIT.java @@ -15,15 +15,8 @@ package io.aklivity.zilla.runtime.catalog.apicurio.internal; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.assertEquals; import static org.junit.rules.RuleChain.outerRule; -import static org.mockito.Mockito.mock; -import java.time.Duration; - -import org.agrona.DirectBuffer; -import org.agrona.concurrent.UnsafeBuffer; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.DisableOnDebug; @@ -32,10 +25,6 @@ import io.aklivity.k3po.runtime.junit.annotation.Specification; import io.aklivity.k3po.runtime.junit.rules.K3poRule; -import io.aklivity.zilla.runtime.catalog.apicurio.config.ApicurioOptionsConfig; -import io.aklivity.zilla.runtime.engine.EngineContext; -import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; -import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.engine.test.EngineRule; import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; @@ -58,19 +47,6 @@ public class ApicurioCatalogIT @Rule public final TestRule chain = outerRule(engine).around(k3po).around(timeout); - private ApicurioOptionsConfig config; - private EngineContext context = mock(EngineContext.class); - - @Before - public void setup() - { - config = ApicurioOptionsConfig.builder() - .url("http://localhost:8080") - .groupId("groupId") - .maxAge(Duration.ofSeconds(1)) - .build(); - } - @Test @Configuration("resolve/artifact/global/id/zilla.yaml") @Specification({ @@ -158,60 +134,4 @@ public void shouldResolveArtifactIdViaSubjectAndVersionFailed() throws Exception { k3po.finish(); } - - @Test - public void shouldVerifyMaxPadding() - { - ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); - - assertEquals(9, catalog.encodePadding(0)); - } - - @Test - public void shouldVerifyEncodedData() - { - ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); - - DirectBuffer data = new UnsafeBuffer(); - - byte[] bytes = {0x06, 0x69, 0x64, - 0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65}; - data.wrap(bytes, 0, bytes.length); - - assertEquals(18, catalog.encode(0L, 0L, 1, data, 0, data.capacity(), - ValueConsumer.NOP, CatalogHandler.Encoder.IDENTITY)); - } - - @Test - public void shouldResolveSchemaIdAndProcessData() - { - - ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); - - DirectBuffer data = new UnsafeBuffer(); - - byte[] bytes = {0x00, 0x00, 0x00, 0x00, 0x09, 0x06, 0x69, 0x64, - 0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65}; - data.wrap(bytes, 0, bytes.length); - - int valLength = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY); - - assertEquals(data.capacity() - 9, valLength); - } - - @Test - public void shouldResolveSchemaIdFromData() - { - ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L); - - DirectBuffer data = new UnsafeBuffer(); - - byte[] bytes = {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x06, 0x69, 0x64, - 0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65}; - data.wrap(bytes, 0, bytes.length); - - int schemaId = catalog.resolve(data, 0, data.capacity()); - - assertEquals(9, schemaId); - } }