Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public class SchemaRegistryCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SUBJECT_PATH = "/subjects/{0}/versions";
private static final String SCHEMA_PATH = "/schemas/ids/{0}";

private static final int MAX_PADDING_LENGTH = 5;
Expand Down Expand Up @@ -80,6 +81,22 @@ public SchemaRegistryCatalogHandler(
this.cachedSchemaIds = catalog.cache.schemaIds;
}

@Override
public int register(
String subject,
String schema)
{
int versionId = NO_VERSION_ID;

String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema);
if (response != null)
{
versionId = request.resolveResponse(response);
}

return versionId;
}

@Override
public String resolve(
int schemaId)
Expand Down Expand Up @@ -348,6 +365,30 @@ private String sendHttpRequest(
return responseBody;
}

private String sendPostHttpRequest(
String path,
String body)
{
HttpRequest httpRequest = HttpRequest
.newBuilder(toURI(baseUrl, path))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
// TODO: introduce interrupt/timeout for request to schema registry

String responseBody;
try
{
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;
}
catch (Exception ex)
{
responseBody = null;
}
return responseBody;
}

@Override
public String location()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,15 @@ public void shouldResolveSchemaIdFromCacheAndRetry() throws Exception
{
k3po.finish();
}

@Test
@Configuration("register/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${remote}/register.schema" })
public void shouldRegisterSchema() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public interface CatalogHandler
{
int NO_SCHEMA_ID = 0;
int NO_VERSION_ID = 0;

@FunctionalInterface
interface Decoder
Expand Down Expand Up @@ -61,6 +62,13 @@ int accept(
ValueConsumer next);
}

default int register(
String subject,
String schema)
{
return NO_VERSION_ID;
}

Comment on lines +65 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent.

String resolve(
int schemaId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.engine.guard.GuardHandler;
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
Expand Down Expand Up @@ -85,8 +84,9 @@ final class TestBindingFactory implements BindingHandler
private final TestEventContext event;

private ConverterHandler valueType;
private List<CatalogHandler> catalogs;
private String schema;
private SchemaConfig catalog;
private List<CatalogHandler> catalogs;
private List<CatalogAssertion> catalogAssertions;
private GuardHandler guard;
private String credentials;
Expand All @@ -106,15 +106,9 @@ final class TestBindingFactory implements BindingHandler
public void attach(
BindingConfig binding)
{
RouteConfig exit = binding.routes.stream()
binding.routes.stream()
.filter(r -> r.when.isEmpty())
.findFirst()
.orElse(null);

if (exit != null)
{
router.put(binding.id, exit.id);
}
.findFirst().ifPresent(exit -> router.put(binding.id, exit.id));

TestBindingOptionsConfig options = (TestBindingOptionsConfig) binding.options;
if (options != null)
Expand All @@ -124,9 +118,11 @@ public void attach(
this.valueType = context.supplyWriteConverter(options.value);
}

this.schema = options.schema;

if (options.cataloged != null)
{
this.catalog = options.cataloged.size() != 0 ? options.cataloged.get(0).schemas.get(0) : null;
this.catalog = !options.cataloged.isEmpty() ? options.cataloged.get(0).schemas.get(0) : null;
this.catalogs = new LinkedList<>();
for (CatalogedConfig catalog : options.cataloged)
{
Expand Down Expand Up @@ -347,7 +343,11 @@ else if (assertion.schema != null && !assertion.schema.equals(schema))
}
else
{
if (catalog.subject != null && catalog.version != null)
if (catalog.subject != null && schema != null)
{
handler.register(catalog.subject, schema);
}
else if (catalog.subject != null && catalog.version != null)
{
handler.resolve(catalog.subject, catalog.version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public final class TestBindingOptionsConfig extends OptionsConfig
{
public final ModelConfig value;
public final String mode;
public final String schema;
public final TestAuthorizationConfig authorization;
public final List<CatalogedConfig> cataloged;
public final List<Event> events;
Expand All @@ -46,6 +47,7 @@ public static <T> TestBindingOptionsConfigBuilder<T> builder(
TestBindingOptionsConfig(
ModelConfig value,
String mode,
String schema,
TestAuthorizationConfig authorization,
List<CatalogedConfig> cataloged,
List<Event> events,
Expand All @@ -55,6 +57,7 @@ public static <T> TestBindingOptionsConfigBuilder<T> builder(
super(value != null ? List.of(value) : List.of(), List.of());
this.value = value;
this.mode = mode;
this.schema = schema;
this.authorization = authorization;
this.cataloged = cataloged;
this.events = events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public OptionsConfig adaptFromJson(
testOptions.mode(object.getString(MODE_NAME));
}

if (object.containsKey(SCHEMA_NAME))
{
testOptions.schema(object.getString(SCHEMA_NAME));
}

if (object.containsKey(CATALOG_NAME))
{
JsonObject catalogsJson = object.getJsonObject(CATALOG_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class TestBindingOptionsConfigBuilder<T> extends ConfigBuilder<T, T

private ModelConfig value;
private String mode;
private String schema;
private TestAuthorizationConfig authorization;
private List<CatalogedConfig> catalogs;
private List<TestBindingOptionsConfig.Event> events;
Expand Down Expand Up @@ -64,6 +65,13 @@ public TestBindingOptionsConfigBuilder<T> mode(
return this;
}

public TestBindingOptionsConfigBuilder<T> schema(
String schema)
{
this.schema = schema;
return this;
}

public TestBindingOptionsConfigBuilder<T> catalog(
List<CatalogedConfig> catalogs)
{
Expand Down Expand Up @@ -113,7 +121,7 @@ public TestBindingOptionsConfigBuilder<T> vaultAssertion(
@Override
public T build()
{
return mapper.apply(new TestBindingOptionsConfig(value, mode, authorization, catalogs, events,
return mapper.apply(new TestBindingOptionsConfig(value, mode, schema, authorization, catalogs, events,
catalogAssertions, vaultAssertion));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
catalogs:
catalog0:
type: schema-registry
options:
url: http://localhost:8081
bindings:
net0:
type: test
kind: server
options:
schema: |
{
"schema":
{
"type": "record",
"name": "test",
"fields":
[
{
"type": "string",
"name": "field1"
},
{
"type": "com.acme.Referenced",
"name": "int"
}
]
},
"schemaType":"AVRO"
}
catalog:
catalog0:
- subject: items-snapshots-value
exit: app0
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ read http:version "HTTP/1.1"
read http:header "Host" "localhost:8081"
read http:header "content-type" "application/json"

read '{'
'"schema":"'
'{'
'\\"type\\": \\"record\\",'
'\\"name\\": \\"test\\",'
'\\"fields\\":'
'['
'{'
'\\"type\\": \\"string\\",'
'\\"name\\": \\"field1\\"'
'},'
'{'
'\\"type\\": \\"com.acme.Referenced\\",'
'\\"name\\": \\"int\\"'
'}'
']'
'}",'
'"schemaType":"AVRO"'
'}'
read "{\n"
" \"schema\":\n"
" {\n"
" \"type\": \"record\",\n"
" \"name\": \"test\",\n"
" \"fields\":\n"
" [\n"
" {\n"
" \"type\": \"string\",\n"
" \"name\": \"field1\"\n"
" },\n"
" {\n"
" \"type\": \"com.acme.Referenced\",\n"
" \"name\": \"int\"\n"
" }\n"
" ]\n"
" },\n"
" \"schemaType\":\"AVRO\"\n"
"}\n"

read closed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
"type": "object",
"properties":
{
"schema":
{
"type": "string"
},
"value":
{
"$ref": "#/$defs/converter"
Expand Down