From 48585f3c15506728281f502bd226c95b8bcaa74f Mon Sep 17 00:00:00 2001 From: Corentin Wallez Date: Tue, 12 Apr 2022 12:17:23 +0000 Subject: [PATCH] dawn.node: Make run-cts --print-stdout print the run's stdout/err Add a new muxWriter and use this for stdout writing to ensure that stdout printing does not tear. Bug: dawn:1123 Change-Id: Iec1ba16e5524c11bfc00ba38b3e4de3c06627fb1 Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/85366 Reviewed-by: Corentin Wallez Kokoro: Kokoro Commit-Queue: Ben Clayton --- src/dawn/node/tools/src/cmd/run-cts/main.go | 295 +++++++++++++------- 1 file changed, 200 insertions(+), 95 deletions(-) diff --git a/src/dawn/node/tools/src/cmd/run-cts/main.go b/src/dawn/node/tools/src/cmd/run-cts/main.go index d20294b98f..28eb60dd1d 100644 --- a/src/dawn/node/tools/src/cmd/run-cts/main.go +++ b/src/dawn/node/tools/src/cmd/run-cts/main.go @@ -30,7 +30,6 @@ import ( "os/exec" "os/signal" "path/filepath" - "regexp" "runtime" "sort" "strconv" @@ -66,7 +65,6 @@ Usage: var ( colors bool - stdout io.Writer mainCtx context.Context ) @@ -130,7 +128,7 @@ func run() error { } var dawnNode, cts, node, npx, resultsPath, expectationsPath, logFilename, backend string - var verbose, isolated, build bool + var printStdout, verbose, isolated, build bool var numRunners int var flags dawnNodeFlags flag.StringVar(&dawnNode, "dawn-node", "", "path to dawn.node module") @@ -139,6 +137,7 @@ func run() error { flag.StringVar(&npx, "npx", "", "path to npx executable") flag.StringVar(&resultsPath, "output", "", "path to write test results file") flag.StringVar(&expectationsPath, "expect", "", "path to expectations file") + flag.BoolVar(&printStdout, "print-stdout", false, "print the stdout and stderr from each test runner server") flag.BoolVar(&verbose, "verbose", false, "print extra information while testing") flag.BoolVar(&build, "build", true, "attempt to build the CTS before running") flag.BoolVar(&isolated, "isolate", false, "run each test in an isolated process") @@ -150,11 +149,14 @@ func run() error { " set to 'vulkan' if VK_ICD_FILENAMES environment variable is set, 'default' otherwise") flag.Parse() + // Create a thread-safe, color supporting stdout wrapper. + var stdout io.WriteCloser if colors { - stdout = colorable.NewColorableStdout() + stdout = newMuxWriter(colorable.NewColorableStdout()) } else { - stdout = colorable.NewNonColorable(os.Stdout) + stdout = newMuxWriter(colorable.NewNonColorable(os.Stdout)) } + defer stdout.Close() // Required to flush the mux chan // Check mandatory arguments if dawnNode == "" || cts == "" { @@ -201,7 +203,7 @@ func run() error { // Forward the backend to use, if specified. if backend != "default" { - fmt.Println("Forcing backend to", backend) + fmt.Fprintln(stdout, "Forcing backend to", backend) flags = append(flags, fmt.Sprint("dawn-backend=", backend)) } @@ -218,17 +220,19 @@ func run() error { } r := runner{ - numRunners: numRunners, - verbose: verbose, - node: node, - npx: npx, - dawnNode: dawnNode, - cts: cts, - flags: flags, - results: testcaseStatuses{}, + numRunners: numRunners, + printStdout: printStdout, + verbose: verbose, + node: node, + npx: npx, + dawnNode: dawnNode, + cts: cts, + flags: flags, + results: testcaseStatuses{}, evalScript: func(main string) string { return fmt.Sprintf(`require('./src/common/tools/setup-ts-in-node.js');require('./src/common/runtime/%v.ts');`, main) }, + stdout: stdout, } if logFilename != "" { @@ -243,7 +247,7 @@ func run() error { cache := cache{} cachePath := dawnNode + ".runcts.cache" if err := cache.load(cachePath); err != nil && verbose { - fmt.Println("failed to load cache from", cachePath, err) + fmt.Fprintln(stdout, "failed to load cache from", cachePath, err) } defer cache.save(cachePath) @@ -257,7 +261,7 @@ func run() error { !isDir(filepath.Join(r.cts, "out-node")) if build { if verbose { - fmt.Println("CTS needs rebuild:", ctsNeedsRebuild) + fmt.Fprintln(stdout, "CTS needs rebuild:", ctsNeedsRebuild) } if npx != "" { @@ -272,7 +276,7 @@ func run() error { return fmt.Sprintf(`require('./out-node/common/runtime/%v.js');`, main) } } else { - fmt.Println("npx not found on PATH. Using runtime TypeScript transpilation (slow)") + fmt.Fprintln(stdout, "npx not found on PATH. Using runtime TypeScript transpilation (slow)") } } @@ -292,20 +296,20 @@ func run() error { } if isolated { - fmt.Println("Running in parallel isolated...") - fmt.Printf("Testing %d test cases...\n", len(r.testcases)) + fmt.Fprintln(stdout, "Running in parallel isolated...") + fmt.Fprintf(stdout, "Testing %d test cases...\n", len(r.testcases)) if err := r.runParallelIsolated(); err != nil { return err } } else { - fmt.Println("Running in parallel with server...") - fmt.Printf("Testing %d test cases...\n", len(r.testcases)) + fmt.Fprintln(stdout, "Running in parallel with server...") + fmt.Fprintf(stdout, "Testing %d test cases...\n", len(r.testcases)) if err := r.runParallelWithServer(); err != nil { return err } } } else { - fmt.Println("Running serially...") + fmt.Fprintln(stdout, "Running serially...") if err := r.runSerially(query); err != nil { return err } @@ -375,6 +379,7 @@ func (c *cache) save(path string) error { type runner struct { numRunners int + printStdout bool verbose bool node, npx, dawnNode, cts string flags dawnNodeFlags @@ -383,6 +388,7 @@ type runner struct { expectations testcaseStatuses results testcaseStatuses log logger + stdout io.WriteCloser } // scanSourceTimestamps scans all the .js and .ts files in all subdirectories of @@ -390,9 +396,9 @@ type runner struct { func (r *runner) scanSourceTimestamps(verbose bool) (time.Time, error) { if verbose { start := time.Now() - fmt.Println("Scanning .js / .ts files for changes...") + fmt.Fprintln(r.stdout, "Scanning .js / .ts files for changes...") defer func() { - fmt.Println("completed in", time.Since(start)) + fmt.Fprintln(r.stdout, "completed in", time.Since(start)) }() } @@ -420,9 +426,9 @@ func (r *runner) scanSourceTimestamps(verbose bool) (time.Time, error) { func (r *runner) buildCTS(verbose bool) error { if verbose { start := time.Now() - fmt.Println("Building CTS...") + fmt.Fprintln(r.stdout, "Building CTS...") defer func() { - fmt.Println("completed in", time.Since(start)) + fmt.Fprintln(r.stdout, "completed in", time.Since(start)) }() } @@ -440,9 +446,9 @@ func (r *runner) buildCTS(verbose bool) error { func (r *runner) gatherTestCases(query string, verbose bool) error { if verbose { start := time.Now() - fmt.Println("Gathering test cases...") + fmt.Fprintln(r.stdout, "Gathering test cases...") defer func() { - fmt.Println("completed in", time.Since(start)) + fmt.Fprintln(r.stdout, "completed in", time.Since(start)) }() } @@ -468,30 +474,80 @@ func (r *runner) gatherTestCases(query string, verbose bool) error { return nil } +// portListener implements io.Writer, monitoring written messages until a port +// is printed between '[[' ']]'. Once the port has been found, the parsed +// port number is written to the 'port' chan, and all subsequent writes are +// forwarded to writer. type portListener struct { + writer io.Writer buffer strings.Builder port chan int } -func newPortListener() portListener { - return portListener{strings.Builder{}, make(chan int)} +func newPortListener(w io.Writer) portListener { + return portListener{w, strings.Builder{}, make(chan int)} } -var portRE = regexp.MustCompile(`\[\[(\d+)\]\]`) - func (p *portListener) Write(data []byte) (n int, err error) { if p.port != nil { p.buffer.Write(data) - match := portRE.FindStringSubmatch(p.buffer.String()) - if len(match) == 2 { - port, err := strconv.Atoi(match[1]) - if err != nil { - return 0, err - } - p.port <- port - close(p.port) - p.port = nil + str := p.buffer.String() + + idx := strings.Index(str, "[[") + if idx < 0 { + // Still waiting for the opening '[[' + return len(data), nil } + + str = str[idx+2:] // skip past '[[' + idx = strings.Index(str, "]]") + if idx < 0 { + // Still waiting for the closing ']]' + return len(data), nil + } + + port, err := strconv.Atoi(str[:idx]) + if err != nil { + return 0, err + } + + // Port found. Write it to the chan, and close the chan. + p.port <- port + close(p.port) + p.port = nil + + str = strings.TrimRight(str[idx+2:], " \n") + if len(str) == 0 { + return len(data), nil + } + // Write out trailing text after the ']]' + return p.writer.Write([]byte(str)) + } + + // Port has been found. Just forward the rest of the data to p.writer + return p.writer.Write(data) +} + +// prefixWriter is an io.Writer that prefixes each write with a prefix string +type prefixWriter struct { + prefix string + writer io.Writer + midLine bool +} + +func (p *prefixWriter) Write(data []byte) (int, error) { + lines := strings.Split(string(data), "\n") + buf := strings.Builder{} + for i, line := range lines { + if line == "" && i == len(lines)-1 { + break + } + buf.WriteString(p.prefix) + buf.WriteString(line) + buf.WriteString("\n") + } + if _, err := p.writer.Write([]byte(buf.String())); err != nil { + return 0, err } return len(data), nil } @@ -514,10 +570,11 @@ func (r *runner) runParallelWithServer() error { // Spin up the test runner goroutines wg := &sync.WaitGroup{} for i := 0; i < r.numRunners; i++ { + id := i wg.Add(1) go func() { defer wg.Done() - if err := r.runServer(caseIndices, results); err != nil { + if err := r.runServer(id, caseIndices, results); err != nil { results <- result{ status: fail, error: fmt.Errorf("Test server error: %w", err), @@ -530,18 +587,14 @@ func (r *runner) runParallelWithServer() error { return nil } -type redirectingWriter struct { - io.Writer -} - // runServer starts a test runner server instance, takes case indices from // caseIndices, and requests the server run the test with the given index. // The result of the test run is written to the results chan. // Once the caseIndices chan has been closed, the server is stopped and // runServer returns. -func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error { +func (r *runner) runServer(id int, caseIndices <-chan int, results chan<- result) error { var port int - var rw redirectingWriter + testCaseLog := &bytes.Buffer{} stopServer := func() {} startServer := func() error { @@ -562,13 +615,20 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error ctx := mainCtx cmd := exec.CommandContext(ctx, r.node, args...) - serverLog := &bytes.Buffer{} + writer := io.Writer(testCaseLog) + if r.printStdout { + pw := &prefixWriter{ + prefix: fmt.Sprintf("[%d] ", id), + writer: r.stdout, + } + writer = io.MultiWriter(pw, writer) + } - pl := newPortListener() + pl := newPortListener(writer) cmd.Dir = r.cts - cmd.Stdout = io.MultiWriter(&rw, serverLog, &pl) - cmd.Stderr = io.MultiWriter(&rw, serverLog) + cmd.Stdout = &pl + cmd.Stderr = &pl err := cmd.Start() if err != nil { @@ -577,9 +637,10 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error select { case port = <-pl.port: + return nil // success case <-time.After(time.Second * 10): - return fmt.Errorf("timeout waiting for server port:\n%v", serverLog.String()) - case <-ctx.Done(): + return fmt.Errorf("timeout waiting for server port:\n%v", pl.buffer.String()) + case <-ctx.Done(): // cancelled return ctx.Err() } @@ -594,9 +655,7 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error } for idx := range caseIndices { - // Redirect the server log per test case - caseServerLog := &bytes.Buffer{} - rw.Writer = caseServerLog + testCaseLog.Reset() // Clear the log for this test case if port == 0 { if err := startServer(); err != nil { @@ -631,16 +690,16 @@ func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error switch resp.Status { case "pass": res.status = pass - res.message = resp.Message + caseServerLog.String() + res.message = resp.Message + testCaseLog.String() case "warn": res.status = warn - res.message = resp.Message + caseServerLog.String() + res.message = resp.Message + testCaseLog.String() case "fail": res.status = fail - res.message = resp.Message + caseServerLog.String() + res.message = resp.Message + testCaseLog.String() case "skip": res.status = skip - res.message = resp.Message + caseServerLog.String() + res.message = resp.Message + testCaseLog.String() default: res.status = fail res.error = fmt.Errorf("unknown status: '%v'", resp.Status) @@ -717,7 +776,7 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) { // Helper function for printing a progress bar. lastStatusUpdate, animFrame := time.Now(), 0 updateProgress := func() { - printANSIProgressBar(animFrame, numTests, numByExpectedStatus) + fmt.Fprint(r.stdout, ansiProgressBar(animFrame, numTests, numByExpectedStatus)) animFrame++ lastStatusUpdate = time.Now() } @@ -744,24 +803,32 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) { if r.verbose || res.error != nil || (exStatus.status != pass && exStatus.status != skip && !exStatus.expected) { - fmt.Printf("%v - %v: %v", name, res.status, res.message) + buf := &bytes.Buffer{} + fmt.Fprint(buf, statusColor[res.status]) + if res.message != "" { + fmt.Fprintf(buf, "%v - %v:\n%v", name, res.status, res.message) + } else { + fmt.Fprintf(buf, "%v - %v", name, res.status) + } if expected != "" { - fmt.Printf(" [%v -> %v]", expected, res.status) + fmt.Fprintf(buf, " [%v -> %v]", expected, res.status) } - fmt.Println() + fmt.Fprintln(buf) if res.error != nil { - fmt.Println(res.error) + fmt.Fprintln(buf, res.error) } + fmt.Fprint(buf, ansiReset) + fmt.Fprint(r.stdout, buf.String()) updateProgress() } if time.Since(lastStatusUpdate) > progressUpdateRate { updateProgress() } } - printANSIProgressBar(animFrame, numTests, numByExpectedStatus) + fmt.Fprint(r.stdout, ansiProgressBar(animFrame, numTests, numByExpectedStatus)) // All done. Print final stats. - fmt.Printf("\nCompleted in %v\n", timeTaken) + fmt.Fprintf(r.stdout, "\nCompleted in %v\n", timeTaken) var numExpectedByStatus map[status]int if r.expectations != nil { @@ -788,22 +855,22 @@ func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) { continue } - fmt.Print(bold, statusColor[s]) - fmt.Print(alignRight(strings.ToUpper(string(s))+": ", 10)) - fmt.Print(ansiReset) + fmt.Fprint(r.stdout, bold, statusColor[s]) + fmt.Fprint(r.stdout, alignRight(strings.ToUpper(string(s))+": ", 10)) + fmt.Fprint(r.stdout, ansiReset) if numByStatus > 0 { - fmt.Print(bold) + fmt.Fprint(r.stdout, bold) } - fmt.Print(alignLeft(numByStatus, 10)) - fmt.Print(ansiReset) - fmt.Print(alignRight("("+percentage(numByStatus, numTests)+")", 6)) + fmt.Fprint(r.stdout, alignLeft(numByStatus, 10)) + fmt.Fprint(r.stdout, ansiReset) + fmt.Fprint(r.stdout, alignRight("("+percentage(numByStatus, numTests)+")", 6)) if diffFromExpected != 0 { - fmt.Print(bold, " [") - fmt.Printf("%+d", diffFromExpected) - fmt.Print(ansiReset, "]") + fmt.Fprint(r.stdout, bold, " [") + fmt.Fprintf(r.stdout, "%+d", diffFromExpected) + fmt.Fprint(r.stdout, ansiReset, "]") } - fmt.Println() + fmt.Fprintln(r.stdout) } } @@ -817,10 +884,10 @@ func (r *runner) runSerially(query string) error { timeTaken := time.Since(start) if r.verbose { - fmt.Println(result) + fmt.Fprintln(r.stdout, result) } - fmt.Println("Status:", result.status) - fmt.Println("Completed in", timeTaken) + fmt.Fprintln(r.stdout, "Status:", result.status) + fmt.Fprintln(r.stdout, "Completed in", timeTaken) return nil } @@ -968,10 +1035,10 @@ func alignRight(val interface{}, width int) string { return strings.Repeat(" ", padding) + s } -// printANSIProgressBar prints a colored progress bar, providing realtime -// information about the status of the CTS run. +// ansiProgressBar returns a string with an ANSI-colored progress bar, providing +// realtime information about the status of the CTS run. // Note: We'll want to skip this if !isatty or if we're running on windows. -func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[expectedStatus]int) { +func ansiProgressBar(animFrame int, numTests int, numByExpectedStatus map[expectedStatus]int) string { const barWidth = 50 animSymbols := []rune{'⣾', '⣽', '⣻', '⢿', '⡿', '⣟', '⣯', '⣷'} @@ -979,7 +1046,8 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e numBlocksPrinted := 0 - fmt.Fprint(stdout, string(animSymbols[animFrame%len(animSymbols)]), " [") + buf := &strings.Builder{} + fmt.Fprint(buf, string(animSymbols[animFrame%len(animSymbols)]), " [") animFrame++ numFinished := 0 @@ -995,20 +1063,20 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e numFinished += num statusFrac := float64(num) / float64(numTests) fNumBlocks := barWidth * statusFrac - fmt.Fprint(stdout, color) + fmt.Fprint(buf, color) numBlocks := int(math.Ceil(fNumBlocks)) if expected { if numBlocks > 1 { - fmt.Print(strings.Repeat(string("░"), numBlocks)) + fmt.Fprint(buf, strings.Repeat(string("░"), numBlocks)) } } else { if numBlocks > 1 { - fmt.Print(strings.Repeat(string("▉"), numBlocks)) + fmt.Fprint(buf, strings.Repeat(string("▉"), numBlocks)) } if numBlocks > 0 { frac := fNumBlocks - math.Floor(fNumBlocks) symbol := blockSymbols[int(math.Round(frac*float64(len(blockSymbols)-1)))] - fmt.Print(string(symbol)) + fmt.Fprint(buf, string(symbol)) } } numBlocksPrinted += numBlocks @@ -1016,18 +1084,20 @@ func printANSIProgressBar(animFrame int, numTests int, numByExpectedStatus map[e } if barWidth > numBlocksPrinted { - fmt.Print(strings.Repeat(string(" "), barWidth-numBlocksPrinted)) + fmt.Fprint(buf, strings.Repeat(string(" "), barWidth-numBlocksPrinted)) } - fmt.Fprint(stdout, ansiReset) - fmt.Print("] ", percentage(numFinished, numTests)) + fmt.Fprint(buf, ansiReset) + fmt.Fprint(buf, "] ", percentage(numFinished, numTests)) if colors { // move cursor to start of line so the bar is overridden - fmt.Fprint(stdout, positionLeft) + fmt.Fprint(buf, positionLeft) } else { // cannot move cursor, so newline - fmt.Println() + fmt.Fprintln(buf) } + + return buf.String() } // testcaseStatus is a pair of testcase name and result status @@ -1164,3 +1234,38 @@ func thisDir() string { } return filepath.Dir(file) } + +type muxWriter struct { + data chan []byte + err chan error +} + +// muxWriter returns a thread-safe io.WriteCloser, that writes to w +func newMuxWriter(w io.Writer) *muxWriter { + m := muxWriter{ + data: make(chan []byte, 256), + err: make(chan error, 1), + } + go func() { + defer close(m.err) + for data := range m.data { + _, err := w.Write(data) + if err != nil { + m.err <- err + return + } + } + m.err <- nil + }() + return &m +} + +func (w *muxWriter) Write(data []byte) (n int, err error) { + w.data <- append([]byte{}, data...) + return len(data), nil +} + +func (w *muxWriter) Close() error { + close(w.data) + return <-w.err +}