From 57724ad67cec551adb7b0e5c452c01dbbb394754 Mon Sep 17 00:00:00 2001 From: mappu Date: Sun, 3 Jun 2018 18:49:14 +1200 Subject: [PATCH] coalesce redundant public chat messages (background timer) --- NTFServer.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++----- TODO.txt | 2 +- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/NTFServer.go b/NTFServer.go index 939689f..aab315d 100644 --- a/NTFServer.go +++ b/NTFServer.go @@ -6,12 +6,18 @@ import ( "html" "log" "strings" + "sync" "time" "code.ivysaur.me/libnmdc" telegram "github.com/go-telegram-bot-api/telegram-bot-api" ) +const ( + coalesceWorkerRescanEvery time.Duration = 10 * time.Second + coalesceWorkerExpireAfter time.Duration = 20 * time.Second +) + // NTFServer methods all run on the same thread, so no mutexes are needed for field access type NTFServer struct { bot *telegram.BotAPI @@ -21,6 +27,14 @@ type NTFServer struct { configFile string config NTFConfig conns map[string]*libnmdc.HubConnection // hubnick -> hubconn + + // Except the coalesce buffer, that requires a background worker. + coalesceBufferMut sync.Mutex + coalesceBuffer map[string]time.Time +} + +func coalesceKey(hubNick, message string) string { + return hubNick + "\x00" + message } type upstreamMessage struct { @@ -31,9 +45,10 @@ type upstreamMessage struct { func NewNTFServer(configFile string) (*NTFServer, error) { ret := NTFServer{ - configFile: configFile, - hubMessages: make(chan upstreamMessage, 0), - conns: make(map[string]*libnmdc.HubConnection), + configFile: configFile, + hubMessages: make(chan upstreamMessage, 0), + conns: make(map[string]*libnmdc.HubConnection), + coalesceBuffer: make(map[string]time.Time), } // Config @@ -45,6 +60,9 @@ func NewNTFServer(configFile string) (*NTFServer, error) { ret.config = cfg + // Coalesce background worker + go ret.coalesceWorker() + // Bot connection if len(cfg.BotAPIKey) == 0 { @@ -212,6 +230,38 @@ func (this *NTFServer) Run() error { return nil // UNREACHABLE } +func (this *NTFServer) coalesceWorker() { + for { + time.Sleep(coalesceWorkerRescanEvery) + + deadLine := time.Now().Add(-coalesceWorkerExpireAfter) + + this.coalesceBufferMut.Lock() + for k, v := range this.coalesceBuffer { + if v.Before(deadLine) { + delete(this.coalesceBuffer, k) + } + } + this.coalesceBufferMut.Unlock() + } +} + +// Coalesce returns true if the message sticks to an existing one. +// It adds it into the coalesce buffer anyway. +func (this *NTFServer) Coalesce(hubNick, message string) bool { + ckey := coalesceKey(hubNick, message) + + this.coalesceBufferMut.Lock() + if _, ok := this.coalesceBuffer[ckey]; ok { + this.coalesceBufferMut.Unlock() + return true // stuck to existing + } else { + this.coalesceBuffer[ckey] = time.Now() + this.coalesceBufferMut.Unlock() + return false // first we've seen it + } +} + func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { switch msg.evt.EventType { @@ -229,7 +279,12 @@ func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { return // ignore } - // FIXME coalesce from multiple connections + // Coalesce from multiple connections + if this.Coalesce(msg.evt.Nick, msg.evt.Message) { + return // ignore - we heard this message already recently + } + + // Display the message htmlMsg := "<" + html.EscapeString(msg.evt.Nick) + "> " + html.EscapeString(msg.evt.Message) err := this.GroupChatSayHTML(htmlMsg) if err != nil { @@ -360,8 +415,7 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { if len(update.Message.Text) > 0 { // Actual chat message - // Find the responsible user and send it to the upstream hub, using their account - // FIXME also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection + // Find the responsible user to send it to the upstream hub, using their account userID := int64(update.Message.From.ID) hubNick, ok := this.config.KnownUsers[userID] @@ -374,6 +428,10 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error { return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick) } + // Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection + this.Coalesce(hubNick, update.Message.Text) + + // Submit to NMDC conn.SayPublic(update.Message.Text) } diff --git a/TODO.txt b/TODO.txt index c08c515..7bafbb8 100644 --- a/TODO.txt +++ b/TODO.txt @@ -10,7 +10,7 @@ [ ] Handle upstream messages [X] System messages [X] Public messages - [ ] Coalesce matches from multiple upstream connections (timer?) + [X] Coalesce matches from multiple upstream connections (timer?) [X] Exclude messages from hub-security (custom nick) [X] Allow sending public messages to DC from inside the group chat [ ] Nick translation