nmdc-telegramfrontend/NTFServer.go

758 lines
23 KiB
Go

package main
import (
"errors"
"fmt"
"html"
"log"
"regexp"
"sort"
"strings"
"sync"
"time"
"code.ivysaur.me/libnmdc"
telegram "github.com/go-telegram-bot-api/telegram-bot-api"
)
const (
coalesceWorkerRescanEvery time.Duration = 10 * time.Second
coalesceWorkerExpireAfter time.Duration = 20 * time.Second
)
// NTFServer methods all run on the same thread, so no mutexes are needed for field access
type NTFServer struct {
bot *telegram.BotAPI
hubMessages chan upstreamMessage
botName string
chatName string
inviteLink string
configFile string
config NTFConfig
conns map[string]*libnmdc.HubConnection // hubnick -> hubconn
verbose bool
// Except the coalesce buffer, that requires a background worker.
coalesceBufferMut sync.Mutex
coalesceBuffer map[string]time.Time
}
func coalesceKey(hubNick, message string) string {
return hubNick + "\x00" + message
}
type upstreamMessage struct {
telegramUserId int64
hubNick string
evt libnmdc.HubEvent
}
func NewNTFServer(configFile string, verbose bool) (*NTFServer, error) {
ret := NTFServer{
configFile: configFile,
hubMessages: make(chan upstreamMessage, 0),
conns: make(map[string]*libnmdc.HubConnection),
coalesceBuffer: make(map[string]time.Time),
verbose: verbose,
}
// Config
cfg, err := LoadConfig(configFile)
if err != nil {
return nil, err
}
ret.config = cfg
// Coalesce background worker
go ret.coalesceWorker()
// Bot connection
if len(cfg.BotAPIKey) == 0 {
return nil, errors.New("No bot API key supplied (register with BotFather first)")
}
bot, err := telegram.NewBotAPI(cfg.BotAPIKey)
if err != nil {
return nil, fmt.Errorf("Connecting to Telegram: %s", err.Error())
}
ret.bot = bot
if ret.verbose {
bot.Debug = true
}
log.Printf("Connected to telegram as '%s'", bot.Self.UserName)
ret.botName = bot.Self.UserName
// Groupchat properties
if ret.IsSetupMode() {
log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run")
} else {
chatInfo, err := bot.GetChat(telegram.ChatConfig{ChatID: ret.config.GroupChatID})
if err != nil {
return nil, fmt.Errorf("Couldn't get supergroup properties: %s", err.Error())
}
inviteLink, err := bot.GetInviteLink(telegram.ChatConfig{ChatID: ret.config.GroupChatID})
if err != nil {
return nil, fmt.Errorf("Couldn't get supergroup invite link: %s", err.Error())
}
log.Printf("Group chat: %s", chatInfo.Title)
log.Printf("Invite link: %s", inviteLink)
ret.chatName = chatInfo.Title
ret.inviteLink = inviteLink
}
// Spawn upstream connections for all pre-existing known users
launchedAny := false
for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers {
hubNick, ok := ret.config.KnownUsers[telegramUserId]
if !ok {
log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB
}
err := ret.LaunchUpstreamWorker(telegramUserId, hubNick)
if err != nil {
log.Fatalf("Couldn't reconnect upstream for '%s': %s", hubNick, err.Error()) // fatal - inconsistent DB is the only possible cause
}
launchedAny = true
}
// We have some bad edge cases if a telegram message comes in while the upstream
// connection is offline. Wait a bit after startup, to minimise failure cases
if launchedAny {
time.Sleep(5 * time.Second)
}
return &ret, nil
}
// registerUser lodges a user mapping in the configuration file.
// This allows them to join the group chat (unbanning them if necessary).
// An actual NMDC connection will occur once the user joins for the first time.
func (this *NTFServer) registerUser(telegramUserId int64, hubUsername string) error {
if existingHubNick, ok := this.config.KnownUsers[telegramUserId]; ok {
if existingHubNick == hubUsername {
return nil
}
return fmt.Errorf("Telegram account is already registered with hub nick '%s'", existingHubNick)
}
for _, v := range this.config.KnownUsers {
if v == hubUsername {
return fmt.Errorf("Requested hub nick '%s' is already used by another member", hubUsername)
}
}
this.config.KnownUsers[telegramUserId] = hubUsername
err := this.config.Save(this.configFile)
if err != nil {
return err
}
// Unban from groupchat, if necessary
// Ignore errors because the user might not have been banned
_, err = this.bot.UnbanChatMember(telegram.ChatMemberConfig{
ChatID: this.config.GroupChatID,
UserID: int(telegramUserId),
})
if err != nil {
log.Printf("Couldn't unban user '%s' from groupchat because: %s (assuming OK, continuing)", hubUsername, err.Error())
}
return nil
}
// LaunchUpstreamWorker opens an NMDC connection.
func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error {
if _, exists := this.conns[hubUsername]; exists {
return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername)
}
log.Printf("Connecting to hub '%s' as user '%s'...", this.config.HubAddr, hubUsername)
// We want to use the async NMDC connection, but at the same time, provide
// extra information in the channel. Wrap the upstream NMDC channel with one
// of our own
upstreamChan := make(chan libnmdc.HubEvent, 0)
go func() {
for msg := range upstreamChan {
this.hubMessages <- upstreamMessage{
telegramUserId: telegramUserId,
hubNick: hubUsername,
evt: msg,
}
}
}()
hubUser := libnmdc.NewUserInfo(hubUsername)
hubUser.ClientTag = AppName
hubUser.ClientVersion = AppVersion
conn := libnmdc.ConnectAsync(
&libnmdc.HubConnectionOptions{
Address: libnmdc.HubAddress(this.config.HubAddr),
Self: hubUser,
},
upstreamChan,
)
// Stash conn so that we can refer to it / close it later
this.conns[hubUsername] = conn
return nil
}
func (this *NTFServer) IsSetupMode() bool {
return this.config.GroupChatID == 0
}
func (this *NTFServer) Run() error {
updateProps := telegram.NewUpdate(0)
updateProps.Timeout = 60 // seconds
updateChan, err := this.bot.GetUpdatesChan(updateProps)
if err != nil {
return err
}
for {
select {
case update, ok := <-updateChan:
if !ok {
log.Fatalf("Telegram update channel closed unexpectedly")
}
this.HandleMessage(update)
case hubMsg, ok := <-this.hubMessages:
if !ok {
log.Fatalf("Upstream update channel closed unexpectedly")
}
this.HandleHubMessage(hubMsg)
}
}
return nil // UNREACHABLE
}
func (this *NTFServer) coalesceWorker() {
for {
time.Sleep(coalesceWorkerRescanEvery)
deadLine := time.Now().Add(-coalesceWorkerExpireAfter)
this.coalesceBufferMut.Lock()
for k, v := range this.coalesceBuffer {
if v.Before(deadLine) {
delete(this.coalesceBuffer, k)
}
}
this.coalesceBufferMut.Unlock()
}
}
// Coalesce returns true if the message sticks to an existing one.
// It adds it into the coalesce buffer anyway.
func (this *NTFServer) Coalesce(hubNick, message string) bool {
ckey := coalesceKey(hubNick, message)
this.coalesceBufferMut.Lock()
if _, ok := this.coalesceBuffer[ckey]; ok {
this.coalesceBufferMut.Unlock()
return true // stuck to existing
} else {
this.coalesceBuffer[ckey] = time.Now()
this.coalesceBufferMut.Unlock()
return false // first we've seen it
}
}
func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
if this.verbose {
log.Printf("Hub: %#v", msg)
}
switch msg.evt.EventType {
case libnmdc.EVENT_SYSTEM_MESSAGE_FROM_CONN, libnmdc.EVENT_SYSTEM_MESSAGE_FROM_HUB:
// This includes "want to register your nick?" and chat-catchup messages
// But it also includes /me messages
// First, remove system characters from the front (star, space)
systemMessage := strings.TrimLeft(msg.evt.Message, `* `)
// Heuristic detect action messages if the first space-separated word is a known online user
isActionMessage := false
if conn, ok := this.conns[msg.hubNick]; ok {
firstWord := strings.SplitN(systemMessage, ` `, 2)[0]
_ = conn.Users(func(umap *map[string]libnmdc.UserInfo) error {
for knownHubNick, _ := range *umap {
if knownHubNick == firstWord {
isActionMessage = true
return nil
}
}
return nil
})
}
if isActionMessage {
// Treat it as a full post
// Coalesce from multiple connections
if this.Coalesce("*", systemMessage) {
return // ignore - we heard this message already recently
}
// Display the message
htmlMsg := "<i>* " + html.EscapeString(systemMessage) + "</i>"
err := this.GroupChatSayHTML(htmlMsg)
if err != nil {
log.Printf("Delivering action message to group chat: %s", err.Error())
}
} else {
// Not an action message, just clerical notices
// Don't mirror them into telegram
log.Printf("Hub(%s): * %s", msg.hubNick, msg.evt.Message)
}
case libnmdc.EVENT_CONNECTION_STATE_CHANGED:
log.Printf("Hub(%s): * Connection %s", msg.evt.StateChange.String())
case libnmdc.EVENT_PRIVATE:
err := this.DirectMessageTelegramUser(msg.telegramUserId, fmt.Sprintf("PM from user '%s': %s", msg.evt.Nick, msg.evt.Message))
if err != nil {
log.Printf("Delivering PM to telegram user: %s", err.Error())
}
case libnmdc.EVENT_PUBLIC:
for _, ignoreNick := range this.config.HubIgnoreNicks {
if ignoreNick == msg.evt.Nick {
return // ignore
}
}
// Coalesce from multiple connections
if this.Coalesce(msg.evt.Nick, msg.evt.Message) {
return // ignore - we heard this message already recently
}
// Display the message
htmlMsg := "<b>&lt;" + html.EscapeString(msg.evt.Nick) + "&gt;</b> " + html.EscapeString(msg.evt.Message)
err := this.GroupChatSayHTML(htmlMsg)
if err != nil {
log.Printf("Delivering public message to group chat: %s", err.Error())
}
case libnmdc.EVENT_BAD_LOGIN_FAILURE:
this.DirectMessageTelegramUser(msg.telegramUserId, "The hub disconnected in a permanent way (login failure?). Consider re-registering with a different nick.")
this.kickAndDrop(msg.telegramUserId)
case libnmdc.EVENT_USER_JOINED, libnmdc.EVENT_USER_PART, libnmdc.EVENT_USER_UPDATED_INFO, libnmdc.EVENT_USERCOMMAND, libnmdc.EVENT_DEBUG_MESSAGE, libnmdc.EVENT_HUBNAME_CHANGED:
// ignore
default:
log.Printf("Hub(%s): Unhandled(%d): %s", msg.hubNick, msg.evt.EventType, msg.evt.Message)
}
}
func (this *NTFServer) HandleMessage(update telegram.Update) {
if update.Message == nil {
return
}
if this.IsSetupMode() {
log.Printf("Message from '%s': '%s', chat ID '%d'\n", update.Message.From.UserName, update.Message.Text, update.Message.Chat.ID)
} else if update.Message.Chat.ID == this.config.GroupChatID {
err := this.HandleGroupMessage(update)
if err != nil {
log.Printf("Handling group message: %s", err.Error())
}
} else if update.Message.Chat.IsPrivate() {
err := this.HandleDirectMessage(update)
if err != nil {
log.Printf("Handling private message: %s", err.Error())
}
} else {
log.Printf("Message from unknown chat %d (not our supergroup, not a PM, ...)", update.Message.Chat.ID)
}
if this.verbose {
fmt.Printf("%#v\n", update.Message)
log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text)
}
}
// kickAndDrop deregisters an account and kicks them from the group chat
func (this *NTFServer) kickAndDrop(telegramUserId int64) {
// Hubnick
if hubNick, ok := this.config.KnownUsers[telegramUserId]; ok {
// Close upstream connection (if any)
if conn, ok := this.conns[hubNick]; ok {
delete(this.conns, hubNick)
log.Printf("Disconnecting '%s' from hub", hubNick)
conn.Disconnect()
}
// Deregister from known users
delete(this.config.KnownUsers, telegramUserId)
err := this.config.Save(this.configFile)
if err != nil {
log.Printf("Couldn't save changes when deregistering user: %s", err.Error())
}
}
// Kick from telegram groupchat (if logged in)
_, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{
ChatMemberConfig: telegram.ChatMemberConfig{
ChatID: this.config.GroupChatID,
UserID: int(telegramUserId),
},
UntilDate: time.Now().Add(24 * time.Hour).Unix(),
})
if err != nil {
log.Printf("Couldn't kick user from telegram group: %s", err.Error())
}
}
// HandleTelegramUserParted processes a user leaving the groupchat, without deregistering their account
func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error {
delete(this.config.GroupChatMembers, telegramUserId)
return this.config.Save(this.configFile)
}
func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, telegramDisplayName string, update telegram.Update) error {
// If known, spawn the upstream connection; if unknown, kick them
hubNick, ok := this.config.KnownUsers[telegramUserId]
if !ok {
log.Printf("Unexpected user '%s' (%d) joined the group chat, kicking...", telegramDisplayName, telegramUserId)
this.kickAndDrop(telegramUserId)
}
// Remember which users are currently joined
this.config.GroupChatMembers[telegramUserId] = telegramDisplayName
err := this.config.Save(this.configFile)
if err != nil {
return err
}
// Spawn the upstream connection for this user
return this.LaunchUpstreamWorker(telegramUserId, hubNick)
}
func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
// Joins: ????
if update.Message.NewChatMembers != nil && len(*update.Message.NewChatMembers) > 0 {
// Users joining
// Ensure that they have a valid user mapping
// Create upstream NMDC connection for them
for _, joinedUser := range *update.Message.NewChatMembers {
err := this.HandleTelegramUserJoined(int64(joinedUser.ID), joinedUser.String(), update)
if err != nil {
log.Printf("Handling user join: %s", err.Error())
}
}
return nil
}
if update.Message.LeftChatMember != nil {
// User parted
// Close upstream NMDC connection for them
log.Printf("Telegram user '%s' (%d) leaving group chat", update.Message.LeftChatMember.String(), update.Message.LeftChatMember.ID)
return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update)
}
if len(update.Message.Text) > 0 {
// Find the responsible user's upstream connection
userID := int64(update.Message.From.ID)
hubNick, ok := this.config.KnownUsers[userID]
if !ok {
return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID)
}
conn, ok := this.conns[hubNick]
if !ok {
return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick)
}
// Intercept some bot commands
if update.Message.Text == "/userlist" || update.Message.Text == "/userlist@"+this.botName {
// Display native userlist inside the groupchat
usernames, err := this.getUserlistText(conn)
if err != nil {
this.GroupChatSayHTML("<i>Can't get userlist for you (internal error)</i>")
}
this.GroupChatSayHTML(fmt.Sprintf("<i>Online users:</i> %s", html.EscapeString(usernames)))
} else {
// Actual chat message
sendMsg := update.Message.Text
// Was it a reply to another message? If so, format it as "<OrigUser> OrigMessage // update text"
// That includes collapsing the original message onto a single line
if update.Message.ReplyToMessage != nil && len(update.Message.ReplyToMessage.Text) > 0 && update.Message.ReplyToMessage.From != nil {
origSenderId := int64(update.Message.ReplyToMessage.From.ID)
flatten := func(s string) string { return strings.Replace(s, "\n", " ", -1) }
if origSenderNick, ok := this.config.KnownUsers[origSenderId]; ok {
// Quoted messages from another hub user
sendMsg = fmt.Sprintf(`"<%s> %s" // %s`, origSenderNick, flatten(update.Message.ReplyToMessage.Text), sendMsg)
} else if origSenderId == int64(this.bot.Self.ID) {
// Quoted messages from the hublink bot
sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg) // the username is already present in the plaintext
} else {
// No idea what was quoted. But it really was quoting something
// Paste it as-is but with some info in the logs for future cleanup
log.Printf("Quoting message '%#v' with unknown source", update.Message.ReplyToMessage)
sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg)
}
}
// Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection
this.Coalesce(hubNick, sendMsg)
// Submit to NMDC
err := conn.SayPublic(sendMsg)
if err != nil {
log.Printf("Failed to deliver message '%s': %s", sendMsg, err.Error())
this.GroupChatSayHTML(fmt.Sprintf("<i>Couldn't sync message '%s' because: %s</i>", html.EscapeString(sendMsg), html.EscapeString(err.Error())))
}
}
}
// TODO probably a file/image upload???
// TODO support "editing messages" by re-sending them with a ** suffix
return nil
}
func (this *NTFServer) ReplyTelegramUser(userID int64, str string, replyToMessageID int) error {
chatId, ok := this.config.DirectMessageChats[userID]
if !ok {
return fmt.Errorf("Can't send telegram message to user '%s': no DM chat known", userID)
}
msg := telegram.NewMessage(chatId, str)
if replyToMessageID != 0 {
msg.ReplyToMessageID = replyToMessageID
}
_, err := this.bot.Send(msg)
return err
}
func (this *NTFServer) DirectMessageTelegramUser(userID int64, str string) error {
return this.ReplyTelegramUser(userID, str, 0)
}
func (this *NTFServer) GroupChatSayHTML(str string) error {
msg := telegram.NewMessage(this.config.GroupChatID, str)
msg.ParseMode = telegram.ModeHTML
_, err := this.bot.Send(msg)
return err
}
func (this *NTFServer) getUserlistText(conn *libnmdc.HubConnection) (string, error) {
usernames := make([]string, 0)
err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error {
for k, _ := range *umap {
usernames = append(usernames, k)
}
return nil
})
if err != nil {
log.Printf("Error retrieving userlist: %s", err.Error())
return "", err
}
sort.Strings(usernames)
return strings.Join(usernames, " "), nil
}
func (this *NTFServer) HandleDirectMessage(update telegram.Update) error {
// Registration workflow
userID := int64(update.Message.From.ID)
// Stash the telegram user ID against this direct-message chat ID so that
// we can always reply later on
chatID := update.Message.Chat.ID
if oldChatID, ok := this.config.DirectMessageChats[userID]; !ok || oldChatID != chatID {
this.config.DirectMessageChats[userID] = chatID
err := this.config.Save(this.configFile)
if err != nil {
log.Printf("Couldn't save chat ID %d for user %d", chatID, userID)
}
}
respond := func(str string) error {
return this.ReplyTelegramUser(userID, str, update.Message.MessageID)
}
// Find out the current status for this chat ID
hubNick, isKnown := this.config.KnownUsers[userID]
_, isInGroupChat := this.config.GroupChatMembers[userID]
// Handle the incoming request
msg := update.Message.Text
if strings.HasPrefix(msg, "/pm ") {
if !(isKnown && isInGroupChat) {
return respond("Can't send a native PM until you've joined.")
}
conn, ok := this.conns[hubNick]
if !ok {
return respond("Can't send a native PM (No upstream hub connection)")
}
parts := strings.SplitN(msg, " ", 3)
if len(parts) != 3 {
return respond("Expected format /pm [recipient] [message] - try again...")
}
if !conn.UserExists(parts[1]) {
return respond(fmt.Sprintf("Can't PM offline user '%s'", parts[1]))
}
err := conn.SayPrivate(parts[1], parts[2])
if err != nil {
log.Printf("Sending PM %s -> %s failed because: %s", hubNick, parts[1], err.Error())
return respond(fmt.Sprintf("Sending PM failed because: %s", err.Error()))
}
} else if strings.HasPrefix(msg, "/join ") {
requestedHubNick := msg[6:]
// Some users take the [] in the message literally. A-Za-z0-9 are the only supported characters
requestedHubNick = regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(requestedHubNick, "")
// Minimum nick lengths
if len(requestedHubNick) < this.config.HubNickMinChars {
return respond(fmt.Sprintf("Upstream nickname '%s' should be at least %d characters long", requestedHubNick, this.config.HubNickMinChars))
}
err := this.registerUser(userID, requestedHubNick)
if err != nil {
log.Printf("Failed to register user: %s", err.Error())
return respond(fmt.Sprintf("Couldn't allow registration because: %s", err.Error()))
}
return respond(fmt.Sprintf("Hi '%s'! You are now registered, and can join %s at %s", requestedHubNick, this.config.HubDescription, this.inviteLink))
} else if strings.HasPrefix(msg, "/rejoin") {
if isKnown && !isInGroupChat {
return respond(fmt.Sprintf("Welcome back '%s'! You can join %s at %s", hubNick, this.config.HubDescription, this.inviteLink))
} else {
return respond("You are either already joined (try /quit first), or not yet registered (try /join first).")
}
} else if strings.HasPrefix(msg, "/userlist") {
conn, ok := this.conns[hubNick]
if !ok {
return respond("Can't get userlist for you (No upstream hub connection)")
}
usernames, err := this.getUserlistText(conn)
if err != nil {
return respond("Can't get userlist for you (internal error)")
}
return respond("Online users: " + usernames)
} else if strings.HasPrefix(msg, "/whois ") {
target := msg[7:]
conn, ok := this.conns[hubNick]
if !ok {
return respond("Can't get user details for you (No upstream hub connection)")
}
var userinfo libnmdc.UserInfo
var exists bool
err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error {
userinfo, exists = (*umap)[target]
return nil
})
if err != nil {
log.Printf("Error retrieving userlist: %s", err.Error())
return respond("Can't get user details for you (internal error)")
}
if !exists {
return respond(fmt.Sprintf("There is no online user '%s'.", target))
}
return respond(fmt.Sprintf("Description: %s\nShare size: %d\nClient: %s %s", userinfo.Description, userinfo.ShareSize, userinfo.ClientTag, userinfo.ClientVersion))
} else if strings.HasPrefix(msg, "/quit") {
this.kickAndDrop(userID)
return respond("Disconnected. You can register again by typing /help .")
} else { // e.g. /start or /help
// Help
helpMsg := `I am a bot that connects Telegram with ` + this.config.HubDescription + ".\n"
if isKnown {
helpMsg += "You are currently registered as: '" + hubNick + "'\n"
} else {
helpMsg += "You aren't connected yet.\n"
}
if isInGroupChat {
helpMsg += "You are currently in the groupchat.\n"
} else {
helpMsg += "You haven't joined the groupchat.\n"
}
helpMsg += `
Available commands:
/setup - Welcome message
/join [hubnick] - Register as 'hubnick' and join ` + this.config.HubDescription + `
/rejoin - Re-invite if you are already registered
/pm [recipient] [message] - Send a native PM (if connected)
/userlist - List native online users
/whois [hubnick] - Check if native user is online, and read their description
/quit - Unregister your nick and leave the group chat
`
return respond(helpMsg)
}
return nil
}