extract message-coalesce-buffer to separate file

This commit is contained in:
mappu 2018-06-06 19:39:22 +12:00
parent c4340a1b1e
commit c9615cbdfb
2 changed files with 46 additions and 41 deletions

View File

@ -15,11 +15,6 @@ import (
telegram "github.com/go-telegram-bot-api/telegram-bot-api" telegram "github.com/go-telegram-bot-api/telegram-bot-api"
) )
const (
coalesceWorkerRescanEvery time.Duration = 10 * time.Second
coalesceWorkerExpireAfter time.Duration = 20 * time.Second
)
// NTFServer methods all run on the same thread, so no mutexes are needed for field access // NTFServer methods all run on the same thread, so no mutexes are needed for field access
type NTFServer struct { type NTFServer struct {
bot *telegram.BotAPI bot *telegram.BotAPI
@ -39,10 +34,6 @@ type NTFServer struct {
coalesceBuffer map[string]time.Time coalesceBuffer map[string]time.Time
} }
func coalesceKey(hubNick, message string) string {
return hubNick + "\x00" + message
}
type upstreamMessage struct { type upstreamMessage struct {
telegramUserId int64 telegramUserId int64
hubNick string hubNick string
@ -267,38 +258,6 @@ func (this *NTFServer) Run() error {
return nil // UNREACHABLE return nil // UNREACHABLE
} }
func (this *NTFServer) coalesceWorker() {
for {
time.Sleep(coalesceWorkerRescanEvery)
deadLine := time.Now().Add(-coalesceWorkerExpireAfter)
this.coalesceBufferMut.Lock()
for k, v := range this.coalesceBuffer {
if v.Before(deadLine) {
delete(this.coalesceBuffer, k)
}
}
this.coalesceBufferMut.Unlock()
}
}
// Coalesce returns true if the message sticks to an existing one.
// It adds it into the coalesce buffer anyway.
func (this *NTFServer) Coalesce(hubNick, message string) bool {
ckey := coalesceKey(hubNick, message)
this.coalesceBufferMut.Lock()
if _, ok := this.coalesceBuffer[ckey]; ok {
this.coalesceBufferMut.Unlock()
return true // stuck to existing
} else {
this.coalesceBuffer[ckey] = time.Now()
this.coalesceBufferMut.Unlock()
return false // first we've seen it
}
}
func (this *NTFServer) HandleHubMessage(msg upstreamMessage) { func (this *NTFServer) HandleHubMessage(msg upstreamMessage) {
if this.verbose { if this.verbose {
log.Printf("Hub: %#v", msg) log.Printf("Hub: %#v", msg)

46
coalesce.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"time"
)
const (
coalesceWorkerRescanEvery time.Duration = 10 * time.Second
coalesceWorkerExpireAfter time.Duration = 20 * time.Second
)
func coalesceKey(hubNick, message string) string {
return hubNick + "\x00" + message
}
func (this *NTFServer) coalesceWorker() {
for {
time.Sleep(coalesceWorkerRescanEvery)
deadLine := time.Now().Add(-coalesceWorkerExpireAfter)
this.coalesceBufferMut.Lock()
for k, v := range this.coalesceBuffer {
if v.Before(deadLine) {
delete(this.coalesceBuffer, k)
}
}
this.coalesceBufferMut.Unlock()
}
}
// Coalesce returns true if the message sticks to an existing one.
// It adds it into the coalesce buffer anyway.
func (this *NTFServer) Coalesce(hubNick, message string) bool {
ckey := coalesceKey(hubNick, message)
this.coalesceBufferMut.Lock()
if _, ok := this.coalesceBuffer[ckey]; ok {
this.coalesceBufferMut.Unlock()
return true // stuck to existing
} else {
this.coalesceBuffer[ckey] = time.Now()
this.coalesceBufferMut.Unlock()
return false // first we've seen it
}
}