#include "hecl/ClientProcess.hpp" #include "hecl/Database.hpp" #include "athena/FileReader.hpp" #include "hecl/Blender/BlenderConnection.hpp" #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include #else #include #endif #define HECL_MULTIPROCESSOR 1 namespace hecl { static logvisor::Module Log("hecl::ClientProcess"); ThreadLocalPtr ClientProcess::ThreadWorker; static int GetCPUCount() { #if _WIN32 SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; #else return sysconf(_SC_NPROCESSORS_ONLN); #endif } void ClientProcess::BufferTransaction::run(BlenderToken& btok) { athena::io::FileReader r(m_path.getAbsolutePath(), 32 * 1024, false); if (r.hasError()) { Log.report(logvisor::Fatal, _S("unable to background-buffer '%s'"), m_path.getAbsolutePath().c_str()); return; } if (m_offset) r.seek(m_offset, athena::Begin); r.readBytesToBuf(m_targetBuf, m_maxLen); m_complete = true; } void ClientProcess::CookTransaction::run(BlenderToken& btok) { m_dataSpec->setThreadProject(); m_returnResult = m_parent.syncCook(m_path, m_dataSpec, btok); m_complete = true; } void ClientProcess::LambdaTransaction::run(BlenderToken& btok) { m_func(btok); m_complete = true; } ClientProcess::Worker::Worker(ClientProcess& proc, int idx) : m_proc(proc), m_idx(idx) { m_thr = std::thread(std::bind(&Worker::proc, this)); } void ClientProcess::Worker::proc() { ClientProcess::ThreadWorker.reset(this); char thrName[64]; snprintf(thrName, 64, "HECL Client Worker %d", m_idx); logvisor::RegisterThreadName(thrName); while (m_proc.m_running) { std::unique_lock lk(m_proc.m_mutex); if (!m_didInit) { m_proc.m_initCv.notify_one(); m_didInit = true; } while (m_proc.m_running && m_proc.m_pendingQueue.size()) { std::shared_ptr trans = std::move(m_proc.m_pendingQueue.front()); ++m_proc.m_inProgress; m_proc.m_pendingQueue.pop_front(); lk.unlock(); trans->run(m_blendTok); lk.lock(); m_proc.m_completedQueue.push_back(std::move(trans)); --m_proc.m_inProgress; } m_proc.m_waitCv.notify_one(); if (!m_proc.m_running) break; m_proc.m_cv.wait(lk); } m_blendTok.shutdown(); } ClientProcess::ClientProcess(int verbosityLevel, bool fast, bool force) : m_verbosity(verbosityLevel), m_fast(fast), m_force(force) { #ifdef HECL_MULTIPROCESSOR const int cpuCount = GetCPUCount(); #else constexpr int cpuCount = 1; #endif m_workers.reserve(cpuCount); for (int i=0 ; i lk(m_mutex); m_workers.emplace_back(*this, m_workers.size()); m_initCv.wait(lk); } } std::shared_ptr ClientProcess::addBufferTransaction(const ProjectPath& path, void* target, size_t maxLen, size_t offset) { std::unique_lock lk(m_mutex); auto ret = std::make_shared(*this, path, target, maxLen, offset); m_pendingQueue.emplace_back(ret); m_cv.notify_one(); return ret; } std::shared_ptr ClientProcess::addCookTransaction(const hecl::ProjectPath& path, Database::IDataSpec* spec) { std::unique_lock lk(m_mutex); auto ret = std::make_shared(*this, path, spec); m_pendingQueue.emplace_back(ret); m_cv.notify_one(); return ret; } std::shared_ptr ClientProcess::addLambdaTransaction(std::function&& func) { std::unique_lock lk(m_mutex); auto ret = std::make_shared(*this, std::move(func)); m_pendingQueue.emplace_back(ret); m_cv.notify_one(); return ret; } bool ClientProcess::syncCook(const hecl::ProjectPath& path, Database::IDataSpec* spec, BlenderToken& btok) { if (spec->canCook(path, btok)) { const Database::DataSpecEntry* specEnt = spec->overrideDataSpec(path, spec->getDataSpecEntry(), btok); if (specEnt) { hecl::ProjectPath cooked = path.getCookedPath(*specEnt); if (m_fast) cooked = cooked.getWithExtension(_S(".fast")); cooked.makeDirChain(false); if (m_force || cooked.getPathType() == ProjectPath::Type::None || path.getModtime() > cooked.getModtime()) { if (path.getAuxInfo().empty()) LogModule.report(logvisor::Info, _S("Cooking %s"), path.getRelativePath().c_str()); else LogModule.report(logvisor::Info, _S("Cooking %s|%s"), path.getRelativePath().c_str(), path.getAuxInfo().c_str()); spec->doCook(path, cooked, false, btok, [](const SystemChar*) {}); } return true; } } return false; } void ClientProcess::swapCompletedQueue(std::list>& queue) { std::unique_lock lk(m_mutex); queue.swap(m_completedQueue); } void ClientProcess::waitUntilComplete() { std::unique_lock lk(m_mutex); while (isBusy()) m_waitCv.wait(lk); } void ClientProcess::shutdown() { if (!m_running) return; std::unique_lock lk(m_mutex); m_pendingQueue.clear(); m_running = false; m_cv.notify_all(); lk.unlock(); for (Worker& worker : m_workers) if (worker.m_thr.joinable()) worker.m_thr.join(); } }