dawn.node: Make run-cts --print-stdout print the run's stdout/err

Add a new muxWriter and use this for stdout writing to ensure
that stdout printing does not tear.

Bug: dawn:1123

Change-Id: Iec1ba16e5524c11bfc00ba38b3e4de3c06627fb1
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/85366
Reviewed-by: Corentin Wallez <cwallez@chromium.org>
Kokoro: Kokoro <noreply+kokoro@google.com>
Commit-Queue: Ben Clayton <bclayton@google.com>
This commit is contained in:
Corentin Wallez 2022-04-12 12:17:23 +00:00 committed by Dawn LUCI CQ
parent 8c5cb62438
commit 48585f3c15
1 changed files with 200 additions and 95 deletions

View File

@ -30,7 +30,6 @@ import (
"os/exec"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"sort"
"strconv"
@ -66,7 +65,6 @@ Usage:
var (
colors bool
stdout io.Writer
mainCtx context.Context
)
@ -130,7 +128,7 @@ func run() error {
}
var dawnNode, cts, node, npx, resultsPath, expectationsPath, logFilename, backend string
var verbose, isolated, build bool
var printStdout, verbose, isolated, build bool
var numRunners int
var flags dawnNodeFlags
flag.StringVar(&dawnNode, "dawn-node", "", "path to dawn.node module")
@ -139,6 +137,7 @@ func run() error {
flag.StringVar(&npx, "npx", "", "path to npx executable")
flag.StringVar(&resultsPath, "output", "", "path to write test results file")
flag.StringVar(&expectationsPath, "expect", "", "path to expectations file")
flag.BoolVar(&printStdout, "print-stdout", false, "print the stdout and stderr from each test runner server")
flag.BoolVar(&verbose, "verbose", false, "print extra information while testing")
flag.BoolVar(&build, "build", true, "attempt to build the CTS before running")
flag.BoolVar(&isolated, "isolate", false, "run each test in an isolated process")
@ -150,11 +149,14 @@ func run() error {
" set to 'vulkan' if VK_ICD_FILENAMES environment variable is set, 'default' otherwise")
flag.Parse()
// Create a thread-safe, color supporting stdout wrapper.
var stdout io.WriteCloser
if colors {
stdout = colorable.NewColorableStdout()
stdout = newMuxWriter(colorable.NewColorableStdout())
} else {
stdout = colorable.NewNonColorable(os.Stdout)
stdout = newMuxWriter(colorable.NewNonColorable(os.Stdout))
}
defer stdout.Close() // Required to flush the mux chan
// Check mandatory arguments
if dawnNode == "" || cts == "" {
@ -201,7 +203,7 @@ func run() error {
// Forward the backend to use, if specified.
if backend != "default" {
fmt.Println("Forcing backend to", backend)
fmt.Fprintln(stdout, "Forcing backend to", backend)
flags = append(flags, fmt.Sprint("dawn-backend=", backend))
}
@ -218,17 +220,19 @@ func run() error {
}
r := runner{
numRunners: numRunners,
verbose: verbose,
node: node,
npx: npx,
dawnNode: dawnNode,
cts: cts,
flags: flags,
results: testcaseStatuses{},
numRunners: numRunners,
printStdout: printStdout,
verbose: verbose,
node: node,
npx: npx,
dawnNode: dawnNode,
cts: cts,
flags: flags,
results: testcaseStatuses{},
evalScript: func(main string) string {
return fmt.Sprintf(`require('./src/common/tools/setup-ts-in-node.js');require('./src/common/runtime/%v.ts');`, main)
},
stdout: stdout,
}
if logFilename != "" {
@ -243,7 +247,7 @@ func run() error {
cache := cache{}
cachePath := dawnNode + ".runcts.cache"
if err := cache.load(cachePath); err != nil && verbose {
fmt.Println("failed to load cache from", cachePath, err)
fmt.Fprintln(stdout, "failed to load cache from", cachePath, err)
}
defer cache.save(cachePath)
@ -257,7 +261,7 @@ func run() error {
!isDir(filepath.Join(r.cts, "out-node"))
if build {
if verbose {
fmt.Println("CTS needs rebuild:", ctsNeedsRebuild)
fmt.Fprintln(stdout, "CTS needs rebuild:", ctsNeedsRebuild)
}
if npx != "" {
@ -272,7 +276,7 @@ func run() error {
return fmt.Sprintf(`require('./out-node/common/runtime/%v.js');`, main)
}
} else {
fmt.Println("npx not found on PATH. Using runtime TypeScript transpilation (slow)")
fmt.Fprintln(stdout, "npx not found on PATH. Using runtime TypeScript transpilation (slow)")
}
}
@ -292,20 +296,20 @@ func run() error {
}
if isolated {
fmt.Println("Running in parallel isolated...")
fmt.Printf("Testing %d test cases...\n", len(r.testcases))
fmt.Fprintln(stdout, "Running in parallel isolated...")
fmt.Fprintf(stdout, "Testing %d test cases...\n", len(r.testcases))
if err := r.runParallelIsolated(); err != nil {
return err
}
} else {
fmt.Println("Running in parallel with server...")
fmt.Printf("Testing %d test cases...\n", len(r.testcases))
fmt.Fprintln(stdout, "Running in parallel with server...")
fmt.Fprintf(stdout, "Testing %d test cases...\n", len(r.testcases))
if err := r.runParallelWithServer(); err != nil {
return err
}
}
} else {
fmt.Println("Running serially...")
fmt.Fprintln(stdout, "Running serially...")
if err := r.runSerially(query); err != nil {
return err
}
@ -375,6 +379,7 @@ func (c *cache) save(path string) error {
type runner struct {
numRunners int
printStdout bool
verbose bool
node, npx, dawnNode, cts string
flags dawnNodeFlags
@ -383,6 +388,7 @@ type runner struct {
expectations testcaseStatuses
results testcaseStatuses
log logger
stdout io.WriteCloser
}
// scanSourceTimestamps scans all the .js and .ts files in all subdirectories of
@ -390,9 +396,9 @@ type runner struct {
func (r *runner) scanSourceTimestamps(verbose bool) (time.Time, error) {
if verbose {
start := time.Now()
fmt.Println("Scanning .js / .ts files for changes...")
fmt.Fprintln(r.stdout, "Scanning .js / .ts files for changes...")
defer func() {
fmt.Println("completed in", time.Since(start))
fmt.Fprintln(r.stdout, "completed in", time.Since(start))
}()
}
@ -420,9 +426,9 @@ func (r *runner) scanSourceTimestamps(verbose bool) (time.Time, error) {
func (r *runner) buildCTS(verbose bool) error {
if verbose {
start := time.Now()
fmt.Println("Building CTS...")
fmt.Fprintln(r.stdout, "Building CTS...")
defer func() {
fmt.Println("completed in", time.Since(start))
fmt.Fprintln(r.stdout, "completed in", time.Since(start))
}()
}
@ -440,9 +446,9 @@ func (r *runner) buildCTS(verbose bool) error {
func (r *runner) gatherTestCases(query string, verbose bool) error {
if verbose {
start := time.Now()
fmt.Println("Gathering test cases...")
fmt.Fprintln(r.stdout, "Gathering test cases...")
defer func() {
fmt.Println("completed in", time.Since(start))
fmt.Fprintln(r.stdout, "completed in", time.Since(start))
}()
}
@ -468,30 +474,80 @@ func (r *runner) gatherTestCases(query string, verbose bool) error {
return nil
}
// portListener implements io.Writer, monitoring written messages until a port
// is printed between '[[' ']]'. Once the port has been found, the parsed
// port number is written to the 'port' chan, and all subsequent writes are
// forwarded to writer.
type portListener struct {
writer io.Writer
buffer strings.Builder
port chan int
}
func newPortListener() portListener {
return portListener{strings.Builder{}, make(chan int)}
func newPortListener(w io.Writer) portListener {
return portListener{w, strings.Builder{}, make(chan int)}
}
var portRE = regexp.MustCompile(`\[\[(\d+)\]\]`)
func (p *portListener) Write(data []byte) (n int, err error) {
if p.port != nil {
p.buffer.Write(data)
match := portRE.FindStringSubmatch(p.buffer.String())
if len(match) == 2 {
port, err := strconv.Atoi(match[1])
if err != nil {
return 0, err
}
p.port <- port
close(p.port)
p.port = nil
str := p.buffer.String()
idx := strings.Index(str, "[[")
if idx < 0 {
// Still waiting for the opening '[['
return len(data), nil
}
str = str[idx+2:] // skip past '[['
idx = strings.Index(str, "]]")
if idx < 0 {
// Still waiting for the closing ']]'
return len(data), nil
}
port, err := strconv.Atoi(str[:idx])
if err != nil {
return 0, err
}
// Port found. Write it to the chan, and close the chan.
p.port <- port
close(p.port)
p.port = nil
str = strings.TrimRight(str[idx+2:], " \n")
if len(str) == 0 {
return len(data), nil
}
// Write out trailing text after the ']]'
return p.writer.Write([]byte(str))
}
// Port has been found. Just forward the rest of the data to p.writer
return p.writer.Write(data)
}
// prefixWriter is an io.Writer that prefixes each write with a prefix string
type prefixWriter struct {
prefix string
writer io.Writer
midLine bool
}
func (p *prefixWriter) Write(data []byte) (int, error) {
lines := strings.Split(string(data), "\n")
buf := strings.Builder{}
for i, line := range lines {
if line == "" && i == len(lines)-1 {
break
}
buf.WriteString(p.prefix)
buf.WriteString(line)
buf.WriteString("\n")
}
if _, err := p.writer.Write([]byte(buf.String())); err != nil {
return 0, err
}
return len(data), nil
}
@ -514,10 +570,11 @@ func (r *runner) runParallelWithServer() error {
// Spin up the test runner goroutines
wg := &sync.WaitGroup{}
for i := 0; i < r.numRunners; i++ {
id := i
wg.Add(1)
go func() {
defer wg.Done()
if err := r.runServer(caseIndices, results); err != nil {
if err := r.runServer(id, caseIndices, results); err != nil {
results <- result{
status: fail,
error: fmt.Errorf("Test server error: %w", err),
@ -530,18 +587,14 @@ func (r *runner) runParallelWithServer() error {
return nil
}
type redirectingWriter struct {
io.Writer
}
// runServer starts a test runner server instance, takes case indices from
// caseIndices, and requests the server run the test with the given index.
// The result of the test run is written to the results chan.
// Once the caseIndices chan has been closed, the server is stopped and
// runServer returns.
func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error {
func (r *runner) runServer(id int, caseIndices <-chan int, results chan<- result) error {
var port int
var rw redirectingWriter
testCaseLog := &bytes.Buffer{}
stopServer := func() {}
startServer := func() error {
@ -562,13 +615,20 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error
ctx := mainCtx
cmd := exec.CommandContext(ctx, r.node, args...)
serverLog := &bytes.Buffer{}
writer := io.Writer(testCaseLog)
if r.printStdout {
pw := &prefixWriter{
prefix: fmt.Sprintf("[%d] ", id),
writer: r.stdout,
}
writer = io.MultiWriter(pw, writer)
}
pl := newPortListener()
pl := newPortListener(writer)
cmd.Dir = r.cts
cmd.Stdout = io.MultiWriter(&rw, serverLog, &pl)
cmd.Stderr = io.MultiWriter(&rw, serverLog)
cmd.Stdout = &pl
cmd.Stderr = &pl
err := cmd.Start()
if err != nil {
@ -577,9 +637,10 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error
select {
case port = <-pl.port:
return nil // success
case <-time.After(time.Second * 10):
return fmt.Errorf("timeout waiting for server port:\n%v", serverLog.String())
case <-ctx.Done():
return fmt.Errorf("timeout waiting for server port:\n%v", pl.buffer.String())
case <-ctx.Done(): // cancelled
return ctx.Err()
}
@ -594,9 +655,7 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error
}
for idx := range caseIndices {
// Redirect the server log per test case
caseServerLog := &bytes.Buffer{}
rw.Writer = caseServerLog
testCaseLog.Reset() // Clear the log for this test case
if port == 0 {
if err := startServer(); err != nil {
@ -631,16 +690,16 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error
switch resp.Status {
case "pass":
res.status = pass
res.message = resp.Message + caseServerLog.String()
res.message = resp.Message + testCaseLog.String()
case "warn":
res.status = warn
res.message = resp.Message + caseServerLog.String()
res.message = resp.Message + testCaseLog.String()
case "fail":
res.status = fail
res.message = resp.Message + caseServerLog.String()
res.message = resp.Message + testCaseLog.String()
case "skip":
res.status = skip
res.message = resp.Message + caseServerLog.String()
res.message = resp.Message + testCaseLog.String()
default:
res.status = fail
res.error = fmt.Errorf("unknown status: '%v'", resp.Status)
@ -717,7 +776,7 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) {
// Helper function for printing a progress bar.
lastStatusUpdate, animFrame := time.Now(), 0
updateProgress := func() {
printANSIProgressBar(animFrame, numTests, numByExpectedStatus)
fmt.Fprint(r.stdout, ansiProgressBar(animFrame, numTests, numByExpectedStatus))
animFrame++
lastStatusUpdate = time.Now()
}
@ -744,24 +803,32 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) {
if r.verbose ||
res.error != nil ||
(exStatus.status != pass && exStatus.status != skip && !exStatus.expected) {
fmt.Printf("%v - %v: %v", name, res.status, res.message)
buf := &bytes.Buffer{}
fmt.Fprint(buf, statusColor[res.status])
if res.message != "" {
fmt.Fprintf(buf, "%v - %v:\n%v", name, res.status, res.message)
} else {
fmt.Fprintf(buf, "%v - %v", name, res.status)
}
if expected != "" {
fmt.Printf(" [%v -> %v]", expected, res.status)
fmt.Fprintf(buf, " [%v -> %v]", expected, res.status)
}
fmt.Println()
fmt.Fprintln(buf)
if res.error != nil {
fmt.Println(res.error)
fmt.Fprintln(buf, res.error)
}
fmt.Fprint(buf, ansiReset)
fmt.Fprint(r.stdout, buf.String())
updateProgress()
}
if time.Since(lastStatusUpdate) > progressUpdateRate {
updateProgress()
}
}
printANSIProgressBar(animFrame, numTests, numByExpectedStatus)
fmt.Fprint(r.stdout, ansiProgressBar(animFrame, numTests, numByExpectedStatus))
// All done. Print final stats.
fmt.Printf("\nCompleted in %v\n", timeTaken)
fmt.Fprintf(r.stdout, "\nCompleted in %v\n", timeTaken)
var numExpectedByStatus map[status]int
if r.expectations != nil {
@ -788,22 +855,22 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) {
continue
}
fmt.Print(bold, statusColor[s])
fmt.Print(alignRight(strings.ToUpper(string(s))+": ", 10))
fmt.Print(ansiReset)
fmt.Fprint(r.stdout, bold, statusColor[s])
fmt.Fprint(r.stdout, alignRight(strings.ToUpper(string(s))+": ", 10))
fmt.Fprint(r.stdout, ansiReset)
if numByStatus > 0 {
fmt.Print(bold)
fmt.Fprint(r.stdout, bold)
}
fmt.Print(alignLeft(numByStatus, 10))
fmt.Print(ansiReset)
fmt.Print(alignRight("("+percentage(numByStatus, numTests)+")", 6))
fmt.Fprint(r.stdout, alignLeft(numByStatus, 10))
fmt.Fprint(r.stdout, ansiReset)
fmt.Fprint(r.stdout, alignRight("("+percentage(numByStatus, numTests)+")", 6))
if diffFromExpected != 0 {
fmt.Print(bold, " [")
fmt.Printf("%+d", diffFromExpected)
fmt.Print(ansiReset, "]")
fmt.Fprint(r.stdout, bold, " [")
fmt.Fprintf(r.stdout, "%+d", diffFromExpected)
fmt.Fprint(r.stdout, ansiReset, "]")
}
fmt.Println()
fmt.Fprintln(r.stdout)
}
}
@ -817,10 +884,10 @@ func (r *runner) runSerially(query string) error {
timeTaken := time.Since(start)
if r.verbose {
fmt.Println(result)
fmt.Fprintln(r.stdout, result)
}
fmt.Println("Status:", result.status)
fmt.Println("Completed in", timeTaken)
fmt.Fprintln(r.stdout, "Status:", result.status)
fmt.Fprintln(r.stdout, "Completed in", timeTaken)
return nil
}
@ -968,10 +1035,10 @@ func alignRight(val interface{}, width int) string {
return strings.Repeat(" ", padding) + s
}
// printANSIProgressBar prints a colored progress bar, providing realtime
// information about the status of the CTS run.
// ansiProgressBar returns a string with an ANSI-colored progress bar, providing
// realtime information about the status of the CTS run.
// Note: We'll want to skip this if !isatty or if we're running on windows.
func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[expectedStatus]int) {
func ansiProgressBar(animFrame int, numTests int, numByExpectedStatus map[expectedStatus]int) string {
const barWidth = 50
animSymbols := []rune{'⣾', '⣽', '⣻', '⢿', '⡿', '⣟', '⣯', '⣷'}
@ -979,7 +1046,8 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e
numBlocksPrinted := 0
fmt.Fprint(stdout, string(animSymbols[animFrame%len(animSymbols)]), " [")
buf := &strings.Builder{}
fmt.Fprint(buf, string(animSymbols[animFrame%len(animSymbols)]), " [")
animFrame++
numFinished := 0
@ -995,20 +1063,20 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e
numFinished += num
statusFrac := float64(num) / float64(numTests)
fNumBlocks := barWidth * statusFrac
fmt.Fprint(stdout, color)
fmt.Fprint(buf, color)
numBlocks := int(math.Ceil(fNumBlocks))
if expected {
if numBlocks > 1 {
fmt.Print(strings.Repeat(string("░"), numBlocks))
fmt.Fprint(buf, strings.Repeat(string("░"), numBlocks))
}
} else {
if numBlocks > 1 {
fmt.Print(strings.Repeat(string("▉"), numBlocks))
fmt.Fprint(buf, strings.Repeat(string("▉"), numBlocks))
}
if numBlocks > 0 {
frac := fNumBlocks - math.Floor(fNumBlocks)
symbol := blockSymbols[int(math.Round(frac*float64(len(blockSymbols)-1)))]
fmt.Print(string(symbol))
fmt.Fprint(buf, string(symbol))
}
}
numBlocksPrinted += numBlocks
@ -1016,18 +1084,20 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e
}
if barWidth > numBlocksPrinted {
fmt.Print(strings.Repeat(string(" "), barWidth-numBlocksPrinted))
fmt.Fprint(buf, strings.Repeat(string(" "), barWidth-numBlocksPrinted))
}
fmt.Fprint(stdout, ansiReset)
fmt.Print("] ", percentage(numFinished, numTests))
fmt.Fprint(buf, ansiReset)
fmt.Fprint(buf, "] ", percentage(numFinished, numTests))
if colors {
// move cursor to start of line so the bar is overridden
fmt.Fprint(stdout, positionLeft)
fmt.Fprint(buf, positionLeft)
} else {
// cannot move cursor, so newline
fmt.Println()
fmt.Fprintln(buf)
}
return buf.String()
}
// testcaseStatus is a pair of testcase name and result status
@ -1164,3 +1234,38 @@ func thisDir() string {
}
return filepath.Dir(file)
}
type muxWriter struct {
data chan []byte
err chan error
}
// muxWriter returns a thread-safe io.WriteCloser, that writes to w
func newMuxWriter(w io.Writer) *muxWriter {
m := muxWriter{
data: make(chan []byte, 256),
err: make(chan error, 1),
}
go func() {
defer close(m.err)
for data := range m.data {
_, err := w.Write(data)
if err != nil {
m.err <- err
return
}
}
m.err <- nil
}()
return &m
}
func (w *muxWriter) Write(data []byte) (n int, err error) {
w.data <- append([]byte{}, data...)
return len(data), nil
}
func (w *muxWriter) Close() error {
close(w.data)
return <-w.err
}