Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.time.Instant.now;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand Down Expand Up @@ -86,11 +88,15 @@
public final class KafkaCacheClientProduceFactory implements BindingHandler
{
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(), 0, 0);
private static final KafkaKeyFW EMPTY_KEY =
new OctetsFW().wrap(new UnsafeBuffer(ByteBuffer.wrap(new byte[] { 0x00 })), 0, 1)
.get(new KafkaKeyFW()::wrap);
private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION = ex -> {};
private static final Array32FW<KafkaHeaderFW> EMPTY_TRAILERS =
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW())
.wrap(new UnsafeBuffer(new byte[8]), 0, 8)
.build();
private static final int PRODUCE_FLUSH_SEQUENCE = -1;

private static final int ERROR_NOT_LEADER_FOR_PARTITION = 6;
private static final int ERROR_RECORD_LIST_TOO_LARGE = 18;
Expand Down Expand Up @@ -762,6 +768,47 @@ private void onClientInitialData(
creditor.credit(traceId, partitionIndex, reserved);
}

private void onClientInitialFlush(
KafkaCacheClientProduceStream stream,
FlushFW flush)
{
final long traceId = flush.traceId();
final int reserved = flush.reserved();

stream.segment = partition.newHeadIfNecessary(partitionOffset, EMPTY_KEY, 0, 0);

int error = NO_ERROR;
if (stream.segment != null)
{
final long nextOffset = partition.nextOffset(defaultOffset);
assert partitionOffset >= 0 && partitionOffset >= nextOffset
: String.format("%d >= 0 && %d >= %d", partitionOffset, partitionOffset, nextOffset);

final long keyHash = partition.computeKeyHash(EMPTY_KEY);
partition.writeProduceEntryStart(partitionOffset, stream.segment, stream.entryMark, stream.position,
now().toEpochMilli(), stream.initialId, PRODUCE_FLUSH_SEQUENCE,
KafkaAckMode.LEADER_ONLY, EMPTY_KEY, keyHash, 0, EMPTY_TRAILERS, trailersSizeMax);
stream.partitionOffset = partitionOffset;
partitionOffset++;

Array32FW<KafkaHeaderFW> trailers = EMPTY_TRAILERS;

partition.writeProduceEntryFin(stream.segment, stream.entryMark, stream.position, stream.initialSeq, trailers);
flushClientFanInitialIfNecessary(traceId);
}
else
{
error = ERROR_RECORD_LIST_TOO_LARGE;
}

if (error != NO_ERROR)
{
stream.cleanupClient(traceId, error);
onClientFanMemberClosed(traceId, stream);
}
creditor.credit(traceId, partitionIndex, reserved);
}

private void flushClientFanInitialIfNecessary(
long traceId)
{
Expand Down Expand Up @@ -1314,12 +1361,25 @@ private void onClientInitialFlush(
assert acknowledge <= sequence;
assert sequence >= initialSeq;

initialSeq = sequence + reserved;
if (reserved > 0)
{
initialSeq = sequence + reserved;

assert initialAck <= initialSeq;
assert initialAck <= initialSeq;

final int noAck = (int) (initialSeq - initialAck);
doClientInitialWindow(traceId, noAck, noAck + initialBudgetMax);
if (initialSeq > initialAck + initialMax)
{
doClientInitialResetIfNecessary(traceId, EMPTY_OCTETS);
doClientReplyAbortIfNecessary(traceId);
fan.onClientFanMemberClosed(traceId, this);
}
else
{
fan.onClientInitialFlush(this, flush);
}
final int noAck = (int) (initialSeq - initialAck);
doClientInitialWindow(traceId, noAck, initialBudgetMax);
}
}

private void onClientInitialEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,8 @@ private void onMergedInitialFlush(
FlushFW flush)
{
final long traceId = flush.traceId();
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final OctetsFW extension = flush.extension();
final int reserved = flush.reserved();
final ExtensionFW flushEx = extension.get(extensionRO::tryWrap);
Expand Down Expand Up @@ -1379,6 +1381,10 @@ private void onMergedInitialFlush(

final KafkaUnmergedProduceStream producer = findProducePartitionLeader(nextPartitionId);
assert producer != null;

initialSeq = sequence + reserved;
assert initialAck <= initialSeq;

producer.doProduceInitialFlush(traceId, reserved, kafkaMergedFlushEx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
Expand Down Expand Up @@ -126,7 +125,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private static final int SIGNAL_CONNECT_WILL_STREAM = 2;
private static final int SIGNAL_EXPIRE_SESSION = 3;
private static final int SIZE_OF_UUID = 38;
private static final AtomicInteger CONTEXT_COUNTER = new AtomicInteger(0);
private static final int RETAIN_AVAILABLE_MASK = 1 << MqttServerCapabilities.RETAIN.value();
private static final int WILDCARD_AVAILABLE_MASK = 1 << MqttServerCapabilities.WILDCARD.value();
private static final int SUBSCRIPTION_IDS_AVAILABLE_MASK = 1 << MqttServerCapabilities.SUBSCRIPTION_IDS.value();
Expand Down Expand Up @@ -209,6 +207,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory

private String serverRef;
private int reconnectAttempt;
private int nextContextId;

public MqttKafkaSessionFactory(
MqttKafkaConfiguration config,
Expand Down Expand Up @@ -1220,7 +1219,7 @@ else if (type.equals(EXPIRY_SIGNAL_NAME_OCTETS) && sessionExpiryIds.containsKey(
case MqttSessionSignalFW.KIND_EXPIRY:
final MqttExpirySignalFW expirySignal = sessionSignal.expiry();
long expireAt = expirySignal.expireAt();
final String16FW expiryClientId = expirySignal.clientId();
final String16FW expiryClientId = new String16FW(expirySignal.clientId().asString());

if (expireAt == MqttTime.UNKNOWN.value())
{
Expand All @@ -1231,7 +1230,7 @@ else if (type.equals(EXPIRY_SIGNAL_NAME_OCTETS) && sessionExpiryIds.containsKey(
expireAt = supplyTime.getAsLong() + expirySignal.delay();
}

final int contextId = CONTEXT_COUNTER.incrementAndGet();
final int contextId = nextContextId++;
expiryClientIds.put(contextId, expiryClientId);

final long signalId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ write advise zilla:flush ${kafka:flushEx()
.merged()
.fetch()
.partition(-1, -1)
.key("key9")
.capabilities("PRODUCE_ONLY")
.key("key9")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ read ${mqtt:sessionSignal()

read notify RECEIVED_WILL_DELIVER_AT_SIGNAL

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
.deferred(0)
.partition(-1, -1)
.key("client-1#expiry-signal")
.header("type", "expiry-signal")
.build()
.build()}
read ${mqtt:sessionSignal()
.expiry()
.instanceId("zilla-1")
.clientId("client-2")
.delay(1000)
.expireAt(2000)
.build()
.build()}

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
Expand All @@ -96,6 +113,17 @@ write zilla:data.ext ${kafka:dataEx()
.build()}
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
.deferred(0)
.partition(-1, -1)
.key("client-2")
.hashKey("client-2")
.build()
.build()}
write flush


connect "zilla://streams/kafka0"
option zilla:window 8192
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ write ${mqtt:sessionSignal()
.build()}
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
.deferred(0)
.partition(-1, -1)
.key("client-1#expiry-signal")
.header("type", "expiry-signal")
.build()
.build()}
write ${mqtt:sessionSignal()
.expiry()
.instanceId("zilla-1")
.clientId("client-2")
.delay(1000)
.expireAt(2000)
.build()
.build()}
write flush

# cleanup session state
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
Expand All @@ -104,6 +123,17 @@ read zilla:data.ext ${kafka:matchDataEx()
.build()}
read zilla:data.null

# cleanup session state
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
.deferred(0)
.partition(-1, -1)
.key("client-2")
.hashKey("client-2")
.build()
.build()}
read zilla:data.null

accepted

Expand Down