Implement async (overlapped) I/O with io_uring

This commit is contained in:
2025-10-06 23:51:09 -06:00
parent f366e77956
commit df36de18bf
21 changed files with 909 additions and 143 deletions

View File

@@ -7,9 +7,12 @@
#include "internal.h"
#include "strutil.h"
#include <algorithm>
#include <chrono>
#include <cstring>
#include <limits>
#include <mutex>
#include <optional>
#include <pthread.h>
#include <string>
#include <sys/wait.h>
@@ -34,6 +37,93 @@ void makeWideNameFromAnsi(LPCSTR ansiName, std::vector<uint16_t> &outWide) {
outWide = stringToWideString(ansiName);
}
struct WaitBlock {
explicit WaitBlock(bool waitAllIn, DWORD count) : waitAll(waitAllIn != FALSE), satisfied(count, false) {}
static void notify(void *context, WaitableObject *obj, DWORD index, bool abandoned) {
auto *self = static_cast<WaitBlock *>(context);
if (self) {
self->handleSignal(obj, index, abandoned, true);
}
}
void noteInitial(WaitableObject *obj, DWORD index, bool abandoned) { handleSignal(obj, index, abandoned, false); }
bool isCompleted(DWORD &outResult) {
std::lock_guard lk(mutex);
if (!completed) {
return false;
}
outResult = result;
return true;
}
bool waitUntil(const std::optional<std::chrono::steady_clock::time_point> &deadline, DWORD &outResult) {
std::unique_lock lk(mutex);
if (!completed) {
if (deadline) {
if (!cv.wait_until(lk, *deadline, [&] { return completed; })) {
return false;
}
} else {
cv.wait(lk, [&] { return completed; });
}
}
outResult = result;
return true;
}
void handleSignal(WaitableObject *obj, DWORD index, bool abandoned, bool fromWaiter) {
if (!obj) {
return;
}
bool notify = false;
{
std::lock_guard lk(mutex);
if (index >= satisfied.size()) {
return;
}
if (satisfied[index]) {
// Already satisfied; nothing to do aside from cleanup below.
} else if (!completed) {
satisfied[index] = true;
if (waitAll) {
if (abandoned) {
result = WAIT_ABANDONED + index;
completed = true;
notify = true;
} else if (std::all_of(satisfied.begin(), satisfied.end(), [](bool v) { return v; })) {
result = WAIT_OBJECT_0;
completed = true;
notify = true;
}
} else {
result = abandoned ? (WAIT_ABANDONED + index) : (WAIT_OBJECT_0 + index);
completed = true;
notify = true;
}
}
}
// Always unregister once we've observed a signal for this waiter.
if (fromWaiter) {
obj->unregisterWaiter(this);
} else if (!waitAll || satisfied[index]) {
// Initial state satisfaction can drop registration immediately.
obj->unregisterWaiter(this);
}
if (notify) {
cv.notify_all();
}
}
const bool waitAll;
std::vector<bool> satisfied;
bool completed = false;
DWORD result = WAIT_TIMEOUT;
std::mutex mutex;
std::condition_variable cv;
};
} // namespace
namespace kernel32 {
@@ -61,7 +151,7 @@ HANDLE WIN_FUNC CreateMutexW(LPSECURITY_ATTRIBUTES lpMutexAttributes, BOOL bInit
mu->owner = pthread_self();
mu->ownerValid = true;
mu->recursionCount = 1;
mu->signaled.store(false, std::memory_order_release);
mu->signaled = false;
}
return mu;
});
@@ -102,12 +192,13 @@ BOOL WIN_FUNC ReleaseMutex(HANDLE hMutex) {
}
if (--mu->recursionCount == 0) {
mu->ownerValid = false;
mu->signaled.store(true, std::memory_order_release);
mu->signaled = true;
notify = true;
}
}
if (notify) {
mu->cv.notify_one();
mu->notifyWaiters(false);
}
return TRUE;
}
@@ -125,7 +216,7 @@ HANDLE WIN_FUNC CreateEventW(LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManu
}
auto [ev, created] = wibo::g_namespace.getOrCreate(name, [&]() {
auto e = new EventObject(bManualReset);
e->signaled.store(bInitialState, std::memory_order_relaxed);
e->signaled = bInitialState;
return e;
});
if (!ev) {
@@ -200,6 +291,7 @@ BOOL WIN_FUNC ReleaseSemaphore(HANDLE hSemaphore, LONG lReleaseCount, PLONG lpPr
}
LONG prev = 0;
bool shouldNotifyWaitBlocks = false;
{
std::lock_guard lk(sem->m);
if (lpPreviousCount) {
@@ -210,11 +302,15 @@ BOOL WIN_FUNC ReleaseSemaphore(HANDLE hSemaphore, LONG lReleaseCount, PLONG lpPr
return FALSE;
}
sem->count += lReleaseCount;
sem->signaled.store(sem->count > 0, std::memory_order_release);
sem->signaled = sem->count > 0;
shouldNotifyWaitBlocks = sem->count > 0;
}
for (LONG i = 0; i < lReleaseCount; ++i) {
sem->cv.notify_one();
}
if (shouldNotifyWaitBlocks) {
sem->notifyWaiters(false);
}
if (lpPreviousCount) {
*lpPreviousCount = prev;
@@ -279,12 +375,12 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
case ObjectType::Event: {
auto ev = std::move(obj).downcast<EventObject>();
std::unique_lock lk(ev->m);
bool ok = doWait(lk, ev->cv, [&] { return ev->signaled.load(std::memory_order_acquire); });
bool ok = doWait(lk, ev->cv, [&] { return ev->signaled; });
if (!ok) {
return WAIT_TIMEOUT;
}
if (!ev->manualReset) {
ev->signaled.store(false, std::memory_order_release);
ev->signaled = false;
}
return WAIT_OBJECT_0;
}
@@ -297,7 +393,7 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
}
--sem->count;
if (sem->count == 0) {
sem->signaled.store(false, std::memory_order_release);
sem->signaled = false;
}
return WAIT_OBJECT_0;
}
@@ -322,7 +418,7 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
mu->owner = self;
mu->ownerValid = true;
mu->recursionCount = 1;
mu->signaled.store(false, std::memory_order_release);
mu->signaled = false;
return ret;
}
case ObjectType::Thread: {
@@ -333,7 +429,7 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
// Windows actually allows you to wait on your own thread, but why bother?
return WAIT_TIMEOUT;
}
bool ok = doWait(lk, th->cv, [&] { return th->signaled.load(std::memory_order_acquire); });
bool ok = doWait(lk, th->cv, [&] { return th->signaled; });
return ok ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
}
case ObjectType::Process: {
@@ -343,7 +439,7 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
// Windows actually allows you to wait on your own process, but why bother?
return WAIT_TIMEOUT;
}
bool ok = doWait(lk, po->cv, [&] { return po->signaled.load(std::memory_order_acquire); });
bool ok = doWait(lk, po->cv, [&] { return po->signaled; });
return ok ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
}
default:
@@ -352,6 +448,101 @@ DWORD WIN_FUNC WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) {
}
}
DWORD WIN_FUNC WaitForMultipleObjects(DWORD nCount, const HANDLE *lpHandles, BOOL bWaitAll, DWORD dwMilliseconds) {
HOST_CONTEXT_GUARD();
DEBUG_LOG("WaitForMultipleObjects(%u, %p, %d, %u)\n", nCount, lpHandles, static_cast<int>(bWaitAll),
dwMilliseconds);
if (nCount == 0 || nCount > MAXIMUM_WAIT_OBJECTS || !lpHandles) {
wibo::lastError = ERROR_INVALID_PARAMETER;
return WAIT_FAILED;
}
std::vector<Pin<WaitableObject>> objects(nCount);
for (DWORD i = 0; i < nCount; ++i) {
HandleMeta meta{};
auto obj = wibo::handles().getAs<WaitableObject>(lpHandles[i], &meta);
if (!obj) {
wibo::lastError = ERROR_INVALID_HANDLE;
return WAIT_FAILED;
}
objects[i] = std::move(obj);
}
WaitBlock block(bWaitAll, nCount);
for (DWORD i = 0; i < objects.size(); ++i) {
objects[i]->registerWaiter(&block, i, &WaitBlock::notify);
}
for (DWORD i = 0; i < objects.size(); ++i) {
auto *obj = objects[i].get();
bool isSignaled = obj->signaled;
bool isAbandoned = false;
if (auto *mu = detail::castTo<MutexObject>(obj)) {
isAbandoned = mu->abandoned;
}
if (isSignaled) {
block.noteInitial(obj, i, isAbandoned);
}
}
DWORD waitResult = WAIT_TIMEOUT;
if (!block.isCompleted(waitResult)) {
if (dwMilliseconds == 0) {
waitResult = WAIT_TIMEOUT;
} else {
std::optional<std::chrono::steady_clock::time_point> deadline;
if (dwMilliseconds != INFINITE) {
deadline =
std::chrono::steady_clock::now() + std::chrono::milliseconds(static_cast<uint64_t>(dwMilliseconds));
}
DWORD signaledResult = WAIT_TIMEOUT;
bool completed = block.waitUntil(deadline, signaledResult);
if (completed) {
waitResult = signaledResult;
} else {
waitResult = WAIT_TIMEOUT;
}
}
}
for (const auto &object : objects) {
object->unregisterWaiter(&block);
}
if (waitResult == WAIT_TIMEOUT) {
return WAIT_TIMEOUT;
}
if (waitResult == WAIT_FAILED) {
return WAIT_FAILED;
}
auto consume = [&](DWORD index) {
if (index < nCount) {
WaitForSingleObject(lpHandles[index], 0);
}
};
if (bWaitAll) {
if (waitResult == WAIT_OBJECT_0) {
for (DWORD i = 0; i < nCount; ++i) {
consume(i);
}
} else if (waitResult >= WAIT_ABANDONED && waitResult < WAIT_ABANDONED + nCount) {
consume(waitResult - WAIT_ABANDONED);
}
} else {
if (waitResult >= WAIT_OBJECT_0 && waitResult < WAIT_OBJECT_0 + nCount) {
consume(waitResult - WAIT_OBJECT_0);
} else if (waitResult >= WAIT_ABANDONED && waitResult < WAIT_ABANDONED + nCount) {
consume(waitResult - WAIT_ABANDONED);
}
}
return waitResult;
}
void WIN_FUNC InitializeCriticalSection(LPCRITICAL_SECTION lpCriticalSection) {
HOST_CONTEXT_GUARD();
VERBOSE_LOG("STUB: InitializeCriticalSection(%p)\n", lpCriticalSection);