package sqliteclidriver import ( "io" "os/exec" "sync" ) const ( evtypeStdout int = iota evtypeStderr evtypeExit ) type processEvent struct { evtype int data []byte err error } func (pe processEvent) Error() string { if pe.err != nil { return pe.err.Error() } if pe.evtype == evtypeStderr { return string(pe.data) } return "" } func (pe processEvent) Unwrap() error { return pe.err } // func ExecEvents(cmd *exec.Cmd) (<-chan processEvent, io.WriteCloser, error) { pw, err := cmd.StdinPipe() if err != nil { return nil, nil, err } pr, err := cmd.StdoutPipe() if err != nil { return nil, nil, err } pe, err := cmd.StderrPipe() if err != nil { return nil, nil, err } err = cmd.Start() if err != nil { return nil, nil, err } chEvents := make(chan processEvent, 0) var wg sync.WaitGroup go func() { defer wg.Done() processEventWorker(pr, evtypeStdout, chEvents) }() go func() { defer wg.Done() processEventWorker(pe, evtypeStderr, chEvents) }() wg.Add(2) go func() { // Only call cmd.Wait() after pipes are closed wg.Wait() err = cmd.Wait() chEvents <- processEvent{ evtype: evtypeExit, err: err, } close(chEvents) }() return chEvents, pw, nil } func processEventWorker(p io.Reader, evtype int, dest chan<- processEvent) { for { buf := make([]byte, 1024) n, err := p.Read(buf) if n > 0 { dest <- processEvent{ evtype: evtype, data: buf[0:n], } } if err != nil { dest <- processEvent{ evtype: evtype, err: err, } // Assume all errors are permanent // Ordering can produce either io.EOF, ErrClosedPipe, or PathError{"file already closed"} return } } }