package libnmdc import ( "crypto/tls" "fmt" "net" "regexp" "sync" "time" ) type HubConnection struct { // Supplied parameters Hco *HubConnectionOptions // Current remote status HubName string State ConnectionState usersMut sync.RWMutex users map[string]UserInfo userSIDs map[string]string proto Protocol // Event callback processEvent func(HubEvent) // Private state conn net.Conn // this is an interface connValid bool autoReconnect bool lastDataRecieved time.Time } // Thread-safe user accessor. func (this *HubConnection) Users(cb func(*map[string]UserInfo) error) error { this.usersMut.Lock() defer this.usersMut.Unlock() return cb(&this.users) } func (this *HubConnection) SayPublic(message string) { this.proto.SayPublic(message) } func (this *HubConnection) SayPrivate(recipient string, message string) { this.proto.SayPrivate(recipient, message) } func (this *HubConnection) UserExists(nick string) bool { this.usersMut.RLock() defer this.usersMut.RUnlock() _, already_existed := this.users[nick] return already_existed } func (this *HubConnection) UserCount() int { this.usersMut.RLock() defer this.usersMut.RUnlock() return len(this.users) } func (this *HubConnection) userJoined_NameOnly(nick string) { if !this.UserExists(nick) { this.usersMut.Lock() this.users[nick] = *NewUserInfo(nick) this.usersMut.Unlock() // Don't lock over a processEvent boundary this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: nick}) } } func (this *HubConnection) userJoined_Full(uinf *UserInfo) { // n.b. also called when we get a replacement MyINFO for someone this.usersMut.Lock() _, userExisted := this.users[uinf.Nick] // don't use UserExists as it would deadlock the mutex this.users[uinf.Nick] = *uinf this.usersMut.Unlock() // Don't lock over a processEvent boundary if !userExisted { this.processEvent(HubEvent{EventType: EVENT_USER_JOINED, Nick: uinf.Nick}) } else { this.processEvent(HubEvent{EventType: EVENT_USER_UPDATED_INFO, Nick: uinf.Nick}) } } // SayRaw sends raw bytes over the TCP socket. Callers should add the protocol // terminating character themselves (e.g. `|` for NMDC). // Note that protocol messages are transmitted on the caller thread, not from // any internal libnmdc thread. func (this *HubConnection) SayRaw(protocolCommand string) error { if !this.connValid { return ErrNotConnected } _, err := this.conn.Write([]byte(protocolCommand)) return err } func (this *HubConnection) SayKeepalive() error { if !this.connValid { return ErrNotConnected } return this.SayRaw(this.proto.ProtoMessageSeparator()) } func (this *HubConnection) Disconnect() { this.autoReconnect = false if this.conn != nil { this.conn.Close() } // A CONNECTIONSTATE_DISCONNECTED message will be emitted by the worker. } func (this *HubConnection) worker() { var fullBuffer string var err error = nil var nbytes int = 0 for { // If we're not connected, attempt reconnect if this.conn == nil { fullBuffer = "" // clear if this.Hco.Address.IsSecure() { this.conn, err = tls.Dial("tcp", this.Hco.Address.GetHostOnly(), &tls.Config{ InsecureSkipVerify: this.Hco.SkipVerifyTLS, }) } else { this.conn, err = net.Dial("tcp", this.Hco.Address.GetHostOnly()) } if err != nil { this.State = CONNECTIONSTATE_DISCONNECTED this.connValid = false this.proto = nil } else { this.State = CONNECTIONSTATE_CONNECTING this.connValid = true this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTING}) this.proto = this.Hco.Address.GetProtocol().Create(this) } } // Read from socket into our local buffer (blocking) if this.connValid { readBuff := make([]byte, 1024) this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY)) nbytes, err = this.conn.Read(readBuff) if checkIsNetTimeout(err) { // No data before read deadline err = nil if this.proto == nil { // Autodetect: switch to ADC this.proto = NewAdcProtocol(this) } else { // Normal // Send KA packet err = this.SayKeepalive() } } if nbytes > 0 { this.lastDataRecieved = time.Now() fullBuffer += string(readBuff[0:nbytes]) } } if this.proto != nil { rxSeparator := regexp.QuoteMeta(this.proto.ProtoMessageSeparator()) rxProtocolMessage := regexp.MustCompile(`(?ms)\A[^` + rxSeparator + `]*` + rxSeparator) // Attempt to parse a message block for len(fullBuffer) > 0 { // FIXME nmdc for len(fullBuffer) > 0 && fullBuffer[0] == '|' { fullBuffer = fullBuffer[1:] } protocolMessage := rxProtocolMessage.FindString(fullBuffer) if len(protocolMessage) > 0 { this.proto.ProcessCommand(protocolMessage[:len(protocolMessage)-1]) fullBuffer = fullBuffer[len(protocolMessage):] } else { break } } if err == nil && time.Now().Sub(this.lastDataRecieved) > RECONNECT_IF_NO_DATA_RECIEVED_IN { err = fmt.Errorf("No packets recieved since %s, connection presumed lost", this.lastDataRecieved.Format(time.RFC3339)) } } // Maybe we disconnected // Perform this check *last*, to ensure we've had a final shot at // clearing out any queued messages if err != nil { this.State = CONNECTIONSTATE_DISCONNECTED this.conn = nil this.connValid = false this.proto = nil this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_DISCONNECTED, Message: err.Error()}) if this.autoReconnect { time.Sleep(AUTO_RECONNECT_AFTER) // Wait before reconnect continue } else { return // leave the worker for good } } } }