Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.Producer;
Expand All @@ -32,10 +30,6 @@ public class InternalServerCnx extends ServerCnx {
@Getter
KafkaRequestHandler kafkaRequestHandler;

private static final AtomicLongFieldUpdater<InternalServerCnx> KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(InternalServerCnx.class, "kopMessagePublishBufferSize");
private volatile long kopMessagePublishBufferSize = 0;

public InternalServerCnx(KafkaRequestHandler kafkaRequestHandler) {
super(kafkaRequestHandler.getPulsarService());
this.kafkaRequestHandler = kafkaRequestHandler;
Expand Down Expand Up @@ -63,54 +57,21 @@ public void closeProducer(Producer producer) {

// called after channel active
public void updateCtx() {
this.remoteAddress = kafkaRequestHandler.getRemoteAddress();
this.remoteAddress = kafkaRequestHandler.remoteAddress;
}

@Override
public void enableCnxAutoRead() {
if (!kafkaRequestHandler.ctx.channel().config().isAutoRead()) {
kafkaRequestHandler.ctx.channel().config().setAutoRead(true);
kafkaRequestHandler.ctx.read();
if (log.isDebugEnabled()) {
log.debug("Channel {} auto read has set to true.", kafkaRequestHandler.ctx.channel());
}
}
// do nothing is this mock
}

@Override
public void disableCnxAutoRead() {
if (kafkaRequestHandler.ctx.channel().config().isAutoRead()) {
kafkaRequestHandler.ctx.channel().config().setAutoRead(false);
if (log.isDebugEnabled()) {
log.debug("Channel {} auto read has set to false.", kafkaRequestHandler.ctx.channel());
}
}
}

public void increasePublishBuffer(long msgSize) {
KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
if (getBrokerService().isReachMessagePublishBufferThreshold()) {
disableCnxAutoRead();
}
}

public void decreasePublishBuffer(long msgSize) {
KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
// do nothing is this mock
}

@Override
public long getMessagePublishBufferSize() {
return kopMessagePublishBufferSize;
}


public void cancelPublishBufferLimiting() {
// do nothing.
// do nothing is this mock
}

@VisibleForTesting
public void setMessagePublishBufferSize(long size) {
this.kopMessagePublishBufferSize = size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -157,9 +158,9 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* This class contains all the request handling methods.
Expand Down Expand Up @@ -196,6 +197,12 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
// is found.
private final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap = new ConcurrentHashMap<>();

// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private final long maxPendingBytes;
private final long resumeThresholdPendingBytes;
private final AtomicLong pendingBytes = new AtomicLong(0);
private volatile boolean autoReadDisabledPublishBufferLimiting = false;

public KafkaRequestHandler(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator,
Expand Down Expand Up @@ -226,6 +233,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat());
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
this.resumeThresholdPendingBytes = this.maxPendingBytes / 2;
}

@Override
Expand All @@ -244,7 +253,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channel inactive {}", ctx.channel());

close();
isActive.set(false);
}

@Override
Expand Down Expand Up @@ -627,6 +635,54 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
});
}

private void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
if (log.isDebugEnabled()) {
log.debug("[{}] disable auto read", ctx.channel());
}
}
}

private void enableCnxAutoRead() {
if (ctx != null && !ctx.channel().config().isAutoRead()
&& !autoReadDisabledPublishBufferLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
if (log.isDebugEnabled()) {
log.debug("[{}] enable auto read", ctx.channel());
}
}
}

private void startSendOperationForThrottling(long msgSize) {
final long currentPendingBytes = pendingBytes.addAndGet(msgSize);
if (currentPendingBytes >= maxPendingBytes && !autoReadDisabledPublishBufferLimiting && maxPendingBytes > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] disable auto read because currentPendingBytes({}) > maxPendingBytes({})",
ctx.channel(), currentPendingBytes, maxPendingBytes);
}
disableCnxAutoRead();
autoReadDisabledPublishBufferLimiting = true;
pulsarService.getBrokerService().pausedConnections(1);
}
}

private void completeSendOperationForThrottling(long msgSize) {
final long currentPendingBytes = pendingBytes.addAndGet(-msgSize);
if (currentPendingBytes < resumeThresholdPendingBytes && autoReadDisabledPublishBufferLimiting) {
if (log.isDebugEnabled()) {
log.debug("[{}] enable auto read because currentPendingBytes({}) < resumeThreshold({})",
ctx.channel(), currentPendingBytes, resumeThresholdPendingBytes);
}
autoReadDisabledPublishBufferLimiting = false;
enableCnxAutoRead();
pulsarService.getBrokerService().resumedConnections(1);
}
}

private void publishMessages(final PersistentTopic persistentTopic,
final ByteBuf byteBuf,
final int numMessages,
Expand Down Expand Up @@ -654,9 +710,10 @@ private void publishMessages(final PersistentTopic persistentTopic,
final long beforePublish = MathUtils.nowInNano();
persistentTopic.publishMessage(byteBuf,
MessagePublishContext.get(offsetFuture, persistentTopic, numMessages, System.nanoTime()));
byteBuf.release();
final RecordBatch batch = records.batchIterator().next();
offsetFuture.whenComplete((offset, e) -> {
completeSendOperationForThrottling(byteBuf.readableBytes());
byteBuf.release();
if (e == null) {
if (batch.isTransactional()) {
transactionCoordinator.addActivePidOffset(TopicName.get(partitionName), batch.producerId(), offset);
Expand All @@ -682,9 +739,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,

final int numPartitions = produceRequest.partitionRecordsOrFail().size();

final long dataSizePerPartition = produceHar.getBuffer().readableBytes();
topicManager.getInternalServerCnx().increasePublishBuffer(dataSizePerPartition);

final Map<TopicPartition, PartitionResponse> responseMap = new ConcurrentHashMap<>();
final CompletableFuture<Void> produceFuture = new CompletableFuture<>();
BiConsumer<TopicPartition, PartitionResponse> addPartitionResponse = (topicPartition, response) -> {
Expand Down Expand Up @@ -718,6 +772,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(byteBuf.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ",
Expand Down Expand Up @@ -756,7 +811,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
});

produceFuture.thenApply(ignored -> {
topicManager.getInternalServerCnx().decreasePublishBuffer(dataSizePerPartition);
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString());
}
Expand Down Expand Up @@ -1704,8 +1758,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
// here we get the broker url, need to find related webServiceUrl.
ZooKeeperCache zkCache = pulsarService.getLocalZkCache();
zkCache.getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT, zkCache)
pulsarService.getPulsarResources()
.getDynamicConfigResources()
.getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT)
.whenComplete((set, throwable) -> {
if (throwable != null) {
log.error("Error in getChildrenAsync(zk://loadbalance) for {}", pulsarAddress, throwable);
Expand All @@ -1730,12 +1785,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}

// Get a list of ServiceLookupData for each matchBroker.
List<CompletableFuture<Optional<ServiceLookupData>>> list = matchBrokers.stream()
.map(matchBroker ->
zkCache.getDataAsync(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker),
(Deserializer<ServiceLookupData>)
pulsarService.getLoadManager().get().getLoadReportDeserializer()))
final MetadataCache<LocalBrokerData> metadataCache = pulsarService.getLocalMetadataStore()
.getMetadataCache(LocalBrokerData.class);
List<CompletableFuture<Optional<LocalBrokerData>>> list = matchBrokers.stream()
.map(matchBroker -> metadataCache.get(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker)))
.collect(Collectors.toList());

FutureUtil.waitForAll(list)
Expand All @@ -1748,7 +1802,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}

try {
for (CompletableFuture<Optional<ServiceLookupData>> lookupData : list) {
for (CompletableFuture<Optional<LocalBrokerData>> lookupData : list) {
ServiceLookupData data = lookupData.get().get();
if (log.isDebugEnabled()) {
log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;

Expand Down Expand Up @@ -211,20 +213,19 @@ private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(
}

// Get a list of ServiceLookupData for each matchBroker.
List<CompletableFuture<Optional<ServiceLookupData>>> list = matchBrokers.stream()
.map(matchBroker ->
zkCache.getDataAsync(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker),
(ZooKeeperCache.Deserializer<ServiceLookupData>)
pulsarService.getLoadManager().get().getLoadReportDeserializer()))
final MetadataCache<LocalBrokerData> metadataCache = pulsarService.getLocalMetadataStore()
.getMetadataCache(LocalBrokerData.class);
List<CompletableFuture<Optional<LocalBrokerData>>> list = matchBrokers.stream()
.map(matchBroker -> metadataCache.get(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker)))
.collect(Collectors.toList());

getKopAddress(list, pulsarAddress, kopAddressFuture, topic, hostAndPort);
});
return kopAddressFuture;
}

private void getKopAddress(List<CompletableFuture<Optional<ServiceLookupData>>> list,
private void getKopAddress(List<CompletableFuture<Optional<LocalBrokerData>>> list,
InetSocketAddress pulsarAddress,
CompletableFuture<Optional<String>> kopAddressFuture,
TopicName topic,
Expand All @@ -239,7 +240,7 @@ private void getKopAddress(List<CompletableFuture<Optional<ServiceLookupData>>>
}

try {
for (CompletableFuture<Optional<ServiceLookupData>> lookupData : list) {
for (CompletableFuture<Optional<LocalBrokerData>> lookupData : list) {
ServiceLookupData data = lookupData.get().get();
if (log.isDebugEnabled()) {
log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, "
Expand Down
Loading