package main import ( "errors" "fmt" "html" "log" "net/url" "sort" "strings" "sync" "time" "code.ivysaur.me/libnmdc" telegram "github.com/go-telegram-bot-api/telegram-bot-api" ) // NTFServer methods all run on the same thread, so no mutexes are needed for field access type NTFServer struct { bot *telegram.BotAPI hubMessages chan upstreamMessage botName string chatName string inviteLink string configFile string config NTFConfig conns map[string]*libnmdc.HubConnection // hubnick -> hubconn verbose bool callOnMainThread chan func() contentedMaxBytes int64 // Except the coalesce buffer, that requires a background worker. coalesceBufferMut sync.Mutex coalesceBuffer map[string]time.Time } type upstreamMessage struct { telegramUserId int64 hubNick string evt libnmdc.HubEvent } func NewNTFServer(configFile string, verbose bool) (*NTFServer, error) { ret := NTFServer{ configFile: configFile, hubMessages: make(chan upstreamMessage, 0), callOnMainThread: make(chan func(), 0), conns: make(map[string]*libnmdc.HubConnection), coalesceBuffer: make(map[string]time.Time), verbose: verbose, } // Config cfg, err := LoadConfig(configFile) if err != nil { return nil, err } ret.config = cfg // Validate contented URL (if present) if len(ret.config.ContentedURL) > 0 { _, err := url.Parse(ret.config.ContentedURL) if err != nil { log.Printf("Ignoring malformed URL to contented server '%s': %s", ret.config.ContentedURL, err.Error()) ret.config.ContentedURL = "" // clear } // Valid. Enforce trailing slash if !strings.HasSuffix(ret.config.ContentedURL, `/`) { ret.config.ContentedURL += `/` } // Only set this if contented URL is valid - zero otherwise ret.contentedMaxBytes = int64(ret.config.ContentedMaxMB) * 1024 * 1024 } // Coalesce background worker go ret.coalesceWorker() // Bot connection if len(cfg.BotAPIKey) == 0 { return nil, errors.New("No bot API key supplied (register with BotFather first)") } bot, err := telegram.NewBotAPI(cfg.BotAPIKey) if err != nil { return nil, fmt.Errorf("Connecting to Telegram: %s", err.Error()) } ret.bot = bot if ret.verbose { bot.Debug = true } log.Printf("Connected to telegram as '%s'", bot.Self.UserName) ret.botName = bot.Self.UserName // Groupchat properties if ret.IsSetupMode() { log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run") } else { chatInfo, err := bot.GetChat(telegram.ChatConfig{ChatID: ret.config.GroupChatID}) if err != nil { return nil, fmt.Errorf("Couldn't get supergroup properties: %s", err.Error()) } inviteLink, err := bot.GetInviteLink(telegram.ChatConfig{ChatID: ret.config.GroupChatID}) if err != nil { return nil, fmt.Errorf("Couldn't get supergroup invite link: %s", err.Error()) } log.Printf("Group chat: %s", chatInfo.Title) log.Printf("Invite link: %s", inviteLink) ret.chatName = chatInfo.Title ret.inviteLink = inviteLink } // Spawn upstream connections for all pre-existing known users launchedAny := false for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers { hubNick, ok := ret.config.KnownUsers[telegramUserId] if !ok { log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB } err := ret.LaunchUpstreamWorker(telegramUserId, hubNick) if err != nil { log.Fatalf("Couldn't reconnect upstream for '%s': %s", hubNick, err.Error()) // fatal - inconsistent DB is the only possible cause } // Slight stagger time.Sleep(50 * time.Millisecond) launchedAny = true } // We have some bad edge cases if a telegram message comes in while the upstream // connection is offline. Wait a bit after startup, to minimise failure cases if launchedAny { time.Sleep(5 * time.Second) } return &ret, nil } // LaunchUpstreamWorker opens an NMDC connection. func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error { if _, exists := this.conns[hubUsername]; exists { return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername) } log.Printf("Connecting to hub '%s' as user '%s'...", this.config.HubAddr, hubUsername) // We want to use the async NMDC connection, but at the same time, provide // extra information in the channel. Wrap the upstream NMDC channel with one // of our own upstreamChan := make(chan libnmdc.HubEvent, 0) go func() { for msg := range upstreamChan { this.hubMessages <- upstreamMessage{ telegramUserId: telegramUserId, hubNick: hubUsername, evt: msg, } } }() hubUser := libnmdc.NewUserInfo(hubUsername) hubUser.ClientTag = AppName hubUser.ClientVersion = AppVersion conn := libnmdc.ConnectAsync( &libnmdc.HubConnectionOptions{ Address: libnmdc.HubAddress(this.config.HubAddr), Self: hubUser, }, upstreamChan, ) // Stash conn so that we can refer to it / close it later this.conns[hubUsername] = conn return nil } func (this *NTFServer) IsSetupMode() bool { return this.config.GroupChatID == 0 } func (this *NTFServer) Run() error { updateProps := telegram.NewUpdate(0) updateProps.Timeout = 60 // seconds updateChan, err := this.bot.GetUpdatesChan(updateProps) if err != nil { return err } for { select { case update, ok := <-updateChan: if !ok { log.Fatalf("Telegram update channel closed unexpectedly") } this.HandleMessage(update) case hubMsg, ok := <-this.hubMessages: if !ok { log.Fatalf("Upstream update channel closed unexpectedly") } this.HandleHubMessage(hubMsg) case mtfn, ok := <-this.callOnMainThread: if !ok { log.Fatalf("Synchronisation channel closed unexpectedly") } mtfn() } } return nil // UNREACHABLE } func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { if this.verbose { log.Printf("Hub: %#v", msg) } switch msg.evt.EventType { case libnmdc.EVENT_SYSTEM_MESSAGE_FROM_CONN, libnmdc.EVENT_SYSTEM_MESSAGE_FROM_HUB: // This includes "want to register your nick?" and chat-catchup messages // But it also includes /me messages // First, remove system characters from the front (star, space) systemMessage := strings.TrimLeft(msg.evt.Message, `* `) // Heuristic detect action messages if the first space-separated word is a known online user isActionMessage := false if conn, ok := this.conns[msg.hubNick]; ok { firstWord := strings.SplitN(systemMessage, ` `, 2)[0] _ = conn.Users(func(umap *map[string]libnmdc.UserInfo) error { for knownHubNick, _ := range *umap { if knownHubNick == firstWord { isActionMessage = true return nil } } return nil }) } if isActionMessage { // Treat it as a full post // Coalesce from multiple connections if this.Coalesce("*", systemMessage) { return // ignore - we heard this message already recently } // Display the message htmlMsg := "* " + html.EscapeString(systemMessage) + "" err := this.GroupChatSayHTML(htmlMsg) if err != nil { log.Printf("Delivering action message to group chat: %s", err.Error()) } } else { // Not an action message, just clerical notices // Don't mirror them into telegram log.Printf("Hub(%s): * %s", msg.hubNick, msg.evt.Message) } case libnmdc.EVENT_CONNECTION_STATE_CHANGED: log.Printf("Hub(%s): * Connection %s", msg.hubNick, msg.evt.StateChange.String()) case libnmdc.EVENT_PRIVATE: err := this.DirectMessageTelegramUser(msg.telegramUserId, fmt.Sprintf("PM from user '%s': %s", msg.evt.Nick, msg.evt.Message)) if err != nil { log.Printf("Delivering PM to telegram user: %s", err.Error()) } case libnmdc.EVENT_PUBLIC: for _, ignoreNick := range this.config.HubIgnoreNicks { if ignoreNick == msg.evt.Nick { return // ignore } } // Coalesce from multiple connections if this.Coalesce(msg.evt.Nick, msg.evt.Message) { return // ignore - we heard this message already recently } // Display the message htmlMsg := "<" + html.EscapeString(msg.evt.Nick) + "> " + html.EscapeString(msg.evt.Message) err := this.GroupChatSayHTML(htmlMsg) if err != nil { log.Printf("Delivering public message to group chat: %s", err.Error()) } case libnmdc.EVENT_BAD_LOGIN_FAILURE: this.DirectMessageTelegramUser(msg.telegramUserId, "The hub disconnected in a permanent way (login failure?). Consider re-registering with a different nick.") this.kickAndDrop(msg.telegramUserId) case libnmdc.EVENT_USER_JOINED, libnmdc.EVENT_USER_PART, libnmdc.EVENT_USER_UPDATED_INFO, libnmdc.EVENT_USERCOMMAND, libnmdc.EVENT_DEBUG_MESSAGE, libnmdc.EVENT_HUBNAME_CHANGED: // ignore default: log.Printf("Hub(%s): Unhandled(%d): %s", msg.hubNick, msg.evt.EventType, msg.evt.Message) } } func (this *NTFServer) HandleMessage(update telegram.Update) { if update.Message == nil { return } if this.IsSetupMode() { log.Printf("Message from '%s': '%s', chat ID '%d'\n", update.Message.From.UserName, update.Message.Text, update.Message.Chat.ID) } else if update.Message.Chat.ID == this.config.GroupChatID { err := this.HandleGroupMessage(update) if err != nil { log.Printf("Handling group message: %s", err.Error()) } } else if update.Message.Chat.IsPrivate() { err := this.HandleDirectMessage(update) if err != nil { log.Printf("Handling private message: %s", err.Error()) } } else { log.Printf("Message from unknown chat %d (not our supergroup, not a PM, ...)", update.Message.Chat.ID) } if this.verbose { fmt.Printf("%#v\n", update.Message) log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text) } } // kickAndDrop deregisters an account and kicks them from the group chat func (this *NTFServer) kickAndDrop(telegramUserId int64) { // Hubnick if hubNick, ok := this.config.KnownUsers[telegramUserId]; ok { // Close upstream connection (if any) if conn, ok := this.conns[hubNick]; ok { delete(this.conns, hubNick) log.Printf("Disconnecting '%s' from hub", hubNick) conn.Disconnect() } // Deregister from known users delete(this.config.KnownUsers, telegramUserId) err := this.config.Save(this.configFile) if err != nil { log.Printf("Couldn't save changes when deregistering user: %s", err.Error()) } } // Kick from telegram groupchat (if logged in) _, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{ ChatMemberConfig: telegram.ChatMemberConfig{ ChatID: this.config.GroupChatID, UserID: int(telegramUserId), }, UntilDate: time.Now().Add(24 * time.Hour).Unix(), }) if err != nil { log.Printf("Couldn't kick user from telegram group: %s", err.Error()) } } // HandleTelegramUserParted processes a user leaving the groupchat, without deregistering their account func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error { delete(this.config.GroupChatMembers, telegramUserId) return this.config.Save(this.configFile) } func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, telegramDisplayName string, update telegram.Update) error { // If known, spawn the upstream connection; if unknown, kick them hubNick, ok := this.config.KnownUsers[telegramUserId] if !ok { log.Printf("Unexpected user '%s' (%d) joined the group chat, kicking...", telegramDisplayName, telegramUserId) this.kickAndDrop(telegramUserId) } // Remember which users are currently joined this.config.GroupChatMembers[telegramUserId] = telegramDisplayName err := this.config.Save(this.configFile) if err != nil { return err } // Spawn the upstream connection for this user return this.LaunchUpstreamWorker(telegramUserId, hubNick) } func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { // Joins: ???? if update.Message.NewChatMembers != nil && len(*update.Message.NewChatMembers) > 0 { // Users joining // Ensure that they have a valid user mapping // Create upstream NMDC connection for them for _, joinedUser := range *update.Message.NewChatMembers { err := this.HandleTelegramUserJoined(int64(joinedUser.ID), joinedUser.String(), update) if err != nil { log.Printf("Handling user join: %s", err.Error()) } } return nil } if update.Message.LeftChatMember != nil { // User parted // Close upstream NMDC connection for them log.Printf("Telegram user '%s' (%d) leaving group chat", update.Message.LeftChatMember.String(), update.Message.LeftChatMember.ID) return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update) } // userID := int64(update.Message.From.ID) if this.contentedMaxBytes > 0 { // File upload types // Audio if update.Message.Audio != nil { go func() { conUrl, err := this.ContentedUploadSync(update.Message.Audio.FileID, int64(update.Message.Audio.FileSize)) // no thumbnail fallback available this.uploadAsyncComplete(userID, "audio", conUrl, err, fmt.Sprintf("Audio '%s' duration %ds: %s", update.Message.Audio.Title, update.Message.Audio.Duration, update.Message.Caption)) }() } // Document if update.Message.Document != nil { go func() { conUrl, err := this.ContentedUploadFallbackSync(update.Message.Document.FileID, int64(update.Message.Document.FileSize), update.Message.Document.Thumbnail) this.uploadAsyncComplete(userID, "document", conUrl, err, update.Message.Caption) }() } // Photo if update.Message.Photo != nil { go func() { conUrl, err := this.ContentedUploadBestSync(*update.Message.Photo) this.uploadAsyncComplete(userID, "photo", conUrl, err, update.Message.Caption) }() } // Sticker if update.Message.Sticker != nil { go func() { conUrl, err := this.ContentedUploadFallbackSync(update.Message.Sticker.FileID, int64(update.Message.Sticker.FileSize), update.Message.Sticker.Thumbnail) this.uploadAsyncComplete(userID, "sticker", conUrl, err, update.Message.Sticker.Emoji+" "+update.Message.Caption) }() } // Video if update.Message.Video != nil { go func() { conUrl, err := this.ContentedUploadFallbackSync(update.Message.Video.FileID, int64(update.Message.Video.FileSize), update.Message.Video.Thumbnail) this.uploadAsyncComplete(userID, "video", conUrl, err, update.Message.Caption) }() } // VideoNote if update.Message.VideoNote != nil { go func() { conUrl, err := this.ContentedUploadFallbackSync(update.Message.VideoNote.FileID, int64(update.Message.VideoNote.FileSize), update.Message.VideoNote.Thumbnail) this.uploadAsyncComplete(userID, "videonote", conUrl, err, update.Message.Caption) }() } // Voice if update.Message.Voice != nil { go func() { conUrl, err := this.ContentedUploadSync(update.Message.Voice.FileID, int64(update.Message.Voice.FileSize)) // no thumbnail fallback available this.uploadAsyncComplete(userID, "voiceclip", conUrl, err, fmt.Sprintf("Voice clip (duration %ds): %s", update.Message.Voice.Duration, update.Message.Caption)) }() } } if update.Message.Contact != nil { return this.HubSay(userID, fmt.Sprintf("Contact %s %s %s %s", update.Message.Contact.FirstName, update.Message.Contact.LastName, update.Message.Contact.PhoneNumber, update.Message.Caption)) } if update.Message.Location != nil { return this.HubSay(userID, fmt.Sprintf( "Latitude %f Longitude %f %s", update.Message.Location.Latitude, update.Message.Location.Longitude, update.Message.Caption)) } if update.Message.Venue != nil { return this.HubSay(userID, fmt.Sprintf( "Venue %s - Address %s (Lat %f Long %f) %s", update.Message.Venue.Title, update.Message.Venue.Address, update.Message.Venue.Location.Latitude, update.Message.Venue.Location.Longitude, update.Message.Caption)) } if len(update.Message.Text) > 0 { // Intercept some bot commands if update.Message.Text == "/userlist" || update.Message.Text == "/userlist@"+this.botName { // Display native userlist inside the groupchat // Find the responsible user's upstream connection hubNick, ok := this.config.KnownUsers[userID] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID) } conn, ok := this.conns[hubNick] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick) } usernames, err := this.getUserlistText(conn) if err != nil { this.GroupChatSayHTML("Can't get userlist for you (internal error)") } this.GroupChatSayHTML(fmt.Sprintf("Online users: %s", html.EscapeString(usernames))) } else { // Actual chat message sendMsg := update.Message.Text // Was it a reply to another message? If so, format it as " OrigMessage // update text" // That includes collapsing the original message onto a single line if update.Message.ReplyToMessage != nil && len(update.Message.ReplyToMessage.Text) > 0 && update.Message.ReplyToMessage.From != nil { origSenderId := int64(update.Message.ReplyToMessage.From.ID) flatten := func(s string) string { return strings.Replace(s, "\n", " ", -1) } if origSenderNick, ok := this.config.KnownUsers[origSenderId]; ok { // Quoted messages from another hub user sendMsg = fmt.Sprintf(`"<%s> %s" // %s`, origSenderNick, flatten(update.Message.ReplyToMessage.Text), sendMsg) } else if origSenderId == int64(this.bot.Self.ID) { // Quoted messages from the hublink bot sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg) // the username is already present in the plaintext } else { // No idea what was quoted. But it really was quoting something // Paste it as-is but with some info in the logs for future cleanup log.Printf("Quoting message '%#v' with unknown source", update.Message.ReplyToMessage) sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg) } } return this.HubSay(userID, sendMsg) } } // TODO probably a file/image upload??? // TODO support "editing messages" by re-sending them with a ** suffix return nil } func (this *NTFServer) HubSay(userID int64, sendMsg string) error { // Find the responsible user's upstream connection hubNick, ok := this.config.KnownUsers[userID] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID) } conn, ok := this.conns[hubNick] if !ok { return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick) } // Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection this.Coalesce(hubNick, sendMsg) // Submit to NMDC err := conn.SayPublic(sendMsg) if err != nil { log.Printf("Failed to deliver message '%s': %s", sendMsg, err.Error()) this.GroupChatSayHTML(fmt.Sprintf("Couldn't sync message '%s' because: %s", html.EscapeString(sendMsg), html.EscapeString(err.Error()))) } return nil } func (this *NTFServer) ReplyTelegramUser(userID int64, str string, replyToMessageID int) error { chatId, ok := this.config.DirectMessageChats[userID] if !ok { return fmt.Errorf("Can't send telegram message to user '%s': no DM chat known", userID) } msg := telegram.NewMessage(chatId, str) if replyToMessageID != 0 { msg.ReplyToMessageID = replyToMessageID } _, err := this.bot.Send(msg) return err } func (this *NTFServer) DirectMessageTelegramUser(userID int64, str string) error { return this.ReplyTelegramUser(userID, str, 0) } func (this *NTFServer) GroupChatSayHTML(str string) error { msg := telegram.NewMessage(this.config.GroupChatID, str) msg.ParseMode = telegram.ModeHTML _, err := this.bot.Send(msg) return err } func (this *NTFServer) getUserlistText(conn *libnmdc.HubConnection) (string, error) { usernames := make([]string, 0) err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error { for k, _ := range *umap { usernames = append(usernames, k) } return nil }) if err != nil { log.Printf("Error retrieving userlist: %s", err.Error()) return "", err } sort.Strings(usernames) return strings.Join(usernames, " "), nil }