use builder to create tcp session

Signed-off-by: conblem <mail@conblem.me>
pull/8047/head
conblem 2023-05-10 15:26:35 +02:00
parent 032c7d905c
commit 330c3852a1
No known key found for this signature in database
GPG Key ID: 8D6908C57DBFCE2C
2 changed files with 141 additions and 54 deletions

View File

@ -34,7 +34,14 @@ declare global {
statusText: string
text: () => Promise<string>
}>
tcp(hostname: string, port: number, readCallback: (data: Uint8Array) => void, connectTimeoutSeconds?: number): Promise<IPNTCPSession>;
tcp(config: {
hostname: string
port: number
readCallback: (data: Uint8Array) => void,
connectTimeoutSeconds?: number
writeBufferSizeInBytes?: number
readBufferSizeInBytes?: number
}): Promise<IPNTCPSession>;
}
interface IPNSSHSession {

View File

@ -157,7 +157,7 @@ func newIPN(jsConfig js.Value) map[string]any {
return map[string]any{
"run": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 1 {
if len(args) != 1 || args[0].Type() != js.TypeObject {
log.Fatal(`Usage: run({
notifyState(state: int): void,
notifyNetMap(netMap: object): void,
@ -205,27 +205,23 @@ func newIPN(jsConfig js.Value) map[string]any {
return jsIPN.fetch(url)
}),
"tcp": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 3 && len(args) != 4 {
log.Printf("Usage: tcp(hostname, port, readCallback, connectTimeoutSeconds)")
if len(args) != 1 || args[0].Type() != js.TypeObject {
log.Fatal(`Usage: tcp({
hostname: string
port: number
readCallback: (data: Uint8Array) => void,
connectTimeoutSeconds?: number
writeBufferSizeInBytes?: number
readBufferSizeInBytes?: number
})`)
return nil
}
connectTimeoutSeconds := defaultTCPConnectTimeout
if len(args) == 4 && args[3].Type() == js.TypeNumber {
connectTimeoutSeconds = args[3].Float()
}
return jsIPN.tcp(
args[0].String(),
args[1].Int(),
args[2],
connectTimeoutSeconds)
return jsIPN.tcp(args[0])
}),
}
}
const defaultTCPConnectTimeout float64 = 5
type jsIPN struct {
dialer *tsdial.Dialer
srv *ipnserver.Server
@ -353,17 +349,9 @@ func (i *jsIPN) logout() {
go i.lb.Logout()
}
func (i *jsIPN) tcp(host string, port int, readCallback js.Value, connectTimeoutSeconds float64) any {
portInt := strconv.Itoa(port)
addr := net.JoinHostPort(host, portInt)
func (i *jsIPN) tcp(config js.Value) any {
return makePromise(func() (any, error) {
if readCallback.Type() != js.TypeFunction {
return nil, errors.New("Invalid readCallback received")
}
connectTimeout := time.Duration(connectTimeoutSeconds * float64(time.Second))
jsTCPSession, err := i.NewJSTCPSession(addr, readCallback, connectTimeout)
jsTCPSession, err := i.newJSTCPSession(config)
if err != nil {
return nil, err
}
@ -371,7 +359,105 @@ func (i *jsIPN) tcp(host string, port int, readCallback js.Value, connectTimeout
jsObjectWrapper := jsTCPSession.jsWrapper()
return jsObjectWrapper, nil
})
}
const defaultTCPConnectTimeoutSeconds = 5 * time.Second
const defaultTCPWriteBufferSizeBytes = 1 << 20
const defaultTCPReadBufferSizeBytes = 1 << 20
type jsTCPSessionBuilder struct {
jsIPN *jsIPN
addr string
readCallback func(js.Value)
writeBufferSize int
readBufferSize int
connectTimeout time.Duration
}
func (i *jsIPN) newJSTCPSession(config js.Value) (*jsTCPSession, error) {
builder := jsTCPSessionBuilder{
jsIPN: i,
writeBufferSize: defaultTCPWriteBufferSizeBytes,
readBufferSize: defaultTCPReadBufferSizeBytes,
connectTimeout: defaultTCPConnectTimeoutSeconds,
}
if err := builder.setAddr(config); err != nil {
return nil, err
}
if err := builder.setReadCallback(config); err != nil {
return nil, err
}
builder.setBufferSizes(config)
builder.setConnectTimeout(config)
return builder.connect()
}
func (b *jsTCPSessionBuilder) setAddr(config js.Value) error {
jsHostname := config.Get("hostname")
if jsHostname.Type() != js.TypeString {
return errors.New("Hostname must be a string")
}
jsPort := config.Get("port")
if jsPort.Type() != js.TypeNumber {
return errors.New("Port must be a number")
}
port := strconv.Itoa(jsPort.Int())
b.addr = net.JoinHostPort(jsHostname.String(), port)
return nil
}
func (b *jsTCPSessionBuilder) setReadCallback(config js.Value) error {
jsReadCallback := config.Get("readCallback");
if jsReadCallback.Type() != js.TypeFunction {
return errors.New("Invalid readCallback received")
}
b.readCallback = func(subarray js.Value) {
jsReadCallback.Invoke(subarray)
}
return nil
}
func (b *jsTCPSessionBuilder) setBufferSizes(config js.Value) {
if jsWriteBufferSize := config.Get("writeBufferSizeInBytes"); jsWriteBufferSize.Type() == js.TypeNumber {
b.writeBufferSize = jsWriteBufferSize.Int()
}
if jsReadBufferSize := config.Get("ReadBufferSizeInBytes"); jsReadBufferSize.Type() == js.TypeNumber {
b.readBufferSize = jsReadBufferSize.Int()
}
}
func (b *jsTCPSessionBuilder) setConnectTimeout(config js.Value) {
if jsConnectTimeout := config.Get("connectTimeout"); jsConnectTimeout.Type() == js.TypeNumber {
b.connectTimeout = time.Duration(jsConnectTimeout.Float() * float64(time.Second))
}
}
func (b *jsTCPSessionBuilder) connect() (*jsTCPSession, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.connectTimeout)
defer cancel()
conn, err := b.jsIPN.dialer.UserDial(ctx, "tcp", b.addr)
if err != nil {
return nil, err
}
s := &jsTCPSession{
jsIPN: b.jsIPN,
conn: conn,
writeBuffer: make([]byte, b.writeBufferSize),
readBuffer: make([]byte, b.readBufferSize),
readCallback: b.readCallback,
}
go s.readLoop()
return s, nil
}
type jsTCPSession struct {
@ -379,30 +465,10 @@ type jsTCPSession struct {
conn net.Conn
writeBuffer []byte
readBuffer []byte
readCallback func(js.Value)
}
func (i *jsIPN) NewJSTCPSession(addr string, readCallback js.Value, connectTimeout time.Duration) (*jsTCPSession, error) {
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()
conn, err := i.dialer.UserDial(ctx, "tcp", addr)
if err != nil {
return nil, err
}
s := &jsTCPSession{
jsIPN: i,
conn: conn,
writeBuffer: make([]byte, 1024),
readBuffer: make([]byte, 4*1024*1024),
}
go s.readLoop(readCallback)
return s, nil
}
func (s *jsTCPSession) readLoop(readCallback js.Value) {
func (s *jsTCPSession) readLoop() {
defer s.conn.Close()
dst := js.Global().Get("Uint8Array").New(len(s.readBuffer))
@ -427,23 +493,37 @@ func (s *jsTCPSession) readLoop(readCallback js.Value) {
js.CopyBytesToJS(subarray, s.readBuffer[:n])
readCallback.Invoke(subarray)
s.readCallback(subarray)
}
}
func (s *jsTCPSession) jsWrapper() any {
return map[string]any{
/**
* Closes the TCP connetion
*
* @returns Promise<void>
*/
"close": js.FuncOf(func(this js.Value, args []js.Value) any {
return makePromise(func() (any, error) {
err := s.conn.Close()
return nil, err
})
}),
/**
* Write to the TCP connection
*
* @param src ArrayBuffer
* @returns Promise<number> of bytes written
*/
"write": js.FuncOf(func(this js.Value, args []js.Value) any {
return makePromise(func() (any, error) {
dst := args[0]
if byteLength := dst.Get("byteLength"); byteLength.Type() == js.TypeNumber {
return s.write(dst, byteLength.Int())
if len(args) != 1 {
return nil, errors.New("Did not receive src argument")
}
src := args[0]
if byteLength := src.Get("byteLength"); byteLength.Type() == js.TypeNumber {
return s.write(src, byteLength.Int())
}
return nil, errors.New("Invalid type has no byteLength")
})
@ -451,11 +531,11 @@ func (s *jsTCPSession) jsWrapper() any {
}
}
func (s *jsTCPSession) write(dst js.Value, length int) (int, error) {
func (s *jsTCPSession) write(src js.Value, length int) (int, error) {
if len(s.writeBuffer) < length {
s.writeBuffer = make([]byte, length)
}
js.CopyBytesToGo(s.writeBuffer, dst)
js.CopyBytesToGo(s.writeBuffer, src)
return s.conn.Write(s.writeBuffer[:length])
}