From be91cd54c6de353e7a434b451c0dad0838371585 Mon Sep 17 00:00:00 2001 From: mappu Date: Sun, 30 Jun 2024 12:33:01 +1200 Subject: [PATCH] eventcmd: initial commit of channel-based process wrapper --- sqliteclidriver/eventcmd.go | 99 ++++++++++++++++++++++++++++++++ sqliteclidriver/eventcmd_test.go | 69 ++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 sqliteclidriver/eventcmd.go create mode 100644 sqliteclidriver/eventcmd_test.go diff --git a/sqliteclidriver/eventcmd.go b/sqliteclidriver/eventcmd.go new file mode 100644 index 0000000..43d8bfc --- /dev/null +++ b/sqliteclidriver/eventcmd.go @@ -0,0 +1,99 @@ +package sqliteclidriver + +import ( + "io" + "os/exec" + "sync" +) + +const ( + evtypeStdout int = iota + evtypeStderr + evtypeExit +) + +type processEvent struct { + evtype int + data []byte + err error +} + +// + +func ExecEvents(cmd *exec.Cmd) (<-chan processEvent, io.WriteCloser, error) { + + pw, err := cmd.StdinPipe() + if err != nil { + return nil, nil, err + } + + pr, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, err + } + + pe, err := cmd.StderrPipe() + if err != nil { + return nil, nil, err + } + + err = cmd.Start() + if err != nil { + return nil, nil, err + } + + chEvents := make(chan processEvent, 0) + + var wg sync.WaitGroup + + go func() { + defer wg.Done() + processEventWorker(pr, evtypeStdout, chEvents) + }() + + go func() { + defer wg.Done() + processEventWorker(pe, evtypeStderr, chEvents) + }() + wg.Add(2) + + go func() { + // Only call cmd.Wait() after pipes are closed + wg.Wait() + + err = cmd.Wait() + chEvents <- processEvent{ + evtype: evtypeExit, + err: err, + } + + close(chEvents) + }() + + return chEvents, pw, nil +} + +func processEventWorker(p io.Reader, evtype int, dest chan<- processEvent) { + for { + buf := make([]byte, 1024) + n, err := p.Read(buf) + + if n > 0 { + dest <- processEvent{ + evtype: evtype, + data: buf[0:n], + } + } + + if err != nil { + dest <- processEvent{ + evtype: evtype, + err: err, + } + + // Assume all errors are permanent + // Ordering can produce either io.EOF, ErrClosedPipe, or PathError{"file already closed"} + return + } + } +} diff --git a/sqliteclidriver/eventcmd_test.go b/sqliteclidriver/eventcmd_test.go new file mode 100644 index 0000000..830c057 --- /dev/null +++ b/sqliteclidriver/eventcmd_test.go @@ -0,0 +1,69 @@ +package sqliteclidriver + +import ( + "errors" + "io" + "os/exec" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEventCmd(t *testing.T) { + cmd := exec.Command("/bin/bash", "-c", `echo "hello world"`) + + ch, _, err := ExecEvents(cmd) + if err != nil { + t.Fatal(err) + } + + var consume []processEvent + for ev := range ch { + consume = append(consume, ev) + } + + expect := []processEvent{ + processEvent{evtype: evtypeStdout, data: []byte("hello world\n")}, + processEvent{evtype: evtypeStdout, err: io.EOF}, + processEvent{evtype: evtypeStderr, err: io.EOF}, + processEvent{evtype: evtypeExit, err: nil}, + } + + require.EqualValues(t, expect, consume) +} + +func TestEventCmdStdin(t *testing.T) { + cmd := exec.Command("/usr/bin/tr", "a-z", "A-Z") + + ch, pw, err := ExecEvents(cmd) + if err != nil { + t.Fatal(err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + var consume []processEvent + go func() { + defer wg.Done() + + for ev := range ch { + if ev.err != nil && errors.Is(ev.err, io.EOF) { + continue // skip flakey ordering of two EOF statements + } + consume = append(consume, ev) + } + }() + + pw.Write([]byte("hello world")) + pw.Close() + + wg.Wait() + + expect := []processEvent{ + processEvent{evtype: evtypeStdout, data: []byte("HELLO WORLD")}, + processEvent{evtype: evtypeExit, err: nil}, + } + + require.EqualValues(t, expect, consume) +}