Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.Executors;

import static com.alibaba.nacos.api.common.Constants.APP_CONN_PREFIX;
import static com.alibaba.nacos.api.common.Constants.ENCODE;
Expand Down Expand Up @@ -150,7 +150,7 @@ public class ClientWorker implements Closeable {
private boolean enableRemoteSyncConfig = false;

private static final int MIN_THREAD_NUM = 2;

private static final int THREAD_MULTIPLE = 1;

private boolean enableClientMetrics = true;
Expand Down Expand Up @@ -538,19 +538,33 @@ public ClientWorker(final ConfigFilterChainManager configFilterChainManager,
agent = new ConfigRpcTransportClient(properties, serverListManager);

configFuzzyWatchGroupKeyHolder = new ConfigFuzzyWatchGroupKeyHolder(agent, uuid);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(initWorkerThreadCount(properties),
new NameThreadFactory("com.alibaba.nacos.client.Worker"));
agent.setExecutor(executorService);

ThreadPoolExecutor executor = instantiateClientExecutor(properties);
agent.setExecutor(executor);

agent.start();
configFuzzyWatchGroupKeyHolder.start();

}

void initAppLabels(Properties properties) {
this.appLabels = ConnLabelsUtils.addPrefixForEachKey(defaultLabelsCollectorManager.getLabels(properties),
APP_CONN_PREFIX);
}


private ThreadPoolExecutor instantiateClientExecutor(final NacosClientProperties properties) {
int workerThreadCount = initWorkerThreadCount(properties);

return new ThreadPoolExecutor(workerThreadCount, workerThreadCount * 2,
60 * 5, TimeUnit.SECONDS,
// when corePoolSize is not enough, task will not wait in queue, because SynchronousQueue 0 capacity
// will create new thread to execute task util maximumPoolSize is reached
new SynchronousQueue<>(),
new NameThreadFactory("com.alibaba.nacos.client.executor"),
// CallerRunsPolicy ensures that tasks are not lost
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

private int initWorkerThreadCount(NacosClientProperties properties) {
int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
if (properties == null) {
Expand Down Expand Up @@ -639,7 +653,9 @@ public boolean isHealthServer() {
public class ConfigRpcTransportClient extends ConfigTransportClient {

Map<String, ExecutorService> multiTaskExecutor = new HashMap<>();


private ExecutorService listenExecutor;

private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);

private final Object bellItem = new Object();
Expand Down Expand Up @@ -699,6 +715,10 @@ public void shutdown() throws NacosException {
executor.shutdown();
}
});
if (listenExecutor != null && !listenExecutor.isShutdown()) {
LOGGER.info("Shutdown listen config executor {}", listenExecutor);
listenExecutor.shutdown();
}
}

}
Expand Down Expand Up @@ -842,12 +862,13 @@ public Class<? extends Event> subscribeType() {

@Override
public void startInternal() {
ScheduledExecutorService executor = getExecutor();
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
listenExecutor =
Executors.newSingleThreadExecutor(new NameThreadFactory("com.alibaba.nacos.client.listen-executor"));
listenExecutor.submit(() -> {
while (!listenExecutor.isShutdown() && !listenExecutor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
if (listenExecutor.isShutdown() || listenExecutor.isTerminated()) {
continue;
}
executeConfigListen();
Expand All @@ -861,8 +882,7 @@ public void startInternal() {
notifyListenConfig();
}
}
}, 0L, TimeUnit.MILLISECONDS);

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
Expand All @@ -49,7 +50,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -87,7 +88,9 @@ public class ConfigFuzzyWatchGroupKeyHolder extends SmartSubscriber implements C
private final AtomicLong fuzzyListenLastAllSyncTime = new AtomicLong(System.currentTimeMillis());

private static final long FUZZY_LISTEN_ALL_SYNC_INTERNAL = 3 * 60 * 1000;


private ExecutorService fuzzyWatcherExecutor;

private String taskId = "0";

/**
Expand All @@ -106,12 +109,14 @@ public ConfigFuzzyWatchGroupKeyHolder(ClientWorker.ConfigRpcTransportClient agen
* start.
*/
public void start() {
ScheduledExecutorService agentExecutor = agent.getExecutor();
agentExecutor.submit(() -> {
while (!agentExecutor.isShutdown() && !agentExecutor.isTerminated()) {
fuzzyWatcherExecutor = Executors.newSingleThreadScheduledExecutor(
new NameThreadFactory("com.alibaba.nacos.client.fuzzy-watcher-executor")
);
fuzzyWatcherExecutor.submit(() -> {
while (!fuzzyWatcherExecutor.isShutdown() && !fuzzyWatcherExecutor.isTerminated()) {
try {
fuzzyListenExecuteBell.poll(5L, TimeUnit.SECONDS);
if (agentExecutor.isShutdown() || agentExecutor.isTerminated()) {
if (fuzzyWatcherExecutor.isShutdown() || fuzzyWatcherExecutor.isTerminated()) {
continue;
}
executeConfigFuzzyListen();
Expand All @@ -129,12 +134,15 @@ public void start() {
}

/**
* Deregistering it from the NotifyCenter.
* Deregistering it from the NotifyCenter and shutting down the executor.
*/
@Override
public void shutdown() {
// deregister subscriber which registered in constructor
NotifyCenter.deregisterSubscriber(this);
if (fuzzyWatcherExecutor != null && !fuzzyWatcherExecutor.isShutdown()) {
fuzzyWatcherExecutor.shutdown();
}
}

/**
Expand Down Expand Up @@ -384,9 +392,7 @@ private void doExecuteConfigFuzzyListen(List<ConfigFuzzyWatchContext> contextLis
for (ConfigFuzzyWatchContext context : contextLists) {
ExecutorService executorService = agent.getExecutor();
// Submit task for execution
Future<?> future = executorService.submit(() -> {
executeFuzzyWatchRequest(context, rpcClient);
});
Future<?> future = executorService.submit(() -> executeFuzzyWatchRequest(context, rpcClient));
listenFutures.add(future);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.client.utils.AppNameUtils;
import com.alibaba.nacos.client.utils.ClientBasicParamUtil;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
Expand All @@ -32,7 +33,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -52,7 +55,7 @@ public abstract class ConfigTransportClient {

String tenant;

private ScheduledExecutorService executor;
private ThreadPoolExecutor executor;

final ConfigServerListManager serverListManager;

Expand All @@ -61,11 +64,19 @@ public abstract class ConfigTransportClient {
private int maxRetry = 3;

private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);


private ScheduledExecutorService loginScheduledExecutor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loginScheduledExecutor 是否可以去掉,使用executor即可?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loginScheduledExecutor 是否可以去掉,使用executor即可?

O(∩_∩)O,我说说我的想法,看看你的意见:

  1. 我想保留 loginScheduledExecutor 的原因是 loginScheduledExecutor 执行的是 ScheduledExecutorService#scheduleWithFixedDelay 方法,它是一个本地定期执行的定时登录任务,所以我想把它放在一个专用的只有 1 个线程的 ScheduledExecutorService 线程池中去执行,就像上边将 while(true) 的忙任务分离出来一样
  2. ConfigTransportClient#executor 我把它定义的类型是 ThreadPoolExecutor,它没有专有的命名,我觉得定位更像是一个在 Client 中较为通用的线程池,就像它现在已经被用作去处理模糊监听的任务了,之后如果有线程池需要的话,可以通过 ConfigTransportClient#getExecutor 方法获取并复用。如果将 loginScheduledExecutor 去掉的话,那么需要把 ConfigTransportClient#executor 定义成 ScheduledExecutorService 类型,那么它就更像是一个能够执行定时任务的线程池了,但是可能本地启动定时任务的场景好像不是很多

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loginScheduledExecutor 是否可以去掉,使用executor即可?

O(∩_∩)O,我说说我的想法,看看你的意见:

  1. 我想保留 loginScheduledExecutor 的原因是 loginScheduledExecutor 执行的是 ScheduledExecutorService#scheduleWithFixedDelay 方法,它是一个本地定期执行的定时登录任务,所以我想把它放在一个专用的只有 1 个线程的 ScheduledExecutorService 线程池中去执行,就像上边将 while(true) 的忙任务分离出来一样
  2. ConfigTransportClient#executor 我把它定义的类型是 ThreadPoolExecutor,它没有专有的命名,我觉得定位更像是一个在 Client 中较为通用的线程池,就像它现在已经被用作去处理模糊监听的任务了,之后如果有线程池需要的话,可以通过 ConfigTransportClient#getExecutor 方法获取并复用。如果将 loginScheduledExecutor 去掉的话,那么需要把 ConfigTransportClient#executor 定义成 ScheduledExecutorService 类型,那么它就更像是一个能够执行定时任务的线程池了,但是可能本地启动定时任务的场景好像不是很多

可以,按照你的想法,会清晰明确一些。


protected SecurityProxy securityProxy;


/**
* Shut down to ensure resource release.
*/
public void shutdown() throws NacosException {
securityProxy.shutdown();
if (loginScheduledExecutor != null && !loginScheduledExecutor.isShutdown()) {
loginScheduledExecutor.shutdown();
}
}

public ConfigTransportClient(NacosClientProperties properties, ConfigServerListManager serverListManager) {
Expand Down Expand Up @@ -123,11 +134,11 @@ private void initMaxRetry(Properties properties) {
maxRetry = ConvertUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.MAX_RETRY)), Constants.MAX_RETRY);
}

public void setExecutor(ScheduledExecutorService executor) {
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

public ScheduledExecutorService getExecutor() {
public ThreadPoolExecutor getExecutor() {
return this.executor;
}

Expand All @@ -136,7 +147,9 @@ public ScheduledExecutorService getExecutor() {
*/
public void start() throws NacosException {
securityProxy.login(this.properties);
this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
this.loginScheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new NameThreadFactory("com.alibaba.nacos.client.login-executor"));
this.loginScheduledExecutor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
startInternal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -240,7 +240,7 @@ public void onEvent(ConfigFuzzyWatchChangeEvent event) {

RpcClient rpcClient = Mockito.mock(RpcClient.class);
when(rpcTransportClient.ensureRpcClient(eq("0"))).thenReturn(rpcClient);
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class);
ThreadPoolExecutor scheduledExecutorService = Mockito.mock(ThreadPoolExecutor.class);
when(rpcTransportClient.getExecutor()).thenReturn(scheduledExecutorService);
when(scheduledExecutorService.submit(any(Runnable.class))).thenReturn(Mockito.mock(Future.class));
configFuzzyWatchGroupKeyHolder.executeConfigFuzzyListen();
Expand Down