Use blocking I/O for transfer thread

This commit is contained in:
Jack Andersen 2017-01-07 11:19:25 -10:00
parent e161ef15fa
commit 42f1b86627
5 changed files with 100 additions and 144 deletions

View File

@ -132,13 +132,6 @@ class Endpoint
CMD_WRITE = 0x15
};
enum class EWaitResp
{
NoWait = 0,
WaitCmd,
WaitIdle
};
static const u64 BITS_PER_SECOND = 115200;
static const u64 BYTES_PER_SECOND = BITS_PER_SECOND / 8;
@ -147,16 +140,13 @@ class Endpoint
std::thread m_transferThread;
std::mutex m_syncLock;
std::condition_variable m_syncCv;
std::condition_variable m_issueCv;
std::experimental::optional<KawasedoChallenge> m_joyBoot;
FGBACallback m_callback;
EWaitResp m_waitingResp = EWaitResp::NoWait;
size_t m_dataReceivedBytes = 0;
u8 m_buffer[5];
u64 m_timeSent = 0;
u8* m_readDstPtr = nullptr;
u8* m_statusPtr = nullptr;
u64 m_lastGCTick = 0;
u64 m_timeCmdSent = 0;
u8 m_lastCmd = 0;
u8 m_chan;
bool m_booted = false;
@ -167,8 +157,8 @@ class Endpoint
void clockSync();
void send(const u8* buffer);
size_t receive(u8* buffer);
size_t runBuffer(u8* buffer, u64& remTicks, EWaitResp resp);
bool idleGetStatus(u64& remTicks);
size_t runBuffer(u8* buffer, std::unique_lock<std::mutex>& lk);
bool idleGetStatus(std::unique_lock<std::mutex>& lk);
void transferProc();
void transferWakeup(ThreadLocalEndpoint& endpoint, u8 status);

View File

@ -367,7 +367,7 @@ public:
operator bool() const { return isOpen(); }
int GetInternalSocket() const { return m_socket; }
SocketTp GetInternalSocket() const { return m_socket; }
};
}

View File

@ -51,8 +51,18 @@ void WaitGCTicks(u64 ticks)
tv.tv_usec = (ticks % GetGCTicksPerSec()) * 1000000 / GetGCTicksPerSec();
select(0, NULL, NULL, NULL, &tv);
#else
Sleep(ticks * 1000 / GetGCTicksPerSec() +
(ticks % GetGCTicksPerSec()) * 1000 / GetGCTicksPerSec());
if (ticks < GetGCTicksPerSec() / 60)
{
/* NT is useless for scheduling sub-millisecond intervals */
u64 start = GetGCTicks();
do { Sleep(0); } while (GetGCTicks() - start < ticks);
}
else
{
/* Use normal Sleep() for durations longer than ~16ms */
Sleep(ticks * 1000 / GetGCTicksPerSec() +
(ticks % GetGCTicksPerSec()) * 1000 / GetGCTicksPerSec());
}
#endif
}

View File

@ -447,8 +447,6 @@ void Endpoint::send(const u8* buffer)
buffer[1], buffer[2], buffer[3], buffer[4], sentBytes);
}
#endif
m_timeCmdSent = GetGCTicks();
}
size_t Endpoint::receive(u8* buffer)
@ -460,27 +458,8 @@ size_t Endpoint::receive(u8* buffer)
}
size_t recvBytes = 0;
u64 transferTime = getTransferTime(m_lastCmd);
bool block = (GetGCTicks() - m_timeCmdSent) > transferTime;
if (m_lastCmd == CMD_STATUS && !m_booted)
block = false;
if (block)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(m_dataSocket.GetInternalSocket(), &fds);
struct timeval tv = {};
tv.tv_sec = 1;
select(m_dataSocket.GetInternalSocket() + 1, &fds, NULL, NULL, &tv);
}
net::Socket::EResult result = m_dataSocket.recv(buffer, 5, recvBytes);
if (result == net::Socket::EResult::Busy)
{
recvBytes = 0;
}
else if (result == net::Socket::EResult::Error)
if (result == net::Socket::EResult::Error)
{
m_running = false;
return 5;
@ -510,42 +489,25 @@ size_t Endpoint::receive(u8* buffer)
return recvBytes;
}
size_t Endpoint::runBuffer(u8* buffer, u64& remTicks, EWaitResp resp)
size_t Endpoint::runBuffer(u8* buffer, std::unique_lock<std::mutex>& lk)
{
if (m_waitingResp == EWaitResp::NoWait)
{
m_dataReceivedBytes = 0;
clockSync();
send(buffer);
m_timeSent = GetGCTicks();
m_waitingResp = resp;
}
u8 tmpBuffer[5];
if (m_waitingResp != EWaitResp::NoWait && m_dataReceivedBytes == 0)
{
m_dataReceivedBytes = receive(buffer);
}
memmove(tmpBuffer, buffer, 5);
lk.unlock();
clockSync();
send(tmpBuffer);
size_t receivedBytes = receive(tmpBuffer);
lk.lock();
memmove(buffer, tmpBuffer, 5);
u64 ticksSinceSend = GetGCTicks() - m_timeSent;
u64 targetTransferTime = getTransferTime(m_lastCmd);
if (targetTransferTime > ticksSinceSend)
{
remTicks = targetTransferTime - ticksSinceSend;
return 0;
}
else
{
remTicks = 0;
if (m_dataReceivedBytes != 0)
m_waitingResp = EWaitResp::NoWait;
return m_dataReceivedBytes;
}
return receivedBytes;
}
bool Endpoint::idleGetStatus(u64& remTicks)
bool Endpoint::idleGetStatus(std::unique_lock<std::mutex>& lk)
{
u8 buffer[] = { CMD_STATUS, 0, 0, 0, 0 };
return runBuffer(buffer, remTicks, EWaitResp::WaitIdle);
return runBuffer(buffer, lk);
}
void Endpoint::transferProc()
@ -554,84 +516,63 @@ void Endpoint::transferProc()
printf("Starting JoyBus transfer thread for channel %d\n", m_chan);
#endif
/* This lock is relinquished on I/O cycles or when waiting for next request */
std::unique_lock<std::mutex> lk(m_syncLock);
while (m_running)
{
u64 remTicks;
if ((m_cmdIssued && m_waitingResp != EWaitResp::WaitIdle) ||
m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
{
/* This inner loop performs first-response on commands invoked from callback
* on this thread; avoiding excessive locking */
do
/* Synchronous command write/read cycle */
runBuffer(m_buffer, lk);
m_cmdIssued = false;
EJoyReturn xferStatus = m_running ? GBA_READY : GBA_NOT_READY;
/* Handle message response */
switch (m_lastCmd) {
case CMD_RESET:
case CMD_STATUS:
if (m_statusPtr)
*m_statusPtr = m_buffer[2];
break;
case CMD_WRITE:
if (m_statusPtr)
*m_statusPtr = m_buffer[0];
break;
case CMD_READ:
if (m_statusPtr)
*m_statusPtr = m_buffer[4];
if (m_readDstPtr)
memmove(m_readDstPtr, m_buffer, 4);
break;
default:
break;
}
m_statusPtr = nullptr;
m_readDstPtr = nullptr;
if (m_callback)
{
m_cmdIssued = false;
if (runBuffer(m_buffer, remTicks, EWaitResp::WaitCmd) || !m_running)
{
EJoyReturn xferStatus;
if (m_running)
{
xferStatus = GBA_READY;
}
else
{
xferStatus = GBA_NOT_READY;
remTicks = 0;
}
/* Handle message response */
switch (m_lastCmd) {
case CMD_RESET:
case CMD_STATUS:
if (m_statusPtr)
*m_statusPtr = m_buffer[2];
break;
case CMD_WRITE:
if (m_statusPtr)
*m_statusPtr = m_buffer[0];
break;
case CMD_READ:
if (m_statusPtr)
*m_statusPtr = m_buffer[4];
if (m_readDstPtr)
memmove(m_readDstPtr, m_buffer, 4);
break;
default:
break;
}
m_statusPtr = nullptr;
m_readDstPtr = nullptr;
if (m_callback)
{
FGBACallback cb = std::move(m_callback);
m_callback = {};
ThreadLocalEndpoint ep(*this);
cb(ep, xferStatus);
}
if (!m_running)
break;
}
} while (m_cmdIssued);
FGBACallback cb = std::move(m_callback);
m_callback = {};
ThreadLocalEndpoint ep(*this);
cb(ep, xferStatus);
}
}
else if (!m_booted)
{
/* Poll bus with status messages when inactive */
if (idleGetStatus(remTicks))
remTicks = GetGCTicksPerSec() * 4 / 60;
if (idleGetStatus(lk))
{
lk.unlock();
WaitGCTicks(GetGCTicksPerSec() * 4 / 60);
lk.lock();
}
}
else
{
/* Wait for the duration of a write otherwise */
remTicks = getTransferTime(CMD_WRITE);
}
if (m_running && remTicks)
{
lk.unlock();
WaitGCTicks(remTicks);
lk.lock();
/* Wait for next user request */
m_issueCv.wait(lk);
}
}
@ -652,12 +593,14 @@ void Endpoint::transferWakeup(ThreadLocalEndpoint& endpoint, u8 status)
void Endpoint::stop()
{
m_running = false;
m_issueCv.notify_one();
if (m_transferThread.joinable())
m_transferThread.join();
}
EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_joyBoot)
{
*percentp = m_joyBoot->percentComplete();
@ -665,7 +608,7 @@ EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp)
return GBA_BUSY;
}
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_BUSY;
return GBA_READY;
@ -674,7 +617,7 @@ EJoyReturn Endpoint::GBAGetProcessStatus(u8* percentp)
EJoyReturn Endpoint::GBAGetStatusAsync(u8* status, FGBACallback&& callback)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -682,13 +625,15 @@ EJoyReturn Endpoint::GBAGetStatusAsync(u8* status, FGBACallback&& callback)
m_buffer[0] = CMD_STATUS;
m_callback = std::move(callback);
m_issueCv.notify_one();
return GBA_READY;
}
EJoyReturn Endpoint::GBAGetStatus(u8* status)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -696,6 +641,7 @@ EJoyReturn Endpoint::GBAGetStatus(u8* status)
m_buffer[0] = CMD_STATUS;
m_callback = bindSync();
m_issueCv.notify_one();
m_syncCv.wait(lk);
return GBA_READY;
@ -704,7 +650,7 @@ EJoyReturn Endpoint::GBAGetStatus(u8* status)
EJoyReturn Endpoint::GBAResetAsync(u8* status, FGBACallback&& callback)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -712,13 +658,15 @@ EJoyReturn Endpoint::GBAResetAsync(u8* status, FGBACallback&& callback)
m_buffer[0] = CMD_RESET;
m_callback = std::move(callback);
m_issueCv.notify_one();
return GBA_READY;
}
EJoyReturn Endpoint::GBAReset(u8* status)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -726,6 +674,7 @@ EJoyReturn Endpoint::GBAReset(u8* status)
m_buffer[0] = CMD_RESET;
m_callback = bindSync();
m_issueCv.notify_one();
m_syncCv.wait(lk);
return GBA_READY;
@ -734,7 +683,7 @@ EJoyReturn Endpoint::GBAReset(u8* status)
EJoyReturn Endpoint::GBAReadAsync(u8* dst, u8* status, FGBACallback&& callback)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -743,13 +692,15 @@ EJoyReturn Endpoint::GBAReadAsync(u8* dst, u8* status, FGBACallback&& callback)
m_buffer[0] = CMD_READ;
m_callback = std::move(callback);
m_issueCv.notify_one();
return GBA_READY;
}
EJoyReturn Endpoint::GBARead(u8* dst, u8* status)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -758,6 +709,7 @@ EJoyReturn Endpoint::GBARead(u8* dst, u8* status)
m_buffer[0] = CMD_READ;
m_callback = bindSync();
m_issueCv.notify_one();
m_syncCv.wait(lk);
return GBA_READY;
@ -766,7 +718,7 @@ EJoyReturn Endpoint::GBARead(u8* dst, u8* status)
EJoyReturn Endpoint::GBAWriteAsync(const u8* src, u8* status, FGBACallback&& callback)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -776,13 +728,15 @@ EJoyReturn Endpoint::GBAWriteAsync(const u8* src, u8* status, FGBACallback&& cal
m_buffer[i+1] = src[i];
m_callback = std::move(callback);
m_issueCv.notify_one();
return GBA_READY;
}
EJoyReturn Endpoint::GBAWrite(const u8* src, u8* status)
{
std::unique_lock<std::mutex> lk(m_syncLock);
if (m_cmdIssued || m_waitingResp == EWaitResp::WaitCmd)
if (m_cmdIssued)
return GBA_NOT_READY;
m_cmdIssued = true;
@ -792,6 +746,7 @@ EJoyReturn Endpoint::GBAWrite(const u8* src, u8* status)
m_buffer[i+1] = src[i];
m_callback = bindSync();
m_issueCv.notify_one();
m_syncCv.wait(lk);
return GBA_READY;

View File

@ -53,8 +53,9 @@ void Listener::listenerProc()
}
}
net::Socket acceptData = {false};
net::Socket acceptClock = {false};
/* We use blocking I/O since we have a dedicated transfer thread */
net::Socket acceptData = {true};
net::Socket acceptClock = {true};
std::string hostname;
u8 chan = 1;
while (m_running && chan < 4)