package main import ( "errors" "fmt" "html" "log" "sort" "strings" "sync" "time" "code.ivysaur.me/libnmdc" telegram "github.com/go-telegram-bot-api/telegram-bot-api" ) const ( coalesceWorkerRescanEvery time.Duration = 10 * time.Second coalesceWorkerExpireAfter time.Duration = 20 * time.Second ) // NTFServer methods all run on the same thread, so no mutexes are needed for field access type NTFServer struct { bot *telegram.BotAPI hubMessages chan upstreamMessage chatName string inviteLink string configFile string config NTFConfig conns map[string]*libnmdc.HubConnection // hubnick -> hubconn verbose bool // Except the coalesce buffer, that requires a background worker. coalesceBufferMut sync.Mutex coalesceBuffer map[string]time.Time } func coalesceKey(hubNick, message string) string { return hubNick + "\x00" + message } type upstreamMessage struct { telegramUserId int64 hubNick string evt libnmdc.HubEvent } func NewNTFServer(configFile string, verbose bool) (*NTFServer, error) { ret := NTFServer{ configFile: configFile, hubMessages: make(chan upstreamMessage, 0), conns: make(map[string]*libnmdc.HubConnection), coalesceBuffer: make(map[string]time.Time), verbose: verbose, } // Config cfg, err := LoadConfig(configFile) if err != nil { return nil, err } ret.config = cfg // Coalesce background worker go ret.coalesceWorker() // Bot connection if len(cfg.BotAPIKey) == 0 { return nil, errors.New("No bot API key supplied (register with BotFather first)") } bot, err := telegram.NewBotAPI(cfg.BotAPIKey) if err != nil { return nil, fmt.Errorf("Connecting to Telegram: %s", err.Error()) } ret.bot = bot if ret.verbose { bot.Debug = true } log.Printf("Connected to telegram as '%s'", bot.Self.UserName) // Groupchat properties if ret.IsSetupMode() { log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run") } else { chatInfo, err := bot.GetChat(telegram.ChatConfig{ChatID: ret.config.GroupChatID}) if err != nil { return nil, fmt.Errorf("Couldn't get supergroup properties: %s", err.Error()) } inviteLink, err := bot.GetInviteLink(telegram.ChatConfig{ChatID: ret.config.GroupChatID}) if err != nil { return nil, fmt.Errorf("Couldn't get supergroup invite link: %s", err.Error()) } log.Printf("Group chat: %s", chatInfo.Title) log.Printf("Invite link: %s", inviteLink) ret.chatName = chatInfo.Title ret.inviteLink = inviteLink } // Spawn upstream connections for all pre-existing known users for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers { hubNick, ok := ret.config.KnownUsers[telegramUserId] if !ok { log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB } err := ret.LaunchUpstreamWorker(telegramUserId, hubNick) if err != nil { return nil, fmt.Errorf("Reconnecting upstream for '%s': %s", hubNick, err.Error()) // fatal } } return &ret, nil } // registerUser lodges a user mapping in the configuration file. // This allows them to join the group chat (unbanning them if necessary). // An actual NMDC connection will occur once the user joins for the first time. func (this *NTFServer) registerUser(telegramUserId int64, hubUsername string) error { if existingHubNick, ok := this.config.KnownUsers[telegramUserId]; ok { if existingHubNick == hubUsername { return nil } return fmt.Errorf("Telegram account is already registered with hub nick '%s'", existingHubNick) } for _, v := range this.config.KnownUsers { if v == hubUsername { return fmt.Errorf("Requested hub nick '%s' is already used by another member", hubUsername) } } this.config.KnownUsers[telegramUserId] = hubUsername err := this.config.Save(this.configFile) if err != nil { return err } // Unban from groupchat, if necessary // Ignore errors because the user might not have been banned _, err = this.bot.UnbanChatMember(telegram.ChatMemberConfig{ ChatID: this.config.GroupChatID, UserID: int(telegramUserId), }) if err != nil { log.Printf("Couldn't unban user '%s' from groupchat because: %s (assuming OK, continuing)", hubUsername, err.Error()) } return nil } // LaunchUpstreamWorker opens an NMDC connection. func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error { if _, exists := this.conns[hubUsername]; exists { return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername) } log.Printf("Connecting to hub '%s' as user '%s'...", this.config.HubAddr, hubUsername) // We want to use the async NMDC connection, but at the same time, provide // extra information in the channel. Wrap the upstream NMDC channel with one // of our own upstreamChan := make(chan libnmdc.HubEvent, 0) go func() { for msg := range upstreamChan { this.hubMessages <- upstreamMessage{ telegramUserId: telegramUserId, hubNick: hubUsername, evt: msg, } } }() hubUser := libnmdc.NewUserInfo(hubUsername) hubUser.ClientTag = AppName hubUser.ClientVersion = AppVersion conn := libnmdc.ConnectAsync( &libnmdc.HubConnectionOptions{ Address: libnmdc.HubAddress(this.config.HubAddr), Self: hubUser, }, upstreamChan, ) // Stash conn so that we can refer to it / close it later this.conns[hubUsername] = conn return nil } func (this *NTFServer) IsSetupMode() bool { return this.config.GroupChatID == 0 } func (this *NTFServer) Run() error { updateProps := telegram.NewUpdate(0) updateProps.Timeout = 60 // seconds updateChan, err := this.bot.GetUpdatesChan(updateProps) if err != nil { return err } for { select { case update, ok := <-updateChan: if !ok { log.Fatalf("Telegram update channel closed unexpectedly") } this.HandleMessage(update) case hubMsg, ok := <-this.hubMessages: if !ok { log.Fatalf("Upstream update channel closed unexpectedly") } this.HandleHubMessage(hubMsg) } } return nil // UNREACHABLE } func (this *NTFServer) coalesceWorker() { for { time.Sleep(coalesceWorkerRescanEvery) deadLine := time.Now().Add(-coalesceWorkerExpireAfter) this.coalesceBufferMut.Lock() for k, v := range this.coalesceBuffer { if v.Before(deadLine) { delete(this.coalesceBuffer, k) } } this.coalesceBufferMut.Unlock() } } // Coalesce returns true if the message sticks to an existing one. // It adds it into the coalesce buffer anyway. func (this *NTFServer) Coalesce(hubNick, message string) bool { ckey := coalesceKey(hubNick, message) this.coalesceBufferMut.Lock() if _, ok := this.coalesceBuffer[ckey]; ok { this.coalesceBufferMut.Unlock() return true // stuck to existing } else { this.coalesceBuffer[ckey] = time.Now() this.coalesceBufferMut.Unlock() return false // first we've seen it } } func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { if this.verbose { log.Printf("Hub: %#v", msg) } switch msg.evt.EventType { case libnmdc.EVENT_SYSTEM_MESSAGE_FROM_CONN, libnmdc.EVENT_CONNECTION_STATE_CHANGED: log.Printf("Hub(%s): * %s", msg.hubNick, msg.evt.Message) case libnmdc.EVENT_PRIVATE: err := this.DirectMessageTelegramUser(msg.telegramUserId, fmt.Sprintf("PM from user '%s': %s", msg.evt.Nick, msg.evt.Message)) if err != nil { log.Printf("Delivering PM to telegram user: %s", err.Error()) } case libnmdc.EVENT_PUBLIC: if msg.evt.Nick == this.config.HubSecNick { return // ignore } // Coalesce from multiple connections if this.Coalesce(msg.evt.Nick, msg.evt.Message) { return // ignore - we heard this message already recently } // Display the message htmlMsg := "<" + html.EscapeString(msg.evt.Nick) + "> " + html.EscapeString(msg.evt.Message) err := this.GroupChatSayHTML(htmlMsg) if err != nil { log.Printf("Delivering public message to group chat: %s", err.Error()) } case libnmdc.EVENT_USER_JOINED, libnmdc.EVENT_USER_PART, libnmdc.EVENT_USER_UPDATED_INFO, libnmdc.EVENT_USERCOMMAND, libnmdc.EVENT_DEBUG_MESSAGE, libnmdc.EVENT_HUBNAME_CHANGED: // ignore default: log.Printf("Hub(%s): Unhandled(%d): %s", msg.hubNick, msg.evt.EventType, msg.evt.Message) } } func (this *NTFServer) HandleMessage(update telegram.Update) { if update.Message == nil { return } if this.IsSetupMode() { log.Printf("Message from '%s': '%s', chat ID '%d'\n", update.Message.From.UserName, update.Message.Text, update.Message.Chat.ID) } else if update.Message.Chat.ID == this.config.GroupChatID { err := this.HandleGroupMessage(update) if err != nil { log.Printf("Handling group message: %s", err.Error()) } } else if update.Message.Chat.IsPrivate() { err := this.HandleDirectMessage(update) if err != nil { log.Printf("Handling private message: %s", err.Error()) } } else { log.Printf("Message from unknown chat %d (not our supergroup, not a PM, ...)", update.Message.Chat.ID) } if this.verbose { fmt.Printf("%#v\n", update.Message) log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text) } } // kickAndDrop deregisters an account and kicks them from the group chat func (this *NTFServer) kickAndDrop(telegramUserId int64) { // Hubnick if hubNick, ok := this.config.KnownUsers[telegramUserId]; ok { // Close upstream connection (if any) if conn, ok := this.conns[hubNick]; ok { delete(this.conns, hubNick) log.Printf("Disconnecting '%s' from hub", hubNick) conn.Disconnect() } // Deregister from known users delete(this.config.KnownUsers, telegramUserId) err := this.config.Save(this.configFile) if err != nil { log.Printf("Couldn't save changes when deregistering user: %s", err.Error()) } } // Kick from telegram groupchat (if logged in) _, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{ ChatMemberConfig: telegram.ChatMemberConfig{ ChatID: this.config.GroupChatID, UserID: int(telegramUserId), }, UntilDate: time.Now().Add(24 * time.Hour).Unix(), }) if err != nil { log.Printf("Couldn't kick user from telegram group: %s", err.Error()) } } // HandleTelegramUserParted processes a user leaving the groupchat, without deregistering their account func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error { delete(this.config.GroupChatMembers, telegramUserId) return this.config.Save(this.configFile) } func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, telegramDisplayName string, update telegram.Update) error { // If known, spawn the upstream connection; if unknown, kick them hubNick, ok := this.config.KnownUsers[telegramUserId] if !ok { log.Printf("Unexpected user '%s' (%d) joined the group chat, kicking...", telegramDisplayName, telegramUserId) this.kickAndDrop(telegramUserId) } // Remember which users are currently joined this.config.GroupChatMembers[telegramUserId] = telegramDisplayName err := this.config.Save(this.configFile) if err != nil { return err } // Spawn the upstream connection for this user return this.LaunchUpstreamWorker(telegramUserId, hubNick) } func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { // Joins: ???? if update.Message.NewChatMembers != nil && len(*update.Message.NewChatMembers) > 0 { // Users joining // Ensure that they have a valid user mapping // Create upstream NMDC connection for them for _, joinedUser := range *update.Message.NewChatMembers { err := this.HandleTelegramUserJoined(int64(joinedUser.ID), joinedUser.String(), update) if err != nil { log.Printf("Handling user join: %s", err.Error()) } } return nil } if update.Message.LeftChatMember != nil { // User parted // Close upstream NMDC connection for them log.Printf("Telegram user '%s' (%d) leaving group chat", update.Message.LeftChatMember.String(), update.Message.LeftChatMember.ID) return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update) } if len(update.Message.Text) > 0 { // Actual chat message // Find the responsible user to send it to the upstream hub, using their account userID := int64(update.Message.From.ID) hubNick, ok := this.config.KnownUsers[userID] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID) } conn, ok := this.conns[hubNick] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick) } // Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection this.Coalesce(hubNick, update.Message.Text) // Submit to NMDC conn.SayPublic(update.Message.Text) } // TODO probably a file/image upload??? // TODO support "editing messages" by re-sending them with a ** suffix return nil } func (this *NTFServer) ReplyTelegramUser(userID int64, str string, replyToMessageID int) error { chatId, ok := this.config.DirectMessageChats[userID] if !ok { return fmt.Errorf("Can't send telegram message to user '%s': no DM chat known", userID) } msg := telegram.NewMessage(chatId, str) if replyToMessageID != 0 { msg.ReplyToMessageID = replyToMessageID } _, err := this.bot.Send(msg) return err } func (this *NTFServer) DirectMessageTelegramUser(userID int64, str string) error { return this.ReplyTelegramUser(userID, str, 0) } func (this *NTFServer) GroupChatSayHTML(str string) error { msg := telegram.NewMessage(this.config.GroupChatID, str) msg.ParseMode = telegram.ModeHTML _, err := this.bot.Send(msg) return err } func (this *NTFServer) HandleDirectMessage(update telegram.Update) error { // Registration workflow userID := int64(update.Message.From.ID) // Stash the telegram user ID against this direct-message chat ID so that // we can always reply later on chatID := update.Message.Chat.ID if oldChatID, ok := this.config.DirectMessageChats[userID]; !ok || oldChatID != chatID { this.config.DirectMessageChats[userID] = chatID err := this.config.Save(this.configFile) if err != nil { log.Printf("Couldn't save chat ID %d for user %d", chatID, userID) } } respond := func(str string) error { return this.ReplyTelegramUser(userID, str, update.Message.MessageID) } // Find out the current status for this chat ID hubNick, isKnown := this.config.KnownUsers[userID] _, isInGroupChat := this.config.GroupChatMembers[userID] // Handle the incoming request msg := update.Message.Text if strings.HasPrefix(msg, "/pm ") { if !(isKnown && isInGroupChat) { return respond("Can't send a native PM until you've joined.") } conn, ok := this.conns[hubNick] if !ok { return respond("Can't send a native PM (No upstream hub connection)") } parts := strings.SplitN(msg, " ", 3) if len(parts) != 3 { return respond("Expected format /pm [recipient] [message] - try again...") } if !conn.UserExists(parts[1]) { return respond(fmt.Sprintf("Can't PM offline user '%s'", parts[1])) } conn.SayPrivate(parts[1], parts[2]) } else if strings.HasPrefix(msg, "/join ") { requestedHubNick := msg[6:] err := this.registerUser(userID, requestedHubNick) if err != nil { log.Printf("Failed to register user: %s", err.Error()) return respond(fmt.Sprintf("Couldn't allow registration because: %s", err.Error())) } return respond(fmt.Sprintf("Hi '%s'! You are now registered, and can join %s at %s", requestedHubNick, this.config.HubDescription, this.inviteLink)) } else if strings.HasPrefix(msg, "/rejoin") { if isKnown && !isInGroupChat { return respond(fmt.Sprintf("Welcome back '%s'! You can join %s at %s", hubNick, this.config.HubDescription, this.inviteLink)) } else { return respond("You are either already joined (try /quit first), or not yet registered (try /join first).") } } else if strings.HasPrefix(msg, "/userlist") { conn, ok := this.conns[hubNick] if !ok { return respond("Can't get userlist for you (No upstream hub connection)") } usernames := make([]string, 0) err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error { for k, _ := range *umap { usernames = append(usernames, k) } return nil }) if err != nil { log.Printf("Error retrieving userlist: %s", err.Error()) return respond("Can't get userlist for you (internal error)") } sort.Strings(usernames) return respond("Online users: " + strings.Join(usernames, " ")) } else if strings.HasPrefix(msg, "/whois ") { target := msg[7:] conn, ok := this.conns[hubNick] if !ok { return respond("Can't get user details for you (No upstream hub connection)") } var userinfo libnmdc.UserInfo var exists bool err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error { userinfo, exists = (*umap)[target] return nil }) if err != nil { log.Printf("Error retrieving userlist: %s", err.Error()) return respond("Can't get user details for you (internal error)") } if !exists { return respond(fmt.Sprintf("There is no online user '%s'.", target)) } return respond(fmt.Sprintf("Description: %s\nShare size: %d\nClient: %s %s", userinfo.Description, userinfo.ShareSize, userinfo.ClientTag, userinfo.ClientVersion)) } else if strings.HasPrefix(msg, "/quit") { this.kickAndDrop(userID) return respond("Disconnected. You can register again by typing /help .") } else { // e.g. /start or /help // Help helpMsg := `I am a bot that connects Telegram with ` + this.config.HubDescription + ".\n" if isKnown { helpMsg += "You are currently registered as: '" + hubNick + "'\n" } else { helpMsg += "You aren't connected yet.\n" } if isInGroupChat { helpMsg += "You are currently in the groupchat.\n" } else { helpMsg += "You haven't joined the groupchat.\n" } helpMsg += ` Available commands: /setup - Welcome message /join [hubnick] - Register as 'hubnick' and join ` + this.config.HubDescription + ` /rejoin - Re-invite if you are already registered /pm [recipient] [message] - Send a native PM (if connected) /userlist - List native online users /whois [hubnick] - Check if native user is online, and read their description /quit - Unregister your nick and leave the group chat ` return respond(helpMsg) } return nil }