working protocol autodetection

--HG--
branch : adc
This commit is contained in:
mappu 2017-11-22 20:02:11 +13:00
parent 131ce0a63b
commit 23e107f758
7 changed files with 123 additions and 52 deletions

View File

@ -4,7 +4,7 @@ type AdcProtocol struct {
hc *HubConnection
}
func NewAdcProtocol(hc *HubConnection) *AdcProtocol {
func NewAdcProtocol(hc *HubConnection) Protocol {
proto := AdcProtocol{}
proto.hc = hc
@ -13,8 +13,6 @@ func NewAdcProtocol(hc *HubConnection) *AdcProtocol {
return &proto
}
var _ Protocol = &AdcProtocol{} // assert interface implementation
func (this *AdcProtocol) ProcessCommand(msg string) {
this.hc.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: msg})
}

80
AutodetectProtocol.go Normal file
View File

@ -0,0 +1,80 @@
package libnmdc
import (
"sync"
"time"
)
type AutodetectProtocol struct {
hc *HubConnection
realProtoMut sync.Mutex
realProto Protocol
}
func NewAutodetectProtocol(hc *HubConnection) Protocol {
proto := AutodetectProtocol{
hc: hc,
realProto: nil,
}
go proto.timeout()
return &proto
}
func (this *AutodetectProtocol) timeout() {
time.Sleep(AUTODETECT_ADC_NMDC_TIMEOUT)
this.realProtoMut.Lock()
defer this.realProtoMut.Unlock()
if this.realProto == nil {
this.realProto = NewAdcProtocol(this.hc)
this.hc.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Detected ADC protocol"})
}
}
func (this *AutodetectProtocol) ProcessCommand(msg string) {
this.realProtoMut.Lock()
defer this.realProtoMut.Unlock()
if this.realProto == nil {
// We actually got some data using $ as the separator?
// Upgrade to a full NMDC protocol
this.realProto = NewNmdcProtocol(this.hc)
this.hc.processEvent(HubEvent{EventType: EVENT_DEBUG_MESSAGE, Message: "Detected NMDC protocol"})
}
this.realProto.ProcessCommand(msg)
}
func (this *AutodetectProtocol) SayPublic(msg string) {
this.realProtoMut.Lock()
defer this.realProtoMut.Unlock()
if this.realProto == nil {
this.realProto = NewNmdcProtocol(this.hc)
}
this.realProto.SayPublic(msg)
}
func (this *AutodetectProtocol) SayPrivate(user, message string) {
this.realProtoMut.Lock()
defer this.realProtoMut.Unlock()
if this.realProto == nil {
this.realProto = NewNmdcProtocol(this.hc)
}
this.realProto.SayPrivate(user, message)
}
func (this *AutodetectProtocol) ProtoMessageSeparator() string {
this.realProtoMut.Lock()
defer this.realProtoMut.Unlock()
if this.realProto == nil {
return "|"
}
return this.realProto.ProtoMessageSeparator()
}

View File

@ -11,7 +11,7 @@ func (this *HubAddress) parse() url.URL {
parsed, err := url.Parse(strings.ToLower(string(*this)))
if err != nil || len(parsed.Host) == 0 {
parsed = &url.URL{
Scheme: "nmdc",
Scheme: "",
Host: string(*this),
}
}

View File

@ -146,14 +146,7 @@ func (this *HubConnection) worker() {
this.State = CONNECTIONSTATE_CONNECTING
this.connValid = true
this.processEvent(HubEvent{EventType: EVENT_CONNECTION_STATE_CHANGED, StateChange: CONNECTIONSTATE_CONNECTING})
if this.Hco.Address.GetProtocol() == HubProtocolNmdc {
this.proto = NewNmdcProtocol(this)
} else if this.Hco.Address.GetProtocol() == HubProtocolAdc {
this.proto = NewAdcProtocol(this)
} else {
this.proto = nil
}
this.proto = this.Hco.Address.GetProtocol().Create(this)
}
}
@ -162,15 +155,7 @@ func (this *HubConnection) worker() {
if this.connValid {
readBuff := make([]byte, 1024)
if this.proto == nil {
// Haven't determined if this is ADC or NMDC
// If we get data in this interval, it's NMDC; otherwise it's ADC
this.conn.SetReadDeadline(time.Now().Add(AUTODETECT_ADC_NMDC_TIMEOUT))
} else {
// Normal
this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY))
}
this.conn.SetReadDeadline(time.Now().Add(SEND_KEEPALIVE_EVERY))
nbytes, err = this.conn.Read(readBuff)
@ -188,35 +173,33 @@ func (this *HubConnection) worker() {
}
}
if this.proto == nil {
this.proto = NewNmdcProtocol(this)
}
if nbytes > 0 {
this.lastDataRecieved = time.Now()
fullBuffer += string(readBuff[0:nbytes])
}
}
rxSeparator := regexp.QuoteMeta(this.proto.ProtoMessageSeparator())
rxProtocolMessage := regexp.MustCompile(`(?ms)\A[^` + rxSeparator + `]*` + rxSeparator)
if this.proto != nil {
rxSeparator := regexp.QuoteMeta(this.proto.ProtoMessageSeparator())
rxProtocolMessage := regexp.MustCompile(`(?ms)\A[^` + rxSeparator + `]*` + rxSeparator)
// Attempt to parse a message block
for len(fullBuffer) > 0 {
for len(fullBuffer) > 0 && fullBuffer[0] == '|' {
fullBuffer = fullBuffer[1:]
// Attempt to parse a message block
for len(fullBuffer) > 0 {
for len(fullBuffer) > 0 && fullBuffer[0] == '|' {
fullBuffer = fullBuffer[1:]
}
protocolMessage := rxProtocolMessage.FindString(fullBuffer)
if len(protocolMessage) > 0 {
this.proto.ProcessCommand(protocolMessage[:len(protocolMessage)-1])
fullBuffer = fullBuffer[len(protocolMessage):]
} else {
break
}
}
protocolMessage := rxProtocolMessage.FindString(fullBuffer)
if len(protocolMessage) > 0 {
this.proto.ProcessCommand(protocolMessage[:len(protocolMessage)-1])
fullBuffer = fullBuffer[len(protocolMessage):]
} else {
break
}
}
if err == nil && time.Now().Sub(this.lastDataRecieved) > RECONNECT_IF_NO_DATA_RECIEVED_IN {
err = fmt.Errorf("No packets recieved since %s, connection presumed lost", this.lastDataRecieved.Format(time.RFC3339))
if err == nil && time.Now().Sub(this.lastDataRecieved) > RECONNECT_IF_NO_DATA_RECIEVED_IN {
err = fmt.Errorf("No packets recieved since %s, connection presumed lost", this.lastDataRecieved.Format(time.RFC3339))
}
}
// Maybe we disconnected

View File

@ -20,9 +20,7 @@ type NmdcProtocol struct {
rxMyInfoNoTag *regexp.Regexp
}
var _ Protocol = &NmdcProtocol{} // Assert that we implement the interface
func NewNmdcProtocol(hc *HubConnection) *NmdcProtocol {
func NewNmdcProtocol(hc *HubConnection) Protocol {
proto := NmdcProtocol{}
proto.hc = hc

View File

@ -9,3 +9,23 @@ type Protocol interface {
ProtoMessageSeparator() string
}
type HubProtocol int
const (
HubProtocolAutodetect HubProtocol = 0
HubProtocolNmdc HubProtocol = 1
HubProtocolAdc HubProtocol = 2
)
func (hp HubProtocol) Create(hc *HubConnection) Protocol {
if hp == HubProtocolNmdc {
return NewNmdcProtocol(hc)
} else if hp == HubProtocolAdc {
return NewAdcProtocol(hc)
} else {
return NewAutodetectProtocol(hc)
}
}

View File

@ -18,14 +18,6 @@ const (
var ErrNotConnected error = errors.New("Not connected")
type HubProtocol int
const (
HubProtocolAutodetect = 0
HubProtocolNmdc = 1
HubProtocolAdc = 2
)
func maybeParse(str string, dest *uint64, default_val uint64) {
sz, err := strconv.ParseUint(str, 10, 64)
if err == nil {