Send test heartbeats from the CTS runner

Heartbeats are rate-limited to 1 every 500ms.
They are sent at the following checkpoints:
 - Before a log is recorded in TestCaseRecorder
 - Just before an async WebGPU operation
 - Immediately following completion of a async WebGPU op
 - After a then/catch handler waiting on an async WebGPU op

Bug: chromium:1340602
Change-Id: I4d5c864dabd3f4215dac2e78d7658df0f6a7b0b7
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/99360
Commit-Queue: Austin Eng <enga@chromium.org>
Reviewed-by: Kai Ninomiya <kainino@chromium.org>
Kokoro: Kokoro <noreply+kokoro@google.com>
This commit is contained in:
Austin Eng 2022-08-18 22:49:10 +00:00 committed by Dawn LUCI CQ
parent 6a314e49e1
commit caa9baed54
1 changed files with 131 additions and 14 deletions

View File

@ -25,6 +25,65 @@ const LOGS_MAX_BYTES = 72000;
var socket; var socket;
// Returns a wrapper around `fn` which gets called at most once every `intervalMs`.
// If the wrapper is called when `fn` was called too recently, `fn` is scheduled to
// be called later in the future after the interval passes.
// Returns [ wrappedFn, {start, stop}] where wrappedFn is the rate-limited function,
// and start/stop control whether or not the function is enabled. If it is stopped, calls
// to the fn will no-op. If it is started, calls will be rate-limited, starting from
// the time `start` is called.
function rateLimited(fn, intervalMs) {
let last = undefined;
let timer = undefined;
const wrappedFn = (...args) => {
if (timer !== undefined || last === undefined) {
// If there is already a fn call scheduled, or the function is
// not enabled, return.
return;
}
// Get the current time as a number.
const now = +new Date();
const diff = now - last;
if (diff >= intervalMs) {
// Clear the timer, if there was one. This could happen if a timer
// is scheduled, but it never runs due to long-running synchronous
// code.
if (timer) {
clearTimeout(timer);
timer = undefined;
}
// Call the function.
last = now;
fn(...args);
} else if (timer === undefined) {
// Otherwise, we have called `fn` too recently.
// Schedule a future call.
timer = setTimeout(() => {
// Clear the timer to indicate nothing is scheduled.
timer = undefined;
last = +new Date();
fn(...args);
}, intervalMs - diff + 1);
}
};
return [
wrappedFn,
{
start: () => {
last = +new Date();
},
stop: () => {
last = undefined;
if (timer) {
clearTimeout(timer);
timer = undefined;
}
},
}
];
}
function byteSize(s) { function byteSize(s) {
return new Blob([s]).size; return new Blob([s]).size;
} }
@ -39,6 +98,57 @@ async function runCtsTestViaSocket(event) {
runCtsTest(input['q'], input['w']); runCtsTest(input['q'], input['w']);
} }
// Make a rate-limited version `sendMessageTestHeartbeat` that executes
// at most once every 500 ms.
const [sendHeartbeat, {
start: beginHeartbeatScope,
stop: endHeartbeatScope
}] = rateLimited(sendMessageTestHeartbeat, 500);
function wrapPromiseWithHeartbeat(prototype, key) {
const old = prototype[key];
prototype[key] = function (...args) {
return new Promise((resolve, reject) => {
// Send the heartbeat both before and after resolve/reject
// so that the heartbeat is sent ahead of any potentially
// long-running synchronous code awaiting the Promise.
old.call(this, ...args)
.then(val => { sendHeartbeat(); resolve(val) })
.catch(err => { sendHeartbeat(); reject(err) })
.finally(sendHeartbeat);
});
}
}
wrapPromiseWithHeartbeat(GPU.prototype, 'requestAdapter');
wrapPromiseWithHeartbeat(GPUAdapter.prototype, 'requestAdapterInfo');
wrapPromiseWithHeartbeat(GPUAdapter.prototype, 'requestDevice');
wrapPromiseWithHeartbeat(GPUDevice.prototype, 'createRenderPipelineAsync');
wrapPromiseWithHeartbeat(GPUDevice.prototype, 'createComputePipelineAsync');
wrapPromiseWithHeartbeat(GPUDevice.prototype, 'popErrorScope');
wrapPromiseWithHeartbeat(GPUQueue.prototype, 'onSubmittedWorkDone');
wrapPromiseWithHeartbeat(GPUBuffer.prototype, 'mapAsync');
wrapPromiseWithHeartbeat(GPUShaderModule.prototype, 'compilationInfo');
// Make a wrapper around TestCaseRecorder that sends a heartbeat before any
// recording operations.
function makeRecorderWithHeartbeat(rec) {
return new Proxy(rec, {
// Create a wrapper around all methods of the TestCaseRecorder.
get(target, prop, receiver) {
const orig = Reflect.get(target, prop, receiver);
if (typeof orig !== 'function') {
// Return the original property if it is not a function.
return orig;
}
return (...args) => {
sendHeartbeat();
return orig.call(receiver, ...args)
}
}
});
}
async function runCtsTest(query, use_worker) { async function runCtsTest(query, use_worker) {
const workerEnabled = use_worker; const workerEnabled = use_worker;
const worker = workerEnabled ? new TestWorker(false) : undefined; const worker = workerEnabled ? new TestWorker(false) : undefined;
@ -57,18 +167,18 @@ async function runCtsTest(query, use_worker) {
const wpt_fn = async () => { const wpt_fn = async () => {
sendMessageTestStarted(); sendMessageTestStarted();
const [rec, res] = log.record(name); const [rec, res] = log.record(name);
const recWithHeartbeat = makeRecorderWithHeartbeat(rec);
beginHeartbeatScope();
if (worker) { if (worker) {
await worker.run(rec, name, expectations); await worker.run(recWithHeartbeat, name, expectations);
} else { } else {
await testcase.run(rec, expectations); await testcase.run(recWithHeartbeat, expectations);
} }
endHeartbeatScope();
sendMessageTestStatus(res.status, res.timems); sendMessageTestStatus(res.status, res.timems);
sendMessageTestLog(res.logs);
let fullLogs = (res.logs ?? []).map(prettyPrintLog);
fullLogs = fullLogs.join('\n\n\n');
let logPieces = splitLogsForPayload(fullLogs);
sendMessageTestLog(logPieces);
sendMessageTestFinished(); sendMessageTestFinished();
}; };
await wpt_fn(); await wpt_fn();
@ -100,7 +210,11 @@ function splitLogsForPayload(fullLogs) {
} }
function sendMessageTestStarted() { function sendMessageTestStarted() {
socket.send(JSON.stringify({'type': 'TEST_STARTED'})); socket.send('{"type":"TEST_STARTED"}');
}
function sendMessageTestHeartbeat() {
socket.send('{"type":"TEST_HEARTBEAT"}');
} }
function sendMessageTestStatus(status, jsDurationMs) { function sendMessageTestStatus(status, jsDurationMs) {
@ -109,15 +223,18 @@ function sendMessageTestStatus(status, jsDurationMs) {
'js_duration_ms': jsDurationMs})); 'js_duration_ms': jsDurationMs}));
} }
function sendMessageTestLog(logPieces) { function sendMessageTestLog(logs) {
logPieces.forEach((piece) => { splitLogsForPayload((logs ?? []).map(prettyPrintLog).join('\n\n'))
socket.send(JSON.stringify({'type': 'TEST_LOG', .forEach((piece) => {
'log': piece})); socket.send(JSON.stringify({
'type': 'TEST_LOG',
'log': piece
}));
}); });
} }
function sendMessageTestFinished() { function sendMessageTestFinished() {
socket.send(JSON.stringify({'type': 'TEST_FINISHED'})); socket.send('{"type":"TEST_FINISHED"}');
} }
window.runCtsTest = runCtsTest; window.runCtsTest = runCtsTest;