Skip to content

Commit babe211

Browse files
author
Ajay Kumar Movva
committed
Admission Controller Module Rest and Transport Interceptor Commit
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent 1342578 commit babe211

14 files changed

Lines changed: 390 additions & 7 deletions

File tree

modules/throttling/build.gradle

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
apply plugin: 'opensearch.java-rest-test'
2+
3+
opensearchplugin {
4+
description 'Plugin intercepting requests and throttle based on resource consumption'
5+
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
6+
extendedPlugins = ['transport-netty4']
7+
}
8+
9+
dependencies {
10+
api project(path: ':modules:reindex')
11+
implementation project(path: ':modules:transport-netty4')
12+
compileOnly project(':modules:transport-netty4')
13+
}
14+
15+
testClusters.all {
16+
module ':modules:reindex'
17+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.throttling;
10+
11+
import io.netty.channel.ChannelHandler;
12+
import org.opensearch.common.util.concurrent.ThreadContext;
13+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
14+
import org.opensearch.plugins.NetworkPlugin;
15+
import org.opensearch.plugins.Plugin;
16+
import org.opensearch.throttling.admissioncontroller.AdmissionControllerRestHandler;
17+
import org.opensearch.throttling.admissioncontroller.AdmissionControllerTransportInterceptor;
18+
import org.opensearch.transport.Netty4HandlerExtension;
19+
import org.opensearch.transport.TransportInterceptor;
20+
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin, Netty4HandlerExtension {
27+
28+
private static final Map<String, ChannelHandler> HANDLERS = new HashMap<String, ChannelHandler>();
29+
30+
@Override
31+
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
32+
List<TransportInterceptor> interceptors = new ArrayList<>(0);
33+
interceptors.add(new AdmissionControllerTransportInterceptor(null));
34+
return interceptors;
35+
}
36+
37+
@Override
38+
public Map<String, ChannelHandler> getHandlers() {
39+
if (HANDLERS.isEmpty()) {
40+
HANDLERS.put("opensearch-throttling-plugin:AdmissionControlRestHandler", new AdmissionControllerRestHandler());
41+
}
42+
return HANDLERS;
43+
}
44+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.throttling.admissioncontroller;
10+
11+
import io.netty.channel.ChannelDuplexHandler;
12+
import io.netty.channel.ChannelHandler;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelPromise;
15+
import io.netty.handler.codec.http.FullHttpRequest;
16+
import io.netty.handler.codec.http.HttpHeaderNames;
17+
import org.apache.logging.log4j.LogManager;
18+
import org.apache.logging.log4j.Logger;
19+
20+
@ChannelHandler.Sharable
21+
public class AdmissionControllerRestHandler extends ChannelDuplexHandler {
22+
private static final Logger LOGGER = LogManager.getLogger(AdmissionControllerRestHandler.class);
23+
24+
@Override
25+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
26+
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
27+
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
28+
String uri = getUri(fullHttpRequest);
29+
applyAdmissionControl(uri);
30+
ctx.fireChannelRead(msg);
31+
}
32+
33+
private String getUri(FullHttpRequest fullHttpRequest) {
34+
return fullHttpRequest.uri();
35+
}
36+
37+
private void applyAdmissionControl(String requestURI) {
38+
// apply admission controller
39+
// LOGGER.info("Apply Admission Controller Triggered URI: " + requestURI);
40+
}
41+
42+
private void releaseAdmissionControl(ChannelHandlerContext ctx) {
43+
// release the acquired objects
44+
// LOGGER.info("Released Admission Controller Handler");
45+
}
46+
47+
private long getContentLength(FullHttpRequest fullHttpRequest) {
48+
String contentLengthHeader = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_LENGTH);
49+
return contentLengthHeader == null ? 0 : Long.parseLong(contentLengthHeader);
50+
}
51+
52+
@Override
53+
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
54+
releaseAdmissionControl(ctx);
55+
super.close(ctx, promise);
56+
}
57+
58+
@Override
59+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
60+
releaseAdmissionControl(ctx);
61+
super.write(ctx, msg, promise);
62+
}
63+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
grant {
34+
// needed to generate runtime classes
35+
permission java.lang.RuntimePermission "createClassLoader";
36+
37+
// needed to find the classloader to load allowlisted classes from
38+
permission java.lang.RuntimePermission "getClassLoader";
39+
};
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# The OpenSearch Contributors require contributions made to
5+
# this file be licensed under the Apache-2.0 license or a
6+
# compatible open source license.
7+
#
8+
org.opensearch.throttling.OpenSearchThrottlingModulePlugin

modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,17 @@
9393
import org.opensearch.threadpool.ThreadPool;
9494
import org.opensearch.transport.NettyAllocator;
9595
import org.opensearch.transport.NettyByteBufSizer;
96+
import org.opensearch.transport.NettySettings;
9697
import org.opensearch.transport.SharedGroupFactory;
9798
import org.opensearch.transport.netty4.Netty4Utils;
9899

99100
import java.net.InetSocketAddress;
100101
import java.net.SocketOption;
102+
import java.util.List;
103+
import java.util.Map;
104+
import java.util.Objects;
101105
import java.util.concurrent.TimeUnit;
106+
import java.util.stream.Collectors;
102107

103108
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
104109
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
@@ -182,6 +187,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
182187

183188
private volatile ServerBootstrap serverBootstrap;
184189
private volatile SharedGroupFactory.SharedGroup sharedGroup;
190+
private List<ChannelHandler> channelHandlers;
185191

186192
public Netty4HttpServerTransport(
187193
Settings settings,
@@ -223,6 +229,25 @@ public Netty4HttpServerTransport(
223229
);
224230
}
225231

232+
public Netty4HttpServerTransport(
233+
Settings settings,
234+
NetworkService networkService,
235+
BigArrays bigArrays,
236+
ThreadPool threadPool,
237+
NamedXContentRegistry xContentRegistry,
238+
Dispatcher dispatcher,
239+
ClusterSettings clusterSettings,
240+
SharedGroupFactory sharedGroupFactory,
241+
Map<String, ChannelHandler> channelHandlers
242+
) {
243+
this(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, sharedGroupFactory);
244+
this.channelHandlers = NettySettings.HANDLER_ORDERING.get(settings)
245+
.stream()
246+
.map(channelHandlers::get)
247+
.filter(Objects::nonNull)
248+
.collect(Collectors.toList());
249+
}
250+
226251
public Settings settings() {
227252
return this.settings;
228253
}
@@ -418,7 +443,6 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
418443
final ChannelPipeline pipeline = ctx.pipeline();
419444
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
420445
pipeline.replace(this, "decoder_compress", new HttpContentDecompressor());
421-
422446
pipeline.addAfter("decoder_compress", "aggregator", aggregator);
423447
if (handlingSettings.isCompression()) {
424448
pipeline.addAfter(
@@ -430,7 +454,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
430454
pipeline.addBefore("handler", "request_creator", requestCreator);
431455
pipeline.addBefore("handler", "response_creator", responseCreator);
432456
pipeline.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
433-
457+
transport.channelHandlers.forEach(
458+
handler -> ch.pipeline().addBefore("request_creator", handler.getClass().getSimpleName(), handler)
459+
);
434460
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
435461
}
436462
});
@@ -497,7 +523,6 @@ protected void initChannel(Channel childChannel) throws Exception {
497523
childChannel.pipeline()
498524
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
499525
}
500-
501526
childChannel.pipeline()
502527
.addLast("aggregator", aggregator)
503528
.addLast("request_creator", requestCreator)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import io.netty.channel.ChannelHandler;
12+
13+
import java.util.Collections;
14+
import java.util.Map;
15+
16+
public interface Netty4HandlerExtension {
17+
default Map<String, ChannelHandler> getHandlers() {
18+
return Collections.emptyMap();
19+
}
20+
}

modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.transport;
3434

35+
import io.netty.channel.ChannelHandler;
3536
import org.opensearch.Version;
3637
import org.opensearch.common.SetOnce;
3738
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
@@ -46,23 +47,43 @@
4647
import org.opensearch.http.HttpServerTransport;
4748
import org.opensearch.http.netty4.Netty4HttpServerTransport;
4849
import org.opensearch.core.indices.breaker.CircuitBreakerService;
50+
import org.opensearch.plugins.ExtensiblePlugin;
4951
import org.opensearch.plugins.NetworkPlugin;
5052
import org.opensearch.plugins.Plugin;
5153
import org.opensearch.threadpool.ThreadPool;
5254
import org.opensearch.transport.netty4.Netty4Transport;
5355

54-
import java.util.Arrays;
55-
import java.util.Collections;
56+
import java.util.ArrayList;
57+
import java.util.HashSet;
5658
import java.util.List;
59+
import java.util.Set;
60+
import java.util.Arrays;
5761
import java.util.Map;
62+
import java.util.Collections;
63+
import java.util.HashMap;
5864
import java.util.function.Supplier;
5965

60-
public class Netty4ModulePlugin extends Plugin implements NetworkPlugin {
66+
public class Netty4ModulePlugin extends Plugin implements NetworkPlugin, ExtensiblePlugin {
6167

6268
public static final String NETTY_TRANSPORT_NAME = "netty4";
6369
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";
6470

6571
private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();
72+
private static final List<Netty4HandlerExtension> EXTENSIONS = new ArrayList<>();
73+
74+
@Override
75+
public void loadExtensions(ExtensionLoader loader) {
76+
Set<String> uniqueNames = new HashSet<>();
77+
for (Netty4HandlerExtension extension : loader.loadExtensions(Netty4HandlerExtension.class)) {
78+
String name = extension.getClass().getName();
79+
if (uniqueNames.contains(name)) {
80+
continue;
81+
}
82+
Netty4ModulePlugin.EXTENSIONS.add(extension);
83+
uniqueNames.add(name);
84+
}
85+
assert 1 == Netty4ModulePlugin.EXTENSIONS.size() : "More than 1 extensions are not supported";
86+
}
6687

6788
@Override
6889
public List<Setting<?>> getSettings() {
@@ -124,6 +145,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
124145
HttpServerTransport.Dispatcher dispatcher,
125146
ClusterSettings clusterSettings
126147
) {
148+
Map<String, ChannelHandler> channelHandlers = new HashMap<>();
149+
Netty4ModulePlugin.EXTENSIONS.forEach(extension -> channelHandlers.putAll(extension.getHandlers()));
127150
return Collections.singletonMap(
128151
NETTY_HTTP_TRANSPORT_NAME,
129152
() -> new Netty4HttpServerTransport(
@@ -134,7 +157,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
134157
xContentRegistry,
135158
dispatcher,
136159
clusterSettings,
137-
getSharedGroupFactory(settings)
160+
getSharedGroupFactory(settings),
161+
channelHandlers
138162
)
139163
);
140164
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import org.opensearch.common.settings.Setting;
12+
13+
import java.util.Arrays;
14+
import java.util.List;
15+
import java.util.function.Function;
16+
17+
public final class NettySettings {
18+
private NettySettings() {}
19+
20+
// TODO: Evaluate which one is better, Moving this to yml(AMI Config) or keeping it here.
21+
public static final Setting<List<String>> HANDLER_ORDERING = Setting.listSetting(
22+
"opensearch.netty.plugin.handler.ordering",
23+
Arrays.asList("opensearch-throttling-plugin:AdmissionControlRestHandler"),
24+
Function.identity(),
25+
Setting.Property.NodeScope
26+
);
27+
}

0 commit comments

Comments
 (0)