From 241f66f0fa37b7f71befd4c710f086d87cef7663 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 8 Dec 2025 22:49:22 +0800 Subject: [PATCH 1/3] extension --- build.gradle.kts | 2 + .../netty/http/coroutines/NettyCoroutine.kt | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 src/main/kotlin/net/ccbluex/netty/http/coroutines/NettyCoroutine.kt diff --git a/build.gradle.kts b/build.gradle.kts index 9265f0a..7601bb7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -31,8 +31,10 @@ dependencies { api(libs.bundles.netty) api(libs.gson) api(libs.tika.core) + api(libs.coroutines.core) testImplementation(kotlin("test")) + testImplementation(libs.coroutines.test) testImplementation("com.squareup.retrofit2:retrofit:2.9.0") testImplementation("com.squareup.retrofit2:converter-gson:2.9.0") } diff --git a/src/main/kotlin/net/ccbluex/netty/http/coroutines/NettyCoroutine.kt b/src/main/kotlin/net/ccbluex/netty/http/coroutines/NettyCoroutine.kt new file mode 100644 index 0000000..4e266fe --- /dev/null +++ b/src/main/kotlin/net/ccbluex/netty/http/coroutines/NettyCoroutine.kt @@ -0,0 +1,41 @@ +package net.ccbluex.netty.http.coroutines + +import io.netty.util.concurrent.Future +import io.netty.util.concurrent.GenericFutureListener +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.suspendCancellableCoroutine + +/** + * Suspend until this Netty Future completes. + * + * Returns the Future result. Throws on failure or cancellation. + */ +suspend fun > F.suspend(): V { + if (isDone) return unwrapDone().getOrThrow() + + return suspendCancellableCoroutine { cont -> + addListener(futureContinuationListener(cont)) + + cont.invokeOnCancellation { + this.cancel(false) + } + } +} + +private fun > futureContinuationListener( + cont: CancellableContinuation +): GenericFutureListener = GenericFutureListener { future -> + if (cont.isActive) { + cont.resumeWith(future.unwrapDone()) + } +} + +private fun > F.unwrapDone(): Result = + when { + isSuccess -> Result.success(this.now) + isCancelled -> Result.failure(CancellationException("Netty Future was cancelled")) + else -> Result.failure( + this.cause() ?: IllegalStateException("Future failed without cause") + ) + } From c54aa69c8d24d66605c87a072f9ec61a029d8dcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 8 Dec 2025 23:05:56 +0800 Subject: [PATCH 2/3] suspend handler --- build.gradle.kts | 2 +- .../net/ccbluex/netty/http/HttpConductor.kt | 2 +- .../ccbluex/netty/http/HttpServerHandler.kt | 54 +++++++++++++++---- .../netty/http/model/RequestHandler.kt | 2 +- .../ccbluex/netty/http/rest/FileServant.kt | 2 +- .../net/ccbluex/netty/http/rest/Node.kt | 2 +- .../net/ccbluex/netty/http/rest/Route.kt | 2 +- .../net/ccbluex/netty/http/rest/ZipServant.kt | 2 +- .../ccbluex/netty/http/util/IterableUtils.kt | 9 ++++ src/test/kotlin/ZipServantTest.kt | 31 +++++------ 10 files changed, 76 insertions(+), 32 deletions(-) create mode 100644 src/main/kotlin/net/ccbluex/netty/http/util/IterableUtils.kt diff --git a/build.gradle.kts b/build.gradle.kts index 7601bb7..c8091df 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ val authorName = "ccbluex" val projectUrl = "https://github.com/ccbluex/netty-httpserver" group = "net.ccbluex" -version = "2.4.2" +version = "2.4.3-alpha.1" repositories { mavenCentral() diff --git a/src/main/kotlin/net/ccbluex/netty/http/HttpConductor.kt b/src/main/kotlin/net/ccbluex/netty/http/HttpConductor.kt index 51101be..0d50095 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/HttpConductor.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/HttpConductor.kt @@ -34,7 +34,7 @@ import net.ccbluex.netty.http.util.httpNoContent * @param context The request context to process. * @return The response to the request. */ -internal fun HttpServer.processRequestContext(context: RequestContext) = runCatching { +internal suspend fun HttpServer.processRequestContext(context: RequestContext) = runCatching { val content = context.contentBuffer.toByteArray() val method = context.httpMethod diff --git a/src/main/kotlin/net/ccbluex/netty/http/HttpServerHandler.kt b/src/main/kotlin/net/ccbluex/netty/http/HttpServerHandler.kt index 688c6dd..39f6e19 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/HttpServerHandler.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/HttpServerHandler.kt @@ -26,9 +26,16 @@ import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch import net.ccbluex.netty.http.HttpServer.Companion.logger import net.ccbluex.netty.http.middleware.Middleware import net.ccbluex.netty.http.model.RequestContext +import net.ccbluex.netty.http.util.forEachIsInstance import net.ccbluex.netty.http.websocket.WebSocketHandler import java.net.URLDecoder @@ -40,6 +47,7 @@ import java.net.URLDecoder internal class HttpServerHandler(private val server: HttpServer) : ChannelInboundHandlerAdapter() { private val localRequestContext = ThreadLocal() + private lateinit var channelScope: CoroutineScope /** * Extension property to get the WebSocket URL from an HttpRequest. @@ -47,6 +55,29 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun private val HttpRequest.webSocketUrl: String get() = "ws://${headers().get("Host")}${uri()}" + /** + * Adds the [CoroutineScope] of current [io.netty.channel.Channel]. + */ + override fun handlerAdded(ctx: ChannelHandlerContext) { + super.handlerAdded(ctx) + + val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + val ctxName = ctx.name() + val channelId = ctx.channel().id().asLongText() + logger.error( + "Uncaught coroutine error in [ctx: $ctxName, channel: $channelId]", + throwable + ) + } + + channelScope = CoroutineScope( + ctx.channel().eventLoop().asCoroutineDispatcher() + + CoroutineName("${ctx.name()}#${ctx.channel().id().asShortText()}") + + exceptionHandler + ) + ctx.channel().closeFuture().addListener { channelScope.cancel() } + } + /** * Reads the incoming messages and processes HTTP requests. * @@ -68,11 +99,11 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun if (connection.equals("Upgrade", ignoreCase = true) && upgrade.equals("WebSocket", ignoreCase = true)) { - server.middlewares.filterIsInstance().forEach { middleware -> + server.middlewares.forEachIsInstance { middleware -> val response = middleware.invoke(ctx, msg) if (response != null) { ctx.writeAndFlush(response) - return + return super.channelRead(ctx, msg) } } @@ -99,7 +130,7 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun URLDecoder.decode(msg.uri(), Charsets.UTF_8), msg.headers(), ) - + localRequestContext.set(requestContext) } } @@ -107,7 +138,7 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun is HttpContent -> { val requestContext = localRequestContext.get() ?: run { logger.warn("Received HttpContent without HttpRequest") - return + return super.channelRead(ctx, msg) } // Append content to the buffer @@ -117,18 +148,21 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun if (msg is LastHttpContent) { localRequestContext.remove() - server.middlewares.filterIsInstance().forEach { middleware -> + server.middlewares.forEachIsInstance { middleware -> val response = middleware.invoke(requestContext) if (response != null) { ctx.writeAndFlush(response) - return + return super.channelRead(ctx, msg) } } - var response = server.processRequestContext(requestContext) - server.middlewares.filterIsInstance().forEach { middleware -> - response = middleware.invoke(requestContext, response) + + channelScope.launch { + var response = server.processRequestContext(requestContext) + server.middlewares.forEachIsInstance { middleware -> + response = middleware.invoke(requestContext, response) + } + ctx.writeAndFlush(response) } - ctx.writeAndFlush(response) } } diff --git a/src/main/kotlin/net/ccbluex/netty/http/model/RequestHandler.kt b/src/main/kotlin/net/ccbluex/netty/http/model/RequestHandler.kt index 070c27f..d0100cd 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/model/RequestHandler.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/model/RequestHandler.kt @@ -3,5 +3,5 @@ package net.ccbluex.netty.http.model import io.netty.handler.codec.http.FullHttpResponse fun interface RequestHandler { - fun handle(request: RequestObject): FullHttpResponse + suspend fun handle(request: RequestObject): FullHttpResponse } diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt index 46f9238..79e471e 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt @@ -37,7 +37,7 @@ class FileServant(part: String, private val baseFolder: File) : Node(part) { override val isExecutable = true - override fun handle(request: RequestObject): FullHttpResponse { + override suspend fun handle(request: RequestObject): FullHttpResponse { val path = request.remainingPath val sanitizedPath = path.replace("..", "") val file = baseFolder.resolve(sanitizedPath) diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/Node.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/Node.kt index d2415b9..64a00c0 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/Node.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/Node.kt @@ -135,7 +135,7 @@ open class Node(val part: String) : RequestHandler { * @param request The request object. * @return The HTTP response. */ - override fun handle(request: RequestObject): FullHttpResponse = throw NotImplementedError() + override suspend fun handle(request: RequestObject): FullHttpResponse = throw NotImplementedError() /** * Checks if the node matches a part of the path and HTTP method. diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/Route.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/Route.kt index c39b3ad..01d1a86 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/Route.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/Route.kt @@ -33,7 +33,7 @@ import net.ccbluex.netty.http.model.RequestObject open class Route(name: String, private val method: HttpMethod, val handler: RequestHandler) : Node(name) { override val isExecutable = true - override fun handle(request: RequestObject) = handler.handle(request) + override suspend fun handle(request: RequestObject) = handler.handle(request) override fun matchesMethod(method: HttpMethod) = this.method == method && super.matchesMethod(method) diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt index ad685ec..8352782 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt @@ -114,7 +114,7 @@ class ZipServant(part: String, zipInputStream: InputStream) : Node(part) { return files } - override fun handle(request: RequestObject): FullHttpResponse { + override suspend fun handle(request: RequestObject): FullHttpResponse { val path = request.remainingPath.removePrefix("/") val cleanPath = path.substringBefore("?") val sanitizedPath = cleanPath.replace("..", "") diff --git a/src/main/kotlin/net/ccbluex/netty/http/util/IterableUtils.kt b/src/main/kotlin/net/ccbluex/netty/http/util/IterableUtils.kt new file mode 100644 index 0000000..dca0c9b --- /dev/null +++ b/src/main/kotlin/net/ccbluex/netty/http/util/IterableUtils.kt @@ -0,0 +1,9 @@ +package net.ccbluex.netty.http.util + +inline fun Iterable<*>.forEachIsInstance(action: (E) -> Unit) { + for (it in this) { + if (it is E) { + action(it) + } + } +} diff --git a/src/test/kotlin/ZipServantTest.kt b/src/test/kotlin/ZipServantTest.kt index a207ae8..6ea27d2 100644 --- a/src/test/kotlin/ZipServantTest.kt +++ b/src/test/kotlin/ZipServantTest.kt @@ -22,6 +22,7 @@ import io.netty.handler.codec.http.EmptyHttpHeaders import io.netty.handler.codec.http.HttpHeaders import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpResponseStatus +import kotlinx.coroutines.test.runTest import net.ccbluex.netty.http.model.RequestObject import net.ccbluex.netty.http.rest.ZipServant import org.junit.jupiter.api.Test @@ -109,7 +110,7 @@ class ZipServantTest { } @Test - fun `should serve index html for root path`() { + fun `should serve index html for root path`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -123,7 +124,7 @@ class ZipServantTest { } @Test - fun `should serve index html for slash path`() { + fun `should serve index html for slash path`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -134,7 +135,7 @@ class ZipServantTest { } @Test - fun `should serve specific files with correct content types`() { + fun `should serve specific files with correct content types`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -162,7 +163,7 @@ class ZipServantTest { } @Test - fun `should handle files with dot-slash prefix`() { + fun `should handle files with dot-slash prefix`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -175,7 +176,7 @@ class ZipServantTest { } @Test - fun `should return 404 for non-existent files`() { + fun `should return 404 for non-existent files`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -185,7 +186,7 @@ class ZipServantTest { } @Test - fun `should sanitize path traversal attempts`() { + fun `should sanitize path traversal attempts`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -195,7 +196,7 @@ class ZipServantTest { } @Test - fun `should handle paths without leading slash`() { + fun `should handle paths without leading slash`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -206,7 +207,7 @@ class ZipServantTest { } @Test - fun `should serve index html for SPA routes with hash fragments`() { + fun `should serve index html for SPA routes with hash fragments`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -219,7 +220,7 @@ class ZipServantTest { } @Test - fun `should handle unknown file extensions with default content type`() { + fun `should handle unknown file extensions with default content type`() = runTest { val baos = ByteArrayOutputStream() ZipOutputStream(baos).use { zos -> zos.putNextEntry(ZipEntry("test.unknown")) @@ -235,7 +236,7 @@ class ZipServantTest { } @Test - fun `should handle various content types correctly`() { + fun `should handle various content types correctly`() = runTest { val baos = ByteArrayOutputStream() ZipOutputStream(baos).use { zos -> // Test various file types - using Tika's expected content types @@ -281,7 +282,7 @@ class ZipServantTest { } @Test - fun `should handle empty zip file gracefully`() { + fun `should handle empty zip file gracefully`() = runTest { val baos = ByteArrayOutputStream() ZipOutputStream(baos).use { /* empty zip */ } @@ -292,7 +293,7 @@ class ZipServantTest { } @Test - fun `should serve index html for directory paths with trailing slash`() { + fun `should serve index html for directory paths with trailing slash`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -312,7 +313,7 @@ class ZipServantTest { } @Test - fun `should serve index html for SPA routes with fragments`() { + fun `should serve index html for SPA routes with fragments`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -339,7 +340,7 @@ class ZipServantTest { } @Test - fun `should serve index html for implicit directory access`() { + fun `should serve index html for implicit directory access`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) @@ -359,7 +360,7 @@ class ZipServantTest { } @Test - fun `should return 404 for directory without index html`() { + fun `should return 404 for directory without index html`() = runTest { val zipData = createTestZip() val zipServant = ZipServant("static", zipData.inputStream()) From 28f3345b97f52b30f07fa39952e69f4d45911bbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 8 Dec 2025 23:16:33 +0800 Subject: [PATCH 3/3] padding --- src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt | 2 +- src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt index 79e471e..6b01af9 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/FileServant.kt @@ -37,7 +37,7 @@ class FileServant(part: String, private val baseFolder: File) : Node(part) { override val isExecutable = true - override suspend fun handle(request: RequestObject): FullHttpResponse { + override suspend fun handle(request: RequestObject): FullHttpResponse { val path = request.remainingPath val sanitizedPath = path.replace("..", "") val file = baseFolder.resolve(sanitizedPath) diff --git a/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt b/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt index 8352782..155b884 100644 --- a/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt +++ b/src/main/kotlin/net/ccbluex/netty/http/rest/ZipServant.kt @@ -114,7 +114,7 @@ class ZipServant(part: String, zipInputStream: InputStream) : Node(part) { return files } - override suspend fun handle(request: RequestObject): FullHttpResponse { + override suspend fun handle(request: RequestObject): FullHttpResponse { val path = request.remainingPath.removePrefix("/") val cleanPath = path.substringBefore("?") val sanitizedPath = cleanPath.replace("..", "")