Compare commits

...

2 Commits

Author SHA1 Message Date
Ben Fleming
907b0de2ae rsmq: initial support 2026-02-16 17:34:28 +13:00
Ben Fleming
c594fde0a7 table: handle string in binary column type 2026-02-16 17:34:28 +13:00
3 changed files with 411 additions and 1 deletions

View File

@@ -38,6 +38,7 @@ type ConnectionConfig struct {
Pebble *pebbleConnection `yicon:":/assets/vendor_cockroach.png" json:",omitempty"`
Pogreb *pogrebConn `yicon:":/assets/vendor_pogreb.png" json:",omitempty"`
Redis *redisConnectionOptions `yicon:":/assets/vendor_redis.png" json:",omitempty"`
RSMQ *rsmqConnectionOptions `yicon:":/assets/vendor_redis.png" json:",omitempty"`
RoseDB *roseDBConn `ylabel:"RoseDB" yicon:":/assets/vendor_rosedb.png" json:",omitempty"`
SQLite *sqliteConnection `ylabel:"SQLite" yicon:":/assets/vendor_sqlite.png" json:",omitempty"`
SSHAgent *sshAgentConn `yicon:":/assets/vendor_ssh.png" json:",omitempty"`

405
db_rsmq.go Normal file
View File

@@ -0,0 +1,405 @@
package main
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"time"
qt "github.com/mappu/miqt/qt6"
redis "github.com/redis/go-redis/v9"
)
type rsmqConnectionOptions struct {
Redis *redisConnectionOptions
Namespace string
}
func (config *rsmqConnectionOptions) String() string {
return config.Redis.String()
}
func (config *rsmqConnectionOptions) Connect(ctx context.Context) (loadedDatabase, error) {
rd, err := config.Redis.Connect(ctx)
if err != nil {
return nil, err
}
redisdb, ok := rd.(*redisLoadedDatabase)
if !ok {
return nil, fmt.Errorf("Expected redisLoadedDatabase, got %T", rd)
}
ld := &rsmqLoadedDatabase{
redisLoadedDatabase: redisdb,
ns: config.Namespace,
}
return ld, nil
}
var _ DBConnector = &rsmqConnectionOptions{} // interface assertion
type rsmqLoadedDatabase struct {
*redisLoadedDatabase
ns string
}
func (ld *rsmqLoadedDatabase) DriverName() string {
return ld.redisLoadedDatabase.DriverName()
}
func (ld *rsmqLoadedDatabase) Properties(bucketPath []string) (string, error) {
ctx := context.Background()
if len(bucketPath) != 2 {
return ld.redisLoadedDatabase.Properties(bucketPath)
} else if len(bucketPath) == 2 {
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
if err != nil {
return "", fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
}
stats, err := ld.rsmqQueueStats(ctx, bucketPath[1])
if err != nil {
return "", fmt.Errorf("Getting stats for queue %q: %w", bucketPath[1], err)
}
propstr := fmt.Sprintf(
"# Redis Simple Message Queue\nrsmq_queue_name:%s\nrsmq_visibility_timeout:%d\nrsmq_delay:%d\nrsmq_max_size:%d\nrsmq_total_received_messages:%d\nrsmq_total_sent_messages:%d\nrsmq_created:%s\nrsmq_modified:%s\nrsmq_messages:%d\nrsmq_hidden_messages:%d\n",
stats.Name,
stats.Vt,
stats.Delay,
stats.MaxSize,
stats.TotalRecv,
stats.TotalSent,
stats.Created.Format(time.DateTime),
stats.Modified.Format(time.DateTime),
stats.Msgs,
stats.HiddenMsgs,
)
return propstr, nil
} else {
return "", fmt.Errorf("Unexpected nav position %q", bucketPath)
}
}
func (ld *rsmqLoadedDatabase) RenderForNav(f *tableState, bucketPath []string) error {
ctx := context.Background()
if len(bucketPath) == 0 || len(bucketPath) == 1 {
// Leave data tab disabled (default behaviour)
return nil
} else if len(bucketPath) == 2 {
// One selected database
// Figure out its content
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
if err != nil {
return fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
}
// List messages in the selected queue
messages, err := ld.rsmqListMessages(ctx, bucketPath[1])
if err != nil {
return fmt.Errorf("Listing messages in queue %q: %w", bucketPath[1], err)
}
// Redis always uses Key string, Type string, Value []byte as the columns
f.SetupColumns(
[]TableColumn{&stringColumn{}, &stringColumn{}, &int64Column{}, &stringColumn{}},
[]string{"ID", "Sent At", "Receive Count", "Body"},
)
for _, msg := range messages {
rpos := f.AddRow()
f.SetCell(rpos, 0, msg.ID)
f.SetCell(rpos, 1, msg.SentAt.Format(time.DateTime))
f.SetCell(rpos, 2, int64(msg.Rc))
f.SetCell(rpos, 3, msg.Body)
}
// Valid
f.tbl.HorizontalHeader().SetStretchLastSection(true)
f.Ready()
return nil
} else {
return fmt.Errorf("Unexpected nav position %q", bucketPath)
}
}
func (ld *rsmqLoadedDatabase) NavChildren(bucketPath []string) ([]string, error) {
if len(bucketPath) == 0 {
// List databases
return ld.redisLoadedDatabase.NavChildren(bucketPath)
} else if len(bucketPath) == 1 {
// List queues in the selected database
ctx := context.Background()
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
if err != nil {
return nil, fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
}
queues, err := ld.db.SMembers(ctx, fmt.Sprintf("%sQUEUES", ld.ns)).Result()
if err != nil {
return nil, fmt.Errorf("Listing keys in database %q: %w", bucketPath[0], err)
}
return queues, nil // No further children
} else if len(bucketPath) == 2 {
return []string{}, nil
} else {
return nil, fmt.Errorf("Unexpected nav position %q", bucketPath)
}
}
func (ld *rsmqLoadedDatabase) NavContext(bucketPath []string) ([]contextAction, error) {
if len(bucketPath) == 2 {
return []contextAction{
{ "Send Message", ld.SendMessage },
}, nil
}
return nil, nil // No special actions are supported
}
func (ld *rsmqLoadedDatabase) SendMessage(sender *qt.QTreeWidgetItem, bucketPath []string) error {
msg := qt.QInputDialog_GetText(sender.TreeWidget().QWidget, APPNAME, "Queue a new message:")
if msg == "" {
return nil // cancel
}
ctx := context.Background()
return ld.rsmqSendMessage(ctx, bucketPath[1], msg)
}
func (ld *rsmqLoadedDatabase) Close() {
ld.redisLoadedDatabase.Close()
}
var _ loadedDatabase = &rsmqLoadedDatabase{} // interface assertion
// Helper Functions
type rsmqMessage struct {
ID string
Body string
Rc int
Fr time.Time
SentAt time.Time
VisibleAt time.Time
}
func (ld *rsmqLoadedDatabase) rsmqListMessages(ctx context.Context, queue string) ([]rsmqMessage, error) {
key := ld.ns + queue
// Get all members from ZSet with scores
zres, err := ld.db.ZRangeWithScores(ctx, key, 0, -1).Result()
if err != nil {
return nil, err
}
if len(zres) == 0 {
return []rsmqMessage{}, nil
}
msgs := make([]rsmqMessage, len(zres))
hashKey := key + ":Q"
fields := make([]string, 0, len(zres)*3)
for _, z := range zres {
id := z.Member.(string)
fields = append(fields, id, id+":rc", id+":fr")
}
hmres, err := ld.db.HMGet(ctx, hashKey, fields...).Result()
if err != nil {
return nil, err
}
for i, z := range zres {
id := z.Member.(string)
// Parse ID to get Sent time
// Match RSMQ implementation: parseInt(id.slice(0, 10), 36)
sent := time.Time{}
parseLen := 10
if len(id) < 10 {
parseLen = len(id)
}
if parseLen > 0 {
tsMs, _ := strconv.ParseInt(id[:parseLen], 36, 64)
sent = time.UnixMicro(tsMs)
}
body := ""
if val := hmres[i*3]; val != nil {
body = val.(string)
}
rc := 0
if val := hmres[i*3+1]; val != nil {
if s, ok := val.(string); ok {
rc, _ = strconv.Atoi(s)
}
}
fr := time.Time{}
if val := hmres[i*3+2]; val != nil {
if s, ok := val.(string); ok {
frMs, _ := strconv.ParseInt(s, 10, 64)
fr = time.UnixMilli(frMs)
}
}
msgs[i] = rsmqMessage{
ID: id,
Body: body,
Rc: rc,
Fr: fr,
SentAt: sent,
VisibleAt: time.UnixMilli(int64(z.Score)),
}
}
return msgs, nil
}
func (ld *rsmqLoadedDatabase) rsmqSendMessage(ctx context.Context, queue string, message string) error {
stats, err := ld.rsmqQueueStats(ctx, queue)
if err != nil {
return err
}
if len(message) > stats.MaxSize {
return fmt.Errorf("message too long")
}
id := ld.rsmqMessageID()
now := time.Now().UnixMilli()
score := now + int64(stats.Delay*1000)
keyQ := ld.ns + queue + ":Q"
keyZ := ld.ns + queue
pipe := ld.db.TxPipeline()
pipe.ZAdd(ctx, keyZ, redis.Z{Score: float64(score), Member: id})
pipe.HMSet(ctx, keyQ, map[string]interface{}{
id: message,
id + ":rc": 0,
id + ":fr": 0,
id + ":sent": now,
})
pipe.HIncrBy(ctx, keyQ, "totalsent", 1)
_, err = pipe.Exec(ctx)
return err
}
const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
func (ld *rsmqLoadedDatabase) rsmqMessageID() string {
// RSMQ uses microsecond precision for the ID timestamp part
// Logic: Number(seconds + microseconds).toString(36)
// Go's UnixMicro returns the number of microseconds elapsed since January 1, 1970 UTC.
// This is equivalent to seconds*1e6 + microseconds, which matches the JS logic
// assuming the JS logic intends to create a full microsecond timestamp.
ts := strconv.FormatInt(time.Now().UnixMicro(), 36)
if len(ts) < 10 {
ts = strings.Repeat("0", 10-len(ts)) + ts
}
b := make([]byte, 22)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return ts + string(b)
}
//
type rsmqQueueStats struct {
Name string
Vt int
Delay int
MaxSize int
TotalRecv uint64
TotalSent uint64
Created time.Time
Modified time.Time
Msgs int64
HiddenMsgs int64
}
func (ld *rsmqLoadedDatabase) rsmqQueueStats(ctx context.Context, queue string) (*rsmqQueueStats, error) {
key := ld.ns + queue
// Get Attributes from Hash
// Fields: vt, delay, maxsize, totalrecv, totalsent, created, modified
res, err := ld.db.HMGet(ctx, key+":Q", "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified").Result()
if err != nil {
return nil, err
}
// If all nil, queue might not exist
if len(res) == 0 || res[0] == nil {
return nil, fmt.Errorf("queue doesn't exist")
}
toInt := func(v interface{}) int {
if s, ok := v.(string); ok {
i, _ := strconv.Atoi(s)
return i
}
return 0
}
toUint64 := func(v interface{}) uint64 {
if s, ok := v.(string); ok {
i, _ := strconv.ParseUint(s, 10, 64)
return i
}
return 0
}
toInt64 := func(v interface{}) int64 {
if s, ok := v.(string); ok {
i, _ := strconv.ParseInt(s, 10, 64)
return i
}
return 0
}
stats := &rsmqQueueStats{
Name: queue,
Vt: toInt(res[0]),
Delay: toInt(res[1]),
MaxSize: toInt(res[2]),
TotalRecv: toUint64(res[3]),
TotalSent: toUint64(res[4]),
Created: time.Unix(toInt64(res[5]), 0),
Modified: time.Unix(toInt64(res[6]), 0),
}
// Get Msgs Count (ZCard)
stats.Msgs, _ = ld.db.ZCard(ctx, key).Result()
// Get Hidden Msgs Count (ZCount where score > now)
nowMs := time.Now().UnixNano() / 1e6
stats.HiddenMsgs, _ = ld.db.ZCount(ctx, key, strconv.FormatInt(nowMs, 10), "+inf").Result()
return stats, nil
}

View File

@@ -24,8 +24,12 @@ func (bc *binColumn) SetRowCount(newlen int) {
func (bc *binColumn) SetCell(aRow int, data any) {
str, ok := data.([]byte)
if !ok {
if s2, ok := data.(string); ok {
str = []byte(s2)
} else {
panic(fmt.Sprintf("bad data type (got %T)", data))
}
}
// TODO Use slice_dup() here because many iterator loops reuse the backing []byte
// slice. However it would be more efficient if slice_dup() was explicitly called