Implement Win32 AsyncIO

This commit is contained in:
Jack Andersen 2018-02-06 15:36:51 -10:00
parent bdf4bd07a8
commit 3b10be80f7
4 changed files with 229 additions and 54 deletions

View File

@ -6,7 +6,7 @@
using SizeReturn = ssize_t; using SizeReturn = ssize_t;
#else #else
#include <windows.h> #include <windows.h>
using SizeReturn = SSIZE_T; using SizeReturn = DWORD;
#endif #endif
#include "Util.hpp" #include "Util.hpp"
@ -21,7 +21,10 @@ class AsyncIO
int m_fd = -1; int m_fd = -1;
std::vector<std::pair<struct aiocb, SizeReturn>> m_queue; std::vector<std::pair<struct aiocb, SizeReturn>> m_queue;
#else #else
HANDLE m_fh = INVALID_HANDLE_VALUE;
std::vector<std::pair<OVERLAPPED, SizeReturn>> m_queue;
#endif #endif
void _waitForOperation(size_t qIdx) const;
size_t m_maxBlock = 0; size_t m_maxBlock = 0;
public: public:
AsyncIO() = default; AsyncIO() = default;
@ -32,14 +35,16 @@ public:
AsyncIO(const AsyncIO* other) = delete; AsyncIO(const AsyncIO* other) = delete;
AsyncIO& operator=(const AsyncIO& other) = delete; AsyncIO& operator=(const AsyncIO& other) = delete;
void resizeQueue(size_t queueSz) { m_queue.resize(queueSz); } void resizeQueue(size_t queueSz) { m_queue.resize(queueSz); }
SizeReturn syncRead(void* buf, size_t length, off_t offset);
bool asyncRead(size_t qIdx, void* buf, size_t length, off_t offset); bool asyncRead(size_t qIdx, void* buf, size_t length, off_t offset);
SizeReturn syncWrite(const void* buf, size_t length, off_t offset);
bool asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offset); bool asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offset);
ECardResult pollStatus(size_t qIdx, SizeReturn* szRet = nullptr) const; ECardResult pollStatus(size_t qIdx, SizeReturn* szRet = nullptr) const;
ECardResult pollStatus() const; ECardResult pollStatus() const;
void waitForCompletion() const; void waitForCompletion() const;
#ifndef _WIN32
operator bool() const { return m_fd != -1; } operator bool() const { return m_fd != -1; }
#else
operator bool() const { return m_fh != INVALID_HANDLE_VALUE; }
#endif
}; };
} }

View File

@ -39,10 +39,17 @@ AsyncIO& AsyncIO::operator=(AsyncIO&& other)
return *this; return *this;
} }
SizeReturn AsyncIO::syncRead(void* buf, size_t length, off_t offset) void AsyncIO::_waitForOperation(size_t qIdx) const
{ {
lseek(m_fd, offset, SEEK_SET); auto& aio = const_cast<AsyncIO*>(this)->m_queue[qIdx];
return read(m_fd, buf, length); if (aio.first.aio_fildes == 0)
return;
const struct aiocb* aiop = &aio.first;
struct timespec ts = {2, 0};
while (aio_suspend(&aiop, 1, &ts) && errno == EINTR) {}
if (aio_error(&aio.first) != EINPROGRESS)
aio.second = aio_return(&aio.first);
aio.first.aio_fildes = 0;
} }
bool AsyncIO::asyncRead(size_t qIdx, void* buf, size_t length, off_t offset) bool AsyncIO::asyncRead(size_t qIdx, void* buf, size_t length, off_t offset)
@ -53,11 +60,7 @@ bool AsyncIO::asyncRead(size_t qIdx, void* buf, size_t length, off_t offset)
#ifndef NDEBUG #ifndef NDEBUG
fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n"); fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n");
#endif #endif
const struct aiocb* aiop = &aio; _waitForOperation(qIdx);
struct timespec ts = {2, 0};
while (aio_suspend(&aiop, 1, &ts) && errno == EINTR) {}
if (aio_error(&aio) == 0)
aio_return(&aio);
} }
memset(&aio, 0, sizeof(struct aiocb)); memset(&aio, 0, sizeof(struct aiocb));
aio.aio_fildes = m_fd; aio.aio_fildes = m_fd;
@ -68,12 +71,6 @@ bool AsyncIO::asyncRead(size_t qIdx, void* buf, size_t length, off_t offset)
return aio_read(&aio) == 0; return aio_read(&aio) == 0;
} }
SizeReturn AsyncIO::syncWrite(const void* buf, size_t length, off_t offset)
{
lseek(m_fd, offset, SEEK_SET);
return write(m_fd, buf, length);
}
bool AsyncIO::asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offset) bool AsyncIO::asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offset)
{ {
struct aiocb& aio = m_queue[qIdx].first; struct aiocb& aio = m_queue[qIdx].first;
@ -82,11 +79,7 @@ bool AsyncIO::asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offs
#ifndef NDEBUG #ifndef NDEBUG
fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n"); fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n");
#endif #endif
const struct aiocb* aiop = &aio; _waitForOperation(qIdx);
struct timespec ts = {2, 0};
while (aio_suspend(&aiop, 1, &ts) && errno == EINTR) {}
if (aio_error(&aio) == 0)
aio_return(&aio);
} }
memset(&aio, 0, sizeof(struct aiocb)); memset(&aio, 0, sizeof(struct aiocb));
aio.aio_fildes = m_fd; aio.aio_fildes = m_fd;
@ -117,6 +110,10 @@ ECardResult AsyncIO::pollStatus(size_t qIdx, SizeReturn* szRet) const
case EINPROGRESS: case EINPROGRESS:
return ECardResult::BUSY; return ECardResult::BUSY;
default: default:
aio.second = aio_return(&aio.first);
aio.first.aio_fildes = 0;
if (szRet)
*szRet = aio.second;
return ECardResult::IOERROR; return ECardResult::IOERROR;
} }
} }
@ -142,6 +139,8 @@ ECardResult AsyncIO::pollStatus() const
result = ECardResult::BUSY; result = ECardResult::BUSY;
break; break;
default: default:
aio.second = aio_return(&aio.first);
aio.first.aio_fildes = 0;
if (result > ECardResult::IOERROR) if (result > ECardResult::IOERROR)
result = ECardResult::IOERROR; result = ECardResult::IOERROR;
break; break;
@ -154,30 +153,9 @@ ECardResult AsyncIO::pollStatus() const
void AsyncIO::waitForCompletion() const void AsyncIO::waitForCompletion() const
{ {
for (auto it = const_cast<AsyncIO*>(this)->m_queue.begin(); for (size_t i=0 ; i<m_maxBlock ; ++i)
it != const_cast<AsyncIO*>(this)->m_queue.begin() + m_maxBlock; _waitForOperation(i);
++it) const_cast<AsyncIO*>(this)->m_maxBlock = 0;
{
auto& aio = *it;
if (aio.first.aio_fildes == 0)
continue;
switch (aio_error(&aio.first))
{
case 0:
aio.second = aio_return(&aio.first);
aio.first.aio_fildes = 0;
break;
case EINPROGRESS:
{
const struct aiocb* aiop = &aio.first;
struct timespec ts = {2, 0};
while (aio_suspend(&aiop, 1, &ts) && errno == EINTR) {}
break;
}
default:
break;
}
}
} }
} }

View File

@ -1,4 +1,191 @@
// #include "kabufuda/AsyncIO.hpp"
// Created by Jack Andersen on 2/5/18.
//
namespace kabufuda
{
#undef min
#undef max
static void ResetOverlapped(OVERLAPPED& aio, DWORD offset = 0)
{
aio.Internal = 0;
aio.InternalHigh = 0;
aio.Offset = offset;
aio.OffsetHigh = 0;
}
AsyncIO::AsyncIO(SystemStringView filename, bool truncate)
{
#if WINDOWS_STORE
CREATEFILE2_EXTENDED_PARAMETERS parms = {};
parms.dwSize = sizeof(CREATEFILE2_EXTENDED_PARAMETERS);
parms.dwFileAttributes = FILE_ATTRIBUTE_NORMAL;
parms.dwFileFlags = FILE_FLAG_OVERLAPPED;
m_fh = CreateFile2(filename.data(), GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
truncate ? CREATE_ALWAYS : OPEN_ALWAYS, &parms);
#else
m_fh = CreateFileW(filename.data(), GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
nullptr, truncate ? CREATE_ALWAYS : OPEN_ALWAYS,
FILE_FLAG_OVERLAPPED | FILE_ATTRIBUTE_NORMAL, nullptr);
#endif
}
AsyncIO::~AsyncIO()
{
if (*this)
{
if (CancelIoEx(m_fh, nullptr))
waitForCompletion();
CloseHandle(m_fh);
}
}
AsyncIO::AsyncIO(AsyncIO&& other)
{
m_fh = other.m_fh;
other.m_fh = INVALID_HANDLE_VALUE;
m_queue = std::move(other.m_queue);
m_maxBlock = other.m_maxBlock;
}
AsyncIO& AsyncIO::operator=(AsyncIO&& other)
{
if (*this)
{
if (CancelIoEx(m_fh, nullptr))
waitForCompletion();
CloseHandle(m_fh);
}
m_fh = other.m_fh;
other.m_fh = INVALID_HANDLE_VALUE;
m_queue = std::move(other.m_queue);
m_maxBlock = other.m_maxBlock;
return *this;
}
void AsyncIO::_waitForOperation(size_t qIdx) const
{
auto& aio = const_cast<AsyncIO*>(this)->m_queue[qIdx];
if (aio.first.hEvent == 0)
return;
GetOverlappedResult(m_fh, &aio.first, &aio.second, TRUE);
CloseHandle(aio.first.hEvent);
aio.first.hEvent = 0;
}
bool AsyncIO::asyncRead(size_t qIdx, void* buf, size_t length, off_t offset)
{
OVERLAPPED& aio = m_queue[qIdx].first;
if (aio.hEvent)
{
#ifndef NDEBUG
fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n");
#endif
_waitForOperation(qIdx);
}
else
{
aio.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr);
}
ResetOverlapped(aio, DWORD(offset));
m_maxBlock = std::max(m_maxBlock, qIdx + 1);
BOOL res = ReadFile(m_fh, buf, length, nullptr, &aio);
return res == TRUE || GetLastError() == ERROR_IO_PENDING;
}
bool AsyncIO::asyncWrite(size_t qIdx, const void* buf, size_t length, off_t offset)
{
OVERLAPPED& aio = m_queue[qIdx].first;
if (aio.hEvent)
{
#ifndef NDEBUG
fprintf(stderr, "WARNING: synchronous kabufuda fallback, check access polling\n");
#endif
_waitForOperation(qIdx);
}
else
{
aio.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr);
}
ResetOverlapped(aio, DWORD(offset));
m_maxBlock = std::max(m_maxBlock, qIdx + 1);
BOOL res = WriteFile(m_fh, buf, length, nullptr, &aio);
return res == TRUE || GetLastError() == ERROR_IO_PENDING;
}
ECardResult AsyncIO::pollStatus(size_t qIdx, SizeReturn* szRet) const
{
auto& aio = const_cast<AsyncIO*>(this)->m_queue[qIdx];
if (aio.first.hEvent == 0)
{
if (szRet)
*szRet = aio.second;
return ECardResult::READY;
}
if (GetOverlappedResult(m_fh, &aio.first, &aio.second, FALSE))
{
CloseHandle(aio.first.hEvent);
aio.first.hEvent = 0;
if (szRet)
*szRet = aio.second;
return ECardResult::READY;
}
else
{
if (GetLastError() == ERROR_IO_INCOMPLETE)
{
return ECardResult::BUSY;
}
else
{
_waitForOperation(qIdx);
return ECardResult::IOERROR;
}
}
}
ECardResult AsyncIO::pollStatus() const
{
ECardResult result = ECardResult::READY;
for (auto it = const_cast<AsyncIO*>(this)->m_queue.begin();
it != const_cast<AsyncIO*>(this)->m_queue.begin() + m_maxBlock;
++it)
{
auto& aio = *it;
if (aio.first.hEvent == 0)
continue;
if (GetOverlappedResult(m_fh, &aio.first, &aio.second, FALSE))
{
CloseHandle(aio.first.hEvent);
aio.first.hEvent = 0;
}
else
{
if (GetLastError() == ERROR_IO_INCOMPLETE)
{
if (result > ECardResult::BUSY)
result = ECardResult::BUSY;
}
else
{
_waitForOperation(it - m_queue.cbegin());
if (result > ECardResult::IOERROR)
result = ECardResult::IOERROR;
}
}
}
if (result == ECardResult::READY)
const_cast<AsyncIO*>(this)->m_maxBlock = 0;
return result;
}
void AsyncIO::waitForCompletion() const
{
for (size_t i=0 ; i<m_maxBlock ; ++i)
_waitForOperation(i);
const_cast<AsyncIO*>(this)->m_maxBlock = 0;
}
}

View File

@ -934,11 +934,16 @@ bool Card::open(SystemStringView filepath)
if (m_fileHandle) if (m_fileHandle)
{ {
m_fileHandle.resizeQueue(5); m_fileHandle.resizeQueue(5);
m_fileHandle.asyncRead(0, __raw, BlockSize, 0); if (!m_fileHandle.asyncRead(0, __raw, BlockSize, 0))
m_fileHandle.asyncRead(1, m_dirs[0].__raw, BlockSize, BlockSize * 1); return false;
m_fileHandle.asyncRead(2, m_dirs[1].__raw, BlockSize, BlockSize * 2); if (!m_fileHandle.asyncRead(1, m_dirs[0].__raw, BlockSize, BlockSize * 1))
m_fileHandle.asyncRead(3, m_bats[0].__raw, BlockSize, BlockSize * 3); return false;
m_fileHandle.asyncRead(4, m_bats[1].__raw, BlockSize, BlockSize * 4); if (!m_fileHandle.asyncRead(2, m_dirs[1].__raw, BlockSize, BlockSize * 2))
return false;
if (!m_fileHandle.asyncRead(3, m_bats[0].__raw, BlockSize, BlockSize * 3))
return false;
if (!m_fileHandle.asyncRead(4, m_bats[1].__raw, BlockSize, BlockSize * 4))
return false;
return true; return true;
} }
return false; return false;