libnmdc/HubConnection.go

324 lines
8.4 KiB
Go
Raw Normal View History

2016-05-04 07:03:36 +00:00
package libnmdc
import (
"crypto/tls"
"net"
"strings"
"sync"
2016-05-04 07:03:36 +00:00
"time"
)
type HubConnection struct {
// Supplied parameters
Hco *HubConnectionOptions
// Current remote status
HubName string
State ConnectionState
users map[string]UserInfo
userLock sync.RWMutex
2016-05-04 07:03:36 +00:00
// Streamed events
processEvent func(HubEvent)
OnEvent chan HubEvent
// Private state
conn net.Conn // this is an interface
connValid bool
sentOurHello bool
autoReconnect bool
2016-05-04 07:03:36 +00:00
}
func (this *HubConnection) Users(cb func(*map[string]UserInfo) error) error {
this.userLock.Lock()
defer this.userLock.Unlock()
return cb(&this.users)
}
2016-05-04 07:03:36 +00:00
func (this *HubConnection) SayPublic(message string) {
this.SayRaw("<" + this.Hco.Self.Nick + "> " + NMDCEscape(message) + "|")
}
func (this *HubConnection) SayPrivate(recipient string, message string) {
this.SayRaw("$To: " + recipient + " From: " + this.Hco.Self.Nick + " $<" + this.Hco.Self.Nick + "> " + NMDCEscape(message) + "|")
}
func (this *HubConnection) SayInfo() {
this.SayRaw(this.Hco.Self.toMyINFO() + "|")
}
func (this *HubConnection) UserExists(nick string) bool {
this.userLock.RLock()
defer this.userLock.RUnlock()
_, already_existed := this.users[nick]
return already_existed
}
func (this *HubConnection) UserCount() int {
this.userLock.RLock()
defer this.userLock.RUnlock()
return len(this.users)
}
2016-05-04 07:03:36 +00:00
func (this *HubConnection) userJoined_NameOnly(nick string) {
if !this.UserExists(nick) {
this.userLock.Lock()
defer this.userLock.Unlock()
this.users[nick] = *NewUserInfo(nick)
2016-05-04 07:03:36 +00:00
this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: nick})
}
}
func (this *HubConnection) userJoined_Full(uinf *UserInfo) {
if !this.UserExists(uinf.Nick) {
this.userLock.Lock()
defer this.userLock.Unlock()
this.users[uinf.Nick] = *uinf
2016-05-04 07:03:36 +00:00
this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: uinf.Nick})
}
}
// Note that protocol messages are transmitted on the caller thread, not from
// any internal libnmdc thread.
func (this *HubConnection) SayRaw(protocolCommand string) error {
if this.connValid {
_, err := this.conn.Write([]byte(protocolCommand))
return err
} else {
return ErrNotConnected
}
}
func (this *HubConnection) processProtocolMessage(message string) {
// Zero-length protocol message
// ````````````````````````````
if len(message) == 0 {
return
}
// Public chat
// ```````````
if rx_publicChat.MatchString(message) {
pubchat_parts := rx_publicChat.FindStringSubmatch(message)
this.processEvent(HubEvent{EventType: EVENT_PUBLIC, Nick: pubchat_parts[1], Message: NMDCUnescape(pubchat_parts[2])})
return
}
// System messages
// ```````````````
if message[0] != '$' {
this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_HUB, Nick: this.HubName, Message: NMDCUnescape(message)})
return
}
// Protocol messages
// `````````````````
commandParts := strings.SplitN(message, " ", 2)
switch commandParts[0] {
case "$Lock":
this.SayRaw("$Supports NoGetINFO UserCommand UserIP2|" +
"$Key " + NMDCUnlock([]byte(commandParts[1])) + "|" +
"$ValidateNick " + NMDCEscape(this.Hco.Self.Nick) + "|")
this.sentOurHello = false
case "$Hello":
if commandParts[1] == this.Hco.Self.Nick && !this.sentOurHello {
this.SayRaw("$Version 1,0091|")
this.SayRaw("$GetNickList|")
this.SayInfo()
this.sentOurHello = true
} else {
this.userJoined_NameOnly(commandParts[1])
}
case "$HubName":
this.HubName = commandParts[1]
this.processEvent(HubEvent{EventType: EVENT_HUBNAME_CHANGED, Nick: commandParts[1]})
case "$ValidateDenide": // sic
if len(this.Hco.NickPassword) > 0 {
this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Incorrect password."})
} else {
this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Nick already in use."})
}
case "$HubIsFull":
this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Hub is full."})
case "$BadPass":
this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Incorrect password."})
case "$GetPass":
this.SayRaw("$MyPass " + NMDCEscape(this.Hco.NickPassword) + "|")
case "$Quit":
func() {
this.userLock.Lock()
defer this.userLock.Unlock()
delete(this.users, commandParts[1])
}()
2016-05-04 07:03:36 +00:00
this.processEvent(HubEvent{EventType: EVENT_USER_PART, Nick: commandParts[1]})
case "$MyINFO":
u := UserInfo{}
err := u.fromMyINFO(commandParts[1])
if err == nil {
this.userJoined_Full(&u)
} else {
this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: err.Error()})
}
case "$NickList":
nicklist := strings.Split(commandParts[1], "$$")
for _, nick := range nicklist {
if len(nick) > 0 {
this.userJoined_NameOnly(nick)
}
}
case "$To:":
valid := false
if rx_incomingTo.MatchString(commandParts[1]) {
txparts := rx_incomingTo.FindStringSubmatch(commandParts[1])
if txparts[1] == this.Hco.Self.Nick && txparts[2] == txparts[3] {
this.processEvent(HubEvent{EventType: EVENT_PRIVATE, Nick: txparts[2], Message: txparts[4]})
valid = true
}
}
if !valid {
this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Malformed private message '" + commandParts[1] + "'"})
}
case "$UserIP":
// Final message in PtokaX connection handshake - trigger connection callback.
// This might not be the case for other hubsofts, though
if this.State != CONNECTIONSTATE_CONNECTED {
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTED})
this.State = CONNECTIONSTATE_CONNECTED
}
case "$ForceMove":
this.Hco.Address = HubAddress(commandParts[1])
this.conn.Close() // we'll reconnect onto the new address
// IGNORABLE COMMANDS
case "$Supports":
case "$UserCommand": // TODO $UserCommand 1 1 Group chat\New group chat$<%[mynick]> !groupchat_new&#124;|
case "$UserList":
case "$OpList":
case "$HubTopic":
case "$Search":
case "$ConnectToMe":
default:
this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Unhandled protocol command '" + commandParts[0] + "'"})
}
}
func (this *HubConnection) Disconnect() {
this.autoReconnect = false
if this.conn != nil {
this.conn.Close()
}
// A CONNECTIONSTATE_DISCONNECTED message will be emitted by the worker.
}
2016-05-04 07:03:36 +00:00
func (this *HubConnection) worker() {
var fullBuffer string
var err error = nil
var nbytes int = 0
for {
// If we're not connected, attempt reconnect
if this.conn == nil {
fullBuffer = "" // clear
if this.Hco.Address.IsSecure() {
this.conn, err = tls.Dial("tcp", this.Hco.Address.GetHostOnly(), &tls.Config{
InsecureSkipVerify: this.Hco.SkipVerifyTLS,
})
} else {
this.conn, err = net.Dial("tcp", this.Hco.Address.GetHostOnly())
}
if err != nil {
this.State = CONNECTIONSTATE_DISCONNECTED
this.connValid = false
} else {
this.State = CONNECTIONSTATE_CONNECTING
this.connValid = true
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTING})
}
}
// Read from socket into our local buffer (blocking)
if this.connValid {
readBuff := make([]byte, 1024)
this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY))
nbytes, err = this.conn.Read(readBuff)
if CheckIsNetTimeout(err) {
// No data before read deadline
err = nil
// Send KA packet
_, err = this.conn.Write([]byte("|"))
}
if nbytes > 0 {
fullBuffer += string(readBuff[0:nbytes])
}
}
// Attempt to parse a message block
for len(fullBuffer) > 0 {
for len(fullBuffer) > 0 && fullBuffer[0] == '|' {
fullBuffer = fullBuffer[1:]
}
protocolMessage := rx_protocolMessage.FindString(fullBuffer)
if len(protocolMessage) > 0 {
this.processProtocolMessage(protocolMessage[:len(protocolMessage)-1])
fullBuffer = fullBuffer[len(protocolMessage):]
} else {
break
}
}
// Maybe we disconnected
// Perform this check *last*, to ensure we've had a final shot at
// clearing out any queued messages
if err != nil {
this.State = CONNECTIONSTATE_DISCONNECTED
this.conn = nil
this.connValid = false
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_DISCONNECTED, Message: err.Error()})
if this.autoReconnect {
time.Sleep(AUTO_RECONNECT_AFTER) // Wait before reconnect
continue
} else {
return // leave the worker for good
}
2016-05-04 07:03:36 +00:00
}
}
}