Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,33 @@ public int encodePadding(
return MAX_PADDING_LENGTH;
}

@Override
public boolean validate(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next,
Validator validator)
{
int schemaId = NO_SCHEMA_ID;
int progress = 0;
boolean status = false;
if (data.getByte(index) == MAGIC_BYTE)
{
progress += BitUtil.SIZE_OF_BYTE;
schemaId = data.getInt(index + progress, ByteOrder.BIG_ENDIAN);
progress += BitUtil.SIZE_OF_INT;
}

if (schemaId != NO_SCHEMA_ID)
{
status = validator.accept(traceId, bindingId, schemaId, data, index + progress, length - progress, next);
}
return status;
}

private String sendHttpRequest(
String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.time.Duration;
Expand Down Expand Up @@ -110,4 +111,19 @@ public void shouldResolveSchemaIdFromData()

assertEquals(9, schemaId);
}

@Test
public void shouldResolveSchemaIdAndValidate()
{

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, catalogConfig, context);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {0x00, 0x00, 0x00, 0x00, 0x09, 0x06, 0x69, 0x64,
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);

assertTrue(catalog.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Validator.IDENTITY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ int accept(
ValueConsumer next);
}

@FunctionalInterface
interface Validator
{
Validator IDENTITY = (traceId, bindingId, schemaId, data, index, length, next) -> true;

boolean accept(
long traceId,
long bindingId,
int schemaId,
DirectBuffer data,
int index,
int length,
ValueConsumer next);
}

default int register(
String subject,
String schema)
Expand Down Expand Up @@ -102,6 +117,18 @@ default int decode(
return decoder.accept(traceId, bindingId, NO_SCHEMA_ID, data, index, length, next);
}

default boolean validate(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next,
Validator validator)
{
return validator.accept(traceId, bindingId, NO_SCHEMA_ID, data, index, length, next);
}

default int encode(
long traceId,
long bindingId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.aklivity.zilla.runtime.engine.test.internal.catalog;

import java.nio.ByteOrder;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
Expand Down Expand Up @@ -102,6 +105,21 @@ public int encodePadding(
return prefix != null ? prefix.capacity() : 0;
}

@Override
public boolean validate(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next,
Validator validator)
{
int schemaId = data.getInt(index, ByteOrder.BIG_ENDIAN);
return validator.accept(traceId, bindingId, schemaId, data,
index + BitUtil.SIZE_OF_INT, length - BitUtil.SIZE_OF_INT, next);
}

@Override
public String location()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.test.internal.catalog;

import static org.junit.Assert.assertTrue;

import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

public class ValidatorTest
{
@Test
public void shouldCreateAndVerifyIdentityValidator()
{
CatalogHandler.Validator validator = CatalogHandler.Validator.IDENTITY;

assertTrue(validator.accept(0L, 0L, 1, new UnsafeBuffer(), 1, 1, ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.model.json.internal;

import java.io.InputStream;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParsingException;
Expand Down Expand Up @@ -68,23 +70,19 @@ public boolean validate(

if ((flags & FLAGS_FIN) != 0x00)
{
in.wrap(buffer, 0, progress);

int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);

JsonProvider provider = supplyProvider(schemaId);
if (catalog != null && "encoded".equals(catalog.strategy))
{
status = handler.validate(traceId, bindingId, buffer, 0, progress, next, this::validatePayload);
}
else
{
in.wrap(buffer, 0, progress);

status &= provider != null;
int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);

if (status)
{
parser = provider.createParser(in);
while (parser.hasNext())
{
parser.next();
}
status = validatePayload(schemaId, in);
}
}
}
Expand All @@ -96,4 +94,37 @@ public boolean validate(

return status;
}

private boolean validatePayload(
long traceId,
long bindingId,
int schemaId,
DirectBuffer data,
int index,
int length,
ValueConsumer next)
{
in.wrap(data, index, length);
return validatePayload(schemaId, in);
}

private boolean validatePayload(
int schemaId,
InputStream in)
{
boolean status = true;
JsonProvider provider = supplyProvider(schemaId);

status &= provider != null;

if (status)
{
parser = provider.createParser(in);
while (parser.hasNext())
{
parser.next();
}
}
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Clock;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -282,4 +283,82 @@ public void shouldVerifyValidJsonArray()

assertTrue(validator.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
public void shouldVerifyValidCompleteJsonObjectWithEncoded()
{
TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
.namespace("test")
.name("test0")
.type("test")
.options(TestCatalogOptionsConfig::builder)
.id(9)
.schema(OBJECT_SCHEMA)
.build()
.build();

JsonModelConfig model = JsonModelConfig.builder()
.catalog()
.name("test0")
.schema()
.strategy("encoded")
.build()
.build()
.build();

when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
JsonValidatorHandler validator = new JsonValidatorHandler(model, context);

byte[] encoded = {0x00, 0x00, 0x00, 0x09};
byte[] event = """
{
"id": "123",
"status": "OK"
}""".getBytes();

int length = encoded.length + event.length;

ExpandableDirectByteBuffer data = new ExpandableDirectByteBuffer(length);
data.putBytes(0, encoded, 0, encoded.length);
data.putBytes(encoded.length, event, 0, event.length);

assertTrue(validator.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
public void shouldVerifyInvalidCompleteJsonObjectWithEncoded()
{
TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
.namespace("test")
.name("test0")
.type("test")
.options(TestCatalogOptionsConfig::builder)
.id(9)
.schema(OBJECT_SCHEMA)
.build()
.build();

JsonModelConfig model = JsonModelConfig.builder()
.catalog()
.name("test0")
.schema()
.strategy("encoded")
.build()
.build()
.build();

when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
JsonValidatorHandler validator = new JsonValidatorHandler(model, context);

byte[] event = """
{
"id": "123",
"status": "OK"
}""".getBytes();

DirectBuffer data = new UnsafeBuffer();
data.wrap(event, 0, event.length);

assertFalse(validator.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
"strategy":
{
"type": "string",
"enum": [ "topic" ]
"enum": [ "topic", "encoded" ]
},
"version":
{
Expand Down
Loading