Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import jakarta.json.JsonReader;
import jakarta.json.stream.JsonParsingException;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.apicurio.config.ApicurioOptionsConfig;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioPrefixFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioDefaultIdFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.ApicurioLegacyIdFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
Expand All @@ -60,9 +62,12 @@ public class ApicurioCatalogHandler implements CatalogHandler
private static final long RESET_RETRY_DELAY_MS_DEFAULT = 0L;
private static final long RETRY_INITIAL_DELAY_MS_DEFAULT = 1000L;

private final ApicurioPrefixFW.Builder prefixRW = new ApicurioPrefixFW.Builder()
private final ApicurioLegacyIdFW.Builder legacyIdRW = new ApicurioLegacyIdFW.Builder()
.wrap(new UnsafeBuffer(new byte[5]), 0, 5);

private final ApicurioDefaultIdFW.Builder defaultIdRW = new ApicurioDefaultIdFW.Builder()
.wrap(new UnsafeBuffer(new byte[9]), 0, 9);

private final HttpClient client;
private final String baseUrl;
private final CRC32C crc32c;
Expand All @@ -73,20 +78,22 @@ public class ApicurioCatalogHandler implements CatalogHandler
private final long catalogId;
private final String groupId;
private final String useId;
private final IdDecoder decodeId;
private final IdEncoder encodeId;
private final int idSize;
private final int sizeofId;
private final int encodePadding;
private final String artifactPath;
private final ConcurrentMap<Integer, CompletableFuture<CachedArtifact>> cachedArtifacts;
private final ConcurrentMap<Integer, CompletableFuture<CachedArtifactId>> cachedArtifactIds;


public ApicurioCatalogHandler(
ApicurioOptionsConfig config,
EngineContext context,
long catalogId)
{
this(config, context, catalogId, new ApicurioCache());
}

public ApicurioCatalogHandler(
ApicurioOptionsConfig config,
EngineContext context,
Expand All @@ -101,8 +108,10 @@ public ApicurioCatalogHandler(
this.maxAgeMillis = config.maxAge.toMillis();
this.groupId = config.groupId;
this.useId = config.useId;
this.decodeId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? this::decodeLegacyId : this::decodeDefaultId;
this.encodeId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? this::encodeLegacyId : this::encodeDefaultId;
this.idSize = config.idEncoding.equals(LEGACY_ID_ENCODING) ? SIZE_OF_INT : SIZE_OF_LONG;
this.sizeofId = config.idEncoding.equals(LEGACY_ID_ENCODING) ? SIZE_OF_INT : SIZE_OF_LONG;
this.encodePadding = BitUtil.SIZE_OF_BYTE + sizeofId;
this.artifactPath = useId.equals(CONTENT_ID) ? ARTIFACT_BY_CONTENT_ID_PATH : ARTIFACT_BY_GLOBAL_ID_PATH;
this.event = new ApicurioEventContext(context);
this.catalogId = catalogId;
Expand Down Expand Up @@ -325,7 +334,7 @@ public int resolve(
int schemaId = NO_SCHEMA_ID;
if (data.getByte(index) == MAGIC_BYTE)
{
schemaId = encodeId.encode(data, index + SIZE_OF_BYTE);
schemaId = decodeId.decode(data, index + SIZE_OF_BYTE);
}
return schemaId;
}
Expand All @@ -346,8 +355,8 @@ public int decode(
if (data.getByte(index) == MAGIC_BYTE)
{
progress += SIZE_OF_BYTE;
schemaId = encodeId.encode(data, index + progress);
progress += idSize;
schemaId = decodeId.decode(data, index + progress);
progress += sizeofId;
}

if (schemaId > NO_SCHEMA_ID)
Expand All @@ -368,17 +377,16 @@ public int encode(
ValueConsumer next,
Encoder encoder)
{
ApicurioPrefixFW prefix = prefixRW.rewrap().schemaId(schemaId).build();
next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof());
int prefixLen = encodeId.encode(schemaId, next);
int valLength = encoder.accept(traceId, bindingId, schemaId, data, index, length, next);
return valLength > 0 ? prefix.sizeof() + valLength : -1;
return valLength > 0 ? prefixLen + valLength : -1;
}

@Override
public int encodePadding(
int length)
{
return MAX_PADDING_LENGTH;
return encodePadding;
}

private URI toURI(
Expand Down Expand Up @@ -415,13 +423,29 @@ private int resolveId(
}

private int encodeDefaultId(
int schemaId, ValueConsumer next)
{
ApicurioDefaultIdFW prefix = defaultIdRW.rewrap().schemaId(schemaId).build();
next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof());
return prefix.sizeof();
}

private int encodeLegacyId(
int schemaId, ValueConsumer next)
{
ApicurioLegacyIdFW prefix = legacyIdRW.rewrap().schemaId(schemaId).build();
next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof());
return prefix.sizeof();
}

private int decodeDefaultId(
DirectBuffer data,
int index)
{
return (int) data.getLong(index, ByteOrder.BIG_ENDIAN);
}

private int encodeLegacyId(
private int decodeLegacyId(
DirectBuffer data,
int index)
{
Expand All @@ -431,6 +455,12 @@ private int encodeLegacyId(
@FunctionalInterface
private interface IdEncoder
{
int encode(DirectBuffer data, int index);
int encode(int schemaId, ValueConsumer next);
}

@FunctionalInterface
private interface IdDecoder
{
int decode(DirectBuffer data, int index);
}
}
8 changes: 7 additions & 1 deletion runtime/catalog-apicurio/src/main/zilla/internal.idl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ scope internal
{
option byteorder network;

struct ApicurioPrefix
struct ApicurioDefaultId
{
uint8 magic = 0;
int64 schemaId;
}

struct ApicurioLegacyId
{
uint8 magic = 0;
int32 schemaId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.apicurio.internal;

import static org.agrona.BitUtil.SIZE_OF_BYTE;
import static org.agrona.BitUtil.SIZE_OF_INT;
import static org.agrona.BitUtil.SIZE_OF_LONG;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.time.Duration;

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

import io.aklivity.zilla.runtime.catalog.apicurio.config.ApicurioOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

public class ApicurioCatalogHandlerTest
{
private static final int SIZE_OF_DEFAULT_PREFIX = SIZE_OF_BYTE + SIZE_OF_LONG;
private static final int SIZE_OF_LEGACY_PREFIX = SIZE_OF_BYTE + SIZE_OF_INT;

private EngineContext context = mock(EngineContext.class);

@Test
public void shouldVerifyDefaultEncodePadding()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

int actual = catalog.encodePadding(0);

assertEquals(SIZE_OF_DEFAULT_PREFIX, actual);
}

@Test
public void shouldVerifyLegacyEncodePadding()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.idEncoding("legacy")
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

int actual = catalog.encodePadding(0);

assertEquals(SIZE_OF_LEGACY_PREFIX, actual);
}

@Test
public void shouldEncodeDefaultSchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.encode(0L, 0L, 1, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Encoder.IDENTITY);

assertEquals(SIZE_OF_DEFAULT_PREFIX + bytes.length, actual);
}

@Test
public void shouldEncodeLegacySchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.idEncoding("legacy")
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.encode(0L, 0L, 1, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Encoder.IDENTITY);

assertEquals(SIZE_OF_LEGACY_PREFIX + bytes.length, actual);
}

@Test
public void shouldDecodeDefaultSchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09,
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);

assertEquals(data.capacity() - SIZE_OF_DEFAULT_PREFIX, actual);
}

@Test
public void shouldDecodeLegacySchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.idEncoding("legacy")
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {
0x00, 0x00, 0x00, 0x00, 0x09,
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);

assertEquals(data.capacity() - SIZE_OF_LEGACY_PREFIX, actual);
}

@Test
public void shouldResolveDefaultSchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

MutableDirectBuffer data = new UnsafeBuffer(new byte[SIZE_OF_DEFAULT_PREFIX + 13]);

data.putByte(0, (byte) 0);

byte[] bytes = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09,
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.resolve(data, 0, data.capacity());

assertEquals(9, actual);
}

@Test
public void shouldResolveLegacySchemaId()
{
ApicurioOptionsConfig config = ApicurioOptionsConfig.builder()
.url("http://localhost:8080")
.groupId("groupId")
.maxAge(Duration.ofSeconds(1))
.idEncoding("legacy")
.build();

ApicurioCatalogHandler catalog = new ApicurioCatalogHandler(config, context, 0L);

MutableDirectBuffer data = new UnsafeBuffer(new byte[SIZE_OF_LEGACY_PREFIX + 13]);

data.putByte(0, (byte) 0);

byte[] bytes = {
0x00, 0x00, 0x00, 0x00, 0x09,
0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f, 0x73,
0x69, 0x74, 0x69, 0x76, 0x65 };

data.wrap(bytes, 0, bytes.length);

int actual = catalog.resolve(data, 0, data.capacity());

assertEquals(9, actual);
}
}
Loading