From 661af76ed69e1b764eafe4e2baa21632051c49c0 Mon Sep 17 00:00:00 2001 From: Azalea <22280294+hykilpikonna@users.noreply.github.com> Date: Mon, 13 Jan 2025 05:21:23 -0500 Subject: [PATCH] [+] TCP --- .../samnyan/aqua/sega/maimai2/MaimaiFutari.kt | 95 ++++++++++++++----- 1 file changed, 71 insertions(+), 24 deletions(-) diff --git a/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt b/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt index 0eecf81e..59c3aa2f 100644 --- a/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt +++ b/src/main/java/icu/samnyan/aqua/sega/maimai2/MaimaiFutari.kt @@ -7,6 +7,7 @@ import java.io.InputStreamReader import java.io.OutputStreamWriter import java.net.ServerSocket import java.net.Socket +import java.net.SocketTimeoutException import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import kotlin.collections.set @@ -14,15 +15,18 @@ import kotlin.concurrent.withLock const val PROTO_VERSION = 1 +const val MAX_STREAMS = 10 +const val SO_TIMEOUT = 10000 private object Command { // Control plane const val CTL_START = 1 const val CTL_BIND = 2 const val CTL_HEARTBEAT = 3 - const val CTL_TCP_ACCEPT = 4 // Accept a new multiplexed TCP stream - const val CTL_TCP_CONNECT = 5 - const val CTL_TCP_CLOSE = 6 + const val CTL_TCP_CONNECT = 4 // Accept a new multiplexed TCP stream + const val CTL_TCP_ACCEPT = 5 + const val CTL_TCP_ACCEPT_ACK = 6 + const val CTL_TCP_CLOSE = 7 // Data plane const val DATA_SEND = 21 @@ -47,22 +51,25 @@ data class Message( val data: Any? = null ) -fun ctlMsg(cmd: Int, data: Any? = null) = Message(cmd, 0, data = data) +fun ctlMsg(cmd: Int, data: Any? = null) = Message(cmd, data = data) data class ActiveClient( val clientKey: String, val socket: Socket, val reader: BufferedReader, - val writer: BufferedWriter + val writer: BufferedWriter, + // + val tcpStreams: MutableMap = mutableMapOf(), + val pendingStreams: MutableSet = mutableSetOf(), ) { val log = logger() val stubIp = keychipToStubIp(clientKey) - val mutex = ReentrantLock() + val writeMutex = ReentrantLock() var lastHeartbeat = millis() fun send(msg: Message) { - mutex.withLock { + writeMutex.withLock { writer.write(msg.toJson()) writer.newLine() writer.flush() @@ -70,21 +77,57 @@ data class ActiveClient( } } -fun ActiveClient.handle(message: Message) { - when (message.cmd) { - Command.CTL_HEARTBEAT -> lastHeartbeat = millis() - Command.DATA_SEND -> { - // Find target by dst IP address - val target = message.dst?.let { clients[it] } ?: return log.warn("Target not found: ${message.dst}") +fun ActiveClient.handle(msg: Message) { + // Find target by dst IP address or TCP stream ID + val target = (msg.dst ?: msg.sid?.let { tcpStreams[it] } )?.let { clients[it] } - // Send to target TODO: SID - target.send(message.copy(sid = 0)) - } + when (msg.cmd) { + Command.CTL_HEARTBEAT -> lastHeartbeat = millis() Command.DATA_BROADCAST -> { // Broadcast to all clients. This is only used in UDP so SID is always 0 - assert(message.proto == Proto.UDP) - clients.values.filter { it.clientKey != clientKey } - .forEach { it.send(message.copy(sid = 0, src = stubIp)) } + if (msg.proto != Proto.UDP) return log.warn("TCP Broadcast received, something is wrong.") + clients.values.filter { it.clientKey != clientKey }.forEach { it.send(msg.copy(src = stubIp)) } + } + Command.DATA_SEND, Command.CTL_TCP_ACCEPT_ACK -> { + target ?: return log.warn("Send: Target not found: ${msg.dst}") + + if (msg.proto == Proto.TCP && msg.sid !in tcpStreams) + return log.warn("Stream ID not found: ${msg.sid}") + + target.send(msg.copy(src = stubIp, dst = target.stubIp)) + + // 1020844165 + // 2245580860 + } + Command.CTL_TCP_CONNECT -> { + target ?: return log.warn("Connect: Target not found: ${msg.dst}") + msg.sid ?: return log.warn("Connect: Stream ID not found") + + if (msg.sid in tcpStreams || msg.sid in pendingStreams) + return log.warn("Stream ID already in use: ${msg.sid}") + + // Add the stream to the pending list + pendingStreams.add(msg.sid) + if (pendingStreams.size > MAX_STREAMS) { + log.warn("Too many pending streams, closing connection") + return socket.close() + } + + target.send(msg.copy(src = stubIp, dst = target.stubIp)) + } + Command.CTL_TCP_ACCEPT -> { + target ?: return log.warn("Accept: Target not found: ${msg.dst}") + msg.sid ?: return log.warn("Accept: Stream ID not found") + + if (msg.sid !in target.pendingStreams) + return log.warn("Stream ID not found in pending list: ${msg.sid}") + + // Add the stream to the active list + target.pendingStreams.remove(msg.sid) + target.tcpStreams[msg.sid] = stubIp + tcpStreams[msg.sid] = target.stubIp + + target.send(msg.copy(src = stubIp, dst = target.stubIp)) } } } @@ -105,7 +148,10 @@ class MaimaiFutari(private val port: Int = 20101) { log.info("Server started on port $port") while (true) { - val clientSocket = serverSocket.accept() + val clientSocket = serverSocket.accept().apply { + soTimeout = SO_TIMEOUT + log.info("[+] Client connected: $remoteSocketAddress") + } thread { handleClient(clientSocket) } } } @@ -126,9 +172,10 @@ class MaimaiFutari(private val port: Int = 20101) { Command.CTL_START -> { val id = message.data as String val client = ActiveClient(id, socket, reader, writer) + clients[client.stubIp]?.socket?.close() clients[client.stubIp] = client handler = clients[client.stubIp] - log.info("[+] Client registered: $id") + log.info("[+] Client registered: ${socket.remoteSocketAddress} -> $id") // Send back the version handler?.send(ctlMsg(Command.CTL_START, mapOf("version" to PROTO_VERSION))) @@ -141,13 +188,13 @@ class MaimaiFutari(private val port: Int = 20101) { } } } catch (e: Exception) { - if (e.message == "Connection reset") { - log.info("[-] Client disconnected: ${handler?.clientKey}") - } else log.error("Error in client handler", e) + if (e.message != "Connection reset" && e !is SocketTimeoutException) + log.error("Error in client handler", e) } finally { // Remove client handler?.stubIp?.let { clients.remove(it) } socket.close() + log.info("[-] Client disconnected: ${handler?.clientKey}") } } }