From b061b1acd040c7c9bef2717a531d22df592f50d6 Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Sun, 28 Apr 2024 15:24:42 +0200 Subject: [PATCH 01/10] Chat application benchmark using Kotlin, Ktor, and Kotlin coroutines --- README.md | 10 +- benchmarks/kotlin-ktor/README.md | 104 +++++++++ .../kotlin/ktor/KtorRenaissanceBenchmark.kt | 120 ++++++++++ .../renaissance/kotlin/ktor/client/Client.kt | 76 ++++++ .../kotlin/ktor/client/ClientManager.kt | 57 +++++ .../kotlin/ktor/client/ClientTask.kt | 8 + .../ktor/client/DirectMessageClientTask.kt | 28 +++ .../JoinGroupAndSendMessageClientTask.kt | 14 ++ .../client/SendMessageAndAwaitClientTask.kt | 30 +++ .../ktor/client/generateRandomMessage.kt | 10 + .../kotlin/ktor/client/waitForMessage.kt | 12 + .../renaissance/kotlin/ktor/common/Chat.kt | 14 ++ .../kotlin/ktor/common/DirectMessageChat.kt | 12 + .../renaissance/kotlin/ktor/common/Message.kt | 7 + .../renaissance/kotlin/ktor/common/User.kt | 11 + .../common/command/AddUserToChatCommand.kt | 9 + .../kotlin/ktor/common/command/Command.kt | 3 + .../ktor/common/command/CommandReply.kt | 3 + .../ktor/common/command/CreateChatCommand.kt | 14 ++ .../command/CreateDirectMessageChatCommand.kt | 14 ++ .../ktor/common/command/JoinChatCommand.kt | 9 + .../ktor/common/command/RenameUserCommand.kt | 9 + .../sendSerialisedCommandReplyNative.kt | 9 + .../common/sendSerializedCommandNative.kt | 11 + .../kotlin/ktor/common/serialisationFormat.kt | 22 ++ .../kotlin/ktor/server/ChatApplication.kt | 137 +++++++++++ .../kotlin/ktor/server/ChatServer.kt | 216 ++++++++++++++++++ .../src/main/resources/logback.xml | 11 + 28 files changed, 977 insertions(+), 3 deletions(-) create mode 100644 benchmarks/kotlin-ktor/README.md create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientTask.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/DirectMessageClientTask.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/JoinGroupAndSendMessageClientTask.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/SendMessageAndAwaitClientTask.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/generateRandomMessage.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/waitForMessage.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Chat.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/DirectMessageChat.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Message.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/User.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/AddUserToChatCommand.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/Command.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CommandReply.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateChatCommand.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateDirectMessageChatCommand.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/JoinChatCommand.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/RenameUserCommand.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerialisedCommandReplyNative.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerializedCommandNative.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/serialisationFormat.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatApplication.kt create mode 100644 benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatServer.kt create mode 100644 benchmarks/kotlin-ktor/src/main/resources/logback.xml diff --git a/README.md b/README.md index e460b080..1a7d88f5 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ To run a Renaissance benchmark, you need to have a JRE version 11 (or later) installed and execute the following `java` command: ``` -$ java -jar 'renaissance-gpl-0.15.0.jar' +$ java -jar 'renaissance-gpl-0.16.0.jar' ``` In the above command, `` is the list of benchmarks that you want to run. @@ -175,6 +175,10 @@ The following is the complete list of benchmarks, separated into groups. \ Default repetitions: 12; APACHE2 license, MIT distribution; Supported JVM: 11 and later +- `kotlin-ktor` - Simple Ktor chat application with multiple clients, performing various tasks. + \ + Default repetitions: 20; MIT license, MIT distribution; Supported JVM: 11 and later + The suite also contains a group of benchmarks intended solely for testing @@ -268,7 +272,7 @@ arguments to that plugin (or policy). The following is a complete list of command-line options. ``` -Renaissance Benchmark Suite, version 0.15.0 +Renaissance Benchmark Suite, version 0.16.0 Usage: renaissance [options] [benchmark-specification] -h, --help Prints this usage text. @@ -315,7 +319,7 @@ $ tools/sbt/bin/sbt renaissanceJmhPackage To run the benchmarks using JMH, you can execute the following `java` command: ``` -$ java -jar 'renaissance-jmh/target/renaissance-jmh-0.15.0.jar' +$ java -jar 'renaissance-jmh/target/renaissance-jmh-0.16.0.jar' ``` diff --git a/benchmarks/kotlin-ktor/README.md b/benchmarks/kotlin-ktor/README.md new file mode 100644 index 00000000..f7fc61d0 --- /dev/null +++ b/benchmarks/kotlin-ktor/README.md @@ -0,0 +1,104 @@ +# Kotlin-Ktor benchmark + +The benchmark is designed to simulate a chat application where multiple clients send requests simultaneously. +It uses the [Ktor](https://ktor.io/) framework. + +Namely, the benchmark runs multiple clients at the same time. +Clients are set up with one or both of the following tasks: + +- Join a chat and send a randomized message to it. +- Create a private chat with another user (if not already created) and send a randomized message to them. + +They repeat those task the required number of times. +After that number of successfully completed tasks is validated. + +It's also possible to implement validation via end-to-end comparison of the artifacts is produced. +However, in our case it would be tricky to implement, since every run is randomized both in content and in the order of +operations, so a semantic comparison of logs would be required. +Gladly, it's not needed for us, since each task auto-validates itself, so it will successfully terminate only if +everything went as expected. +Hence, counting the number of successfully completed tasks is enough to validate the run. + +## Running the benchmark + +To run benchmark, execute the following command: + +```bash +java -jar kotlin-ktor +``` + +Here are the arguments that can be passed to the benchmark: + +- `client_count`: Number of clients that are simultaneously sending the requests. + Increasing number of clients (with proportional decrease in number of repetitions) doesn't have much effect on the + overall runtime. Default value is the number of available CPUs. +- `iterations_count`: Number of times clients should repeat their designated operations. Default value is 2000. +- `chat_count`: How many public chats should be setup for user interactions. This simulates interactions in a few + large chats. Increasing number of chats will reduce runtime, since it reduces contention between clients. Default + value is 10. +- `group_message_fraction`: Fraction of clients which execute joinChatAndSendMessage. Default value + is 1.0. +- `private_message_fraction`: Fraction of clients that should send private messages. Default value + is 0.5. +- `random_seed`: Random seed to use for client tasks setup for reproducibility. Default value is 32. + +## Concurrency model + +The common way for concurrent programming in Kotlin is to use coroutines. +The high-level idea is that you of coroutines is that multiple blocking operations may run on the same thread pool. +For more details, please see [the official coroutines guide](https://kotlinlang.org/docs/coroutines-overview.html) +This is exactly what Ktor uses. + +Unfortunately, the Ktor framework does not provide a way neither to limit the number of threads used for internal +operations nor to have separate thread pools for the client and server operations. +Ktor, mostly uses the `Dispatchers.IO`and `Dispatchers.Default`, which operate on intersecting thread pools. +The `Dispatchers.Default` has a thread pool of size equal to number of CPU cores, but it's at least 2. +The `Dispatchers.IO` is a bit more complicated. +By default, it has a thead pool of size 64 (which could be controlled via a system +property`kotlinx.coroutines.io.parallelism`). +However, it's possible to obtain a separate view of the `Dispatchers.IO` by using +`Dispatchers.IO.limitedParallelism(n)`, which may create up to `n` additional threads. +In theory the usage of the `Dispatchers.IO` should be reserved for IO-bound operations and the `Dispatchers.Default` +should be used for CPU-bound operations. +However, in practice, these rules are rather semantic and cannot and are not enforced by coroutines. +So, Ktor may switch between these two dispatchers as it sees fit, with potential changes in the future versions. + +With this mind, we start the server using `start` method. +To make sure that Ktor internal operations don't interfere much with us starting client jobs, +we use a separate thread pool to start clients, though internally Ktor will switch to another thread pool. + +Considering there are enough available threads, both server and the client are capable of running mostly concurrently +with a few synchronization points. +Namely, if multiple clients are trying to perform the same operation on the same object +(modifying the message history, chat pool, etc.), it will be processed sequentially. +While operations such as sending a message to the server or routing the message on the server side and sending replies +are parallel (up to contention for the system resources). + +It's possible to influence the amount of contention present by adjusting the parameters of the bencmark. +Increasing the number of chats will decrease contention for the modification of the chat history (the opposite is true +as well). +Increasing group/private message fraction will increase contention for the chat history and the chat pool, respectively. +Increasing the number of clients will increase contention in all synchronization points. + +## Core classes overview + +- `KtorRenaissanceServer` is the root class which initialises, runs and tears down the server and the clients. + See [Running the benchmark](#running-the-benchmark) for more details on the availiable arguments for client and server + setup. +- `ChatApplication` is the class which sets up the Ktor application for the `ChatServer`. Namely it installs the + required plugins, such as `WebSockets`, and sets up routes from websockets calls to the server operations. +- `ChatServer` is the class which handles the server side logic for the chat application. It is responsible for + maintaining the state of the chat application, such as the list of users, the list of chats, and the messages sent in + the chats. It also handles the logic for the different operations that the clients can perform, such as joining a + chat, sending a message, and sending a private message. Important detail is that the server will broadcast the message + which is sent to some chat. So, if the user sends a message to the chat, they will receive the message back and that's + how they'll now that the 'transaction' was successful. +- `ClientManager` sets up clients according to the arguments received from the `KtorRenaissanceBenchmark`, runs them, + and tears them down. +- `Client` is essentially represents a collection of tasks which are run sequentially for the required number of times. +- `ClientTask` is the interface which represents a task that a client can perform. So far the are 2 types of client + tasks: + - `GroupMessageTask` which represents the task of joining a chat and sending a message to it. + - `PrivateMessageTask` which represents the task of sending a private message to another user. +- `Command` instances and `Message` are the data classes with which client and server communicate with each other. Both + classes are being send as JSON objects. \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt new file mode 100644 index 00000000..7e57324e --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt @@ -0,0 +1,120 @@ +package org.renaissance.kotlin.ktor + +import io.ktor.server.engine.* +import kotlinx.coroutines.* +import org.renaissance.Benchmark +import org.renaissance.Benchmark.* +import org.renaissance.BenchmarkContext +import org.renaissance.BenchmarkResult +import org.renaissance.BenchmarkResult.Validators +import org.renaissance.License +import org.renaissance.kotlin.ktor.client.ClientManager +import org.renaissance.kotlin.ktor.server.ChatApplication +import kotlin.math.min + +@Group("web") +@Name("kotlin-ktor") +@Summary("Simple Ktor chat application with multiple clients, performing various tasks.") +@Licenses(License.MIT) +@Parameter( + name = "port", + defaultValue = "8080", + summary = "Port to run the server on." +) +@Parameter( + name = "client_count", + defaultValue = "\$cpu.count", + summary = "Number of clients that are simultaneously sending the requests" +) +@Parameter( + name = "iterations_count", + defaultValue = "2000", + summary = "Number of times clients should repeat their designated operations" +) +@Parameter( + name = "chat_count", + defaultValue = "10", + summary = "How many public chats should be setup for user interactions." + + "Reducing/increasing number of chats will increase/decrease runtime" +) +@Parameter( + name = "group_message_fraction", + defaultValue = "1.0", + summary = "Clients which execute joinChatAndSendMessage" +) +@Parameter( + name = "private_message_fraction", + defaultValue = "0.5", + summary = "How many public chats should be setup for user interactions" +) +@Parameter( + name = "random_seed", + defaultValue = "32", + summary = "Random seed to use for client tasks setup." +) +class KtorRenaissanceBenchmark() : Benchmark { + private var port: Int = 0 + private var clientCount: Int = 0 + private var numberOfRepetitions: Int = 0 + private var fractionOfClientsSendingPrivateMessages: Double = 0.0 + private var fractionOfClientsSendingGroupMessages: Double = 0.0 + private lateinit var clientPool: ExecutorCoroutineDispatcher + private lateinit var server: ApplicationEngine + private lateinit var application: ChatApplication + private lateinit var clientManager: ClientManager + + @OptIn(DelicateCoroutinesApi::class) + override fun setUpBeforeAll(context: BenchmarkContext?) { + port = context!!.parameter("port").toPositiveInteger() + clientCount = context.parameter("client_count").toPositiveInteger() + numberOfRepetitions = context.parameter("iterations_count").toPositiveInteger() + val numberOfChats = context.parameter("chat_count").toPositiveInteger() + fractionOfClientsSendingGroupMessages = context.parameter("group_message_fraction").toDouble() + fractionOfClientsSendingPrivateMessages = + context.parameter("private_message_fraction").toDouble() + val randomSeed = context.parameter("random_seed").toPositiveInteger() + + application = ChatApplication(numberOfChats) + server = embeddedServer(io.ktor.server.cio.CIO, host = "127.0.0.1", port = port) { + application.apply { + main() + } + } + server.start() + + clientPool = newFixedThreadPoolContext(min(clientCount, Runtime.getRuntime().availableProcessors()), "clientPool") + clientManager = ClientManager( + port, + clientCount, + numberOfRepetitions, + fractionOfClientsSendingGroupMessages, + fractionOfClientsSendingPrivateMessages, + randomSeed, + CoroutineScope(clientPool) + ) + + super.setUpBeforeAll(context) + } + + override fun setUpBeforeEach(context: BenchmarkContext?) { + application.setup() + clientManager.setupClients(application.getAvailableChatIds()) + } + + override fun run(context: BenchmarkContext?): BenchmarkResult { + val numberOfSuccessfulTasks = runBlocking { + clientManager.runClients() + } + + return Validators.simple( + "Number of successful client tasks", + (((clientCount * fractionOfClientsSendingGroupMessages) + (clientCount * fractionOfClientsSendingPrivateMessages)) * numberOfRepetitions).toLong(), + numberOfSuccessfulTasks.toLong() + ) + } + + override fun tearDownAfterAll(context: BenchmarkContext?) { + server.stop() + clientPool.close() + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt new file mode 100644 index 00000000..5b05c63a --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt @@ -0,0 +1,76 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.* +import io.ktor.client.engine.cio.* +import io.ktor.client.plugins.websocket.* +import io.ktor.http.* +import io.ktor.serialization.kotlinx.* +import org.renaissance.kotlin.ktor.common.serializationFormat + + +/** + * Client is an abstraction for the list of tasks to perform **sequentially**. + * + * For example: + * 1. Send a message to some User + * 2. Send picture to some chat + * 3. Join another chat + * 4. Leave the chat + */ +internal class Client private constructor( + private val httpClient: HttpClient, + private val port: Int, + private val userId: String, + private val operationsRepetitions: Int, + private val tasksToRun: List, +) { + /** + * Runs the client by establishing a WebSocket connection and performing the specified tasks. + * + * @return The number of successful tasks. Should be equal to `operationsRepetitions * tasksToRun.length` + */ + suspend fun run(): Int { + var successfulTasks = 0 + httpClient.webSocket(method = HttpMethod.Get, host = "127.0.0.1", port = port, path = "/ws/${userId}") { + tasksToRun.forEach { it.setup(this) } + + for (i in 0.. = mutableListOf() + + fun addTaskToRun(task: ClientTask): Builder { + tasksToRun.add(task) + return this + } + + fun build(): Client { + return Client(httpClient, port, userId, operationsRepetitions, tasksToRun) + } + } +} + +fun createDefaultClient(): HttpClient = HttpClient(CIO) { + engine { + + } + install(WebSockets) { + contentConverter = KotlinxWebsocketSerializationConverter(serializationFormat) + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt new file mode 100644 index 00000000..46b56216 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt @@ -0,0 +1,57 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.* +import io.ktor.util.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlin.random.Random + +/** + * A **probabilistic** client manager, which set's up the appropriate amount of clients, performing a specified number and kind of tasks + */ +class ClientManager( + private val port: Int, + numberOfClients: Int, + private val numberOfRequestsPerClient: Int, + private val fractionOfClientsSendingGroupMessages: Double, + private val fractionOfClientsSendingPrivateMessages: Double, + randomSeed: Int, + private val coroutineScope: CoroutineScope +) { + private val random = Random(randomSeed) + private val userIds: Set = (0.. = mutableListOf() + private val clients: MutableList = mutableListOf() + + fun setupClients(availableChatIds: List) { + clients.clear() + predefinedHttpClients.forEach { it.close() } + predefinedHttpClients.clear() + predefinedHttpClients.addAll(userIds.indices.map { createDefaultClient() }) + + val clientBuilders = userIds.zip(predefinedHttpClients).map { (userId, httpClient) -> + Client.Builder(port, userId, numberOfRequestsPerClient, httpClient) + }.toMutableList() + + clientBuilders.take((userIds.size * fractionOfClientsSendingGroupMessages).toInt()).forEach { builder -> + builder.addTaskToRun(JoinGroupAndSendMessageClientTask(availableChatIds.random(random), random)) + } + clientBuilders.shuffle(random) + + clientBuilders.take((userIds.size * fractionOfClientsSendingPrivateMessages).toInt()).forEach { builder -> + builder.addTaskToRun(DirectMessageClientTask(userIds.shuffled(random).first { it != builder.userId }, random)) + } + clientBuilders.shuffle(random) + + clients.addAll(clientBuilders.map { it.build() }) + } + + suspend fun runClients(): Int { + return clients.map { coroutineScope.async(start = CoroutineStart.LAZY) { it.run() } } + .map { it.also { it.start() } } + .awaitAll() + .sum() + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientTask.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientTask.kt new file mode 100644 index 00000000..f1390d28 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientTask.kt @@ -0,0 +1,8 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.plugins.websocket.* + +interface ClientTask { + suspend fun setup(session: DefaultClientWebSocketSession) {} + suspend fun run(session: DefaultClientWebSocketSession) {} +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/DirectMessageClientTask.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/DirectMessageClientTask.kt new file mode 100644 index 00000000..531a514c --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/DirectMessageClientTask.kt @@ -0,0 +1,28 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.plugins.websocket.* +import io.ktor.serialization.* +import org.renaissance.kotlin.ktor.common.command.CommandReply +import org.renaissance.kotlin.ktor.common.command.CreateDirectMessageChatCommand +import org.renaissance.kotlin.ktor.common.command.CreateDirectMessageChatCommandReply +import org.renaissance.kotlin.ktor.common.sendSerialisedCommandNative +import kotlin.random.Random + +class DirectMessageClientTask(private val userId: String, random: Random) : SendMessageAndAwaitClientTask(random) { + private lateinit var privateChatId: String + + override suspend fun run(session: DefaultClientWebSocketSession) { + session.sendSerialisedCommandNative(CreateDirectMessageChatCommand(userId)) + session.waitForMessage { + val reply = kotlin.runCatching { session.converter!!.deserialize(it) }.getOrNull() + if (reply is CreateDirectMessageChatCommandReply) { + privateChatId = reply.chatId + true + } else { + false + } + } + + session.sendMessageToChatAndAwait(privateChatId) + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/JoinGroupAndSendMessageClientTask.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/JoinGroupAndSendMessageClientTask.kt new file mode 100644 index 00000000..8adc3553 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/JoinGroupAndSendMessageClientTask.kt @@ -0,0 +1,14 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.plugins.websocket.* +import org.renaissance.kotlin.ktor.common.command.JoinChatCommand +import org.renaissance.kotlin.ktor.common.sendSerialisedCommandNative +import kotlin.random.Random + + +class JoinGroupAndSendMessageClientTask(private val chatId: String, random: Random): SendMessageAndAwaitClientTask(random) { + override suspend fun run(session: DefaultClientWebSocketSession) { + session.sendSerialisedCommandNative(JoinChatCommand(chatId)) + session.sendMessageToChatAndAwait(chatId) + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/SendMessageAndAwaitClientTask.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/SendMessageAndAwaitClientTask.kt new file mode 100644 index 00000000..694cc213 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/SendMessageAndAwaitClientTask.kt @@ -0,0 +1,30 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.plugins.websocket.* +import io.ktor.util.* +import io.ktor.websocket.* +import org.renaissance.kotlin.ktor.common.Message +import org.renaissance.kotlin.ktor.common.command.RenameUserCommand +import org.renaissance.kotlin.ktor.common.sendSerialisedCommandNative +import java.util.concurrent.atomic.AtomicInteger +import kotlin.random.Random + +abstract class SendMessageAndAwaitClientTask( + private val random: Random, + private val clientId: String = generateNonce() +) : ClientTask { + private val messageId = AtomicInteger(0) + + protected suspend fun DefaultClientWebSocketSession.sendMessageToChatAndAwait(chatId: String) { + val expectedMsg = "${messageId.getAndIncrement()}_${random.getRandomString(random.nextInt(1, MAX_MESSAGE_LENGTH))}" + sendSerialisedCommandNative(RenameUserCommand("client$clientId")) + sendSerialized(Message(chatId, expectedMsg)) + val expectedMessage = "[client$clientId] $expectedMsg" + waitForMessage { + val receivedText = (it as? Frame.Text)?.readText() ?: return@waitForMessage false + receivedText == expectedMessage + } + } +} + +const val MAX_MESSAGE_LENGTH = 356 \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/generateRandomMessage.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/generateRandomMessage.kt new file mode 100644 index 00000000..40b61df1 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/generateRandomMessage.kt @@ -0,0 +1,10 @@ +package org.renaissance.kotlin.ktor.client + +import kotlin.random.Random + +private val allowedChars = ('A'..'Z') + ('a'..'z') + ('0'..'9') +internal fun Random.getRandomString(length: Int) : String { + return (1..length) + .map { allowedChars.random(this) } + .joinToString("") +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/waitForMessage.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/waitForMessage.kt new file mode 100644 index 00000000..3e7a5a8d --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/waitForMessage.kt @@ -0,0 +1,12 @@ +package org.renaissance.kotlin.ktor.client + +import io.ktor.client.plugins.websocket.* +import io.ktor.websocket.* + +internal suspend fun DefaultClientWebSocketSession.waitForMessage(match: suspend (Frame) -> Boolean) { + for (frame in incoming) { + if (match(frame)) { + break + } + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Chat.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Chat.kt new file mode 100644 index 00000000..f2bdc361 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Chat.kt @@ -0,0 +1,14 @@ +package org.renaissance.kotlin.ktor.common + +import io.ktor.util.collections.* +import java.util.concurrent.ConcurrentLinkedDeque + +open class Chat(val id: String) { + val users = ConcurrentSet() + var description: String? = null + /** + * A list of the latest messages sent to the server, so new members can have a bit context of what + * other people was talking about before joining. + */ + val lastMessages = ConcurrentLinkedDeque() +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/DirectMessageChat.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/DirectMessageChat.kt new file mode 100644 index 00000000..f5028ac9 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/DirectMessageChat.kt @@ -0,0 +1,12 @@ +package org.renaissance.kotlin.ktor.common + +class DirectMessageChat(val userId1: String, val userId2: String, chatId: String): Chat(chatId) { + companion object { + fun hashForChat(userId1: String, userId2: String): String = + if (userId1 < userId2) { + "$userId1-$userId2" + } else { + "$userId2-$userId1" + } + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Message.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Message.kt new file mode 100644 index 00000000..34edbcef --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/Message.kt @@ -0,0 +1,7 @@ +package org.renaissance.kotlin.ktor.common + +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") // IDE cannot detect serialization compiler plugin, that is enabled via sbt +@Serializable +data class Message(val chatId: String, val content: String) diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/User.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/User.kt new file mode 100644 index 00000000..2e5d69b8 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/User.kt @@ -0,0 +1,11 @@ +package org.renaissance.kotlin.ktor.common + +/** + * Represents a user in the chat system. + * + * @property userId The unique identifier for the user. + * @property userName The display name of the user. + */ +data class User(val userId: String) { + var userName: String = userId +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/AddUserToChatCommand.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/AddUserToChatCommand.kt new file mode 100644 index 00000000..0c4b59ba --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/AddUserToChatCommand.kt @@ -0,0 +1,9 @@ +package org.renaissance.kotlin.ktor.common.command + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("add_user_to_chat") +data class AddUserToChatCommand(val userId: String, val chatId: String) : Command {} diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/Command.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/Command.kt new file mode 100644 index 00000000..3628dd35 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/Command.kt @@ -0,0 +1,3 @@ +package org.renaissance.kotlin.ktor.common.command + +sealed interface Command \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CommandReply.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CommandReply.kt new file mode 100644 index 00000000..fe061c49 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CommandReply.kt @@ -0,0 +1,3 @@ +package org.renaissance.kotlin.ktor.common.command + +sealed interface CommandReply \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateChatCommand.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateChatCommand.kt new file mode 100644 index 00000000..9559bc09 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateChatCommand.kt @@ -0,0 +1,14 @@ +package org.renaissance.kotlin.ktor.common.command + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("create_chat") +data object CreateChatCommand : Command {} + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("create_chat_reply") +data class CreateChatCommandReply(val chatId: String) : CommandReply \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateDirectMessageChatCommand.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateDirectMessageChatCommand.kt new file mode 100644 index 00000000..84f3d0dc --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/CreateDirectMessageChatCommand.kt @@ -0,0 +1,14 @@ +package org.renaissance.kotlin.ktor.common.command + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("create_direct_message_chat") +data class CreateDirectMessageChatCommand(val inviteeUserId: String) : Command + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("create_direct_message_chat_reply") +data class CreateDirectMessageChatCommandReply(val chatId: String) : CommandReply \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/JoinChatCommand.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/JoinChatCommand.kt new file mode 100644 index 00000000..067f507a --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/JoinChatCommand.kt @@ -0,0 +1,9 @@ +package org.renaissance.kotlin.ktor.common.command + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("join_chat") +data class JoinChatCommand(val chatId: String): Command {} diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/RenameUserCommand.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/RenameUserCommand.kt new file mode 100644 index 00000000..165428c2 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/command/RenameUserCommand.kt @@ -0,0 +1,9 @@ +package org.renaissance.kotlin.ktor.common.command + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Suppress("PLUGIN_IS_NOT_ENABLED") +@Serializable +@SerialName("rename") +data class RenameUserCommand(val newName: String) : Command diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerialisedCommandReplyNative.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerialisedCommandReplyNative.kt new file mode 100644 index 00000000..1f32a465 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerialisedCommandReplyNative.kt @@ -0,0 +1,9 @@ +package org.renaissance.kotlin.ktor.common + +import io.ktor.websocket.* +import kotlinx.serialization.PolymorphicSerializer +import org.renaissance.kotlin.ktor.common.command.CommandReply + +internal suspend inline fun WebSocketSession.sendSerialisedCommandReplyNative(cmd: T) { + send(serializationFormat.encodeToString(PolymorphicSerializer(CommandReply::class), cmd)) +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerializedCommandNative.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerializedCommandNative.kt new file mode 100644 index 00000000..e0976146 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/sendSerializedCommandNative.kt @@ -0,0 +1,11 @@ +package org.renaissance.kotlin.ktor.common + +import io.ktor.websocket.* +import kotlinx.serialization.PolymorphicSerializer +import org.renaissance.kotlin.ktor.common.command.Command + +// we are using embedded version of a serialization plugin with limited reflection capabilities +// so we have to explicitly specify type serializer +internal suspend inline fun WebSocketSession.sendSerialisedCommandNative(cmd: T) { + send(serializationFormat.encodeToString(PolymorphicSerializer(Command::class), cmd)) +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/serialisationFormat.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/serialisationFormat.kt new file mode 100644 index 00000000..ea9ff136 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/common/serialisationFormat.kt @@ -0,0 +1,22 @@ +package org.renaissance.kotlin.ktor.common + +import kotlinx.serialization.json.Json +import kotlinx.serialization.modules.SerializersModule +import kotlinx.serialization.modules.polymorphic +import kotlinx.serialization.modules.subclass +import org.renaissance.kotlin.ktor.common.command.* + +internal val serializationFormat = Json { + serializersModule = SerializersModule { + polymorphic(Command::class) { + subclass(CreateChatCommand::class) + subclass(CreateDirectMessageChatCommand::class) + subclass(JoinChatCommand::class) + subclass(AddUserToChatCommand::class) + subclass(RenameUserCommand::class) + } + polymorphic(CommandReply::class) { + subclass(CreateDirectMessageChatCommandReply::class) + } + } +} diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatApplication.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatApplication.kt new file mode 100644 index 00000000..189e3fa1 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatApplication.kt @@ -0,0 +1,137 @@ +package org.renaissance.kotlin.ktor.server + +import io.ktor.serialization.* +import io.ktor.serialization.kotlinx.* +import io.ktor.server.application.* +import io.ktor.server.plugins.callloging.* +import io.ktor.server.plugins.defaultheaders.* +import io.ktor.server.routing.* +import io.ktor.server.websocket.* +import io.ktor.websocket.* +import kotlinx.coroutines.channels.consumeEach +import org.renaissance.kotlin.ktor.common.Message +import org.renaissance.kotlin.ktor.common.command.* +import org.renaissance.kotlin.ktor.common.sendSerialisedCommandReplyNative +import org.renaissance.kotlin.ktor.common.serializationFormat +import org.slf4j.event.Level + +/** + * In this case, we have a class holding our application state so it is not global and can be tested easier. + */ +class ChatApplication(numberOfChatsToSetup: Int) { + /** + * This class handles the logic of a [ChatServer]. + * With the standard handlers [ChatServer.registerUser] or [ChatServer.disconnectUserSocket] and operations like + * sending messages to everyone or to specific people connected to the server. + */ + private val server = ChatServer(numberOfChatsToSetup) + + fun getAvailableChatIds(): List { + return server.chats.keys().toList() + } + + fun setup() { + server.setupChats() + } + + /** + * This is the main method of application in this class. + */ + fun Application.main() { + /** + * First, we install the plugins we need. + * They are bound to the whole application + * since this method has an implicit [Application] receiver that supports the [install] method. + */ + // This adds Date and Server headers to each response, and would allow you to configure + // additional headers served to each response. + install(DefaultHeaders) + // This uses the logger to log every call (request/response) + install(CallLogging) { + level = Level.TRACE + } + // This installs the WebSockets plugin to be able to establish a bidirectional configuration + // between the server and the client + install(WebSockets) { + contentConverter = KotlinxWebsocketSerializationConverter(serializationFormat) + } + + /** + * Now we are going to define routes to handle specific methods + URLs for this application. + */ + routing { + + // Defines a websocket `/ws` route that allows a protocol upgrade to convert a HTTP request/response request + // into a bidirectional packetized connection. + webSocket("/ws/{userId}") { // this: WebSocketSession -> + + // First of all we get the user id + val userId = call.parameters["userId"] + + // We check that we actually have a userId. We should always have one, + // since we have defined an interceptor before to set one. + if (userId == null) { + close(CloseReason(CloseReason.Codes.VIOLATED_POLICY, "No userId")) + return@webSocket + } + + // We notify that a member joined by calling the server handler [memberJoin]. + // This allows associating the session ID to a specific WebSocket connection. + server.registerUser(userId, this) + + try { + // We start receiving messages (frames). + // Since this is a coroutine, it is suspended until receiving frames. + // Once the connection is closed, this consumeEach will finish and the code will continue. + incoming.consumeEach { frame -> + // Frames can be [Text], [Binary], [Ping], [Pong], [Close]. + // We are only interested in textual messages, so we filter it. + if (frame is Frame.Text) { + // Now it is time to process the text sent from the user. + // At this point, we have context about this connection, + // the session, the text and the server. + // So we have everything we need. + receivedMessage(userId, frame) + } + } + } finally { + // Either if there was an error, or if the connection was closed gracefully, + // we notified the server that the member had left. + server.disconnectUserSocket(userId, this) + } + } + } + } + + /** + * We received a message. Let's process it. + */ + private suspend fun DefaultWebSocketServerSession.receivedMessage(userId: String, frame: Frame.Text) { + val deserializedCommand = kotlin.runCatching { converter!!.deserialize(frame) }.getOrNull() + when (deserializedCommand) { + is RenameUserCommand -> { + server.renameUser(userId, deserializedCommand.newName) + } + + is JoinChatCommand -> { + server.joinChat(deserializedCommand.chatId, userId) + } + + is CreateChatCommand -> { + val createdChatId = server.createChat(userId) + sendSerialisedCommandReplyNative(CreateChatCommandReply(createdChatId)) + } + + is CreateDirectMessageChatCommand -> { + val createdChat = server.createDirectMessageChat( + userId, + deserializedCommand.inviteeUserId, + ) + sendSerialisedCommandReplyNative(CreateDirectMessageChatCommandReply(createdChat)) + } + + // Handle a normal message. + else -> server.sendMessage(userId, converter!!.deserialize(frame)) + } + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatServer.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatServer.kt new file mode 100644 index 00000000..73e7abc6 --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/server/ChatServer.kt @@ -0,0 +1,216 @@ +package org.renaissance.kotlin.ktor.server + +import io.ktor.util.* +import io.ktor.websocket.* +import kotlinx.coroutines.channels.ClosedSendChannelException +import org.renaissance.kotlin.ktor.common.Chat +import org.renaissance.kotlin.ktor.common.DirectMessageChat +import org.renaissance.kotlin.ktor.common.Message +import org.renaissance.kotlin.ktor.common.User +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.atomic.AtomicInteger + +const val MAX_MESSAGE_HISTORY_LENGTH = 100 +const val MAX_USERNAME_LENGTH = 50 + +/** + * This class is in charge of the chat server logic. + * It contains handlers for events and commands to send messages to specific users on the server. + */ +class ChatServer(private val numberOfChatsToSetup: Int) { + /** + * The atomic counter used to get unique usernames based on the maximum users the server had. + */ + private val usersCounter = AtomicInteger() + + /** + * A concurrent map associating session IDs to usernames. + */ + private val users = ConcurrentHashMap() + private val directChatIdToCommonChatId = ConcurrentHashMap() + val chats = ConcurrentHashMap() + + /** + * Associates a session ID to a set of websockets. + * Since a browser is able to open several tabs and windows with the same cookies and thus the same session. + * There might be several opened sockets for the same client. + */ + private val userSockets = ConcurrentHashMap>() + + fun setupChats() { + chats.clear() + directChatIdToCommonChatId.clear() + for (i in 0.. + it.send(message) + } + } + } + } + + /** + * Handles a [userId] identified by its session ID renaming [newName] a specific name. + */ + suspend fun renameUser(userId: String, newName: String) { + if (newName.isBlank()) + return sendTo(userId, "server::help", "/user [newName]") + + if (newName.length > MAX_USERNAME_LENGTH) + return sendTo(userId, "server::help", "new name is too long: 50 characters limit") + + // Re-sets the member name. + val user = users[userId] ?: return + val oldName = user.userName + synchronized(user) { + // if userName was updated before we got the lock + if (user.userName != oldName) return + user.userName = newName + } + // Notifies everyone in the server about this change. + userSockets[userId]?.forEach { + it.send("You've been successfully renamed from $oldName to $newName") + } + } + + /** + * Handles that a [userId] with a specific [socket] left the server. + */ + suspend fun disconnectUserSocket(userId: String, socket: WebSocketSession) { + // Removes the socket connection for this member + val connections = userSockets[userId] ?: return + connections.remove(socket) + if (connections.isEmpty()) { + // If there are no more connections for this member, we remove it from the server. + userSockets.remove(userId) ?: return + val user = users.remove(userId) ?: return + + usersCounter.decrementAndGet() + + // If the member was in a chat, we notify the rest of the members about this event. + user.let { userInChat -> + chats.values.forEach { chat -> + if (chat.users.remove(userInChat)) { + chat.broadcast("Member left: ${userInChat.userName}") + } + } + } + } + } + + /** + * Handles sending to a [recipientUserId] from a [senderUserId] a [message]. + * + * Both [recipientUserId] and [senderUserId] are identified by its session-id. + */ + suspend fun sendTo(recipientUserId: String, senderUserId: String, message: String) { + userSockets[recipientUserId]?.send(Frame.Text("[$senderUserId] $message")) + } + + /** + * Handles a [message] sent from a [userId] by notifying the rest of the users. + */ + suspend fun sendMessage(userId: String, message: Message) { + // Pre-format the message to be send, to prevent doing it for all the users or connected sockets. + val userName = users[userId]?.userName ?: userId + val formatted = "[$userName] ${message.content}" + + // Sends this pre-formatted message to all the members in the server. + val chat = chats[message.chatId] ?: return + + chat.broadcast(formatted) + + val lastMessages = chat.lastMessages + // Appends the message to the list of [lastMessages] and caps that collection to 100 items to prevent + // growing too much. + lastMessages.addLast(formatted) + + if (lastMessages.size > MAX_MESSAGE_HISTORY_LENGTH) { + lastMessages.removeFirst() + } + } + + /** + * Sends a [message] to all the members in the server, including all the connections per member. + */ + private suspend fun Chat.broadcast(message: String) { + users.asSequence().map { userSockets[it.userId] }.filterNotNull().forEach { socket -> + socket.send(Frame.Text(message)) + } + } + + /** + * Sends a [sendMessage] to a list of [this] [WebSocketSession]. + */ + private suspend fun List.send(frame: Frame) { + forEach { + try { + it.send(frame.copy()) + } catch (t: Throwable) { + try { + it.close(CloseReason(CloseReason.Codes.PROTOCOL_ERROR, "")) + } catch (ignore: ClosedSendChannelException) { + // at some point it will get closed + } + } + } + } + + fun createChat(creatorUserId: String): String { + val chat = Chat(generateNonce()) + chats[chat.id] = chat + + chat.users.add(users[creatorUserId]!!) + return chat.id + } + + fun createDirectMessageChat(creatorUserId: String, inviteeUserId: String): String { + val chat = chats.computeIfAbsent(DirectMessageChat.hashForChat(creatorUserId, inviteeUserId)) { + DirectMessageChat( + creatorUserId, + inviteeUserId, + it + ).apply { + users.add(this@ChatServer.users[creatorUserId]!!) + users.add(this@ChatServer.users.computeIfAbsent(inviteeUserId) { + User(inviteeUserId).apply { + userName = "user${usersCounter.incrementAndGet()}" + } + }) + } + } + return chat.id + } +} \ No newline at end of file diff --git a/benchmarks/kotlin-ktor/src/main/resources/logback.xml b/benchmarks/kotlin-ktor/src/main/resources/logback.xml new file mode 100644 index 00000000..344a441b --- /dev/null +++ b/benchmarks/kotlin-ktor/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file From 6f7aa150d4a3532cec02b791bf73ca84a0c91e15 Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Sun, 28 Apr 2024 15:27:49 +0200 Subject: [PATCH 02/10] Sbt setup for the `kotlin-ktor` benchmark. Kotlin plugin for SBT was added. All dependencies specified --- build.sbt | 30 +++++++++++++++++++++++++++++- project/plugins.sbt | 1 + 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 1a025309..286e9436 100644 --- a/build.sbt +++ b/build.sbt @@ -166,6 +166,8 @@ val jacksonVersion = "2.15.2" val jerseyVersion = "2.40" val jnaVersion = "5.13.0" val nettyVersion = "4.1.99.Final" +val ktorVersion = "2.3.8" +val logbackVersion = "1.4.11" val scalaCollectionCompatVersion = "2.11.0" val scalaParallelCollectionsVersion = "1.0.4" val slf4jVersion = "2.0.9" @@ -491,6 +493,31 @@ lazy val twitterFinagleBenchmarks = (project in file("benchmarks/twitter-finagle ) .dependsOn(renaissanceCore % "provided") +lazy val kotlinKtorBenchmarks = (project in file("benchmarks/kotlin-ktor")) + .enablePlugins(KotlinPlugin) + .settings( + name := "kotlin-ktor", + commonSettingsNoScala, + kotlinLib("stdlib"), + kotlinPlugin("serialization-compiler-plugin-embeddable"), + kotlinVersion := "1.9.20", + kotlincJvmTarget := "11", + libraryDependencies ++= Seq( + "io.ktor" % "ktor-server-core-jvm" % ktorVersion, + "io.ktor" % "ktor-server-cio-jvm" % ktorVersion, + "io.ktor" % "ktor-server-tests-jvm" % ktorVersion, + "io.ktor" % "ktor-server-call-logging" % ktorVersion, + "io.ktor" % "ktor-serialization-kotlinx-json-jvm" % ktorVersion, + "io.ktor" % "ktor-server-auth-jvm" % ktorVersion, + "org.jetbrains.kotlinx" % "kotlinx-serialization-json" % "1.6.2", + "ch.qos.logback" % "logback-classic" % logbackVersion + ), + dependencyOverrides ++= Seq( + "org.jetbrains.kotlinx" % "kotlinx-coroutines-core" % "1.8.0-RC" + ) + ) + .dependsOn(renaissanceCore % "provided") + // // Project collections. // @@ -514,7 +541,8 @@ val renaissanceBenchmarks: Seq[Project] = Seq( scalaSatBenchmarks, scalaStdlibBenchmarks, scalaStmBenchmarks, - twitterFinagleBenchmarks + twitterFinagleBenchmarks, + kotlinKtorBenchmarks ) /** diff --git a/project/plugins.sbt b/project/plugins.sbt index 614ead4a..247a982f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.4") addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.2") +addSbtPlugin("org.bitlap" % "sbt-kotlin-plugin" % "4.0.0") From f9b3cc1f982e79bb84a787629bdcc0503af142a9 Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Sun, 28 Apr 2024 15:28:48 +0200 Subject: [PATCH 03/10] Disabling Kotlin plugin for the sbt for `twitter-finagle` benchmark due to the conflicts --- build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sbt b/build.sbt index 286e9436..31918d1c 100644 --- a/build.sbt +++ b/build.sbt @@ -464,6 +464,7 @@ lazy val scalaStmBenchmarks = (project in file("benchmarks/scala-stm")) val finagleVersion = "22.12.0" lazy val twitterFinagleBenchmarks = (project in file("benchmarks/twitter-finagle")) + .disablePlugins(KotlinPlugin) .settings( name := "twitter-finagle", commonSettingsScala213, From 59c109efd89931fadf359961e7fe785998dde07d Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Mon, 29 Apr 2024 11:22:49 +0200 Subject: [PATCH 04/10] Readme without version update --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1a7d88f5..bdb0743a 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ To run a Renaissance benchmark, you need to have a JRE version 11 (or later) installed and execute the following `java` command: ``` -$ java -jar 'renaissance-gpl-0.16.0.jar' +$ java -jar 'renaissance-gpl-0.15.0.jar' ``` In the above command, `` is the list of benchmarks that you want to run. @@ -272,7 +272,7 @@ arguments to that plugin (or policy). The following is a complete list of command-line options. ``` -Renaissance Benchmark Suite, version 0.16.0 +Renaissance Benchmark Suite, version 0.15.0 Usage: renaissance [options] [benchmark-specification] -h, --help Prints this usage text. @@ -319,7 +319,7 @@ $ tools/sbt/bin/sbt renaissanceJmhPackage To run the benchmarks using JMH, you can execute the following `java` command: ``` -$ java -jar 'renaissance-jmh/target/renaissance-jmh-0.16.0.jar' +$ java -jar 'renaissance-jmh/target/renaissance-jmh-0.15.0.jar' ``` From e03548e4c26ce9a769b7aadae8dd73fcd4d60b32 Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Mon, 29 Apr 2024 11:42:41 +0200 Subject: [PATCH 05/10] Clarified benchmark readme, fixed typos and punctuation --- benchmarks/kotlin-ktor/README.md | 48 +++++++++++++++++--------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/benchmarks/kotlin-ktor/README.md b/benchmarks/kotlin-ktor/README.md index f7fc61d0..db09e3c9 100644 --- a/benchmarks/kotlin-ktor/README.md +++ b/benchmarks/kotlin-ktor/README.md @@ -9,10 +9,12 @@ Clients are set up with one or both of the following tasks: - Join a chat and send a randomized message to it. - Create a private chat with another user (if not already created) and send a randomized message to them. -They repeat those task the required number of times. -After that number of successfully completed tasks is validated. +Clients repeat those tasks required number of times specified via `iterations_count` parameter. +For the validation, each successful task completion is counted and compared to the expected number at the end of the +run. -It's also possible to implement validation via end-to-end comparison of the artifacts is produced. +It's also possible to implement validation via end-to-end comparison of the artifacts produced by the run. +An example of such an artifact could be a log file of all sent/received messages or of completed tasks. However, in our case it would be tricky to implement, since every run is randomized both in content and in the order of operations, so a semantic comparison of logs would be required. Gladly, it's not needed for us, since each task auto-validates itself, so it will successfully terminate only if @@ -27,15 +29,16 @@ To run benchmark, execute the following command: java -jar kotlin-ktor ``` -Here are the arguments that can be passed to the benchmark: +Here are the parameters that can be passed to the benchmark: - `client_count`: Number of clients that are simultaneously sending the requests. - Increasing number of clients (with proportional decrease in number of repetitions) doesn't have much effect on the + Increasing number of clients (with a proportional decrease in the number of repetitions) has little effect on the overall runtime. Default value is the number of available CPUs. -- `iterations_count`: Number of times clients should repeat their designated operations. Default value is 2000. -- `chat_count`: How many public chats should be setup for user interactions. This simulates interactions in a few - large chats. Increasing number of chats will reduce runtime, since it reduces contention between clients. Default - value is 10. +- `iterations_count`: Number of times clients should repeat their designated tasks. Default value is 2000. +- `chat_count`: How many public chats should be set up for user interactions. + This simulates interactions in a few large chats. + Increasing the number of chats will reduce runtime, since it reduces contention between clients. + Default value is 10. - `group_message_fraction`: Fraction of clients which execute joinChatAndSendMessage. Default value is 1.0. - `private_message_fraction`: Fraction of clients that should send private messages. Default value @@ -45,7 +48,7 @@ Here are the arguments that can be passed to the benchmark: ## Concurrency model The common way for concurrent programming in Kotlin is to use coroutines. -The high-level idea is that you of coroutines is that multiple blocking operations may run on the same thread pool. +The high-level idea of coroutines is that multiple blocking operations may run on the same thread pool. For more details, please see [the official coroutines guide](https://kotlinlang.org/docs/coroutines-overview.html) This is exactly what Ktor uses. @@ -54,8 +57,8 @@ operations nor to have separate thread pools for the client and server operation Ktor, mostly uses the `Dispatchers.IO`and `Dispatchers.Default`, which operate on intersecting thread pools. The `Dispatchers.Default` has a thread pool of size equal to number of CPU cores, but it's at least 2. The `Dispatchers.IO` is a bit more complicated. -By default, it has a thead pool of size 64 (which could be controlled via a system -property`kotlinx.coroutines.io.parallelism`). +By default, it has a thread pool of size 64 (which could be controlled via a system +property `kotlinx.coroutines.io.parallelism`). However, it's possible to obtain a separate view of the `Dispatchers.IO` by using `Dispatchers.IO.limitedParallelism(n)`, which may create up to `n` additional threads. In theory the usage of the `Dispatchers.IO` should be reserved for IO-bound operations and the `Dispatchers.Default` @@ -74,7 +77,7 @@ Namely, if multiple clients are trying to perform the same operation on the same While operations such as sending a message to the server or routing the message on the server side and sending replies are parallel (up to contention for the system resources). -It's possible to influence the amount of contention present by adjusting the parameters of the bencmark. +It's possible to influence the amount of contention present by adjusting the parameters of the benchmark. Increasing the number of chats will decrease contention for the modification of the chat history (the opposite is true as well). Increasing group/private message fraction will increase contention for the chat history and the chat pool, respectively. @@ -82,23 +85,24 @@ Increasing the number of clients will increase contention in all synchronization ## Core classes overview -- `KtorRenaissanceServer` is the root class which initialises, runs and tears down the server and the clients. - See [Running the benchmark](#running-the-benchmark) for more details on the availiable arguments for client and server +- `KtorRenaissanceBenchmark` is the root class which initialises, runs and tears down the server and the clients. + See [Running the benchmark](#running-the-benchmark) for more details on the available parameters for client and server setup. -- `ChatApplication` is the class which sets up the Ktor application for the `ChatServer`. Namely it installs the - required plugins, such as `WebSockets`, and sets up routes from websockets calls to the server operations. +- `ChatApplication` is the class which sets up the Ktor application for the `ChatServer`. + Namely, it installs the required plugins, such as `WebSockets`, and sets up routes from websockets calls to the server operations. - `ChatServer` is the class which handles the server side logic for the chat application. It is responsible for maintaining the state of the chat application, such as the list of users, the list of chats, and the messages sent in the chats. It also handles the logic for the different operations that the clients can perform, such as joining a chat, sending a message, and sending a private message. Important detail is that the server will broadcast the message - which is sent to some chat. So, if the user sends a message to the chat, they will receive the message back and that's + which is sent to some chat. So, if the user sends a message to the chat, they will receive the message back, and + that's how they'll now that the 'transaction' was successful. -- `ClientManager` sets up clients according to the arguments received from the `KtorRenaissanceBenchmark`, runs them, +- `ClientManager` sets up clients according to the parameters received from the `KtorRenaissanceBenchmark`, runs them, and tears them down. - `Client` is essentially represents a collection of tasks which are run sequentially for the required number of times. -- `ClientTask` is the interface which represents a task that a client can perform. So far the are 2 types of client - tasks: +- `ClientTask` is the interface which represents a task that a client can perform. + So far, there are two types of client tasks: - `GroupMessageTask` which represents the task of joining a chat and sending a message to it. - `PrivateMessageTask` which represents the task of sending a private message to another user. - `Command` instances and `Message` are the data classes with which client and server communicate with each other. Both - classes are being send as JSON objects. \ No newline at end of file + classes are being sent as JSON objects. \ No newline at end of file From a803a3a6911d0c6c4117bc504c3b5f48cbd32fe8 Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Mon, 29 Apr 2024 11:44:35 +0200 Subject: [PATCH 06/10] Fixed typo --- .../kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt index 46b56216..50671719 100644 --- a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/ClientManager.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.awaitAll import kotlin.random.Random /** - * A **probabilistic** client manager, which set's up the appropriate amount of clients, performing a specified number and kind of tasks + * A **probabilistic** client manager, which sets up the appropriate number of clients, performing a specified number and kind of tasks */ class ClientManager( private val port: Int, From 60009785729a20d3f71b5bf5b7b15db5d9cd4efa Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Mon, 29 Apr 2024 11:50:02 +0200 Subject: [PATCH 07/10] Switch to using localhost to avoid problems with IPv6 environments --- .../org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt | 2 +- .../main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt index 7e57324e..944c0cc1 100644 --- a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt @@ -75,7 +75,7 @@ class KtorRenaissanceBenchmark() : Benchmark { val randomSeed = context.parameter("random_seed").toPositiveInteger() application = ChatApplication(numberOfChats) - server = embeddedServer(io.ktor.server.cio.CIO, host = "127.0.0.1", port = port) { + server = embeddedServer(io.ktor.server.cio.CIO, host = "localhost", port = port) { application.apply { main() } diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt index 5b05c63a..4207dfe5 100644 --- a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/client/Client.kt @@ -31,7 +31,7 @@ internal class Client private constructor( */ suspend fun run(): Int { var successfulTasks = 0 - httpClient.webSocket(method = HttpMethod.Get, host = "127.0.0.1", port = port, path = "/ws/${userId}") { + httpClient.webSocket(method = HttpMethod.Get, host = "localhost", port = port, path = "/ws/${userId}") { tasksToRun.forEach { it.setup(this) } for (i in 0.. Date: Mon, 29 Apr 2024 11:50:10 +0200 Subject: [PATCH 08/10] Switch to using uncommon port --- .../org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt index 944c0cc1..9683c8fa 100644 --- a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt @@ -18,7 +18,7 @@ import kotlin.math.min @Licenses(License.MIT) @Parameter( name = "port", - defaultValue = "8080", + defaultValue = "9496", summary = "Port to run the server on." ) @Parameter( From fcab781ef0b63ddae3f465440b6cc91a103066cb Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Wed, 22 May 2024 14:06:57 +0200 Subject: [PATCH 09/10] fix jmh package build by disabling KotlinPlugin --- build.sbt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.sbt b/build.sbt index 31918d1c..7593cab4 100644 --- a/build.sbt +++ b/build.sbt @@ -1003,6 +1003,7 @@ def mapJarContentsToAssemblyTask(classpath: Classpath) = * compiled wrappers are used by the [[renaissanceJmh]] project below. */ lazy val renaissanceJmhWrappers = (project in file("renaissance-jmh/wrappers")) + .disablePlugins(KotlinPlugin) .settings( name := "renaissance-jmh-wrappers", commonSettingsNoScala, @@ -1022,6 +1023,7 @@ lazy val renaissanceJmhWrappers = (project in file("renaissance-jmh/wrappers")) * its dependencies), along with the benchmark dependencies as JAR files. */ lazy val renaissanceJmh = (project in file("renaissance-jmh")) + .disablePlugins(KotlinPlugin) .settings( name := "renaissance-jmh", commonSettingsNoScala, From 3c9605bae525eb88f7581a56fe2e47dfb46bbfcc Mon Sep 17 00:00:00 2001 From: Ilia Zavidnyi Date: Thu, 23 May 2024 09:29:25 +0200 Subject: [PATCH 10/10] Added `test` and `jmh` run configurations for the kotlin-ktor --- .../renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt index 9683c8fa..de5a9c2d 100644 --- a/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt +++ b/benchmarks/kotlin-ktor/src/main/kotlin/org/renaissance/kotlin/ktor/KtorRenaissanceBenchmark.kt @@ -52,6 +52,13 @@ import kotlin.math.min defaultValue = "32", summary = "Random seed to use for client tasks setup." ) +@Configuration( + name = "test", + settings = [ + "iterations_count = 100", + ] +) +@Configuration(name = "jmh") class KtorRenaissanceBenchmark() : Benchmark { private var port: Int = 0 private var clientCount: Int = 0