forked from code.ivysaur.me/qbolt
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
907b0de2ae | ||
|
|
c594fde0a7 |
@@ -38,6 +38,7 @@ type ConnectionConfig struct {
|
||||
Pebble *pebbleConnection `yicon:":/assets/vendor_cockroach.png" json:",omitempty"`
|
||||
Pogreb *pogrebConn `yicon:":/assets/vendor_pogreb.png" json:",omitempty"`
|
||||
Redis *redisConnectionOptions `yicon:":/assets/vendor_redis.png" json:",omitempty"`
|
||||
RSMQ *rsmqConnectionOptions `yicon:":/assets/vendor_redis.png" json:",omitempty"`
|
||||
RoseDB *roseDBConn `ylabel:"RoseDB" yicon:":/assets/vendor_rosedb.png" json:",omitempty"`
|
||||
SQLite *sqliteConnection `ylabel:"SQLite" yicon:":/assets/vendor_sqlite.png" json:",omitempty"`
|
||||
SSHAgent *sshAgentConn `yicon:":/assets/vendor_ssh.png" json:",omitempty"`
|
||||
|
||||
405
db_rsmq.go
Normal file
405
db_rsmq.go
Normal file
@@ -0,0 +1,405 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
qt "github.com/mappu/miqt/qt6"
|
||||
redis "github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type rsmqConnectionOptions struct {
|
||||
Redis *redisConnectionOptions
|
||||
Namespace string
|
||||
}
|
||||
|
||||
func (config *rsmqConnectionOptions) String() string {
|
||||
return config.Redis.String()
|
||||
}
|
||||
|
||||
func (config *rsmqConnectionOptions) Connect(ctx context.Context) (loadedDatabase, error) {
|
||||
rd, err := config.Redis.Connect(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
redisdb, ok := rd.(*redisLoadedDatabase)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Expected redisLoadedDatabase, got %T", rd)
|
||||
}
|
||||
|
||||
ld := &rsmqLoadedDatabase{
|
||||
redisLoadedDatabase: redisdb,
|
||||
ns: config.Namespace,
|
||||
}
|
||||
|
||||
return ld, nil
|
||||
}
|
||||
|
||||
var _ DBConnector = &rsmqConnectionOptions{} // interface assertion
|
||||
|
||||
type rsmqLoadedDatabase struct {
|
||||
*redisLoadedDatabase
|
||||
|
||||
ns string
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) DriverName() string {
|
||||
return ld.redisLoadedDatabase.DriverName()
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) Properties(bucketPath []string) (string, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
if len(bucketPath) != 2 {
|
||||
return ld.redisLoadedDatabase.Properties(bucketPath)
|
||||
|
||||
} else if len(bucketPath) == 2 {
|
||||
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
||||
}
|
||||
|
||||
stats, err := ld.rsmqQueueStats(ctx, bucketPath[1])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Getting stats for queue %q: %w", bucketPath[1], err)
|
||||
}
|
||||
|
||||
propstr := fmt.Sprintf(
|
||||
"# Redis Simple Message Queue\nrsmq_queue_name:%s\nrsmq_visibility_timeout:%d\nrsmq_delay:%d\nrsmq_max_size:%d\nrsmq_total_received_messages:%d\nrsmq_total_sent_messages:%d\nrsmq_created:%s\nrsmq_modified:%s\nrsmq_messages:%d\nrsmq_hidden_messages:%d\n",
|
||||
stats.Name,
|
||||
stats.Vt,
|
||||
stats.Delay,
|
||||
stats.MaxSize,
|
||||
stats.TotalRecv,
|
||||
stats.TotalSent,
|
||||
stats.Created.Format(time.DateTime),
|
||||
stats.Modified.Format(time.DateTime),
|
||||
stats.Msgs,
|
||||
stats.HiddenMsgs,
|
||||
)
|
||||
|
||||
return propstr, nil
|
||||
|
||||
} else {
|
||||
return "", fmt.Errorf("Unexpected nav position %q", bucketPath)
|
||||
}
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) RenderForNav(f *tableState, bucketPath []string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
if len(bucketPath) == 0 || len(bucketPath) == 1 {
|
||||
// Leave data tab disabled (default behaviour)
|
||||
return nil
|
||||
|
||||
} else if len(bucketPath) == 2 {
|
||||
// One selected database
|
||||
// Figure out its content
|
||||
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
||||
}
|
||||
|
||||
// List messages in the selected queue
|
||||
messages, err := ld.rsmqListMessages(ctx, bucketPath[1])
|
||||
if err != nil {
|
||||
return fmt.Errorf("Listing messages in queue %q: %w", bucketPath[1], err)
|
||||
}
|
||||
|
||||
// Redis always uses Key string, Type string, Value []byte as the columns
|
||||
|
||||
f.SetupColumns(
|
||||
[]TableColumn{&stringColumn{}, &stringColumn{}, &int64Column{}, &stringColumn{}},
|
||||
[]string{"ID", "Sent At", "Receive Count", "Body"},
|
||||
)
|
||||
|
||||
for _, msg := range messages {
|
||||
rpos := f.AddRow()
|
||||
f.SetCell(rpos, 0, msg.ID)
|
||||
f.SetCell(rpos, 1, msg.SentAt.Format(time.DateTime))
|
||||
f.SetCell(rpos, 2, int64(msg.Rc))
|
||||
f.SetCell(rpos, 3, msg.Body)
|
||||
}
|
||||
|
||||
// Valid
|
||||
f.tbl.HorizontalHeader().SetStretchLastSection(true)
|
||||
f.Ready()
|
||||
return nil
|
||||
|
||||
} else {
|
||||
return fmt.Errorf("Unexpected nav position %q", bucketPath)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) NavChildren(bucketPath []string) ([]string, error) {
|
||||
|
||||
if len(bucketPath) == 0 {
|
||||
// List databases
|
||||
return ld.redisLoadedDatabase.NavChildren(bucketPath)
|
||||
|
||||
} else if len(bucketPath) == 1 {
|
||||
// List queues in the selected database
|
||||
ctx := context.Background()
|
||||
err := ld.db.Do(ctx, "SELECT", bucketPath[0]).Err()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Switching to database %q: %w", bucketPath[0], err)
|
||||
}
|
||||
|
||||
queues, err := ld.db.SMembers(ctx, fmt.Sprintf("%sQUEUES", ld.ns)).Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Listing keys in database %q: %w", bucketPath[0], err)
|
||||
}
|
||||
|
||||
return queues, nil // No further children
|
||||
|
||||
} else if len(bucketPath) == 2 {
|
||||
return []string{}, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unexpected nav position %q", bucketPath)
|
||||
}
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) NavContext(bucketPath []string) ([]contextAction, error) {
|
||||
|
||||
if len(bucketPath) == 2 {
|
||||
return []contextAction{
|
||||
{ "Send Message", ld.SendMessage },
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil // No special actions are supported
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) SendMessage(sender *qt.QTreeWidgetItem, bucketPath []string) error {
|
||||
msg := qt.QInputDialog_GetText(sender.TreeWidget().QWidget, APPNAME, "Queue a new message:")
|
||||
if msg == "" {
|
||||
return nil // cancel
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
return ld.rsmqSendMessage(ctx, bucketPath[1], msg)
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) Close() {
|
||||
ld.redisLoadedDatabase.Close()
|
||||
}
|
||||
|
||||
var _ loadedDatabase = &rsmqLoadedDatabase{} // interface assertion
|
||||
|
||||
// Helper Functions
|
||||
|
||||
type rsmqMessage struct {
|
||||
ID string
|
||||
Body string
|
||||
Rc int
|
||||
Fr time.Time
|
||||
SentAt time.Time
|
||||
VisibleAt time.Time
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) rsmqListMessages(ctx context.Context, queue string) ([]rsmqMessage, error) {
|
||||
key := ld.ns + queue
|
||||
|
||||
// Get all members from ZSet with scores
|
||||
zres, err := ld.db.ZRangeWithScores(ctx, key, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(zres) == 0 {
|
||||
return []rsmqMessage{}, nil
|
||||
}
|
||||
|
||||
msgs := make([]rsmqMessage, len(zres))
|
||||
|
||||
hashKey := key + ":Q"
|
||||
fields := make([]string, 0, len(zres)*3)
|
||||
for _, z := range zres {
|
||||
id := z.Member.(string)
|
||||
fields = append(fields, id, id+":rc", id+":fr")
|
||||
}
|
||||
|
||||
hmres, err := ld.db.HMGet(ctx, hashKey, fields...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, z := range zres {
|
||||
id := z.Member.(string)
|
||||
|
||||
// Parse ID to get Sent time
|
||||
// Match RSMQ implementation: parseInt(id.slice(0, 10), 36)
|
||||
sent := time.Time{}
|
||||
parseLen := 10
|
||||
if len(id) < 10 {
|
||||
parseLen = len(id)
|
||||
}
|
||||
|
||||
if parseLen > 0 {
|
||||
tsMs, _ := strconv.ParseInt(id[:parseLen], 36, 64)
|
||||
sent = time.UnixMicro(tsMs)
|
||||
}
|
||||
|
||||
body := ""
|
||||
if val := hmres[i*3]; val != nil {
|
||||
body = val.(string)
|
||||
}
|
||||
|
||||
rc := 0
|
||||
if val := hmres[i*3+1]; val != nil {
|
||||
if s, ok := val.(string); ok {
|
||||
rc, _ = strconv.Atoi(s)
|
||||
}
|
||||
}
|
||||
|
||||
fr := time.Time{}
|
||||
if val := hmres[i*3+2]; val != nil {
|
||||
if s, ok := val.(string); ok {
|
||||
frMs, _ := strconv.ParseInt(s, 10, 64)
|
||||
fr = time.UnixMilli(frMs)
|
||||
}
|
||||
}
|
||||
|
||||
msgs[i] = rsmqMessage{
|
||||
ID: id,
|
||||
Body: body,
|
||||
Rc: rc,
|
||||
Fr: fr,
|
||||
SentAt: sent,
|
||||
VisibleAt: time.UnixMilli(int64(z.Score)),
|
||||
}
|
||||
}
|
||||
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) rsmqSendMessage(ctx context.Context, queue string, message string) error {
|
||||
stats, err := ld.rsmqQueueStats(ctx, queue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(message) > stats.MaxSize {
|
||||
return fmt.Errorf("message too long")
|
||||
}
|
||||
|
||||
id := ld.rsmqMessageID()
|
||||
now := time.Now().UnixMilli()
|
||||
score := now + int64(stats.Delay*1000)
|
||||
|
||||
keyQ := ld.ns + queue + ":Q"
|
||||
keyZ := ld.ns + queue
|
||||
|
||||
pipe := ld.db.TxPipeline()
|
||||
pipe.ZAdd(ctx, keyZ, redis.Z{Score: float64(score), Member: id})
|
||||
pipe.HMSet(ctx, keyQ, map[string]interface{}{
|
||||
id: message,
|
||||
id + ":rc": 0,
|
||||
id + ":fr": 0,
|
||||
id + ":sent": now,
|
||||
})
|
||||
pipe.HIncrBy(ctx, keyQ, "totalsent", 1)
|
||||
|
||||
_, err = pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
||||
|
||||
func (ld *rsmqLoadedDatabase) rsmqMessageID() string {
|
||||
// RSMQ uses microsecond precision for the ID timestamp part
|
||||
// Logic: Number(seconds + microseconds).toString(36)
|
||||
// Go's UnixMicro returns the number of microseconds elapsed since January 1, 1970 UTC.
|
||||
// This is equivalent to seconds*1e6 + microseconds, which matches the JS logic
|
||||
// assuming the JS logic intends to create a full microsecond timestamp.
|
||||
ts := strconv.FormatInt(time.Now().UnixMicro(), 36)
|
||||
if len(ts) < 10 {
|
||||
ts = strings.Repeat("0", 10-len(ts)) + ts
|
||||
}
|
||||
|
||||
b := make([]byte, 22)
|
||||
for i := range b {
|
||||
b[i] = charset[rand.Intn(len(charset))]
|
||||
}
|
||||
return ts + string(b)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
type rsmqQueueStats struct {
|
||||
Name string
|
||||
Vt int
|
||||
Delay int
|
||||
MaxSize int
|
||||
TotalRecv uint64
|
||||
TotalSent uint64
|
||||
Created time.Time
|
||||
Modified time.Time
|
||||
Msgs int64
|
||||
HiddenMsgs int64
|
||||
}
|
||||
|
||||
func (ld *rsmqLoadedDatabase) rsmqQueueStats(ctx context.Context, queue string) (*rsmqQueueStats, error) {
|
||||
key := ld.ns + queue
|
||||
|
||||
// Get Attributes from Hash
|
||||
// Fields: vt, delay, maxsize, totalrecv, totalsent, created, modified
|
||||
res, err := ld.db.HMGet(ctx, key+":Q", "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified").Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If all nil, queue might not exist
|
||||
if len(res) == 0 || res[0] == nil {
|
||||
return nil, fmt.Errorf("queue doesn't exist")
|
||||
}
|
||||
|
||||
toInt := func(v interface{}) int {
|
||||
if s, ok := v.(string); ok {
|
||||
i, _ := strconv.Atoi(s)
|
||||
return i
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
toUint64 := func(v interface{}) uint64 {
|
||||
if s, ok := v.(string); ok {
|
||||
i, _ := strconv.ParseUint(s, 10, 64)
|
||||
return i
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
toInt64 := func(v interface{}) int64 {
|
||||
if s, ok := v.(string); ok {
|
||||
i, _ := strconv.ParseInt(s, 10, 64)
|
||||
return i
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
stats := &rsmqQueueStats{
|
||||
Name: queue,
|
||||
Vt: toInt(res[0]),
|
||||
Delay: toInt(res[1]),
|
||||
MaxSize: toInt(res[2]),
|
||||
TotalRecv: toUint64(res[3]),
|
||||
TotalSent: toUint64(res[4]),
|
||||
Created: time.Unix(toInt64(res[5]), 0),
|
||||
Modified: time.Unix(toInt64(res[6]), 0),
|
||||
}
|
||||
|
||||
// Get Msgs Count (ZCard)
|
||||
stats.Msgs, _ = ld.db.ZCard(ctx, key).Result()
|
||||
|
||||
// Get Hidden Msgs Count (ZCount where score > now)
|
||||
nowMs := time.Now().UnixNano() / 1e6
|
||||
stats.HiddenMsgs, _ = ld.db.ZCount(ctx, key, strconv.FormatInt(nowMs, 10), "+inf").Result()
|
||||
return stats, nil
|
||||
}
|
||||
@@ -24,7 +24,11 @@ func (bc *binColumn) SetRowCount(newlen int) {
|
||||
func (bc *binColumn) SetCell(aRow int, data any) {
|
||||
str, ok := data.([]byte)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("bad data type (got %T)", data))
|
||||
if s2, ok := data.(string); ok {
|
||||
str = []byte(s2)
|
||||
} else {
|
||||
panic(fmt.Sprintf("bad data type (got %T)", data))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Use slice_dup() here because many iterator loops reuse the backing []byte
|
||||
|
||||
Reference in New Issue
Block a user