This commit is contained in:
mappu 2018-06-03 16:39:45 +12:00
parent 574274c43d
commit 6a7a7a51f6
4 changed files with 231 additions and 84 deletions

View File

@ -5,13 +5,19 @@ import (
"io/ioutil" "io/ioutil"
) )
// NTFConfig keeps track of setup properties as well as runtime state.
// FIXME separate compile-time and run-time properties into separate files
type NTFConfig struct { type NTFConfig struct {
HubAddr string HubAddr string
HubDescription string
BotAPIKey string BotAPIKey string
GroupChatID int64 GroupChatID int64
// Map of telegram user IDs to NMDC nicks // Map of telegram user IDs to NMDC nicks
KnownUsers map[int64]string KnownUsers map[int64]string
// Map of telegram users known to be in the group chat ==> telegram displayname
GroupChatMembers map[int64]string
} }
func LoadConfig(configFile string) (NTFConfig, error) { func LoadConfig(configFile string) (NTFConfig, error) {

View File

@ -1,10 +1,11 @@
package main package main
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"log" "log"
"strings"
"time"
"code.ivysaur.me/libnmdc" "code.ivysaur.me/libnmdc"
telegram "github.com/go-telegram-bot-api/telegram-bot-api" telegram "github.com/go-telegram-bot-api/telegram-bot-api"
@ -13,23 +14,28 @@ import (
// NTFServer methods all run on the same thread, so no mutexes are needed for field access // NTFServer methods all run on the same thread, so no mutexes are needed for field access
type NTFServer struct { type NTFServer struct {
bot *telegram.BotAPI bot *telegram.BotAPI
upstream chan upstreamMessage hubMessages chan upstreamMessage
chatName string chatName string
inviteLink string inviteLink string
configFile string configFile string
config NTFConfig config NTFConfig
conns map[string]*libnmdc.HubConnection // hubnick -> hubconn
} }
type upstreamMessage struct { type upstreamMessage struct {
telegramUserId int64 telegramUserId int64
hubNick string
evt libnmdc.HubEvent evt libnmdc.HubEvent
} }
func NewNTFServer(configFile string) (*NTFServer, error) { func NewNTFServer(configFile string) (*NTFServer, error) {
ret := NTFServer{ ret := NTFServer{
configFile: configFile, configFile: configFile,
hubMessages: make(chan upstreamMessage, 0),
} }
// Config
cfg, err := LoadConfig(configFile) cfg, err := LoadConfig(configFile)
if err != nil { if err != nil {
return nil, err return nil, err
@ -37,6 +43,8 @@ func NewNTFServer(configFile string) (*NTFServer, error) {
ret.config = cfg ret.config = cfg
// Bot connection
if len(cfg.BotAPIKey) == 0 { if len(cfg.BotAPIKey) == 0 {
return nil, errors.New("No bot API key supplied (register with BotFather first)") return nil, errors.New("No bot API key supplied (register with BotFather first)")
} }
@ -52,6 +60,8 @@ func NewNTFServer(configFile string) (*NTFServer, error) {
log.Printf("Connected to telegram as '%s'", bot.Self.UserName) log.Printf("Connected to telegram as '%s'", bot.Self.UserName)
// Groupchat properties
if ret.IsSetupMode() { if ret.IsSetupMode() {
log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run") log.Println("Group chat ID unknown, running in setup mode only - find the groupchat ID then re-run")
@ -74,20 +84,92 @@ func NewNTFServer(configFile string) (*NTFServer, error) {
} }
// Spawn upstream connections for all known users // Spawn upstream connections for all pre-existing known users
ret.upstream = make(chan upstreamMessage, 0) for telegramUserId, telegramDisplayName := range ret.config.GroupChatMembers {
for k, v := range ret.config.KnownUsers { hubNick, ok := ret.config.KnownUsers[telegramUserId]
ret.LaunchUpstreamWorker(k, v) if !ok {
log.Fatalf("Chat member '%d' (%s) is missing a hub mapping!!!", telegramUserId, telegramDisplayName) // fatal - inconsistent DB
}
err := ret.LaunchUpstreamWorker(telegramUserId, hubNick)
if err != nil {
return nil, fmt.Errorf("Reconnecting upstream for '%s': %s", hubNick, err.Error()) // fatal
}
} }
return &ret, nil return &ret, nil
} }
func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) { // registerUser lodges a user mapping in the configuration file.
ctx := context.Background() // This allows them to join the group chat (unbanning them if necessary).
// An actual NMDC connection will occur once the user joins for the first time.
func (this *NTFServer) registerUser(telegramUserId int64, hubUsername string) error {
if existingHubNick, ok := this.config.KnownUsers[telegramUserId]; ok {
return fmt.Errorf("Telegram account is already registered with hub nick '%s'", existingHubNick)
}
// Open NMDC connection for _, v := range this.config.KnownUsers {
go upstreamWorker() if v == hubUsername {
return fmt.Errorf("Requested hub nick '%s' is already used by another member", hubUsername)
}
}
this.config.KnownUsers[telegramUserId] = hubUsername
err := this.config.Save(this.configFile)
if err != nil {
return err
}
// Unban from groupchat, if necessary
// Ignore errors because the user might not have been banned
_, err = this.bot.UnbanChatMember(telegram.ChatMemberConfig{
ChatID: this.config.GroupChatID,
UserID: int(telegramUserId),
})
if err != nil {
log.Printf("Couldn't unban user '%s' from groupchat because: %s (assuming OK, continuing)", hubUsername, err.Error())
}
return nil
}
// LaunchUpstreamWorker opens an NMDC connection.
func (this *NTFServer) LaunchUpstreamWorker(telegramUserId int64, hubUsername string) error {
if _, exists := this.conns[hubUsername]; exists {
return fmt.Errorf("Duplicate hub connection for user '%s', abandoning", hubUsername)
}
// We want to use the async NMDC connection, but at the same time, provide
// extra information in the channel. Wrap the upstream NMDC channel with one
// of our own
upstreamChan := make(chan libnmdc.HubEvent, 0)
go func() {
for msg := range upstreamChan {
this.hubMessages <- upstreamMessage{
telegramUserId: telegramUserId,
hubNick: hubUsername,
evt: msg,
}
}
}()
conn := libnmdc.ConnectAsync(
&libnmdc.HubConnectionOptions{
Address: libnmdc.HubAddress(this.config.HubAddr),
Self: &libnmdc.UserInfo{
Nick: hubUsername,
ClientTag: AppName,
ClientVersion: AppVersion,
},
},
upstreamChan,
)
// Stash conn so that we can refer to it / close it later
this.conns[hubUsername] = conn
return nil
} }
func (this *NTFServer) IsSetupMode() bool { func (this *NTFServer) IsSetupMode() bool {
@ -103,11 +185,27 @@ func (this *NTFServer) Run() error {
return err return err
} }
for update := range updateChan { for {
select {
case update, ok := <-updateChan:
if !ok {
log.Fatalf("Telegram update channel closed unexpectedly")
}
this.HandleMessage(update) this.HandleMessage(update)
case hubMsg, ok := <-this.hubMessages:
if !ok {
log.Fatalf("Upstream update channel closed unexpectedly")
}
this.HandleHubMessage(hubMsg)
}
} }
return nil // Update channel was closed return nil // UNREACHABLE
}
func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
log.Printf("Hub: %#v", msg)
} }
func (this *NTFServer) HandleMessage(update telegram.Update) { func (this *NTFServer) HandleMessage(update telegram.Update) {
@ -139,6 +237,33 @@ func (this *NTFServer) HandleMessage(update telegram.Update) {
log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text) log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text)
} }
func (this *NTFServer) HandleTelegramUserParted(telegramUserId int64, update telegram.Update) error {
delete(this.config.GroupChatMembers, telegramUserId)
return this.config.Save(this.configFile)
}
func (this *NTFServer) HandleTelegramUserJoined(telegramUserId int64, update telegram.Update) error {
// If known, spawn the upstream connection; if unknown, kick them
hubNick, ok := this.config.KnownUsers[telegramUserId]
if !ok {
_, err := this.bot.KickChatMember(telegram.KickChatMemberConfig{
ChatMemberConfig: telegram.ChatMemberConfig{
ChatID: update.Message.Chat.ID,
UserID: int(telegramUserId),
},
UntilDate: time.Now().Add(24 * time.Hour).Unix(),
})
if err != nil {
log.Printf("Couldn't kick unexpected chat member from group: %s", err.Error())
}
return nil
}
// Spawn the upstream connection for this user
return this.LaunchUpstreamWorker(telegramUserId, hubNick)
}
func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
// Joins: ???? // Joins: ????
@ -146,21 +271,24 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
// Users joining // Users joining
// Ensure that they have a valid user mapping // Ensure that they have a valid user mapping
// Create upstream NMDC connection for them // Create upstream NMDC connection for them
for _, joinedUser := range *update.Message.NewChatMembers {
err := this.HandleTelegramUserJoined(int64(joinedUser.ID), update)
if err != nil {
log.Printf("Handling user join: %s", err.Error())
}
}
return nil
} }
if update.Message.LeftChatMember != nil { if update.Message.LeftChatMember != nil {
// User parted // User parted
// Close upstream NMDC connection for them // Close upstream NMDC connection for them
return this.HandleTelegramUserParted(int64(update.Message.LeftChatMember.ID), update)
} }
// Parts: // Actual chat message
/* // Find the responsible user and send it to the upstream hub using their account
&tgbotapi.Message{ // TODO
MessageID:9, From:(*tgbotapi.User)(0xc420304000), Date:1527989178, Chat:(*tgbotapi.Chat)(0xc4201dc120), [...]
NewChatMembers:(*[]tgbotapi.User)(nil),
LeftChatMember:(*tgbotapi.User)(0xc420304050),
}
*/
return nil return nil
} }
@ -168,11 +296,68 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
func (this *NTFServer) HandleDirectMessage(update telegram.Update) error { func (this *NTFServer) HandleDirectMessage(update telegram.Update) error {
// Registration workflow // Registration workflow
// Find out the current status for this chat ID... etc.
msg := telegram.NewMessage(update.Message.Chat.ID, "Hi user, join the group chat at "+this.inviteLink) // Find out the current status for this chat ID
userID := int64(update.Message.From.ID)
hubNick, isKnown := this.config.KnownUsers[userID]
respond := func(str string) error {
msg := telegram.NewMessage(update.Message.Chat.ID, str)
msg.ReplyToMessageID = update.Message.MessageID msg.ReplyToMessageID = update.Message.MessageID
this.bot.Send(msg) _, err := this.bot.Send(msg)
return err
}
// Handle the incoming request
msg := update.Message.Text
if strings.HasPrefix(msg, "/pm ") {
if !isKnown {
return respond("Can't send a native PM until you've joined.")
}
// TODO
return respond("Not yet implemented.")
} else if strings.HasPrefix(msg, "/join ") {
if isKnown {
return respond("You've already joined as '" + hubNick + "'.")
}
requestedHubNick := msg[6:]
err := this.registerUser(userID, requestedHubNick)
if err != nil {
log.Printf("Failed to join user: %s", err.Error())
return respond(fmt.Sprintf("Couldn't allow join because: %s", err.Error()))
}
return respond(fmt.Sprintf("Hi '%s'! You can join %s at %s", requestedHubNick, this.config.HubDescription, this.inviteLink))
} else if strings.HasPrefix(msg, "/quit") {
// TODO
return respond("Not yet implemented.")
} else { // e.g. /setup or /help
// Help
helpMsg := `I am a bot that connects Telegram with ` + this.config.HubDescription + ".\n"
if isKnown {
helpMsg += "You are currently connected as: '" + hubNick + "'\n"
} else {
helpMsg += "You aren't connected yet.\n"
}
helpMsg += `
Available commands:
/setup - Welcome message
/join [hubnick] - Join ` + this.config.HubDescription + ` as user 'hubnick'
/pm [recipient] [message] - Send a native PM (if connected)
/quit - Leave the group chat
`
return respond(helpMsg)
}
return nil return nil
} }

View File

@ -1,11 +1,11 @@
[ ] Menu for direct messages [X] Menu for direct messages
[ ] Commands for new user registration [X] Commands for new user registration
[ ] If registered/online -- responding to PMs (always require prefix) [ ] If registered/online -- responding to PMs (always require prefix)
[ ] Open upstream connections [X] Open upstream connections
[ ] when new users register [X] when registered users join the groupchat
[ ] open existing connections at app startup [X] open existing connections at app startup
[ ] Handle upstream messages [ ] Handle upstream messages
[ ] Coalesce matches from multiple upstream connections? [ ] Coalesce matches from multiple upstream connections?
@ -15,7 +15,7 @@
[ ] Ability to close nmdc connections [ ] Ability to close nmdc connections
[ ] Close nmdc connection && deregister user (with bot message) when user leaves groupchat [ ] Close nmdc connection && deregister user (with bot message) when user leaves groupchat
[ ] Block unknown users from entering telegram channel [X] Block unknown users from entering telegram channel
[ ] Attachment support [ ] Attachment support
[ ] Convert telegram files/images/videos to raw URLs for NMDC [ ] Convert telegram files/images/videos to raw URLs for NMDC

View File

@ -1,44 +0,0 @@
package main
import (
"context"
"code.ivysaur.me/libnmdc"
)
// upstreamWorker handles an NMDC connection.
// It blocks on the current thread, caller can background it with 'go'.
func upstreamWorker(ctx context.Context, hubAddress, username string, responseChan chan<- libnmdc.HubEvent) {
interiorChan := make(chan libnmdc.HubEvent, 0)
conn := libnmdc.ConnectAsync(
&libnmdc.HubConnectionOptions{
Address: libnmdc.HubAddress(hubAddress),
Self: &libnmdc.UserInfo{
Nick: username,
ClientTag: AppName,
ClientVersion: AppVersion,
},
},
interiorChan,
)
for {
select {
case <-ctx.Done():
conn.Disconnect()
ctx = nil // prevent hitting this case again repeatedly - closed channel reads immediately, nil channel blocks forever
case evt := <-interiorChan:
responseChan <- evt // forward
if evt.StateChange == libnmdc.CONNECTIONSTATE_DISCONNECTED && ctx.Err() != nil {
// We're done here
return
}
}
}
}