diff --git a/NTFConfig.go b/NTFConfig.go index 30f7c9c..41ba422 100644 --- a/NTFConfig.go +++ b/NTFConfig.go @@ -5,13 +5,19 @@ import ( "io/ioutil" ) +// NTFConfig keeps track of setup properties as well as runtime state. +// FIXME separate compile-time and run-time properties into separate files type NTFConfig struct { - HubAddr string - BotAPIKey string - GroupChatID int64 + HubAddr string + HubDescription string + BotAPIKey string + GroupChatID int64 // Map of telegram user IDs to NMDC nicks KnownUsers map[int64]string + + // Map of telegram users known to be in the group chat ==> telegram displayname + GroupChatMembers map[int64]string } func LoadConfig(configFile string) (NTFConfig, error) { diff --git a/NTFServer.go b/NTFServer.go index 33896fc..92519fd 100644 --- a/NTFServer.go +++ b/NTFServer.go @@ -1,10 +1,11 @@ package main import ( - "context" "errors" "fmt" "log" + "strings" + "time" "code.ivysaur.me/libnmdc" telegram "github.com/go-telegram-bot-api/telegram-bot-api" @@ -12,24 +13,29 @@ import ( // NTFServer methods all run on the same thread, so no mutexes are needed for field access type NTFServer struct { - bot *telegram.BotAPI - upstream chan upstreamMessage - chatName string - inviteLink string - configFile string - config NTFConfig + bot *telegram.BotAPI + hubMessages chan upstreamMessage + chatName string + inviteLink string + configFile string + config NTFConfig + conns map[string]*libnmdc.HubConnection // hubnick -> hubconn } type upstreamMessage struct { telegramUserId int64 + hubNick string evt libnmdc.HubEvent } func NewNTFServer(configFile string) (*NTFServer, error) { ret := NTFServer{ - configFile: configFile, + configFile: configFile, + hubMessages: make(chan upstreamMessage, 0), } + // Config + cfg, err := LoadConfig(configFile) if err != nil { return nil, err @@ -37,6 +43,8 @@ func NewNTFServer(configFile string) (*NTFServer, error) { ret.config = cfg + // Bot connection + if len(cfg.BotAPIKey) == 0 { return nil, errors.New("No bot API key supplied (register with BotFather first)") } @@ -52,6 +60,8 @@ func NewNTFServer(configFile string) (*NTFServer, error) { log.Printf("Connected to telegram as '%s'", bot.Self.UserName) + // Groupchat properties + if ret.IsSetupMode() { log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run") @@ -74,20 +84,92 @@ func NewNTFServer(configFile string) (*NTFServer, error) { } - // Spawn upstream connections for all known users - ret.upstream = make(chan upstreamMessage, 0) - for k, v := range ret.config.KnownUsers { - ret.LaunchUpstreamWorker(k, v) + // Spawn upstream connections for all pre-existing known users + for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers { + hubNick, ok := ret.config.KnownUsers[telegramUserId] + if !ok { + log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB + } + + err := ret.LaunchUpstreamWorker(telegramUserId, hubNick) + if err != nil { + return nil, fmt.Errorf("Reconnecting upstream for '%s': %s", hubNick, err.Error()) // fatal + } } return &ret, nil } -func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) { - ctx := context.Background() +// registerUser lodges a user mapping in the configuration file. +// This allows them to join the group chat (unbanning them if necessary). +// An actual NMDC connection will occur once the user joins for the first time. +func (this *NTFServer) registerUser(telegramUserId int64, hubUsername string) error { + if existingHubNick, ok := this.config.KnownUsers[telegramUserId]; ok { + return fmt.Errorf("Telegram account is already registered with hub nick '%s'", existingHubNick) + } - // Open NMDC connection - go upstreamWorker() + for _, v := range this.config.KnownUsers { + if v == hubUsername { + return fmt.Errorf("Requested hub nick '%s' is already used by another member", hubUsername) + } + } + + this.config.KnownUsers[telegramUserId] = hubUsername + err := this.config.Save(this.configFile) + if err != nil { + return err + } + + // Unban from groupchat, if necessary + // Ignore errors because the user might not have been banned + _, err = this.bot.UnbanChatMember(telegram.ChatMemberConfig{ + ChatID: this.config.GroupChatID, + UserID: int(telegramUserId), + }) + if err != nil { + log.Printf("Couldn't unban user '%s' from groupchat because: %s (assuming OK, continuing)", hubUsername, err.Error()) + } + + return nil +} + +// LaunchUpstreamWorker opens an NMDC connection. +func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error { + + if _, exists := this.conns[hubUsername]; exists { + return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername) + } + + // We want to use the async NMDC connection, but at the same time, provide + // extra information in the channel. Wrap the upstream NMDC channel with one + // of our own + upstreamChan := make(chan libnmdc.HubEvent, 0) + go func() { + for msg := range upstreamChan { + this.hubMessages <- upstreamMessage{ + telegramUserId: telegramUserId, + hubNick: hubUsername, + evt: msg, + } + } + }() + + conn := libnmdc.ConnectAsync( + &libnmdc.HubConnectionOptions{ + Address: libnmdc.HubAddress(this.config.HubAddr), + Self: &libnmdc.UserInfo{ + Nick: hubUsername, + ClientTag: AppName, + ClientVersion: AppVersion, + }, + }, + upstreamChan, + ) + + // Stash conn so that we can refer to it / close it later + this.conns[hubUsername] = conn + + return nil } func (this *NTFServer) IsSetupMode() bool { @@ -103,11 +185,27 @@ func (this *NTFServer) Run() error { return err } - for update := range updateChan { - this.HandleMessage(update) + for { + select { + case update, ok := <-updateChan: + if !ok { + log.Fatalf("Telegram update channel closed unexpectedly") + } + this.HandleMessage(update) + + case hubMsg, ok := <-this.hubMessages: + if !ok { + log.Fatalf("Upstream update channel closed unexpectedly") + } + this.HandleHubMessage(hubMsg) + } } - return nil // Update channel was closed + return nil // UNREACHABLE +} + +func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { + log.Printf("Hub: %#v", msg) } func (this *NTFServer) HandleMessage(update telegram.Update) { @@ -139,6 +237,33 @@ func (this *NTFServer) HandleMessage(update telegram.Update) { log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text) } +func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error { + delete(this.config.GroupChatMembers, telegramUserId) + return this.config.Save(this.configFile) +} + +func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, update telegram.Update) error { + // If known, spawn the upstream connection; if unknown, kick them + hubNick, ok := this.config.KnownUsers[telegramUserId] + if !ok { + _, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{ + ChatMemberConfig: telegram.ChatMemberConfig{ + ChatID: update.Message.Chat.ID, + UserID: int(telegramUserId), + }, + UntilDate: time.Now().Add(24 * time.Hour).Unix(), + }) + if err != nil { + log.Printf("Couldn't kick unexpected chat member from group: %s", err.Error()) + } + + return nil + } + + // Spawn the upstream connection for this user + return this.LaunchUpstreamWorker(telegramUserId, hubNick) +} + func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { // Joins: ???? @@ -146,21 +271,24 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { // Users joining // Ensure that they have a valid user mapping // Create upstream NMDC connection for them + for _, joinedUser := range *update.Message.NewChatMembers { + err := this.HandleTelegramUserJoined(int64(joinedUser.ID), update) + if err != nil { + log.Printf("Handling user join: %s", err.Error()) + } + } + return nil } if update.Message.LeftChatMember != nil { // User parted // Close upstream NMDC connection for them + return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update) } - // Parts: - /* - &tgbotapi.Message{ - MessageID:9, From:(*tgbotapi.User)(0xc420304000), Date:1527989178, Chat:(*tgbotapi.Chat)(0xc4201dc120), [...] - NewChatMembers:(*[]tgbotapi.User)(nil), - LeftChatMember:(*tgbotapi.User)(0xc420304050), - } - */ + // Actual chat message + // Find the responsible user and send it to the upstream hub using their account + // TODO return nil } @@ -168,11 +296,68 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { func (this *NTFServer) HandleDirectMessage(update telegram.Update) error { // Registration workflow - // Find out the current status for this chat ID... etc. - msg := telegram.NewMessage(update.Message.Chat.ID, "Hi user, join the group chat at "+this.inviteLink) - msg.ReplyToMessageID = update.Message.MessageID - this.bot.Send(msg) + // Find out the current status for this chat ID + userID := int64(update.Message.From.ID) + hubNick, isKnown := this.config.KnownUsers[userID] + + respond := func(str string) error { + msg := telegram.NewMessage(update.Message.Chat.ID, str) + msg.ReplyToMessageID = update.Message.MessageID + _, err := this.bot.Send(msg) + return err + } + + // Handle the incoming request + + msg := update.Message.Text + if strings.HasPrefix(msg, "/pm ") { + if !isKnown { + return respond("Can't send a native PM until you've joined.") + } + + // TODO + return respond("Not yet implemented.") + + } else if strings.HasPrefix(msg, "/join ") { + if isKnown { + return respond("You've already joined as '" + hubNick + "'.") + } + + requestedHubNick := msg[6:] + err := this.registerUser(userID, requestedHubNick) + if err != nil { + log.Printf("Failed to join user: %s", err.Error()) + return respond(fmt.Sprintf("Couldn't allow join because: %s", err.Error())) + } + + return respond(fmt.Sprintf("Hi '%s'! You can join %s at %s", requestedHubNick, this.config.HubDescription, this.inviteLink)) + + } else if strings.HasPrefix(msg, "/quit") { + + // TODO + return respond("Not yet implemented.") + + } else { // e.g. /setup or /help + // Help + helpMsg := `I am a bot that connects Telegram with ` + this.config.HubDescription + ".\n" + if isKnown { + helpMsg += "You are currently connected as: '" + hubNick + "'\n" + } else { + helpMsg += "You aren't connected yet.\n" + } + + helpMsg += ` + +Available commands: +/setup - Welcome message +/join [hubnick] - Join ` + this.config.HubDescription + ` as user 'hubnick' +/pm [recipient] [message] - Send a native PM (if connected) +/quit - Leave the group chat +` + + return respond(helpMsg) + } return nil } diff --git a/TODO.txt b/TODO.txt index 6d4cbe4..86fb5f3 100644 --- a/TODO.txt +++ b/TODO.txt @@ -1,11 +1,11 @@ -[ ] Menu for direct messages - [ ] Commands for new user registration +[X] Menu for direct messages + [X] Commands for new user registration [ ] If registered/online -- responding to PMs (always require prefix) -[ ] Open upstream connections - [ ] when new users register - [ ] open existing connections at app startup +[X] Open upstream connections + [X] when registered users join the groupchat + [X] open existing connections at app startup [ ] Handle upstream messages [ ] Coalesce matches from multiple upstream connections? @@ -15,7 +15,7 @@ [ ] Ability to close nmdc connections [ ] Close nmdc connection && deregister user (with bot message) when user leaves groupchat -[ ] Block unknown users from entering telegram channel +[X] Block unknown users from entering telegram channel [ ] Attachment support [ ] Convert telegram files/images/videos to raw URLs for NMDC diff --git a/upstreamWorker.go b/upstreamWorker.go deleted file mode 100644 index 6e27e64..0000000 --- a/upstreamWorker.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "context" - - "code.ivysaur.me/libnmdc" -) - -// upstreamWorker handles an NMDC connection. -// It blocks on the current thread, caller can background it with 'go'. -func upstreamWorker(ctx context.Context, hubAddress, username string, responseChan chan<- libnmdc.HubEvent) { - - interiorChan := make(chan libnmdc.HubEvent, 0) - - conn := libnmdc.ConnectAsync( - &libnmdc.HubConnectionOptions{ - Address: libnmdc.HubAddress(hubAddress), - Self: &libnmdc.UserInfo{ - Nick: username, - ClientTag: AppName, - ClientVersion: AppVersion, - }, - }, - interiorChan, - ) - - for { - select { - case <-ctx.Done(): - conn.Disconnect() - ctx = nil // prevent hitting this case again repeatedly - closed channel reads immediately, nil channel blocks forever - - case evt := <-interiorChan: - responseChan <- evt // forward - - if evt.StateChange == libnmdc.CONNECTIONSTATE_DISCONNECTED && ctx.Err() != nil { - // We're done here - return - } - - } - - } -}