Skip to content

Commit ab25f04

Browse files
authored
support multiple replacer with dynamic pattern for http-kafka & sse-kafka (#1590)
1 parent 379f7b8 commit ab25f04

File tree

5 files changed

+51
-158
lines changed

5 files changed

+51
-158
lines changed

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java

Lines changed: 32 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -178,19 +178,9 @@ public HttpKafkaWithFetchResult resolveFetch(
178178
if (filter.key.isPresent())
179179
{
180180
String key0 = filter.key.get();
181-
Matcher keyMatcher = paramsMatcher.reset(key0);
182-
if (keyMatcher.matches())
183-
{
184-
key0 = keyMatcher.replaceAll(replacer);
185-
}
186-
187-
keyMatcher = identityMatcher.reset(key0);
188-
if (keyMatcher.matches())
189-
{
190-
key0 = keyMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
191-
}
192-
193-
key0 = resolveAttribute(authorization, keyMatcher, key0);
181+
key0 = findAndReplace(key0, paramsMatcher, replacer);
182+
key0 = findAndReplace(key0, identityMatcher, r -> identityReplacer.apply(authorization, r));
183+
key0 = findAndReplace(key0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
194184

195185
key = new String16FW(key0).value();
196186
}
@@ -206,19 +196,9 @@ public HttpKafkaWithFetchResult resolveFetch(
206196
DirectBuffer name = new String16FW(name0).value();
207197

208198
String value0 = header0.value;
209-
Matcher valueMatcher = paramsMatcher.reset(value0);
210-
if (valueMatcher.matches())
211-
{
212-
value0 = valueMatcher.replaceAll(replacer);
213-
}
214-
215-
valueMatcher = identityMatcher.reset(value0);
216-
if (valueMatcher.matches())
217-
{
218-
value0 = valueMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
219-
}
220-
221-
value0 = resolveAttribute(authorization, valueMatcher, value0);
199+
value0 = findAndReplace(value0, paramsMatcher, replacer);
200+
value0 = findAndReplace(value0, identityMatcher, r -> identityReplacer.apply(authorization, r));
201+
value0 = findAndReplace(value0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
222202

223203
DirectBuffer value = new String16FW(value0).value();
224204

@@ -305,12 +285,7 @@ public HttpKafkaWithProduceResult resolveProduce(
305285
String name0 = header.name;
306286
String8FW name = new String8FW(name0);
307287

308-
String value0 = header.value;
309-
Matcher valueMatcher = paramsMatcher.reset(value0);
310-
if (valueMatcher.find())
311-
{
312-
value0 = valueMatcher.replaceAll(replacer);
313-
}
288+
String value0 = findAndReplace(header.value, paramsMatcher, replacer);
314289

315290
String value = value0;
316291
Supplier<String16FW> valueRef = () -> new String16FW(value);
@@ -320,11 +295,7 @@ public HttpKafkaWithProduceResult resolveProduce(
320295
valueRef = () ->
321296
{
322297
String value1 = value;
323-
Matcher value1Matcher = correlationIdMatcher.reset(value1);
324-
if (value1Matcher.find())
325-
{
326-
value1 = value1Matcher.replaceAll(hash.correlationId().asString());
327-
}
298+
value1 = findAndReplace(value1, correlationIdMatcher, r -> hash.correlationId().asString());
328299

329300
return new String16FW(value1);
330301
};
@@ -343,19 +314,9 @@ public HttpKafkaWithProduceResult resolveProduce(
343314
if (produce.key.isPresent())
344315
{
345316
String key0 = produce.key.get();
346-
Matcher keyMatcher = paramsMatcher.reset(key0);
347-
if (keyMatcher.matches())
348-
{
349-
key0 = keyMatcher.replaceAll(replacer);
350-
}
351-
352-
keyMatcher = identityMatcher.reset(key0);
353-
if (keyMatcher.matches())
354-
{
355-
key0 = keyMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
356-
}
357-
358-
key0 = resolveAttribute(authorization, keyMatcher, key0);
317+
key0 = findAndReplace(key0, paramsMatcher, replacer);
318+
key0 = findAndReplace(key0, identityMatcher, r -> identityReplacer.apply(authorization, r));
319+
key0 = findAndReplace(key0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
359320

360321
String key = key0;
361322
keyRef = () -> new String16FW(key).value();
@@ -365,11 +326,7 @@ public HttpKafkaWithProduceResult resolveProduce(
365326
keyRef = () ->
366327
{
367328
String key1 = key;
368-
Matcher key1Matcher = idempotencyKeyMatcher.reset(key1);
369-
if (key1Matcher.find())
370-
{
371-
key1 = key1Matcher.replaceAll(idempotencyKey.asString());
372-
}
329+
key1 = findAndReplace(key1, idempotencyKeyMatcher, r -> idempotencyKey.asString());
373330
return new String16FW(key1).value();
374331
};
375332
}
@@ -386,19 +343,9 @@ public HttpKafkaWithProduceResult resolveProduce(
386343
DirectBuffer name = new String16FW(name0).value();
387344

388345
String value0 = override.value;
389-
Matcher valueMatcher = paramsMatcher.reset(value0);
390-
if (valueMatcher.matches())
391-
{
392-
value0 = valueMatcher.replaceAll(replacer);
393-
}
394-
395-
valueMatcher = identityMatcher.reset(value0);
396-
if (valueMatcher.matches())
397-
{
398-
value0 = valueMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
399-
}
400-
401-
value0 = resolveAttribute(authorization, valueMatcher, value0);
346+
value0 = findAndReplace(value0, paramsMatcher, replacer);
347+
value0 = findAndReplace(value0, identityMatcher, r -> identityReplacer.apply(authorization, r));
348+
value0 = findAndReplace(value0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
402349

403350
String value = value0;
404351
Supplier<DirectBuffer> valueRef = () -> new String16FW(value).value();
@@ -407,11 +354,7 @@ public HttpKafkaWithProduceResult resolveProduce(
407354
valueRef = () ->
408355
{
409356
String value1 = value;
410-
Matcher value1Matcher = correlationIdMatcher.reset(value1);
411-
if (value1Matcher.find())
412-
{
413-
value1 = value1Matcher.replaceAll(hash.correlationId().asString());
414-
}
357+
value1 = findAndReplace(value1, idempotencyKeyMatcher, r -> idempotencyKey.asString());
415358
return new String16FW(value1).value();
416359
};
417360
}
@@ -424,11 +367,7 @@ public HttpKafkaWithProduceResult resolveProduce(
424367
if (produce.replyTo.isPresent())
425368
{
426369
String replyTo0 = produce.replyTo.get();
427-
Matcher replyToMatcher = paramsMatcher.reset(replyTo0);
428-
if (replyToMatcher.matches())
429-
{
430-
replyTo0 = replyToMatcher.replaceAll(replacer);
431-
}
370+
replyTo0 = findAndReplace(replyTo0, paramsMatcher, replacer);
432371
replyTo = new String16FW(replyTo0);
433372
}
434373

@@ -449,41 +388,28 @@ public HttpKafkaWithProduceResult resolveProduce(
449388
produce.correlationId, idempotencyKey, async, hash, timeout);
450389
}
451390

452-
private String resolveAttribute(
453-
long authorization,
454-
Matcher matcher,
455-
String value)
456-
{
457-
matcher = attributeMatcher.reset(value);
458-
if (matcher.matches())
459-
{
460-
value = matcher.replaceAll(r -> attributeReplacer.apply(authorization, r));
461-
}
462-
return value;
463-
}
464-
465391
private String16FW resolveTopic(
466392
long authorization,
467393
String topic)
468394
{
469-
Matcher topicMatcher = paramsMatcher.reset(topic);
470-
if (topicMatcher.matches())
471-
{
472-
topic = topicMatcher.replaceAll(replacer);
473-
}
395+
topic = findAndReplace(topic, paramsMatcher, replacer);
396+
topic = findAndReplace(topic, identityMatcher, r -> identityReplacer.apply(authorization, r));
397+
topic = findAndReplace(topic, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
474398

475-
topicMatcher = identityMatcher.reset(topic);
476-
if (topicMatcher.find())
477-
{
478-
topic = topicMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
479-
}
399+
return new String16FW(topic);
400+
}
480401

481-
topicMatcher = attributeMatcher.reset(topic);
482-
if (topicMatcher.find())
402+
private static String findAndReplace(
403+
String value,
404+
Matcher matcher,
405+
Function<MatchResult, String> replacer)
406+
{
407+
matcher.reset(value);
408+
while (matcher.find())
483409
{
484-
topic = topicMatcher.replaceAll(r -> attributeReplacer.apply(authorization, r));
410+
value = matcher.replaceAll(replacer);
411+
matcher.reset(value);
485412
}
486-
487-
return new String16FW(topic);
413+
return value;
488414
}
489415
}

runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/config/SseKafkaWithResolver.java

Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,9 @@ public SseKafkaWithResult resolve(
9494
if (filter.key.isPresent())
9595
{
9696
String key0 = filter.key.get();
97-
Matcher keyMatcher = paramsMatcher.reset(key0);
98-
if (keyMatcher.matches())
99-
{
100-
key0 = keyMatcher.replaceAll(paramsReplacer);
101-
}
102-
103-
keyMatcher = identityMatcher.reset(key0);
104-
if (keyMatcher.matches())
105-
{
106-
key0 = keyMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
107-
}
108-
109-
key0 = resolveAttribute(authorization, keyMatcher, key0);
97+
key0 = findAndReplace(key0, paramsMatcher, paramsReplacer);
98+
key0 = findAndReplace(key0, identityMatcher, r -> identityReplacer.apply(authorization, r));
99+
key0 = findAndReplace(key0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
110100

111101
key = new String16FW(key0).value();
112102
}
@@ -122,19 +112,9 @@ public SseKafkaWithResult resolve(
122112
DirectBuffer name = new String16FW(name0).value();
123113

124114
String value0 = header0.value;
125-
Matcher valueMatcher = paramsMatcher.reset(value0);
126-
if (valueMatcher.matches())
127-
{
128-
value0 = valueMatcher.replaceAll(paramsReplacer);
129-
}
130-
131-
valueMatcher = identityMatcher.reset(value0);
132-
if (valueMatcher.matches())
133-
{
134-
value0 = valueMatcher.replaceAll(r -> identityReplacer.apply(authorization, r));
135-
}
136-
137-
value0 = resolveAttribute(authorization, valueMatcher, value0);
115+
value0 = findAndReplace(value0, paramsMatcher, paramsReplacer);
116+
value0 = findAndReplace(value0, identityMatcher, r -> identityReplacer.apply(authorization, r));
117+
value0 = findAndReplace(value0, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
138118

139119
DirectBuffer value = new String16FW(value0).value();
140120

@@ -155,36 +135,23 @@ private String16FW resolveTopic(
155135
long authorization,
156136
String topic)
157137
{
158-
Matcher matcher = paramsMatcher.reset(topic);
159-
if (matcher.matches())
160-
{
161-
topic = matcher.replaceAll(paramsReplacer);
162-
}
163-
164-
matcher = identityMatcher.reset(topic);
165-
if (matcher.find())
166-
{
167-
topic = matcher.replaceAll(r -> identityReplacer.apply(authorization, r));
168-
}
169-
170-
matcher = attributeMatcher.reset(topic);
171-
if (matcher.find())
172-
{
173-
topic = matcher.replaceAll(r -> attributeReplacer.apply(authorization, r));
174-
}
138+
topic = findAndReplace(topic, paramsMatcher, paramsReplacer);
139+
topic = findAndReplace(topic, identityMatcher, r -> identityReplacer.apply(authorization, r));
140+
topic = findAndReplace(topic, attributeMatcher, r -> attributeReplacer.apply(authorization, r));
175141

176142
return new String16FW(topic);
177143
}
178144

179-
private String resolveAttribute(
180-
long authorization,
145+
private static String findAndReplace(
146+
String value,
181147
Matcher matcher,
182-
String value)
148+
Function<MatchResult, String> replacer)
183149
{
184-
matcher = attributeMatcher.reset(value);
185-
if (matcher.matches())
150+
matcher.reset(value);
151+
while (matcher.find())
186152
{
187-
value = matcher.replaceAll(r -> attributeReplacer.apply(authorization, r));
153+
value = matcher.replaceAll(replacer);
154+
matcher.reset(value);
188155
}
189156
return value;
190157
}

specs/binding-http-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/http/kafka/config/proxy.with.topic.dynamic.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ bindings:
5252
path: /my-items/{id}
5353
with:
5454
capability: produce
55-
topic: ${guarded['test0'].identity}-items
55+
topic: ${guarded['test0'].identity}/${guarded['test0'].attributes.topic}
5656
acks: leader_only
5757
key: ${params.id}
5858
- guarded:

specs/binding-http-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/http/kafka/streams/kafka/put.my.item/client.rpt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ write zilla:begin.ext ${kafka:beginEx()
2121
.typeId(zilla:id("kafka"))
2222
.merged()
2323
.capabilities("PRODUCE_ONLY")
24-
.topic("alice-items")
24+
.topic("alice/items")
2525
.partition(-1, -2)
2626
.ackMode("LEADER_ONLY")
2727
.build()

specs/binding-http-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/http/kafka/streams/kafka/put.my.item/server.rpt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ read zilla:begin.ext ${kafka:beginEx()
2323
.typeId(zilla:id("kafka"))
2424
.merged()
2525
.capabilities("PRODUCE_ONLY")
26-
.topic("alice-items")
26+
.topic("alice/items")
2727
.partition(-1, -2)
2828
.ackMode("LEADER_ONLY")
2929
.build()

0 commit comments

Comments
 (0)