213 lines
5.3 KiB
Go
213 lines
5.3 KiB
Go
package libnmdc
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type HubConnection struct {
|
|
// Supplied parameters
|
|
Hco *HubConnectionOptions
|
|
|
|
// Current remote status
|
|
HubName string
|
|
State ConnectionState
|
|
users map[string]UserInfo
|
|
userLock sync.RWMutex
|
|
|
|
proto Protocol
|
|
|
|
// Streamed events
|
|
processEvent func(HubEvent)
|
|
OnEvent chan HubEvent
|
|
|
|
// Private state
|
|
conn net.Conn // this is an interface
|
|
connValid bool
|
|
autoReconnect bool
|
|
lastDataRecieved time.Time
|
|
}
|
|
|
|
// Thread-safe user accessor.
|
|
func (this *HubConnection) Users(cb func(*map[string]UserInfo) error) error {
|
|
this.userLock.Lock()
|
|
defer this.userLock.Unlock()
|
|
|
|
return cb(&this.users)
|
|
}
|
|
|
|
func (this *HubConnection) SayPublic(message string) {
|
|
this.proto.SayPublic(message)
|
|
}
|
|
|
|
func (this *HubConnection) SayPrivate(recipient string, message string) {
|
|
this.proto.SayPrivate(recipient, message)
|
|
}
|
|
|
|
func (this *HubConnection) UserExists(nick string) bool {
|
|
this.userLock.RLock()
|
|
defer this.userLock.RUnlock()
|
|
|
|
_, already_existed := this.users[nick]
|
|
return already_existed
|
|
}
|
|
|
|
func (this *HubConnection) UserCount() int {
|
|
this.userLock.RLock()
|
|
defer this.userLock.RUnlock()
|
|
|
|
return len(this.users)
|
|
}
|
|
|
|
func (this *HubConnection) userJoined_NameOnly(nick string) {
|
|
if !this.UserExists(nick) {
|
|
|
|
this.userLock.Lock()
|
|
this.users[nick] = *NewUserInfo(nick)
|
|
this.userLock.Unlock() // Don't lock over a processEvent boundary
|
|
|
|
this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: nick})
|
|
}
|
|
}
|
|
|
|
func (this *HubConnection) userJoined_Full(uinf *UserInfo) {
|
|
// n.b. also called when we get a replacement MyINFO for someone
|
|
this.userLock.Lock()
|
|
_, userExisted := this.users[uinf.Nick] // don't use UserExists as it would deadlock the mutex
|
|
this.users[uinf.Nick] = *uinf
|
|
this.userLock.Unlock() // Don't lock over a processEvent boundary
|
|
|
|
if !userExisted {
|
|
this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: uinf.Nick})
|
|
} else {
|
|
this.processEvent(HubEvent{EventType: EVENT_USER_UPDATED_INFO, Nick: uinf.Nick})
|
|
}
|
|
}
|
|
|
|
// SayRaw sends raw bytes over the TCP socket. Callers should add the protocol
|
|
// terminating character `|` themselves.
|
|
// Note that protocol messages are transmitted on the caller thread, not from
|
|
// any internal libnmdc thread.
|
|
func (this *HubConnection) SayRaw(protocolCommand string) error {
|
|
if !this.connValid {
|
|
return ErrNotConnected
|
|
}
|
|
|
|
_, err := this.conn.Write([]byte(protocolCommand))
|
|
return err
|
|
}
|
|
|
|
func (this *HubConnection) SayKeepalive() error {
|
|
if !this.connValid {
|
|
return ErrNotConnected
|
|
}
|
|
|
|
return this.SayRaw(this.proto.ProtoMessageSeparator())
|
|
}
|
|
|
|
func (this *HubConnection) Disconnect() {
|
|
this.autoReconnect = false
|
|
if this.conn != nil {
|
|
this.conn.Close()
|
|
}
|
|
// A CONNECTIONSTATE_DISCONNECTED message will be emitted by the worker.
|
|
}
|
|
|
|
func (this *HubConnection) worker() {
|
|
var fullBuffer string
|
|
var err error = nil
|
|
var nbytes int = 0
|
|
|
|
for {
|
|
|
|
// If we're not connected, attempt reconnect
|
|
if this.conn == nil {
|
|
|
|
fullBuffer = "" // clear
|
|
|
|
if this.Hco.Address.IsSecure() {
|
|
this.conn, err = tls.Dial("tcp", this.Hco.Address.GetHostOnly(), &tls.Config{
|
|
InsecureSkipVerify: this.Hco.SkipVerifyTLS,
|
|
})
|
|
} else {
|
|
this.conn, err = net.Dial("tcp", this.Hco.Address.GetHostOnly())
|
|
}
|
|
|
|
if err != nil {
|
|
this.State = CONNECTIONSTATE_DISCONNECTED
|
|
this.connValid = false
|
|
|
|
} else {
|
|
this.State = CONNECTIONSTATE_CONNECTING
|
|
this.connValid = true
|
|
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTING})
|
|
|
|
}
|
|
}
|
|
|
|
// Read from socket into our local buffer (blocking)
|
|
if this.connValid {
|
|
readBuff := make([]byte, 1024)
|
|
this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY))
|
|
nbytes, err = this.conn.Read(readBuff)
|
|
|
|
if checkIsNetTimeout(err) {
|
|
// No data before read deadline
|
|
err = nil
|
|
|
|
// Send KA packet
|
|
err = this.SayKeepalive()
|
|
}
|
|
|
|
if nbytes > 0 {
|
|
this.lastDataRecieved = time.Now()
|
|
fullBuffer += string(readBuff[0:nbytes])
|
|
}
|
|
}
|
|
|
|
rxSeparator := regexp.QuoteMeta(this.proto.ProtoMessageSeparator())
|
|
rxProtocolMessage := regexp.MustCompile(`(?ms)\A[^` + rxSeparator + `]*` + rxSeparator)
|
|
|
|
// Attempt to parse a message block
|
|
for len(fullBuffer) > 0 {
|
|
for len(fullBuffer) > 0 && fullBuffer[0] == '|' {
|
|
fullBuffer = fullBuffer[1:]
|
|
}
|
|
protocolMessage := rxProtocolMessage.FindString(fullBuffer)
|
|
if len(protocolMessage) > 0 {
|
|
this.proto.ProcessCommand(protocolMessage[:len(protocolMessage)-1])
|
|
fullBuffer = fullBuffer[len(protocolMessage):]
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
if err == nil && time.Now().Sub(this.lastDataRecieved) > RECONNECT_IF_NO_DATA_RECIEVED_IN {
|
|
err = fmt.Errorf("No packets recieved since %s, connection presumed lost", this.lastDataRecieved.Format(time.RFC3339))
|
|
}
|
|
|
|
// Maybe we disconnected
|
|
// Perform this check *last*, to ensure we've had a final shot at
|
|
// clearing out any queued messages
|
|
if err != nil {
|
|
this.State = CONNECTIONSTATE_DISCONNECTED
|
|
this.conn = nil
|
|
this.connValid = false
|
|
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_DISCONNECTED, Message: err.Error()})
|
|
|
|
if this.autoReconnect {
|
|
time.Sleep(AUTO_RECONNECT_AFTER) // Wait before reconnect
|
|
continue
|
|
} else {
|
|
return // leave the worker for good
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|