dawn_node: Use the new CTS server runner

This allows tests to be streamed to N node processes, without incurring the cost of re-scanning the test lists for each case.
It also means that there is an increased chance of state leakage. `--isolate` is a new flag that uses the old behavior of running each test in a separate process.

Depends on: https://github.com/gpuweb/cts/pull/789

Change-Id: Ifc92d1cc07a9de3b2751bed0971f3424d0c247ae
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/66920
Commit-Queue: Ben Clayton <bclayton@google.com>
Reviewed-by: Antonio Maiorano <amaiorano@google.com>
This commit is contained in:
Ben Clayton 2021-10-19 21:27:23 +00:00 committed by Dawn LUCI CQ
parent d2c9cd369d
commit 74635bc6e9
1 changed files with 208 additions and 19 deletions

View File

@ -24,10 +24,13 @@ import (
"fmt"
"io"
"math"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -70,7 +73,7 @@ func run() error {
}
var dawnNode, cts, node, npx, logFilename string
var verbose, build bool
var verbose, isolated, build bool
var numRunners int
flag.StringVar(&dawnNode, "dawn-node", "", "path to dawn.node module")
flag.StringVar(&cts, "cts", "", "root directory of WebGPU CTS")
@ -78,8 +81,9 @@ func run() error {
flag.StringVar(&npx, "npx", "", "path to npx executable")
flag.BoolVar(&verbose, "verbose", false, "print extra information while testing")
flag.BoolVar(&build, "build", true, "attempt to build the CTS before running")
flag.BoolVar(&isolated, "isolate", false, "run each test in an isolated process")
flag.BoolVar(&colors, "colors", colors, "enable / disable colors")
flag.IntVar(&numRunners, "j", runtime.NumCPU(), "number of concurrent runners. 0 runs serially")
flag.IntVar(&numRunners, "j", runtime.NumCPU()/2, "number of concurrent runners. 0 runs serially")
flag.StringVar(&logFilename, "log", "", "path to log file of tests run and result")
flag.Parse()
@ -143,8 +147,9 @@ func run() error {
npx: npx,
dawnNode: dawnNode,
cts: cts,
evalScript: `require('./src/common/tools/setup-ts-in-node.js');
require('./src/common/runtime/cmdline.ts');`,
evalScript: func(main string) string {
return fmt.Sprintf(`require('./src/common/tools/setup-ts-in-node.js');require('./src/common/runtime/%v.ts');`, main)
},
}
if logFilename != "" {
@ -183,7 +188,9 @@ func run() error {
cache.BuildTimestamp = mostRecentSourceChange
}
// Use the prebuilt CTS (instead of using the `setup-ts-in-node` transpiler)
r.evalScript = `require('./out-node/common/runtime/cmdline.js');`
r.evalScript = func(main string) string {
return fmt.Sprintf(`require('./out-node/common/runtime/%v.js');`, main)
}
} else {
fmt.Println("npx not found on PATH. Using runtime TypeScript transpilation (slow)")
}
@ -196,7 +203,10 @@ func run() error {
}
fmt.Printf("Testing %d test cases...\n", len(r.testcases))
return r.runParallel()
if isolated {
return r.runParallelIsolated()
}
return r.runParallelWithServer()
}
fmt.Println("Running serially...")
@ -260,7 +270,7 @@ type runner struct {
numRunners int
verbose bool
node, npx, dawnNode, cts string
evalScript string
evalScript func(string) string
testcases []string
log logger
}
@ -327,7 +337,7 @@ func (r *runner) gatherTestCases(query string, verbose bool) error {
}
args := append([]string{
"-e", r.evalScript,
"-e", r.evalScript("cmdline"),
"--", // Start of arguments
// src/common/runtime/helper/sys.ts expects 'node file.js <args>'
// and slices away the first two arguments. When running with '-e', args
@ -348,10 +358,37 @@ func (r *runner) gatherTestCases(query string, verbose bool) error {
return nil
}
// runParallel() calls the CTS test runner to run each testcase in a separate
// process.
// Up to r.numRunners tests will be run concurrently.
func (r *runner) runParallel() error {
type portListener struct {
buffer strings.Builder
port chan int
}
func newPortListener() portListener {
return portListener{strings.Builder{}, make(chan int)}
}
var portRE = regexp.MustCompile(`\[\[(\d+)\]\]`)
func (p *portListener) Write(data []byte) (n int, err error) {
if p.port != nil {
p.buffer.Write(data)
match := portRE.FindStringSubmatch(p.buffer.String())
if len(match) == 2 {
port, err := strconv.Atoi(match[1])
if err != nil {
return 0, err
}
p.port <- port
close(p.port)
p.port = nil
}
}
return len(data), nil
}
// runParallelWithServer() starts r.numRunners instances of the CTS server test
// runner, and issues test run requests to those servers, concurrently.
func (r *runner) runParallelWithServer() error {
// Create a chan of test indices.
// This will be read by the test runner goroutines.
caseIndices := make(chan int, len(r.testcases))
@ -365,8 +402,145 @@ func (r *runner) runParallel() error {
results := make(chan result, len(r.testcases))
// Spin up the test runner goroutines
start := time.Now()
wg := sync.WaitGroup{}
wg := &sync.WaitGroup{}
for i := 0; i < r.numRunners; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := r.runServer(caseIndices, results); err != nil {
results <- result{
status: fail,
error: fmt.Errorf("Test server error: %w", err),
}
}
}()
}
r.streamResults(wg, results)
return nil
}
// runServer starts a test runner server instance, takes case indices from
// caseIndices, and requests the server run the test with the given index.
// The result of the test run is written to the results chan.
// Once the caseIndices chan has been closed, the server is stopped and
// runServer returns.
func (r *runner) runServer(caseIndices <-chan int, results chan<- result) error {
var port int
stopServer := func() {}
startServer := func() error {
cmd := exec.Command(r.node,
"-e", r.evalScript("server"), // Evaluate 'eval'.
"--",
// src/common/runtime/helper/sys.ts expects 'node file.js <args>'
// and slices away the first two arguments. When running with '-e', args
// start at 1, so just inject a dummy argument.
"dummy-arg",
// Actual arguments begin here
"--gpu-provider", r.dawnNode)
serverLog := &bytes.Buffer{}
pl := newPortListener()
cmd.Dir = r.cts
cmd.Stdout = io.MultiWriter(serverLog, &pl)
cmd.Stderr = serverLog
err := cmd.Start()
if err != nil {
return fmt.Errorf("failed to start test runner server:", err)
}
select {
case port = <-pl.port:
case <-time.After(time.Second * 10):
return fmt.Errorf("timeout waiting for server port:\n%v", serverLog.String())
}
return nil
}
stopServer = func() {
if port > 0 {
go http.Post(fmt.Sprintf("http://localhost:%v/terminate", port), "", &bytes.Buffer{})
time.Sleep(time.Millisecond * 100)
port = 0
}
}
for idx := range caseIndices {
if port == 0 {
if err := startServer(); err != nil {
return err
}
}
res := result{index: idx, testcase: r.testcases[idx]}
type Response struct {
Status string
Message string
}
postResp, err := http.Post(fmt.Sprintf("http://localhost:%v/run?%v", port, r.testcases[idx]), "", &bytes.Buffer{})
if err != nil {
res.error = fmt.Errorf("server POST failure. Restarting server...")
res.status = fail
results <- res
stopServer()
continue
}
var resp Response
if err := json.NewDecoder(postResp.Body).Decode(&resp); err != nil {
res.error = fmt.Errorf("server response decode failure")
res.status = fail
results <- res
continue
}
switch resp.Status {
case "pass":
res.status = pass
res.message = resp.Message
case "warn":
res.status = warn
res.message = resp.Message
case "fail":
res.status = fail
res.message = resp.Message
case "skip":
res.status = skip
res.message = resp.Message
default:
err = fmt.Errorf("unknown status: '%v'", resp.Status)
}
results <- res
}
stopServer()
return nil
}
// runParallelIsolated() calls the CTS command-line test runner to run each
// testcase in a separate process. This reduces possibility of state leakage
// between tests.
// Up to r.numRunners tests will be run concurrently.
func (r *runner) runParallelIsolated() error {
// Create a chan of test indices.
// This will be read by the test runner goroutines.
caseIndices := make(chan int, len(r.testcases))
for i := range r.testcases {
caseIndices <- i
}
close(caseIndices)
// Create a chan for the test results.
// This will be written to by the test runner goroutines.
results := make(chan result, len(r.testcases))
// Spin up the test runner goroutines
wg := &sync.WaitGroup{}
for i := 0; i < r.numRunners; i++ {
wg.Add(1)
go func() {
@ -379,8 +553,19 @@ func (r *runner) runParallel() error {
}()
}
r.streamResults(wg, results)
return nil
}
// streamResults reads from the chan 'results', printing the results in test-id
// sequential order. Once the WaitGroup 'wg' is complete, streamResults() will
// automatically close the 'results' chan.
// Once all the results have been printed, a summary will be printed and the
// function will return.
func (r *runner) streamResults(wg *sync.WaitGroup, results chan result) {
// Create another goroutine to close the results chan when all the runner
// goroutines have finished.
start := time.Now()
var timeTaken time.Duration
go func() {
wg.Wait()
@ -413,8 +598,11 @@ func (r *runner) runParallel() error {
numByStatus[res.status] = numByStatus[res.status] + 1
name := res.testcase
if r.verbose || (res.status != pass && res.status != skip) {
if r.verbose || res.error != nil || (res.status != pass && res.status != skip) {
fmt.Printf("%v - %v: %v\n", name, res.status, res.message)
if res.error != nil {
fmt.Println(res.error)
}
updateProgress()
}
if time.Since(lastStatusUpdate) > progressUpdateRate {
@ -438,7 +626,6 @@ timeout: %v (%v)
numByStatus[skip], percentage(numByStatus[skip], numTests),
numByStatus[timeout], percentage(numByStatus[timeout], numTests),
)
return nil
}
// runSerially() calls the CTS test runner to run the test query in a single
@ -458,6 +645,7 @@ type status string
const (
pass status = "pass"
warn status = "warn"
fail status = "fail"
skip status = "skip"
timeout status = "timeout"
@ -478,9 +666,8 @@ func (r *runner) runTestcase(query string, printToStout bool) result {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
eval := r.evalScript
args := append([]string{
"-e", eval, // Evaluate 'eval'.
"-e", r.evalScript("cmdline"), // Evaluate 'eval'.
"--",
// src/common/runtime/helper/sys.ts expects 'node file.js <args>'
// and slices away the first two arguments. When running with '-e', args
@ -510,6 +697,8 @@ func (r *runner) runTestcase(query string, printToStout bool) result {
return result{testcase: query, status: timeout, message: msg}
case strings.Contains(msg, "[fail]"):
return result{testcase: query, status: fail, message: msg}
case strings.Contains(msg, "[warn]"):
return result{testcase: query, status: warn, message: msg}
case strings.Contains(msg, "[skip]"):
return result{testcase: query, status: skip, message: msg}
case strings.Contains(msg, "[pass]"), err == nil:
@ -588,7 +777,7 @@ func printANSIProgressBar(animFrame int, numTests int, numByStatus map[status]in
for _, ty := range []struct {
status status
color string
}{{pass, green}, {skip, blue}, {timeout, yellow}, {fail, red}} {
}{{pass, green}, {warn, yellow}, {skip, blue}, {timeout, yellow}, {fail, red}} {
num := numByStatus[ty.status]
numFinished += num
statusFrac := float64(num) / float64(numTests)