From 42f1b866275a6d79361a081a532b7fa7b50b7944 Mon Sep 17 00:00:00 2001 From: Jack Andersen Date: Sat, 7 Jan 2017 11:19:25 -1000 Subject: [PATCH] Use blocking I/O for transfer thread --- include/jbus/Endpoint.hpp | 16 +-- include/jbus/Socket.hpp | 2 +- lib/Common.cpp | 14 ++- lib/Endpoint.cpp | 207 +++++++++++++++----------------------- lib/Listener.cpp | 5 +- 5 files changed, 100 insertions(+), 144 deletions(-) diff --git a/include/jbus/Endpoint.hpp b/include/jbus/Endpoint.hpp index dc426a6..e1d2031 100644 --- a/include/jbus/Endpoint.hpp +++ b/include/jbus/Endpoint.hpp @@ -132,13 +132,6 @@ class Endpoint CMD_WRITE = 0x15 }; - enum class EWaitResp - { - NoWait = 0, - WaitCmd, - WaitIdle - }; - static const u64 BITS_PER_SECOND = 115200; static const u64 BYTES_PER_SECOND = BITS_PER_SECOND / 8; @@ -147,16 +140,13 @@ class Endpoint std::thread m_transferThread; std::mutex m_syncLock; std::condition_variable m_syncCv; + std::condition_variable m_issueCv; std::experimental::optional m_joyBoot; FGBACallback m_callback; - EWaitResp m_waitingResp = EWaitResp::NoWait; - size_t m_dataReceivedBytes = 0; u8 m_buffer[5]; - u64 m_timeSent = 0; u8* m_readDstPtr = nullptr; u8* m_statusPtr = nullptr; u64 m_lastGCTick = 0; - u64 m_timeCmdSent = 0; u8 m_lastCmd = 0; u8 m_chan; bool m_booted = false; @@ -167,8 +157,8 @@ class Endpoint void clockSync(); void send(const u8* buffer); size_t receive(u8* buffer); - size_t runBuffer(u8* buffer, u64& remTicks, EWaitResp resp); - bool idleGetStatus(u64& remTicks); + size_t runBuffer(u8* buffer, std::unique_lock& lk); + bool idleGetStatus(std::unique_lock& lk); void transferProc(); void transferWakeup(ThreadLocalEndpoint& endpoint, u8 status); diff --git a/include/jbus/Socket.hpp b/include/jbus/Socket.hpp index 55e3a2a..884c2bc 100644 --- a/include/jbus/Socket.hpp +++ b/include/jbus/Socket.hpp @@ -367,7 +367,7 @@ public: operator bool() const { return isOpen(); } - int GetInternalSocket() const { return m_socket; } + SocketTp GetInternalSocket() const { return m_socket; } }; } diff --git a/lib/Common.cpp b/lib/Common.cpp index 31194d5..0bbd41a 100644 --- a/lib/Common.cpp +++ b/lib/Common.cpp @@ -51,8 +51,18 @@ void WaitGCTicks(u64 ticks) tv.tv_usec = (ticks % GetGCTicksPerSec()) * 1000000 / GetGCTicksPerSec(); select(0, NULL, NULL, NULL, &tv); #else - Sleep(ticks * 1000 / GetGCTicksPerSec() + - (ticks % GetGCTicksPerSec()) * 1000 / GetGCTicksPerSec()); + if (ticks < GetGCTicksPerSec() / 60) + { + /* NT is useless for scheduling sub-millisecond intervals */ + u64 start = GetGCTicks(); + do { Sleep(0); } while (GetGCTicks() - start < ticks); + } + else + { + /* Use normal Sleep() for durations longer than ~16ms */ + Sleep(ticks * 1000 / GetGCTicksPerSec() + + (ticks % GetGCTicksPerSec()) * 1000 / GetGCTicksPerSec()); + } #endif } diff --git a/lib/Endpoint.cpp b/lib/Endpoint.cpp index 056eeff..b4fa683 100644 --- a/lib/Endpoint.cpp +++ b/lib/Endpoint.cpp @@ -447,8 +447,6 @@ void Endpoint::send(const u8* buffer) buffer[1], buffer[2], buffer[3], buffer[4], sentBytes); } #endif - - m_timeCmdSent = GetGCTicks(); } size_t Endpoint::receive(u8* buffer) @@ -460,27 +458,8 @@ size_t Endpoint::receive(u8* buffer) } size_t recvBytes = 0; - u64 transferTime = getTransferTime(m_lastCmd); - bool block = (GetGCTicks() - m_timeCmdSent) > transferTime; - if (m_lastCmd == CMD_STATUS && !m_booted) - block = false; - - if (block) - { - fd_set fds; - FD_ZERO(&fds); - FD_SET(m_dataSocket.GetInternalSocket(), &fds); - struct timeval tv = {}; - tv.tv_sec = 1; - select(m_dataSocket.GetInternalSocket() + 1, &fds, NULL, NULL, &tv); - } - net::Socket::EResult result = m_dataSocket.recv(buffer, 5, recvBytes); - if (result == net::Socket::EResult::Busy) - { - recvBytes = 0; - } - else if (result == net::Socket::EResult::Error) + if (result == net::Socket::EResult::Error) { m_running = false; return 5; @@ -510,42 +489,25 @@ size_t Endpoint::receive(u8* buffer) return recvBytes; } -size_t Endpoint::runBuffer(u8* buffer, u64& remTicks, EWaitResp resp) +size_t Endpoint::runBuffer(u8* buffer, std::unique_lock& lk) { - if (m_waitingResp == EWaitResp::NoWait) - { - m_dataReceivedBytes = 0; - clockSync(); - send(buffer); - m_timeSent = GetGCTicks(); - m_waitingResp = resp; - } + u8 tmpBuffer[5]; - if (m_waitingResp != EWaitResp::NoWait && m_dataReceivedBytes == 0) - { - m_dataReceivedBytes = receive(buffer); - } + memmove(tmpBuffer, buffer, 5); + lk.unlock(); + clockSync(); + send(tmpBuffer); + size_t receivedBytes = receive(tmpBuffer); + lk.lock(); + memmove(buffer, tmpBuffer, 5); - u64 ticksSinceSend = GetGCTicks() - m_timeSent; - u64 targetTransferTime = getTransferTime(m_lastCmd); - if (targetTransferTime > ticksSinceSend) - { - remTicks = targetTransferTime - ticksSinceSend; - return 0; - } - else - { - remTicks = 0; - if (m_dataReceivedBytes != 0) - m_waitingResp = EWaitResp::NoWait; - return m_dataReceivedBytes; - } + return receivedBytes; } -bool Endpoint::idleGetStatus(u64& remTicks) +bool Endpoint::idleGetStatus(std::unique_lock& lk) { u8 buffer[] = { CMD_STATUS, 0, 0, 0, 0 }; - return runBuffer(buffer, remTicks, EWaitResp::WaitIdle); + return runBuffer(buffer, lk); } void Endpoint::transferProc() @@ -554,84 +516,63 @@ void Endpoint::transferProc() printf("Starting JoyBus transfer thread for channel %d\n", m_chan); #endif + /* This lock is relinquished on I/O cycles or when waiting for next request */ std::unique_lock lk(m_syncLock); while (m_running) { - u64 remTicks; - if ((m_cmdIssued && m_waitingResp != EWaitResp::WaitIdle) || - m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) { - /* This inner loop performs first-response on commands invoked from callback - * on this thread; avoiding excessive locking */ - do + /* Synchronous command write/read cycle */ + runBuffer(m_buffer, lk); + m_cmdIssued = false; + + EJoyReturn xferStatus = m_running ? GBA_READY : GBA_NOT_READY; + + /* Handle message response */ + switch (m_lastCmd) { + case CMD_RESET: + case CMD_STATUS: + if (m_statusPtr) + *m_statusPtr = m_buffer[2]; + break; + case CMD_WRITE: + if (m_statusPtr) + *m_statusPtr = m_buffer[0]; + break; + case CMD_READ: + if (m_statusPtr) + *m_statusPtr = m_buffer[4]; + if (m_readDstPtr) + memmove(m_readDstPtr, m_buffer, 4); + break; + default: + break; + } + + m_statusPtr = nullptr; + m_readDstPtr = nullptr; + if (m_callback) { - m_cmdIssued = false; - if (runBuffer(m_buffer, remTicks, EWaitResp::WaitCmd) || !m_running) - { - EJoyReturn xferStatus; - if (m_running) - { - xferStatus = GBA_READY; - } - else - { - xferStatus = GBA_NOT_READY; - remTicks = 0; - } - - /* Handle message response */ - switch (m_lastCmd) { - case CMD_RESET: - case CMD_STATUS: - if (m_statusPtr) - *m_statusPtr = m_buffer[2]; - break; - case CMD_WRITE: - if (m_statusPtr) - *m_statusPtr = m_buffer[0]; - break; - case CMD_READ: - if (m_statusPtr) - *m_statusPtr = m_buffer[4]; - if (m_readDstPtr) - memmove(m_readDstPtr, m_buffer, 4); - break; - default: - break; - } - - m_statusPtr = nullptr; - m_readDstPtr = nullptr; - if (m_callback) - { - FGBACallback cb = std::move(m_callback); - m_callback = {}; - ThreadLocalEndpoint ep(*this); - cb(ep, xferStatus); - } - - if (!m_running) - break; - } - } while (m_cmdIssued); + FGBACallback cb = std::move(m_callback); + m_callback = {}; + ThreadLocalEndpoint ep(*this); + cb(ep, xferStatus); + } } else if (!m_booted) { /* Poll bus with status messages when inactive */ - if (idleGetStatus(remTicks)) - remTicks = GetGCTicksPerSec() * 4 / 60; + if (idleGetStatus(lk)) + { + lk.unlock(); + WaitGCTicks(GetGCTicksPerSec() * 4 / 60); + lk.lock(); + } } else { - /* Wait for the duration of a write otherwise */ - remTicks = getTransferTime(CMD_WRITE); - } - - if (m_running && remTicks) - { - lk.unlock(); - WaitGCTicks(remTicks); - lk.lock(); + /* Wait for next user request */ + m_issueCv.wait(lk); } } @@ -652,12 +593,14 @@ void Endpoint::transferWakeup(ThreadLocalEndpoint& endpoint, u8 status) void Endpoint::stop() { m_running = false; + m_issueCv.notify_one(); if (m_transferThread.joinable()) m_transferThread.join(); } EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp) { + std::unique_lock lk(m_syncLock); if (m_joyBoot) { *percentp = m_joyBoot->percentComplete(); @@ -665,7 +608,7 @@ EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp) return GBA_BUSY; } - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_BUSY; return GBA_READY; @@ -674,7 +617,7 @@ EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp) EJoyReturn Endpoint::GBAGetStatusAsync(u8* status, FGBACallback&& callback) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -682,13 +625,15 @@ EJoyReturn Endpoint::GBAGetStatusAsync(u8* status, FGBACallback&& callback) m_buffer[0] = CMD_STATUS; m_callback = std::move(callback); + m_issueCv.notify_one(); + return GBA_READY; } EJoyReturn Endpoint::GBAGetStatus(u8* status) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -696,6 +641,7 @@ EJoyReturn Endpoint::GBAGetStatus(u8* status) m_buffer[0] = CMD_STATUS; m_callback = bindSync(); + m_issueCv.notify_one(); m_syncCv.wait(lk); return GBA_READY; @@ -704,7 +650,7 @@ EJoyReturn Endpoint::GBAGetStatus(u8* status) EJoyReturn Endpoint::GBAResetAsync(u8* status, FGBACallback&& callback) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -712,13 +658,15 @@ EJoyReturn Endpoint::GBAResetAsync(u8* status, FGBACallback&& callback) m_buffer[0] = CMD_RESET; m_callback = std::move(callback); + m_issueCv.notify_one(); + return GBA_READY; } EJoyReturn Endpoint::GBAReset(u8* status) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -726,6 +674,7 @@ EJoyReturn Endpoint::GBAReset(u8* status) m_buffer[0] = CMD_RESET; m_callback = bindSync(); + m_issueCv.notify_one(); m_syncCv.wait(lk); return GBA_READY; @@ -734,7 +683,7 @@ EJoyReturn Endpoint::GBAReset(u8* status) EJoyReturn Endpoint::GBAReadAsync(u8* dst, u8* status, FGBACallback&& callback) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -743,13 +692,15 @@ EJoyReturn Endpoint::GBAReadAsync(u8* dst, u8* status, FGBACallback&& callback) m_buffer[0] = CMD_READ; m_callback = std::move(callback); + m_issueCv.notify_one(); + return GBA_READY; } EJoyReturn Endpoint::GBARead(u8* dst, u8* status) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -758,6 +709,7 @@ EJoyReturn Endpoint::GBARead(u8* dst, u8* status) m_buffer[0] = CMD_READ; m_callback = bindSync(); + m_issueCv.notify_one(); m_syncCv.wait(lk); return GBA_READY; @@ -766,7 +718,7 @@ EJoyReturn Endpoint::GBARead(u8* dst, u8* status) EJoyReturn Endpoint::GBAWriteAsync(const u8* src, u8* status, FGBACallback&& callback) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -776,13 +728,15 @@ EJoyReturn Endpoint::GBAWriteAsync(const u8* src, u8* status, FGBACallback&& cal m_buffer[i+1] = src[i]; m_callback = std::move(callback); + m_issueCv.notify_one(); + return GBA_READY; } EJoyReturn Endpoint::GBAWrite(const u8* src, u8* status) { std::unique_lock lk(m_syncLock); - if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd) + if (m_cmdIssued) return GBA_NOT_READY; m_cmdIssued = true; @@ -792,6 +746,7 @@ EJoyReturn Endpoint::GBAWrite(const u8* src, u8* status) m_buffer[i+1] = src[i]; m_callback = bindSync(); + m_issueCv.notify_one(); m_syncCv.wait(lk); return GBA_READY; diff --git a/lib/Listener.cpp b/lib/Listener.cpp index 7f9dd6b..f28a581 100644 --- a/lib/Listener.cpp +++ b/lib/Listener.cpp @@ -53,8 +53,9 @@ void Listener::listenerProc() } } - net::Socket acceptData = {false}; - net::Socket acceptClock = {false}; + /* We use blocking I/O since we have a dedicated transfer thread */ + net::Socket acceptData = {true}; + net::Socket acceptClock = {true}; std::string hostname; u8 chan = 1; while (m_running && chan < 4)