579 lines
17 KiB
Go
579 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"html"
|
|
"log"
|
|
"net/url"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.ivysaur.me/libnmdc"
|
|
telegram "github.com/go-telegram-bot-api/telegram-bot-api"
|
|
)
|
|
|
|
// NTFServer methods all run on the same thread, so no mutexes are needed for field access
|
|
type NTFServer struct {
|
|
bot *telegram.BotAPI
|
|
hubMessages chan upstreamMessage
|
|
botName string
|
|
chatName string
|
|
inviteLink string
|
|
configFile string
|
|
config NTFConfig
|
|
conns map[string]*libnmdc.HubConnection // hubnick -> hubconn
|
|
verbose bool
|
|
|
|
callOnMainThread chan func()
|
|
|
|
contentedMaxBytes int64
|
|
|
|
// Except the coalesce buffer, that requires a background worker.
|
|
coalesceBufferMut sync.Mutex
|
|
coalesceBuffer map[string]time.Time
|
|
}
|
|
|
|
type upstreamMessage struct {
|
|
telegramUserId int64
|
|
hubNick string
|
|
evt libnmdc.HubEvent
|
|
}
|
|
|
|
func NewNTFServer(configFile string, verbose bool) (*NTFServer, error) {
|
|
ret := NTFServer{
|
|
configFile: configFile,
|
|
hubMessages: make(chan upstreamMessage, 0),
|
|
callOnMainThread: make(chan func(), 0),
|
|
conns: make(map[string]*libnmdc.HubConnection),
|
|
coalesceBuffer: make(map[string]time.Time),
|
|
verbose: verbose,
|
|
}
|
|
|
|
// Config
|
|
|
|
cfg, err := LoadConfig(configFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ret.config = cfg
|
|
|
|
// Validate contented URL (if present)
|
|
if len(ret.config.ContentedURL) > 0 {
|
|
_, err := url.Parse(ret.config.ContentedURL)
|
|
if err != nil {
|
|
log.Printf("Ignoring malformed URL to contented server '%s': %s", ret.config.ContentedURL, err.Error())
|
|
ret.config.ContentedURL = "" // clear
|
|
}
|
|
|
|
// Valid. Enforce trailing slash
|
|
if !strings.HasSuffix(ret.config.ContentedURL, `/`) {
|
|
ret.config.ContentedURL += `/`
|
|
}
|
|
|
|
// Only set this if contented URL is valid - zero otherwise
|
|
ret.contentedMaxBytes = int64(ret.config.ContentedMaxMB) * 1024 * 1024
|
|
}
|
|
|
|
// Coalesce background worker
|
|
go ret.coalesceWorker()
|
|
|
|
// Bot connection
|
|
|
|
if len(cfg.BotAPIKey) == 0 {
|
|
return nil, errors.New("No bot API key supplied (register with BotFather first)")
|
|
}
|
|
|
|
bot, err := telegram.NewBotAPI(cfg.BotAPIKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Connecting to Telegram: %s", err.Error())
|
|
}
|
|
|
|
ret.bot = bot
|
|
|
|
if ret.verbose {
|
|
bot.Debug = true
|
|
}
|
|
|
|
log.Printf("Connected to telegram as '%s'", bot.Self.UserName)
|
|
|
|
ret.botName = bot.Self.UserName
|
|
|
|
// Groupchat properties
|
|
|
|
if ret.IsSetupMode() {
|
|
log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run")
|
|
|
|
} else {
|
|
chatInfo, err := bot.GetChat(telegram.ChatConfig{ChatID: ret.config.GroupChatID})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Couldn't get supergroup properties: %s", err.Error())
|
|
}
|
|
|
|
inviteLink, err := bot.GetInviteLink(telegram.ChatConfig{ChatID: ret.config.GroupChatID})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Couldn't get supergroup invite link: %s", err.Error())
|
|
}
|
|
|
|
log.Printf("Group chat: %s", chatInfo.Title)
|
|
log.Printf("Invite link: %s", inviteLink)
|
|
|
|
ret.chatName = chatInfo.Title
|
|
ret.inviteLink = inviteLink
|
|
|
|
}
|
|
|
|
// Spawn upstream connections for all pre-existing known users
|
|
launchedAny := false
|
|
for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers {
|
|
hubNick, ok := ret.config.KnownUsers[telegramUserId]
|
|
if !ok {
|
|
log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB
|
|
}
|
|
|
|
err := ret.LaunchUpstreamWorker(telegramUserId, hubNick)
|
|
if err != nil {
|
|
log.Fatalf("Couldn't reconnect upstream for '%s': %s", hubNick, err.Error()) // fatal - inconsistent DB is the only possible cause
|
|
}
|
|
|
|
launchedAny = true
|
|
}
|
|
|
|
// We have some bad edge cases if a telegram message comes in while the upstream
|
|
// connection is offline. Wait a bit after startup, to minimise failure cases
|
|
if launchedAny {
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
|
|
return &ret, nil
|
|
}
|
|
|
|
// LaunchUpstreamWorker opens an NMDC connection.
|
|
func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error {
|
|
|
|
if _, exists := this.conns[hubUsername]; exists {
|
|
return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername)
|
|
}
|
|
|
|
log.Printf("Connecting to hub '%s' as user '%s'...", this.config.HubAddr, hubUsername)
|
|
|
|
// We want to use the async NMDC connection, but at the same time, provide
|
|
// extra information in the channel. Wrap the upstream NMDC channel with one
|
|
// of our own
|
|
upstreamChan := make(chan libnmdc.HubEvent, 0)
|
|
go func() {
|
|
for msg := range upstreamChan {
|
|
this.hubMessages <- upstreamMessage{
|
|
telegramUserId: telegramUserId,
|
|
hubNick: hubUsername,
|
|
evt: msg,
|
|
}
|
|
}
|
|
}()
|
|
|
|
hubUser := libnmdc.NewUserInfo(hubUsername)
|
|
hubUser.ClientTag = AppName
|
|
hubUser.ClientVersion = AppVersion
|
|
|
|
conn := libnmdc.ConnectAsync(
|
|
&libnmdc.HubConnectionOptions{
|
|
Address: libnmdc.HubAddress(this.config.HubAddr),
|
|
Self: hubUser,
|
|
},
|
|
upstreamChan,
|
|
)
|
|
|
|
// Stash conn so that we can refer to it / close it later
|
|
this.conns[hubUsername] = conn
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *NTFServer) IsSetupMode() bool {
|
|
return this.config.GroupChatID == 0
|
|
}
|
|
|
|
func (this *NTFServer) Run() error {
|
|
updateProps := telegram.NewUpdate(0)
|
|
updateProps.Timeout = 60 // seconds
|
|
|
|
updateChan, err := this.bot.GetUpdatesChan(updateProps)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case update, ok := <-updateChan:
|
|
if !ok {
|
|
log.Fatalf("Telegram update channel closed unexpectedly")
|
|
}
|
|
this.HandleMessage(update)
|
|
|
|
case hubMsg, ok := <-this.hubMessages:
|
|
if !ok {
|
|
log.Fatalf("Upstream update channel closed unexpectedly")
|
|
}
|
|
this.HandleHubMessage(hubMsg)
|
|
|
|
case mtfn, ok := <-this.callOnMainThread:
|
|
if !ok {
|
|
log.Fatalf("Synchronisation channel closed unexpectedly")
|
|
}
|
|
mtfn()
|
|
}
|
|
}
|
|
|
|
return nil // UNREACHABLE
|
|
}
|
|
|
|
func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
|
|
if this.verbose {
|
|
log.Printf("Hub: %#v", msg)
|
|
}
|
|
|
|
switch msg.evt.EventType {
|
|
|
|
case libnmdc.EVENT_SYSTEM_MESSAGE_FROM_CONN, libnmdc.EVENT_SYSTEM_MESSAGE_FROM_HUB:
|
|
// This includes "want to register your nick?" and chat-catchup messages
|
|
// But it also includes /me messages
|
|
|
|
// First, remove system characters from the front (star, space)
|
|
systemMessage := strings.TrimLeft(msg.evt.Message, `* `)
|
|
|
|
// Heuristic detect action messages if the first space-separated word is a known online user
|
|
isActionMessage := false
|
|
|
|
if conn, ok := this.conns[msg.hubNick]; ok {
|
|
firstWord := strings.SplitN(systemMessage, ` `, 2)[0]
|
|
_ = conn.Users(func(umap *map[string]libnmdc.UserInfo) error {
|
|
for knownHubNick, _ := range *umap {
|
|
if knownHubNick == firstWord {
|
|
isActionMessage = true
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if isActionMessage {
|
|
// Treat it as a full post
|
|
|
|
// Coalesce from multiple connections
|
|
if this.Coalesce("*", systemMessage) {
|
|
return // ignore - we heard this message already recently
|
|
}
|
|
|
|
// Display the message
|
|
htmlMsg := "<i>* " + html.EscapeString(systemMessage) + "</i>"
|
|
err := this.GroupChatSayHTML(htmlMsg)
|
|
if err != nil {
|
|
log.Printf("Delivering action message to group chat: %s", err.Error())
|
|
}
|
|
|
|
} else {
|
|
// Not an action message, just clerical notices
|
|
// Don't mirror them into telegram
|
|
log.Printf("Hub(%s): * %s", msg.hubNick, msg.evt.Message)
|
|
}
|
|
|
|
case libnmdc.EVENT_CONNECTION_STATE_CHANGED:
|
|
log.Printf("Hub(%s): * Connection %s", msg.evt.StateChange.String())
|
|
|
|
case libnmdc.EVENT_PRIVATE:
|
|
err := this.DirectMessageTelegramUser(msg.telegramUserId, fmt.Sprintf("PM from user '%s': %s", msg.evt.Nick, msg.evt.Message))
|
|
if err != nil {
|
|
log.Printf("Delivering PM to telegram user: %s", err.Error())
|
|
}
|
|
|
|
case libnmdc.EVENT_PUBLIC:
|
|
for _, ignoreNick := range this.config.HubIgnoreNicks {
|
|
if ignoreNick == msg.evt.Nick {
|
|
return // ignore
|
|
}
|
|
}
|
|
|
|
// Coalesce from multiple connections
|
|
if this.Coalesce(msg.evt.Nick, msg.evt.Message) {
|
|
return // ignore - we heard this message already recently
|
|
}
|
|
|
|
// Display the message
|
|
htmlMsg := "<b><" + html.EscapeString(msg.evt.Nick) + "></b> " + html.EscapeString(msg.evt.Message)
|
|
err := this.GroupChatSayHTML(htmlMsg)
|
|
if err != nil {
|
|
log.Printf("Delivering public message to group chat: %s", err.Error())
|
|
}
|
|
|
|
case libnmdc.EVENT_BAD_LOGIN_FAILURE:
|
|
this.DirectMessageTelegramUser(msg.telegramUserId, "The hub disconnected in a permanent way (login failure?). Consider re-registering with a different nick.")
|
|
this.kickAndDrop(msg.telegramUserId)
|
|
|
|
case libnmdc.EVENT_USER_JOINED, libnmdc.EVENT_USER_PART, libnmdc.EVENT_USER_UPDATED_INFO, libnmdc.EVENT_USERCOMMAND, libnmdc.EVENT_DEBUG_MESSAGE, libnmdc.EVENT_HUBNAME_CHANGED:
|
|
// ignore
|
|
|
|
default:
|
|
log.Printf("Hub(%s): Unhandled(%d): %s", msg.hubNick, msg.evt.EventType, msg.evt.Message)
|
|
|
|
}
|
|
}
|
|
|
|
func (this *NTFServer) HandleMessage(update telegram.Update) {
|
|
if update.Message == nil {
|
|
return
|
|
}
|
|
|
|
if this.IsSetupMode() {
|
|
log.Printf("Message from '%s': '%s', chat ID '%d'\n", update.Message.From.UserName, update.Message.Text, update.Message.Chat.ID)
|
|
|
|
} else if update.Message.Chat.ID == this.config.GroupChatID {
|
|
err := this.HandleGroupMessage(update)
|
|
if err != nil {
|
|
log.Printf("Handling group message: %s", err.Error())
|
|
}
|
|
|
|
} else if update.Message.Chat.IsPrivate() {
|
|
err := this.HandleDirectMessage(update)
|
|
if err != nil {
|
|
log.Printf("Handling private message: %s", err.Error())
|
|
}
|
|
|
|
} else {
|
|
log.Printf("Message from unknown chat %d (not our supergroup, not a PM, ...)", update.Message.Chat.ID)
|
|
}
|
|
|
|
if this.verbose {
|
|
fmt.Printf("%#v\n", update.Message)
|
|
|
|
log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text)
|
|
}
|
|
}
|
|
|
|
// kickAndDrop deregisters an account and kicks them from the group chat
|
|
func (this *NTFServer) kickAndDrop(telegramUserId int64) {
|
|
|
|
// Hubnick
|
|
if hubNick, ok := this.config.KnownUsers[telegramUserId]; ok {
|
|
|
|
// Close upstream connection (if any)
|
|
if conn, ok := this.conns[hubNick]; ok {
|
|
delete(this.conns, hubNick)
|
|
|
|
log.Printf("Disconnecting '%s' from hub", hubNick)
|
|
conn.Disconnect()
|
|
}
|
|
|
|
// Deregister from known users
|
|
delete(this.config.KnownUsers, telegramUserId)
|
|
err := this.config.Save(this.configFile)
|
|
if err != nil {
|
|
log.Printf("Couldn't save changes when deregistering user: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// Kick from telegram groupchat (if logged in)
|
|
_, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{
|
|
ChatMemberConfig: telegram.ChatMemberConfig{
|
|
ChatID: this.config.GroupChatID,
|
|
UserID: int(telegramUserId),
|
|
},
|
|
UntilDate: time.Now().Add(24 * time.Hour).Unix(),
|
|
})
|
|
if err != nil {
|
|
log.Printf("Couldn't kick user from telegram group: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// HandleTelegramUserParted processes a user leaving the groupchat, without deregistering their account
|
|
func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error {
|
|
delete(this.config.GroupChatMembers, telegramUserId)
|
|
return this.config.Save(this.configFile)
|
|
}
|
|
|
|
func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, telegramDisplayName string, update telegram.Update) error {
|
|
// If known, spawn the upstream connection; if unknown, kick them
|
|
hubNick, ok := this.config.KnownUsers[telegramUserId]
|
|
if !ok {
|
|
log.Printf("Unexpected user '%s' (%d) joined the group chat, kicking...", telegramDisplayName, telegramUserId)
|
|
this.kickAndDrop(telegramUserId)
|
|
}
|
|
|
|
// Remember which users are currently joined
|
|
this.config.GroupChatMembers[telegramUserId] = telegramDisplayName
|
|
err := this.config.Save(this.configFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Spawn the upstream connection for this user
|
|
return this.LaunchUpstreamWorker(telegramUserId, hubNick)
|
|
}
|
|
|
|
func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
|
|
|
|
// Joins: ????
|
|
if update.Message.NewChatMembers != nil && len(*update.Message.NewChatMembers) > 0 {
|
|
// Users joining
|
|
// Ensure that they have a valid user mapping
|
|
// Create upstream NMDC connection for them
|
|
for _, joinedUser := range *update.Message.NewChatMembers {
|
|
err := this.HandleTelegramUserJoined(int64(joinedUser.ID), joinedUser.String(), update)
|
|
if err != nil {
|
|
log.Printf("Handling user join: %s", err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if update.Message.LeftChatMember != nil {
|
|
// User parted
|
|
// Close upstream NMDC connection for them
|
|
log.Printf("Telegram user '%s' (%d) leaving group chat", update.Message.LeftChatMember.String(), update.Message.LeftChatMember.ID)
|
|
return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update)
|
|
}
|
|
|
|
//
|
|
|
|
userID := int64(update.Message.From.ID)
|
|
|
|
if update.Message.Sticker != nil && this.contentedMaxBytes > 0 {
|
|
// Sticker
|
|
go this.uploadStickerWorker(userID, update.Message.Sticker)
|
|
}
|
|
|
|
if len(update.Message.Text) > 0 {
|
|
|
|
// Intercept some bot commands
|
|
|
|
if update.Message.Text == "/userlist" || update.Message.Text == "/userlist@"+this.botName {
|
|
// Display native userlist inside the groupchat
|
|
|
|
// Find the responsible user's upstream connection
|
|
hubNick, ok := this.config.KnownUsers[userID]
|
|
if !ok {
|
|
return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID)
|
|
}
|
|
|
|
conn, ok := this.conns[hubNick]
|
|
if !ok {
|
|
return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick)
|
|
}
|
|
|
|
usernames, err := this.getUserlistText(conn)
|
|
if err != nil {
|
|
this.GroupChatSayHTML("<i>Can't get userlist for you (internal error)</i>")
|
|
}
|
|
|
|
this.GroupChatSayHTML(fmt.Sprintf("<i>Online users:</i> %s", html.EscapeString(usernames)))
|
|
|
|
} else {
|
|
|
|
// Actual chat message
|
|
sendMsg := update.Message.Text
|
|
|
|
// Was it a reply to another message? If so, format it as "<OrigUser> OrigMessage // update text"
|
|
// That includes collapsing the original message onto a single line
|
|
if update.Message.ReplyToMessage != nil && len(update.Message.ReplyToMessage.Text) > 0 && update.Message.ReplyToMessage.From != nil {
|
|
origSenderId := int64(update.Message.ReplyToMessage.From.ID)
|
|
|
|
flatten := func(s string) string { return strings.Replace(s, "\n", " ", -1) }
|
|
|
|
if origSenderNick, ok := this.config.KnownUsers[origSenderId]; ok {
|
|
// Quoted messages from another hub user
|
|
sendMsg = fmt.Sprintf(`"<%s> %s" // %s`, origSenderNick, flatten(update.Message.ReplyToMessage.Text), sendMsg)
|
|
|
|
} else if origSenderId == int64(this.bot.Self.ID) {
|
|
// Quoted messages from the hublink bot
|
|
sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg) // the username is already present in the plaintext
|
|
|
|
} else {
|
|
// No idea what was quoted. But it really was quoting something
|
|
// Paste it as-is but with some info in the logs for future cleanup
|
|
log.Printf("Quoting message '%#v' with unknown source", update.Message.ReplyToMessage)
|
|
sendMsg = fmt.Sprintf(`"%s" // %s`, flatten(update.Message.ReplyToMessage.Text), sendMsg)
|
|
}
|
|
|
|
}
|
|
|
|
return this.HubSay(userID, sendMsg)
|
|
}
|
|
}
|
|
|
|
// TODO probably a file/image upload???
|
|
// TODO support "editing messages" by re-sending them with a ** suffix
|
|
return nil
|
|
}
|
|
|
|
func (this *NTFServer) HubSay(userID int64, sendMsg string) error {
|
|
|
|
// Find the responsible user's upstream connection
|
|
hubNick, ok := this.config.KnownUsers[userID]
|
|
if !ok {
|
|
return fmt.Errorf("Couldn't send public message for user '%d' unexpectedly missing hub nick!", userID)
|
|
}
|
|
|
|
conn, ok := this.conns[hubNick]
|
|
if !ok {
|
|
return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick)
|
|
}
|
|
|
|
// Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection
|
|
this.Coalesce(hubNick, sendMsg)
|
|
|
|
// Submit to NMDC
|
|
err := conn.SayPublic(sendMsg)
|
|
if err != nil {
|
|
log.Printf("Failed to deliver message '%s': %s", sendMsg, err.Error())
|
|
this.GroupChatSayHTML(fmt.Sprintf("<i>Couldn't sync message '%s' because: %s</i>", html.EscapeString(sendMsg), html.EscapeString(err.Error())))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *NTFServer) ReplyTelegramUser(userID int64, str string, replyToMessageID int) error {
|
|
chatId, ok := this.config.DirectMessageChats[userID]
|
|
if !ok {
|
|
return fmt.Errorf("Can't send telegram message to user '%s': no DM chat known", userID)
|
|
}
|
|
|
|
msg := telegram.NewMessage(chatId, str)
|
|
if replyToMessageID != 0 {
|
|
msg.ReplyToMessageID = replyToMessageID
|
|
}
|
|
_, err := this.bot.Send(msg)
|
|
return err
|
|
}
|
|
|
|
func (this *NTFServer) DirectMessageTelegramUser(userID int64, str string) error {
|
|
return this.ReplyTelegramUser(userID, str, 0)
|
|
}
|
|
|
|
func (this *NTFServer) GroupChatSayHTML(str string) error {
|
|
msg := telegram.NewMessage(this.config.GroupChatID, str)
|
|
msg.ParseMode = telegram.ModeHTML
|
|
_, err := this.bot.Send(msg)
|
|
return err
|
|
}
|
|
|
|
func (this *NTFServer) getUserlistText(conn *libnmdc.HubConnection) (string, error) {
|
|
usernames := make([]string, 0)
|
|
err := conn.Users(func(umap *map[string]libnmdc.UserInfo) error {
|
|
for k, _ := range *umap {
|
|
usernames = append(usernames, k)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
log.Printf("Error retrieving userlist: %s", err.Error())
|
|
return "", err
|
|
}
|
|
|
|
sort.Strings(usernames)
|
|
return strings.Join(usernames, " "), nil
|
|
}
|