mirror of https://github.com/hykilpikonna/AquaDX
[+] TCP
parent
18d95a1ccd
commit
661af76ed6
|
@ -7,6 +7,7 @@ import java.io.InputStreamReader
|
||||||
import java.io.OutputStreamWriter
|
import java.io.OutputStreamWriter
|
||||||
import java.net.ServerSocket
|
import java.net.ServerSocket
|
||||||
import java.net.Socket
|
import java.net.Socket
|
||||||
|
import java.net.SocketTimeoutException
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
import kotlin.collections.set
|
import kotlin.collections.set
|
||||||
|
@ -14,15 +15,18 @@ import kotlin.concurrent.withLock
|
||||||
|
|
||||||
|
|
||||||
const val PROTO_VERSION = 1
|
const val PROTO_VERSION = 1
|
||||||
|
const val MAX_STREAMS = 10
|
||||||
|
const val SO_TIMEOUT = 10000
|
||||||
|
|
||||||
private object Command {
|
private object Command {
|
||||||
// Control plane
|
// Control plane
|
||||||
const val CTL_START = 1
|
const val CTL_START = 1
|
||||||
const val CTL_BIND = 2
|
const val CTL_BIND = 2
|
||||||
const val CTL_HEARTBEAT = 3
|
const val CTL_HEARTBEAT = 3
|
||||||
const val CTL_TCP_ACCEPT = 4 // Accept a new multiplexed TCP stream
|
const val CTL_TCP_CONNECT = 4 // Accept a new multiplexed TCP stream
|
||||||
const val CTL_TCP_CONNECT = 5
|
const val CTL_TCP_ACCEPT = 5
|
||||||
const val CTL_TCP_CLOSE = 6
|
const val CTL_TCP_ACCEPT_ACK = 6
|
||||||
|
const val CTL_TCP_CLOSE = 7
|
||||||
|
|
||||||
// Data plane
|
// Data plane
|
||||||
const val DATA_SEND = 21
|
const val DATA_SEND = 21
|
||||||
|
@ -47,22 +51,25 @@ data class Message(
|
||||||
|
|
||||||
val data: Any? = null
|
val data: Any? = null
|
||||||
)
|
)
|
||||||
fun ctlMsg(cmd: Int, data: Any? = null) = Message(cmd, 0, data = data)
|
fun ctlMsg(cmd: Int, data: Any? = null) = Message(cmd, data = data)
|
||||||
|
|
||||||
data class ActiveClient(
|
data class ActiveClient(
|
||||||
val clientKey: String,
|
val clientKey: String,
|
||||||
val socket: Socket,
|
val socket: Socket,
|
||||||
val reader: BufferedReader,
|
val reader: BufferedReader,
|
||||||
val writer: BufferedWriter
|
val writer: BufferedWriter,
|
||||||
|
// <Stream ID, Destination client stub IP>
|
||||||
|
val tcpStreams: MutableMap<Int, UInt> = mutableMapOf(),
|
||||||
|
val pendingStreams: MutableSet<Int> = mutableSetOf(),
|
||||||
) {
|
) {
|
||||||
val log = logger()
|
val log = logger()
|
||||||
val stubIp = keychipToStubIp(clientKey)
|
val stubIp = keychipToStubIp(clientKey)
|
||||||
val mutex = ReentrantLock()
|
val writeMutex = ReentrantLock()
|
||||||
|
|
||||||
var lastHeartbeat = millis()
|
var lastHeartbeat = millis()
|
||||||
|
|
||||||
fun send(msg: Message) {
|
fun send(msg: Message) {
|
||||||
mutex.withLock {
|
writeMutex.withLock {
|
||||||
writer.write(msg.toJson())
|
writer.write(msg.toJson())
|
||||||
writer.newLine()
|
writer.newLine()
|
||||||
writer.flush()
|
writer.flush()
|
||||||
|
@ -70,21 +77,57 @@ data class ActiveClient(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun ActiveClient.handle(message: Message) {
|
fun ActiveClient.handle(msg: Message) {
|
||||||
when (message.cmd) {
|
// Find target by dst IP address or TCP stream ID
|
||||||
Command.CTL_HEARTBEAT -> lastHeartbeat = millis()
|
val target = (msg.dst ?: msg.sid?.let { tcpStreams[it] } )?.let { clients[it] }
|
||||||
Command.DATA_SEND -> {
|
|
||||||
// Find target by dst IP address
|
|
||||||
val target = message.dst?.let { clients[it] } ?: return log.warn("Target not found: ${message.dst}")
|
|
||||||
|
|
||||||
// Send to target TODO: SID
|
when (msg.cmd) {
|
||||||
target.send(message.copy(sid = 0))
|
Command.CTL_HEARTBEAT -> lastHeartbeat = millis()
|
||||||
}
|
|
||||||
Command.DATA_BROADCAST -> {
|
Command.DATA_BROADCAST -> {
|
||||||
// Broadcast to all clients. This is only used in UDP so SID is always 0
|
// Broadcast to all clients. This is only used in UDP so SID is always 0
|
||||||
assert(message.proto == Proto.UDP)
|
if (msg.proto != Proto.UDP) return log.warn("TCP Broadcast received, something is wrong.")
|
||||||
clients.values.filter { it.clientKey != clientKey }
|
clients.values.filter { it.clientKey != clientKey }.forEach { it.send(msg.copy(src = stubIp)) }
|
||||||
.forEach { it.send(message.copy(sid = 0, src = stubIp)) }
|
}
|
||||||
|
Command.DATA_SEND, Command.CTL_TCP_ACCEPT_ACK -> {
|
||||||
|
target ?: return log.warn("Send: Target not found: ${msg.dst}")
|
||||||
|
|
||||||
|
if (msg.proto == Proto.TCP && msg.sid !in tcpStreams)
|
||||||
|
return log.warn("Stream ID not found: ${msg.sid}")
|
||||||
|
|
||||||
|
target.send(msg.copy(src = stubIp, dst = target.stubIp))
|
||||||
|
|
||||||
|
// 1020844165
|
||||||
|
// 2245580860
|
||||||
|
}
|
||||||
|
Command.CTL_TCP_CONNECT -> {
|
||||||
|
target ?: return log.warn("Connect: Target not found: ${msg.dst}")
|
||||||
|
msg.sid ?: return log.warn("Connect: Stream ID not found")
|
||||||
|
|
||||||
|
if (msg.sid in tcpStreams || msg.sid in pendingStreams)
|
||||||
|
return log.warn("Stream ID already in use: ${msg.sid}")
|
||||||
|
|
||||||
|
// Add the stream to the pending list
|
||||||
|
pendingStreams.add(msg.sid)
|
||||||
|
if (pendingStreams.size > MAX_STREAMS) {
|
||||||
|
log.warn("Too many pending streams, closing connection")
|
||||||
|
return socket.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
target.send(msg.copy(src = stubIp, dst = target.stubIp))
|
||||||
|
}
|
||||||
|
Command.CTL_TCP_ACCEPT -> {
|
||||||
|
target ?: return log.warn("Accept: Target not found: ${msg.dst}")
|
||||||
|
msg.sid ?: return log.warn("Accept: Stream ID not found")
|
||||||
|
|
||||||
|
if (msg.sid !in target.pendingStreams)
|
||||||
|
return log.warn("Stream ID not found in pending list: ${msg.sid}")
|
||||||
|
|
||||||
|
// Add the stream to the active list
|
||||||
|
target.pendingStreams.remove(msg.sid)
|
||||||
|
target.tcpStreams[msg.sid] = stubIp
|
||||||
|
tcpStreams[msg.sid] = target.stubIp
|
||||||
|
|
||||||
|
target.send(msg.copy(src = stubIp, dst = target.stubIp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,7 +148,10 @@ class MaimaiFutari(private val port: Int = 20101) {
|
||||||
log.info("Server started on port $port")
|
log.info("Server started on port $port")
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
val clientSocket = serverSocket.accept()
|
val clientSocket = serverSocket.accept().apply {
|
||||||
|
soTimeout = SO_TIMEOUT
|
||||||
|
log.info("[+] Client connected: $remoteSocketAddress")
|
||||||
|
}
|
||||||
thread { handleClient(clientSocket) }
|
thread { handleClient(clientSocket) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,9 +172,10 @@ class MaimaiFutari(private val port: Int = 20101) {
|
||||||
Command.CTL_START -> {
|
Command.CTL_START -> {
|
||||||
val id = message.data as String
|
val id = message.data as String
|
||||||
val client = ActiveClient(id, socket, reader, writer)
|
val client = ActiveClient(id, socket, reader, writer)
|
||||||
|
clients[client.stubIp]?.socket?.close()
|
||||||
clients[client.stubIp] = client
|
clients[client.stubIp] = client
|
||||||
handler = clients[client.stubIp]
|
handler = clients[client.stubIp]
|
||||||
log.info("[+] Client registered: $id")
|
log.info("[+] Client registered: ${socket.remoteSocketAddress} -> $id")
|
||||||
|
|
||||||
// Send back the version
|
// Send back the version
|
||||||
handler?.send(ctlMsg(Command.CTL_START, mapOf("version" to PROTO_VERSION)))
|
handler?.send(ctlMsg(Command.CTL_START, mapOf("version" to PROTO_VERSION)))
|
||||||
|
@ -141,13 +188,13 @@ class MaimaiFutari(private val port: Int = 20101) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
if (e.message == "Connection reset") {
|
if (e.message != "Connection reset" && e !is SocketTimeoutException)
|
||||||
log.info("[-] Client disconnected: ${handler?.clientKey}")
|
log.error("Error in client handler", e)
|
||||||
} else log.error("Error in client handler", e)
|
|
||||||
} finally {
|
} finally {
|
||||||
// Remove client
|
// Remove client
|
||||||
handler?.stubIp?.let { clients.remove(it) }
|
handler?.stubIp?.let { clients.remove(it) }
|
||||||
socket.close()
|
socket.close()
|
||||||
|
log.info("[-] Client disconnected: ${handler?.clientKey}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue