package libnmdc import ( "crypto/tls" "net" "strconv" "strings" "sync" "time" ) type HubConnection struct { // Supplied parameters Hco *HubConnectionOptions // Current remote status HubName string State ConnectionState users map[string]UserInfo userLock sync.RWMutex // Streamed events processEvent func(HubEvent) OnEvent chan HubEvent // Private state conn net.Conn // this is an interface connValid bool sentOurHello bool autoReconnect bool } func (this *HubConnection) Users(cb func(*map[string]UserInfo) error) error { this.userLock.Lock() defer this.userLock.Unlock() return cb(&this.users) } func (this *HubConnection) SayPublic(message string) { this.SayRaw("<" + this.Hco.Self.Nick + "> " + NMDCEscape(message) + "|") } func (this *HubConnection) SayPrivate(recipient string, message string) { this.SayRaw("$To: " + recipient + " From: " + this.Hco.Self.Nick + " $<" + this.Hco.Self.Nick + "> " + NMDCEscape(message) + "|") } func (this *HubConnection) SayInfo() { this.SayRaw(this.Hco.Self.toMyINFO() + "|") } func (this *HubConnection) UserExists(nick string) bool { this.userLock.RLock() defer this.userLock.RUnlock() _, already_existed := this.users[nick] return already_existed } func (this *HubConnection) UserCount() int { this.userLock.RLock() defer this.userLock.RUnlock() return len(this.users) } func (this *HubConnection) userJoined_NameOnly(nick string) { if !this.UserExists(nick) { this.userLock.Lock() defer this.userLock.Unlock() this.users[nick] = *NewUserInfo(nick) this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: nick}) } } func (this *HubConnection) userJoined_Full(uinf *UserInfo) { // n.b. also called when we get a replacement MyINFO for someone this.userLock.Lock() defer this.userLock.Unlock() _, userExisted := this.users[uinf.Nick] // don't use UserExists as it would deadlock the mutex this.users[uinf.Nick] = *uinf if !userExisted { this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: uinf.Nick}) } } // Note that protocol messages are transmitted on the caller thread, not from // any internal libnmdc thread. func (this *HubConnection) SayRaw(protocolCommand string) error { if this.connValid { _, err := this.conn.Write([]byte(protocolCommand)) return err } else { return ErrNotConnected } } func (this *HubConnection) processProtocolMessage(message string) { // Zero-length protocol message // ```````````````````````````` if len(message) == 0 { return } // Public chat // ``````````` if rx_publicChat.MatchString(message) { pubchat_parts := rx_publicChat.FindStringSubmatch(message) this.processEvent(HubEvent{EventType: EVENT_PUBLIC, Nick: pubchat_parts[1], Message: NMDCUnescape(pubchat_parts[2])}) return } // System messages // ``````````````` if message[0] != '$' { this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_HUB, Nick: this.HubName, Message: NMDCUnescape(message)}) return } // Protocol messages // ````````````````` commandParts := strings.SplitN(message, " ", 2) switch commandParts[0] { case "$Lock": this.SayRaw("$Supports NoGetINFO UserCommand UserIP2|" + "$Key " + NMDCUnlock([]byte(commandParts[1])) + "|" + "$ValidateNick " + NMDCEscape(this.Hco.Self.Nick) + "|") this.sentOurHello = false case "$Hello": if commandParts[1] == this.Hco.Self.Nick && !this.sentOurHello { this.SayRaw("$Version 1,0091|") this.SayRaw("$GetNickList|") this.SayInfo() this.sentOurHello = true } else { this.userJoined_NameOnly(commandParts[1]) } case "$HubName": this.HubName = commandParts[1] this.processEvent(HubEvent{EventType: EVENT_HUBNAME_CHANGED, Nick: commandParts[1]}) case "$ValidateDenide": // sic if len(this.Hco.NickPassword) > 0 { this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Incorrect password."}) } else { this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Nick already in use."}) } case "$HubIsFull": this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Hub is full."}) case "$BadPass": this.processEvent(HubEvent{EventType: EVENT_SYSTEM_MESSAGE_FROM_CONN, Message: "Incorrect password."}) case "$GetPass": this.SayRaw("$MyPass " + NMDCEscape(this.Hco.NickPassword) + "|") case "$Quit": func() { this.userLock.Lock() defer this.userLock.Unlock() delete(this.users, commandParts[1]) }() this.processEvent(HubEvent{EventType: EVENT_USER_PART, Nick: commandParts[1]}) case "$MyINFO": u := UserInfo{} err := u.fromMyINFO(commandParts[1]) if err == nil { this.userJoined_Full(&u) } else { this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: err.Error()}) } case "$NickList": nicklist := strings.Split(commandParts[1], "$$") for _, nick := range nicklist { if len(nick) > 0 { this.userJoined_NameOnly(nick) } } case "$OpList": oplist := strings.Split(commandParts[1], "$$") opmap := map[string]struct{}{} // Organise/sort the list, and ensure we're not meeting an operator for // the first time for _, nick := range oplist { if len(nick) > 0 { opmap[nick] = struct{}{} this.userJoined_NameOnly(nick) // assert existence; noop otherwise } } // Mark all mentioned nicks as being operators, and all unmentioned nicks // as being /not/ an operator. (second pass minimises RW mutex use) func() { this.userLock.Lock() defer this.userLock.Unlock() for nick, userinfo := range this.users { _, isop := opmap[nick] userinfo.IsOperator = isop this.users[nick] = userinfo } }() case "$To:": valid := false if rx_incomingTo.MatchString(commandParts[1]) { txparts := rx_incomingTo.FindStringSubmatch(commandParts[1]) if txparts[1] == this.Hco.Self.Nick && txparts[2] == txparts[3] { this.processEvent(HubEvent{EventType: EVENT_PRIVATE, Nick: txparts[2], Message: NMDCUnescape(txparts[4])}) valid = true } } if !valid { this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Malformed private message '" + commandParts[1] + "'"}) } case "$UserIP": // Final message in PtokaX connection handshake - trigger connection callback. // This might not be the case for other hubsofts, though if this.State != CONNECTIONSTATE_CONNECTED { this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTED}) this.State = CONNECTIONSTATE_CONNECTED } case "$ForceMove": this.Hco.Address = HubAddress(commandParts[1]) this.conn.Close() // we'll reconnect onto the new address case "$UserCommand": // $UserCommand 1 1 Group chat\New group chat$<%[mynick]> !groupchat_new|| if rx_userCommand.MatchString(commandParts[1]) { usc := rx_userCommand.FindStringSubmatch(commandParts[1]) typeInt, _ := strconv.Atoi(usc[1]) contextInt, _ := strconv.Atoi(usc[2]) uscStruct := UserCommand{ Type: UserCommandType(typeInt), Context: UserCommandContext(contextInt), Message: usc[3], Command: NMDCUnescape(usc[4]), } this.processEvent(HubEvent{EventType: EVENT_USERCOMMAND, UserCommand: &uscStruct}) } else { this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Malformed usercommand '" + commandParts[1] + "'"}) } // IGNORABLE COMMANDS case "$Supports": case "$HubTopic": case "$Search": case "$ConnectToMe": default: this.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Unhandled protocol command '" + commandParts[0] + "'"}) } } func (this *HubConnection) Disconnect() { this.autoReconnect = false if this.conn != nil { this.conn.Close() } // A CONNECTIONSTATE_DISCONNECTED message will be emitted by the worker. } func (this *HubConnection) worker() { var fullBuffer string var err error = nil var nbytes int = 0 for { // If we're not connected, attempt reconnect if this.conn == nil { fullBuffer = "" // clear if this.Hco.Address.IsSecure() { this.conn, err = tls.Dial("tcp", this.Hco.Address.GetHostOnly(), &tls.Config{ InsecureSkipVerify: this.Hco.SkipVerifyTLS, }) } else { this.conn, err = net.Dial("tcp", this.Hco.Address.GetHostOnly()) } if err != nil { this.State = CONNECTIONSTATE_DISCONNECTED this.connValid = false } else { this.State = CONNECTIONSTATE_CONNECTING this.connValid = true this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTING}) } } // Read from socket into our local buffer (blocking) if this.connValid { readBuff := make([]byte, 1024) this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY)) nbytes, err = this.conn.Read(readBuff) if CheckIsNetTimeout(err) { // No data before read deadline err = nil // Send KA packet _, err = this.conn.Write([]byte("|")) } if nbytes > 0 { fullBuffer += string(readBuff[0:nbytes]) } } // Attempt to parse a message block for len(fullBuffer) > 0 { for len(fullBuffer) > 0 && fullBuffer[0] == '|' { fullBuffer = fullBuffer[1:] } protocolMessage := rx_protocolMessage.FindString(fullBuffer) if len(protocolMessage) > 0 { this.processProtocolMessage(protocolMessage[:len(protocolMessage)-1]) fullBuffer = fullBuffer[len(protocolMessage):] } else { break } } // Maybe we disconnected // Perform this check *last*, to ensure we've had a final shot at // clearing out any queued messages if err != nil { this.State = CONNECTIONSTATE_DISCONNECTED this.conn = nil this.connValid = false this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_DISCONNECTED, Message: err.Error()}) if this.autoReconnect { time.Sleep(AUTO_RECONNECT_AFTER) // Wait before reconnect continue } else { return // leave the worker for good } } } }