eventcmd: initial commit of channel-based process wrapper

This commit is contained in:
mappu 2024-06-30 12:33:01 +12:00
parent b141aaaa6c
commit be91cd54c6
2 changed files with 168 additions and 0 deletions

View File

@ -0,0 +1,99 @@
package sqliteclidriver
import (
"io"
"os/exec"
"sync"
)
const (
evtypeStdout int = iota
evtypeStderr
evtypeExit
)
type processEvent struct {
evtype int
data []byte
err error
}
//
func ExecEvents(cmd *exec.Cmd) (<-chan processEvent, io.WriteCloser, error) {
pw, err := cmd.StdinPipe()
if err != nil {
return nil, nil, err
}
pr, err := cmd.StdoutPipe()
if err != nil {
return nil, nil, err
}
pe, err := cmd.StderrPipe()
if err != nil {
return nil, nil, err
}
err = cmd.Start()
if err != nil {
return nil, nil, err
}
chEvents := make(chan processEvent, 0)
var wg sync.WaitGroup
go func() {
defer wg.Done()
processEventWorker(pr, evtypeStdout, chEvents)
}()
go func() {
defer wg.Done()
processEventWorker(pe, evtypeStderr, chEvents)
}()
wg.Add(2)
go func() {
// Only call cmd.Wait() after pipes are closed
wg.Wait()
err = cmd.Wait()
chEvents <- processEvent{
evtype: evtypeExit,
err: err,
}
close(chEvents)
}()
return chEvents, pw, nil
}
func processEventWorker(p io.Reader, evtype int, dest chan<- processEvent) {
for {
buf := make([]byte, 1024)
n, err := p.Read(buf)
if n > 0 {
dest <- processEvent{
evtype: evtype,
data: buf[0:n],
}
}
if err != nil {
dest <- processEvent{
evtype: evtype,
err: err,
}
// Assume all errors are permanent
// Ordering can produce either io.EOF, ErrClosedPipe, or PathError{"file already closed"}
return
}
}
}

View File

@ -0,0 +1,69 @@
package sqliteclidriver
import (
"errors"
"io"
"os/exec"
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestEventCmd(t *testing.T) {
cmd := exec.Command("/bin/bash", "-c", `echo "hello world"`)
ch, _, err := ExecEvents(cmd)
if err != nil {
t.Fatal(err)
}
var consume []processEvent
for ev := range ch {
consume = append(consume, ev)
}
expect := []processEvent{
processEvent{evtype: evtypeStdout, data: []byte("hello world\n")},
processEvent{evtype: evtypeStdout, err: io.EOF},
processEvent{evtype: evtypeStderr, err: io.EOF},
processEvent{evtype: evtypeExit, err: nil},
}
require.EqualValues(t, expect, consume)
}
func TestEventCmdStdin(t *testing.T) {
cmd := exec.Command("/usr/bin/tr", "a-z", "A-Z")
ch, pw, err := ExecEvents(cmd)
if err != nil {
t.Fatal(err)
}
wg := sync.WaitGroup{}
wg.Add(1)
var consume []processEvent
go func() {
defer wg.Done()
for ev := range ch {
if ev.err != nil && errors.Is(ev.err, io.EOF) {
continue // skip flakey ordering of two EOF statements
}
consume = append(consume, ev)
}
}()
pw.Write([]byte("hello world"))
pw.Close()
wg.Wait()
expect := []processEvent{
processEvent{evtype: evtypeStdout, data: []byte("HELLO WORLD")},
processEvent{evtype: evtypeExit, err: nil},
}
require.EqualValues(t, expect, consume)
}