coalesce redundant public chat messages (background timer)

This commit is contained in:
mappu 2018-06-03 18:49:14 +12:00
parent 5c23f23d10
commit 57724ad67c
2 changed files with 65 additions and 7 deletions

View File

@ -6,12 +6,18 @@ import (
"html" "html"
"log" "log"
"strings" "strings"
"sync"
"time" "time"
"code.ivysaur.me/libnmdc" "code.ivysaur.me/libnmdc"
telegram "github.com/go-telegram-bot-api/telegram-bot-api" telegram "github.com/go-telegram-bot-api/telegram-bot-api"
) )
const (
coalesceWorkerRescanEvery time.Duration = 10 * time.Second
coalesceWorkerExpireAfter time.Duration = 20 * time.Second
)
// NTFServer methods all run on the same thread, so no mutexes are needed for field access // NTFServer methods all run on the same thread, so no mutexes are needed for field access
type NTFServer struct { type NTFServer struct {
bot *telegram.BotAPI bot *telegram.BotAPI
@ -21,6 +27,14 @@ type NTFServer struct {
configFile string configFile string
config NTFConfig config NTFConfig
conns map[string]*libnmdc.HubConnection // hubnick -> hubconn conns map[string]*libnmdc.HubConnection // hubnick -> hubconn
// Except the coalesce buffer, that requires a background worker.
coalesceBufferMut sync.Mutex
coalesceBuffer map[string]time.Time
}
func coalesceKey(hubNick, message string) string {
return hubNick + "\x00" + message
} }
type upstreamMessage struct { type upstreamMessage struct {
@ -31,9 +45,10 @@ type upstreamMessage struct {
func NewNTFServer(configFile string) (*NTFServer, error) { func NewNTFServer(configFile string) (*NTFServer, error) {
ret := NTFServer{ ret := NTFServer{
configFile: configFile, configFile: configFile,
hubMessages: make(chan upstreamMessage, 0), hubMessages: make(chan upstreamMessage, 0),
conns: make(map[string]*libnmdc.HubConnection), conns: make(map[string]*libnmdc.HubConnection),
coalesceBuffer: make(map[string]time.Time),
} }
// Config // Config
@ -45,6 +60,9 @@ func NewNTFServer(configFile string) (*NTFServer, error) {
ret.config = cfg ret.config = cfg
// Coalesce background worker
go ret.coalesceWorker()
// Bot connection // Bot connection
if len(cfg.BotAPIKey) == 0 { if len(cfg.BotAPIKey) == 0 {
@ -212,6 +230,38 @@ func (this *NTFServer) Run() error {
return nil // UNREACHABLE return nil // UNREACHABLE
} }
func (this *NTFServer) coalesceWorker() {
for {
time.Sleep(coalesceWorkerRescanEvery)
deadLine := time.Now().Add(-coalesceWorkerExpireAfter)
this.coalesceBufferMut.Lock()
for k, v := range this.coalesceBuffer {
if v.Before(deadLine) {
delete(this.coalesceBuffer, k)
}
}
this.coalesceBufferMut.Unlock()
}
}
// Coalesce returns true if the message sticks to an existing one.
// It adds it into the coalesce buffer anyway.
func (this *NTFServer) Coalesce(hubNick, message string) bool {
ckey := coalesceKey(hubNick, message)
this.coalesceBufferMut.Lock()
if _, ok := this.coalesceBuffer[ckey]; ok {
this.coalesceBufferMut.Unlock()
return true // stuck to existing
} else {
this.coalesceBuffer[ckey] = time.Now()
this.coalesceBufferMut.Unlock()
return false // first we've seen it
}
}
func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
switch msg.evt.EventType { switch msg.evt.EventType {
@ -229,7 +279,12 @@ func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
return // ignore return // ignore
} }
// FIXME coalesce from multiple connections // Coalesce from multiple connections
if this.Coalesce(msg.evt.Nick, msg.evt.Message) {
return // ignore - we heard this message already recently
}
// Display the message
htmlMsg := "<b>&lt;" + html.EscapeString(msg.evt.Nick) + "&gt;</b> " + html.EscapeString(msg.evt.Message) htmlMsg := "<b>&lt;" + html.EscapeString(msg.evt.Nick) + "&gt;</b> " + html.EscapeString(msg.evt.Message)
err := this.GroupChatSayHTML(htmlMsg) err := this.GroupChatSayHTML(htmlMsg)
if err != nil { if err != nil {
@ -360,8 +415,7 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
if len(update.Message.Text) > 0 { if len(update.Message.Text) > 0 {
// Actual chat message // Actual chat message
// Find the responsible user and send it to the upstream hub, using their account // Find the responsible user to send it to the upstream hub, using their account
// FIXME also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection
userID := int64(update.Message.From.ID) userID := int64(update.Message.From.ID)
hubNick, ok := this.config.KnownUsers[userID] hubNick, ok := this.config.KnownUsers[userID]
@ -374,6 +428,10 @@ func (this *NTFServer) HandleGroupMessage(update telegram.Update) error {
return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick) return fmt.Errorf("Couldn't send public message for user '%d' (%s) unexpectedly missing upstream connection!", userID, hubNick)
} }
// Also add it to the coalesce buffer so that we don't replay it from someone else's NMDC connection
this.Coalesce(hubNick, update.Message.Text)
// Submit to NMDC
conn.SayPublic(update.Message.Text) conn.SayPublic(update.Message.Text)
} }

View File

@ -10,7 +10,7 @@
[ ] Handle upstream messages [ ] Handle upstream messages
[X] System messages [X] System messages
[X] Public messages [X] Public messages
[ ] Coalesce matches from multiple upstream connections (timer?) [X] Coalesce matches from multiple upstream connections (timer?)
[X] Exclude messages from hub-security (custom nick) [X] Exclude messages from hub-security (custom nick)
[X] Allow sending public messages to DC from inside the group chat [X] Allow sending public messages to DC from inside the group chat
[ ] Nick translation [ ] Nick translation