|
|
|
|
@@ -0,0 +1,405 @@
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"math/rand"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
qt "github.com/mappu/miqt/qt6"
|
|
|
|
|
redis "github.com/redis/go-redis/v9"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type rsmqConnectionOptions struct {
|
|
|
|
|
Redis *redisConnectionOptions
|
|
|
|
|
Namespace string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (config *rsmqConnectionOptions) String() string {
|
|
|
|
|
return config.Redis.String()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (config *rsmqConnectionOptions) Connect(ctx context.Context) (loadedDatabase, error) {
|
|
|
|
|
rd, err := config.Redis.Connect(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
redisdb, ok := rd.(*redisLoadedDatabase)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("Expected redisLoadedDatabase, got %T", rd)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ld := &rsmqLoadedDatabase{
|
|
|
|
|
redisLoadedDatabase: redisdb,
|
|
|
|
|
ns: config.Namespace,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ld, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ DBConnector = &rsmqConnectionOptions{} // interface assertion
|
|
|
|
|
|
|
|
|
|
type rsmqLoadedDatabase struct {
|
|
|
|
|
*redisLoadedDatabase
|
|
|
|
|
|
|
|
|
|
ns string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) DriverName() string {
|
|
|
|
|
return ld.redisLoadedDatabase.DriverName()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) Properties(bucketPath []string) (string, error) {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
|
|
if len(bucketPath) != 2 {
|
|
|
|
|
return ld.redisLoadedDatabase.Properties(bucketPath)
|
|
|
|
|
|
|
|
|
|
} else if len(bucketPath) == 2 {
|
|
|
|
|
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stats, err := ld.rsmqQueueStats(ctx, bucketPath[1])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("Getting stats for queue %q: %w", bucketPath[1], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
propstr := fmt.Sprintf(
|
|
|
|
|
"# Redis Simple Message Queue\nrsmq_queue_name:%s\nrsmq_visibility_timeout:%d\nrsmq_delay:%d\nrsmq_max_size:%d\nrsmq_total_received_messages:%d\nrsmq_total_sent_messages:%d\nrsmq_created:%s\nrsmq_modified:%s\nrsmq_messages:%d\nrsmq_hidden_messages:%d\n",
|
|
|
|
|
stats.Name,
|
|
|
|
|
stats.Vt,
|
|
|
|
|
stats.Delay,
|
|
|
|
|
stats.MaxSize,
|
|
|
|
|
stats.TotalRecv,
|
|
|
|
|
stats.TotalSent,
|
|
|
|
|
stats.Created.Format(time.DateTime),
|
|
|
|
|
stats.Modified.Format(time.DateTime),
|
|
|
|
|
stats.Msgs,
|
|
|
|
|
stats.HiddenMsgs,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return propstr, nil
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
return "", fmt.Errorf("Unexpected nav position %q", bucketPath)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) RenderForNav(f *tableState, bucketPath []string) error {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
|
|
if len(bucketPath) == 0 || len(bucketPath) == 1 {
|
|
|
|
|
// Leave data tab disabled (default behaviour)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
} else if len(bucketPath) == 2 {
|
|
|
|
|
// One selected database
|
|
|
|
|
// Figure out its content
|
|
|
|
|
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// List messages in the selected queue
|
|
|
|
|
messages, err := ld.rsmqListMessages(ctx, bucketPath[1])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Listing messages in queue %q: %w", bucketPath[1], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Redis always uses Key string, Type string, Value []byte as the columns
|
|
|
|
|
|
|
|
|
|
f.SetupColumns(
|
|
|
|
|
[]TableColumn{&stringColumn{}, &stringColumn{}, &int64Column{}, &stringColumn{}},
|
|
|
|
|
[]string{"ID", "Sent At", "Receive Count", "Body"},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for _, msg := range messages {
|
|
|
|
|
rpos := f.AddRow()
|
|
|
|
|
f.SetCell(rpos, 0, msg.ID)
|
|
|
|
|
f.SetCell(rpos, 1, msg.SentAt.Format(time.DateTime))
|
|
|
|
|
f.SetCell(rpos, 2, int64(msg.Rc))
|
|
|
|
|
f.SetCell(rpos, 3, msg.Body)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Valid
|
|
|
|
|
f.tbl.HorizontalHeader().SetStretchLastSection(true)
|
|
|
|
|
f.Ready()
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
return fmt.Errorf("Unexpected nav position %q", bucketPath)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) NavChildren(bucketPath []string) ([]string, error) {
|
|
|
|
|
|
|
|
|
|
if len(bucketPath) == 0 {
|
|
|
|
|
// List databases
|
|
|
|
|
return ld.redisLoadedDatabase.NavChildren(bucketPath)
|
|
|
|
|
|
|
|
|
|
} else if len(bucketPath) == 1 {
|
|
|
|
|
// List queues in the selected database
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queues, err := ld.db.SMembers(ctx, fmt.Sprintf("%sQUEUES", ld.ns)).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Listing keys in database %q: %w", bucketPath[0], err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return queues, nil // No further children
|
|
|
|
|
|
|
|
|
|
} else if len(bucketPath) == 2 {
|
|
|
|
|
return []string{}, nil
|
|
|
|
|
} else {
|
|
|
|
|
return nil, fmt.Errorf("Unexpected nav position %q", bucketPath)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) NavContext(bucketPath []string) ([]contextAction, error) {
|
|
|
|
|
|
|
|
|
|
if len(bucketPath) == 2 {
|
|
|
|
|
return []contextAction{
|
|
|
|
|
{ "Send Message", ld.SendMessage },
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, nil // No special actions are supported
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) SendMessage(sender *qt.QTreeWidgetItem, bucketPath []string) error {
|
|
|
|
|
msg := qt.QInputDialog_GetText(sender.TreeWidget().QWidget, APPNAME, "Queue a new message:")
|
|
|
|
|
if msg == "" {
|
|
|
|
|
return nil // cancel
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
return ld.rsmqSendMessage(ctx, bucketPath[1], msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) Close() {
|
|
|
|
|
ld.redisLoadedDatabase.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ loadedDatabase = &rsmqLoadedDatabase{} // interface assertion
|
|
|
|
|
|
|
|
|
|
// Helper Functions
|
|
|
|
|
|
|
|
|
|
type rsmqMessage struct {
|
|
|
|
|
ID string
|
|
|
|
|
Body string
|
|
|
|
|
Rc int
|
|
|
|
|
Fr time.Time
|
|
|
|
|
SentAt time.Time
|
|
|
|
|
VisibleAt time.Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) rsmqListMessages(ctx context.Context, queue string) ([]rsmqMessage, error) {
|
|
|
|
|
key := ld.ns + queue
|
|
|
|
|
|
|
|
|
|
// Get all members from ZSet with scores
|
|
|
|
|
zres, err := ld.db.ZRangeWithScores(ctx, key, 0, -1).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(zres) == 0 {
|
|
|
|
|
return []rsmqMessage{}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msgs := make([]rsmqMessage, len(zres))
|
|
|
|
|
|
|
|
|
|
hashKey := key + ":Q"
|
|
|
|
|
fields := make([]string, 0, len(zres)*3)
|
|
|
|
|
for _, z := range zres {
|
|
|
|
|
id := z.Member.(string)
|
|
|
|
|
fields = append(fields, id, id+":rc", id+":fr")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hmres, err := ld.db.HMGet(ctx, hashKey, fields...).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i, z := range zres {
|
|
|
|
|
id := z.Member.(string)
|
|
|
|
|
|
|
|
|
|
// Parse ID to get Sent time
|
|
|
|
|
// Match RSMQ implementation: parseInt(id.slice(0, 10), 36)
|
|
|
|
|
sent := time.Time{}
|
|
|
|
|
parseLen := 10
|
|
|
|
|
if len(id) < 10 {
|
|
|
|
|
parseLen = len(id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if parseLen > 0 {
|
|
|
|
|
tsMs, _ := strconv.ParseInt(id[:parseLen], 36, 64)
|
|
|
|
|
sent = time.UnixMicro(tsMs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
body := ""
|
|
|
|
|
if val := hmres[i*3]; val != nil {
|
|
|
|
|
body = val.(string)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rc := 0
|
|
|
|
|
if val := hmres[i*3+1]; val != nil {
|
|
|
|
|
if s, ok := val.(string); ok {
|
|
|
|
|
rc, _ = strconv.Atoi(s)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fr := time.Time{}
|
|
|
|
|
if val := hmres[i*3+2]; val != nil {
|
|
|
|
|
if s, ok := val.(string); ok {
|
|
|
|
|
frMs, _ := strconv.ParseInt(s, 10, 64)
|
|
|
|
|
fr = time.UnixMilli(frMs)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msgs[i] = rsmqMessage{
|
|
|
|
|
ID: id,
|
|
|
|
|
Body: body,
|
|
|
|
|
Rc: rc,
|
|
|
|
|
Fr: fr,
|
|
|
|
|
SentAt: sent,
|
|
|
|
|
VisibleAt: time.UnixMilli(int64(z.Score)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return msgs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) rsmqSendMessage(ctx context.Context, queue string, message string) error {
|
|
|
|
|
stats, err := ld.rsmqQueueStats(ctx, queue)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(message) > stats.MaxSize {
|
|
|
|
|
return fmt.Errorf("message too long")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id := ld.rsmqMessageID()
|
|
|
|
|
now := time.Now().UnixMilli()
|
|
|
|
|
score := now + int64(stats.Delay*1000)
|
|
|
|
|
|
|
|
|
|
keyQ := ld.ns + queue + ":Q"
|
|
|
|
|
keyZ := ld.ns + queue
|
|
|
|
|
|
|
|
|
|
pipe := ld.db.TxPipeline()
|
|
|
|
|
pipe.ZAdd(ctx, keyZ, redis.Z{Score: float64(score), Member: id})
|
|
|
|
|
pipe.HMSet(ctx, keyQ, map[string]interface{}{
|
|
|
|
|
id: message,
|
|
|
|
|
id + ":rc": 0,
|
|
|
|
|
id + ":fr": 0,
|
|
|
|
|
id + ":sent": now,
|
|
|
|
|
})
|
|
|
|
|
pipe.HIncrBy(ctx, keyQ, "totalsent", 1)
|
|
|
|
|
|
|
|
|
|
_, err = pipe.Exec(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) rsmqMessageID() string {
|
|
|
|
|
// RSMQ uses microsecond precision for the ID timestamp part
|
|
|
|
|
// Logic: Number(seconds + microseconds).toString(36)
|
|
|
|
|
// Go's UnixMicro returns the number of microseconds elapsed since January 1, 1970 UTC.
|
|
|
|
|
// This is equivalent to seconds*1e6 + microseconds, which matches the JS logic
|
|
|
|
|
// assuming the JS logic intends to create a full microsecond timestamp.
|
|
|
|
|
ts := strconv.FormatInt(time.Now().UnixMicro(), 36)
|
|
|
|
|
if len(ts) < 10 {
|
|
|
|
|
ts = strings.Repeat("0", 10-len(ts)) + ts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b := make([]byte, 22)
|
|
|
|
|
for i := range b {
|
|
|
|
|
b[i] = charset[rand.Intn(len(charset))]
|
|
|
|
|
}
|
|
|
|
|
return ts + string(b)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
type rsmqQueueStats struct {
|
|
|
|
|
Name string
|
|
|
|
|
Vt int
|
|
|
|
|
Delay int
|
|
|
|
|
MaxSize int
|
|
|
|
|
TotalRecv uint64
|
|
|
|
|
TotalSent uint64
|
|
|
|
|
Created time.Time
|
|
|
|
|
Modified time.Time
|
|
|
|
|
Msgs int64
|
|
|
|
|
HiddenMsgs int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ld *rsmqLoadedDatabase) rsmqQueueStats(ctx context.Context, queue string) (*rsmqQueueStats, error) {
|
|
|
|
|
key := ld.ns + queue
|
|
|
|
|
|
|
|
|
|
// Get Attributes from Hash
|
|
|
|
|
// Fields: vt, delay, maxsize, totalrecv, totalsent, created, modified
|
|
|
|
|
res, err := ld.db.HMGet(ctx, key+":Q", "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified").Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If all nil, queue might not exist
|
|
|
|
|
if len(res) == 0 || res[0] == nil {
|
|
|
|
|
return nil, fmt.Errorf("queue doesn't exist")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
toInt := func(v interface{}) int {
|
|
|
|
|
if s, ok := v.(string); ok {
|
|
|
|
|
i, _ := strconv.Atoi(s)
|
|
|
|
|
return i
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
toUint64 := func(v interface{}) uint64 {
|
|
|
|
|
if s, ok := v.(string); ok {
|
|
|
|
|
i, _ := strconv.ParseUint(s, 10, 64)
|
|
|
|
|
return i
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
toInt64 := func(v interface{}) int64 {
|
|
|
|
|
if s, ok := v.(string); ok {
|
|
|
|
|
i, _ := strconv.ParseInt(s, 10, 64)
|
|
|
|
|
return i
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stats := &rsmqQueueStats{
|
|
|
|
|
Name: queue,
|
|
|
|
|
Vt: toInt(res[0]),
|
|
|
|
|
Delay: toInt(res[1]),
|
|
|
|
|
MaxSize: toInt(res[2]),
|
|
|
|
|
TotalRecv: toUint64(res[3]),
|
|
|
|
|
TotalSent: toUint64(res[4]),
|
|
|
|
|
Created: time.Unix(toInt64(res[5]), 0),
|
|
|
|
|
Modified: time.Unix(toInt64(res[6]), 0),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get Msgs Count (ZCard)
|
|
|
|
|
stats.Msgs, _ = ld.db.ZCard(ctx, key).Result()
|
|
|
|
|
|
|
|
|
|
// Get Hidden Msgs Count (ZCount where score > now)
|
|
|
|
|
nowMs := time.Now().UnixNano() / 1e6
|
|
|
|
|
stats.HiddenMsgs, _ = ld.db.ZCount(ctx, key, strconv.FormatInt(nowMs, 10), "+inf").Result()
|
|
|
|
|
return stats, nil
|
|
|
|
|
}
|