diff --git a/Src/Client/ResourceCache.cpp b/Src/Client/ResourceCache.cpp index b8763cd..a815a42 100644 --- a/Src/Client/ResourceCache.cpp +++ b/Src/Client/ResourceCache.cpp @@ -1,4 +1,5 @@ #include "ResourceCache.hpp" +#include "sqlite3.h" #include @@ -156,9 +157,9 @@ std::pair CacheDatabase::getAllHash() { } void CacheDatabase::updateTimeFor(HASH hash) { - sqlite3_bind_blob(STMT_UPDATE_TIME, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - sqlite3_bind_int(STMT_UPDATE_TIME, 1, time(nullptr)); - if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_OK) { + sqlite3_bind_blob(STMT_UPDATE_TIME, 1, (const void*) hash.data(), 32, SQLITE_STATIC); + sqlite3_bind_int(STMT_UPDATE_TIME, 2, time(nullptr)); + if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_DONE) { sqlite3_reset(STMT_UPDATE_TIME); MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB)); } @@ -169,10 +170,10 @@ void CacheDatabase::updateTimeFor(HASH hash) { void CacheDatabase::insert(HASH hash, size_t size) { assert(size < (size_t(1) << 31)-1 && size > 0); - sqlite3_bind_blob(STMT_INSERT, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - sqlite3_bind_int(STMT_INSERT, 1, (int) size); + sqlite3_bind_blob(STMT_INSERT, 1, (const void*) hash.data(), 32, SQLITE_STATIC); sqlite3_bind_int(STMT_INSERT, 2, time(nullptr)); - if(sqlite3_step(STMT_INSERT) != SQLITE_OK) { + sqlite3_bind_int(STMT_INSERT, 3, (int) size); + if(sqlite3_step(STMT_INSERT) != SQLITE_DONE) { sqlite3_reset(STMT_INSERT); MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_INSERT: " << sqlite3_errmsg(DB)); } @@ -184,7 +185,7 @@ std::vector CacheDatabase::findExcessHashes(size_t bytesToF std::vector out; size_t removed = 0; - sqlite3_bind_int(STMT_OLD, 0, timeBefore); + sqlite3_bind_int(STMT_OLD, 1, timeBefore); while(true) { int errc = sqlite3_step(STMT_OLD); if(errc == SQLITE_DONE) @@ -208,7 +209,7 @@ std::vector CacheDatabase::findExcessHashes(size_t bytesToF if(removed > bytesToFree) return out; - sqlite3_bind_int(STMT_TO_FREE, 0, (int) bytesToFree); + sqlite3_bind_int(STMT_TO_FREE, 1, (int) bytesToFree); while(true) { int errc = sqlite3_step(STMT_TO_FREE); @@ -232,8 +233,8 @@ std::vector CacheDatabase::findExcessHashes(size_t bytesToF } void CacheDatabase::remove(HASH hash) { - sqlite3_bind_blob(STMT_REMOVE, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - if(sqlite3_step(STMT_REMOVE) != SQLITE_OK) { + sqlite3_bind_blob(STMT_REMOVE, 1, (const void*) hash.data(), 32, SQLITE_STATIC); + if(sqlite3_step(STMT_REMOVE) != SQLITE_DONE) { sqlite3_reset(STMT_REMOVE); MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_REMOVE: " << sqlite3_errmsg(DB)); } @@ -246,13 +247,13 @@ std::string CacheDatabase::hashToString(HASH hash) { text.reserve(64); for(int iter = 0; iter < 32; iter++) { - int val = hash[31-iter] & 0xf; + int val = (hash[31-iter] >> 4) & 0xf; if(val > 9) text += 'a'+val-10; else text += '0'+val; - val = (hash[31-iter] >> 4) & 0xf; + val = hash[31-iter] & 0xf; if(val > 9) text += 'a'+val-10; else @@ -284,8 +285,10 @@ CacheDatabase::HASH CacheDatabase::stringToHash(const std::string_view view) { return hash; } -CacheHandler::CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath) - : IAsyncDestructible(ioc), Path(cachePath), DB(Path) +CacheHandler::CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath, + size_t maxCacheDirectorySize, size_t maxLifeTime) + : IAsyncDestructible(ioc), Path(cachePath), DB(Path), + MaxCacheDirectorySize(maxCacheDirectorySize), MaxLifeTime(maxLifeTime) { } @@ -295,13 +298,17 @@ std::pair CacheHandler::getAll() { return DB.getAllHash(); } +size_t CacheHandler::getCacheSize() { + return DB.getCacheSize(); +} + coro<> CacheHandlerBasic::asyncDestructor() { NeedShutdown = true; co_await CacheHandler::asyncDestructor(); } void CacheHandlerBasic::readThread(AsyncUseControl::Lock lock) { - LOG.info() << "readThread started"; + LOG.info() << "Поток чтения запущен"; while(!NeedShutdown) { if(ReadQueue.get_read().empty()) @@ -318,54 +325,22 @@ void CacheHandlerBasic::readThread(AsyncUseControl::Lock lock) { std::string name = CacheDatabase::hashToString(hash); fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); - std::shared_ptr data = std::make_shared(); - - try { - std::ifstream fd(path, std::ios::binary | std::ios::ate); - if (!fd.is_open()) - MAKE_ERROR("!is_open(): " << fd.exceptions()); - - if (fd.fail()) - MAKE_ERROR("fail(): " << fd.exceptions()); - - std::ifstream::pos_type size = fd.tellg(); - fd.seekg(0, std::ios::beg); - data->resize(size); - fd.read(data->data(), size); - - if (!fd.good()) - MAKE_ERROR("!good(): " << fd.exceptions()); - } catch(const std::exception &exc) { - LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what(); - } - - ReadedQueue.lock()->emplace_back(hash, std::move(data)); - continue; - } - - wait: - TOS::Time::sleep3(20); - } - - LOG.info() << "readThread ended"; - lock.unlock(); -} - -void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { - LOG.info() << "readThread started"; - - while(!NeedShutdown) { - if(!ReadQueue.get_read().empty()) { - auto lock = ReadQueue.lock(); - if(!lock->empty()) { - CacheDatabase::HASH hash = lock->front(); - lock->pop(); - lock.unlock(); - - std::string name = CacheDatabase::hashToString(hash); - fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); - - std::shared_ptr data = std::make_shared(); + std::shared_ptr data; + + { + auto lock_wc = WriteCache.lock(); + auto iter = lock_wc->begin(); + while(iter != lock_wc->end()) { + if(iter->first == hash) { + // Копируем + data = std::make_shared(*iter->second); + break; + } + } + } + + if(!data) { + data = std::make_shared(); try { std::ifstream fd(path, std::ios::binary | std::ios::ate); @@ -382,11 +357,75 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { if (!fd.good()) MAKE_ERROR("!good(): " << fd.exceptions()); - - DB.updateTimeFor(hash); } catch(const std::exception &exc) { LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what(); } + } + + ReadedQueue.lock()->emplace_back(hash, std::move(data)); + continue; + } + + wait: + TOS::Time::sleep3(20); + } + + LOG.info() << "Поток чтения остановлен"; + lock.unlock(); +} + +void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { + LOG.info() << "Поток чтения/записи запущен"; + + while(!NeedShutdown || !WriteQueue.get_read().empty()) { + if(!ReadQueue.get_read().empty()) { + auto lock = ReadQueue.lock(); + if(!lock->empty()) { + CacheDatabase::HASH hash = lock->front(); + lock->pop(); + lock.unlock(); + + std::string name = CacheDatabase::hashToString(hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + + std::shared_ptr data; + + { + auto lock_wc = WriteCache.lock(); + auto iter = lock_wc->begin(); + while(iter != lock_wc->end()) { + if(iter->first == hash) { + // Копируем + data = std::make_shared(*iter->second); + break; + } + } + } + + if(!data) { + data = std::make_shared(); + + try { + std::ifstream fd(path, std::ios::binary | std::ios::ate); + if (!fd.is_open()) + MAKE_ERROR("!is_open(): " << fd.exceptions()); + + if (fd.fail()) + MAKE_ERROR("fail(): " << fd.exceptions()); + + std::ifstream::pos_type size = fd.tellg(); + fd.seekg(0, std::ios::beg); + data->resize(size); + fd.read(data->data(), size); + + if (!fd.good()) + MAKE_ERROR("!good(): " << fd.exceptions()); + + DB.updateTimeFor(hash); + } catch(const std::exception &exc) { + LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what(); + } + } ReadedQueue.lock()->emplace_back(hash, std::move(data)); continue; @@ -414,7 +453,7 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { LOG.warn() << "Удаление устаревшего кеша в количестве " << hashes.size() << "..."; for(CacheDatabase::HASH hash : hashes) { - std::string name = CacheDatabase::hashToString(task.Hash); + std::string name = CacheDatabase::hashToString(hash); fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); DB.remove(hash); fs::remove(path); @@ -436,18 +475,30 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { } catch(const std::exception &exc) { LOG.error() << "Не удалось сохранить ресурс " << path.c_str() << ": " << exc.what(); } + + auto lock = WriteCache.lock(); + auto iter = lock->begin(); + while(iter != lock->end()) { + if(iter->first == task.Hash) + break; + iter++; + } + + assert(iter != lock->end()); + lock->erase(iter); } } TOS::Time::sleep3(20); } - LOG.info() << "readWriteThread ended"; + LOG.info() << "Поток чтения/записи остановлен"; lock.unlock(); } -CacheHandlerBasic::CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path &cachePath) - : CacheHandler(ioc, cachePath), +CacheHandlerBasic::CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path &cachePath, + size_t maxCacheDirectorySize, size_t maxLifeTime) + : CacheHandler(ioc, cachePath, maxCacheDirectorySize, maxLifeTime), ReadThread(&CacheHandlerBasic::readThread, this, AUC.use()), ReadWriteThread(&CacheHandlerBasic::readWriteThread, this, AUC.use()) { @@ -488,4 +539,37 @@ std::vector> CacheHandlerBasic::pull return out; } +void CacheHandlerBasic::updateParams(size_t maxLifeTime, size_t maxCacheDirectorySize) { + MaxLifeTime = maxLifeTime; + + if(MaxCacheDirectorySize != maxCacheDirectorySize) { + MaxCacheDirectorySize = maxCacheDirectorySize; + + size_t size = DB.getCacheSize(); + if(size > maxCacheDirectorySize) { + size_t needToFree = size-maxCacheDirectorySize+64*1024*1024; + try { + LOG.info() << "Начата вычистка кеша на сумму " << needToFree/1024/1024 << " Мб"; + std::vector hashes = DB.findExcessHashes(needToFree, time(nullptr)-MaxLifeTime); + LOG.warn() << "Удаление кеша в количестве " << hashes.size() << "..."; + + for(CacheDatabase::HASH hash : hashes) { + std::string name = CacheDatabase::hashToString(hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + DB.remove(hash); + fs::remove(path); + + fs::path up1 = path.parent_path(); + LOG.info() << "В директории " << up1.c_str() << " не осталось файлов, удаляем..."; + size_t count = std::distance(fs::directory_iterator(up1), fs::directory_iterator()); + if(count == 0) + fs::remove(up1); + } + } catch(const std::exception &exc) { + LOG.error() << "Не удалось очистить кеш до новой границы: " << exc.what(); + } + } + } +} + } \ No newline at end of file diff --git a/Src/Client/ResourceCache.hpp b/Src/Client/ResourceCache.hpp index 8c53d9d..466a72f 100644 --- a/Src/Client/ResourceCache.hpp +++ b/Src/Client/ResourceCache.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include #include @@ -90,9 +92,15 @@ class CacheHandler : public IAsyncDestructible { protected: const fs::path Path; CacheDatabase DB; + size_t MaxCacheDirectorySize; + size_t MaxLifeTime; + +public: + using Ptr = std::shared_ptr; protected: - CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath); + CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath, + size_t maxCacheDirectorySize, size_t maxLifeTime); public: virtual ~CacheHandler(); @@ -108,6 +116,12 @@ public: // Получить список доступных ресурсов std::pair getAll(); + + // Размер всего хранимого кеша + size_t getCacheSize(); + + // Обновить параметры хранилища + virtual void updateParams(size_t maxLifeTime, size_t maxCacheDirectorySize) = 0; }; class CacheHandlerBasic : public CacheHandler { @@ -129,8 +143,6 @@ class CacheHandlerBasic : public CacheHandler { // Список полностью считанных файлов SpinlockObject> ReadedQueue; bool NeedShutdown = false; - size_t MaxCacheDirectorySize = 8*1024*1024*1024ULL; - size_t MaxLifeTime = 7*24*60*60; public: using Ptr = std::shared_ptr; @@ -143,18 +155,21 @@ private: void readWriteThread(AsyncUseControl::Lock lock); protected: - CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path& cachePath); + CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path& cachePath, + size_t maxCacheDirectorySize, size_t maxLifeTime); public: virtual ~CacheHandlerBasic(); - static std::shared_ptr Create(asio::io_context &ioc, const fs::path& cachePath) { - return createShared(ioc, new CacheHandlerBasic(ioc, cachePath)); + static std::shared_ptr Create(asio::io_context &ioc, const fs::path& cachePath, + size_t maxCacheDirectorySize = 8*1024*1024*1024ULL, size_t maxLifeTime = 7*24*60*60) { + return createShared(ioc, new CacheHandlerBasic(ioc, cachePath, maxCacheDirectorySize, maxLifeTime)); } virtual void pushWrite(std::string &&data, CacheDatabase::HASH hash) override; virtual void pushRead(CacheDatabase::HASH hash) override; virtual std::vector> pullReads() override; + virtual void updateParams(size_t maxLifeTime, size_t maxCacheDirectorySize) override; }; #ifdef LUAVOX_HAVE_LIBURING diff --git a/Src/Client/ServerSession.cpp b/Src/Client/ServerSession.cpp index 8d86896..2fdff1e 100644 --- a/Src/Client/ServerSession.cpp +++ b/Src/Client/ServerSession.cpp @@ -138,6 +138,7 @@ coro> ServerSession::asyncInitGameProtocol(asi } } + co_return std::make_unique(ioc, std::move(socket)); } diff --git a/Src/Client/ServerSession.hpp b/Src/Client/ServerSession.hpp index f9247a7..d9b4a5b 100644 --- a/Src/Client/ServerSession.hpp +++ b/Src/Client/ServerSession.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace LV::Client { @@ -27,6 +28,10 @@ struct ParsedPacket { class ServerSession : public AsyncObject, public IServerSession, public ISurfaceEventListener { std::unique_ptr Socket; IRenderSession *RS = nullptr; + + // Обработчик кеша ресурсов сервера + CacheHandler::Ptr CHDB; + DestroyLock UseLock; bool IsConnected = true, IsGoingShutdown = false; @@ -59,6 +64,21 @@ public: : AsyncObject(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024) { assert(Socket.get()); + + try { + CHDB = CacheHandlerBasic::Create(ioc, "Cache"); + + // Отправка информации о загруженном кеше + // TODO: добавить оптимизацию для подключения клиента к внутреннему серверу + auto [data, count] = CHDB->getAll(); + Net::Packet packet; + packet << uint32_t(count); + packet.write((const std::byte*) data.data(), data.size()); + Socket->pushPacket(std::move(packet)); + } catch(const std::exception &exc) { + MAKE_ERROR("Ошибка инициализации обработчика кеша ресурсов сервера:\n" << exc.what()); + } + co_spawn(run()); } diff --git a/Src/Client/Vulkan/Vulkan.cpp b/Src/Client/Vulkan/Vulkan.cpp index bc57515..80f4d26 100644 --- a/Src/Client/Vulkan/Vulkan.cpp +++ b/Src/Client/Vulkan/Vulkan.cpp @@ -1179,7 +1179,11 @@ void Vulkan::checkLibrary() uint32_t count = -1; VkResult res; - vkAssert(!vkEnumeratePhysicalDevices(localInstance.getInstance(), &count, nullptr)); + VkResult errc = vkEnumeratePhysicalDevices(localInstance.getInstance(), &count, nullptr); + if(errc != VK_SUCCESS && errc != VK_INCOMPLETE) { + error << "vkEnumeratePhysicalDevices не смог обработать запрос (ошибка драйвера?)\n"; + goto onError; + } if(!count) { diff --git a/Src/Common/Packets.hpp b/Src/Common/Packets.hpp index f70aae1..8023cb5 100644 --- a/Src/Common/Packets.hpp +++ b/Src/Common/Packets.hpp @@ -53,7 +53,9 @@ struct PacketQuat { /* uint8_t+uint8_t 0 - Системное - 0 - Новая позиция камеры WorldId_c+ObjectPos+PacketQuat + 0 - + 1 - + 2 - Новая позиция камеры WorldId_c+ObjectPos+PacketQuat */ diff --git a/Src/Server/GameServer.cpp b/Src/Server/GameServer.cpp index ea11699..cb91e5a 100644 --- a/Src/Server/GameServer.cpp +++ b/Src/Server/GameServer.cpp @@ -243,9 +243,18 @@ coro<> GameServer::pushSocketGameProtocol(tcp::socket socket, const std::string lock.unlock(); co_await Net::AsyncSocket::write(socket, 0); + // Считываем ресурсы хранимые в кеше клиента + uint32_t count = co_await Net::AsyncSocket::read(socket); + if(count > 262144) + MAKE_ERROR("Не поддерживаемое количество ресурсов в кеше у клиента"); + + std::vector clientCache; + clientCache.resize(count); + co_await Net::AsyncSocket::read(socket, (std::byte*) clientCache.data(), count*32); + std::sort(clientCache.begin(), clientCache.end()); External.NewConnectedPlayers.lock_write() - ->push_back(std::make_unique(IOC, std::move(socket), username)); + ->push_back(std::make_unique(IOC, std::move(socket), username, std::move(clientCache))); } } } diff --git a/Src/Server/RemoteClient.hpp b/Src/Server/RemoteClient.hpp index 9ff45a6..a7e9239 100644 --- a/Src/Server/RemoteClient.hpp +++ b/Src/Server/RemoteClient.hpp @@ -15,6 +15,7 @@ #include namespace LV::Server { +using HASH = std::array; template= sizeof(ClientKey), int> = 0> class CSChunkedMapper { @@ -185,6 +186,7 @@ class RemoteClient { DestroyLock UseLock; Net::AsyncSocket Socket; bool IsConnected = true, IsGoingShutdown = false; + std::vector ClientCache; struct ResUsesObj { // Счётчики использования базовых ресурсов высшими объектами @@ -270,8 +272,8 @@ public: ToServer::PacketQuat CameraQuat = {0}; public: - RemoteClient(asio::io_context &ioc, tcp::socket socket, const std::string username) - : LOG("RemoteClient " + username), Socket(ioc, std::move(socket)), Username(username) + RemoteClient(asio::io_context &ioc, tcp::socket socket, const std::string username, std::vector &&client_cache) + : LOG("RemoteClient " + username), Socket(ioc, std::move(socket)), Username(username), ClientCache(std::move(client_cache)) { } diff --git a/Src/TOSAsync.hpp b/Src/TOSAsync.hpp index 2940a8d..7e92baf 100644 --- a/Src/TOSAsync.hpp +++ b/Src/TOSAsync.hpp @@ -154,7 +154,7 @@ public: assert(AUC); if(--AUC->Uses == 0 && AUC->OnNoUse) { - AUC->OnNoUse(); + asio::post(AUC->IOC, std::move(AUC->OnNoUse)); } AUC = nullptr; @@ -162,10 +162,17 @@ public: }; private: + asio::io_context &IOC; std::move_only_function OnNoUse; std::atomic_int Uses = 0; public: + AsyncUseControl(asio::io_context &ioc) + : IOC(ioc) + { + + } + template> auto wait(Token&& token = asio::default_completion_token_t()) { auto initiation = [this](auto&& token) { @@ -211,7 +218,7 @@ protected: public: IAsyncDestructible(asio::io_context &ioc) - : IOC(ioc) + : IOC(ioc), AUC(ioc) {} virtual ~IAsyncDestructible() {} diff --git a/Src/main.cpp b/Src/main.cpp index 38832a0..1b9714d 100644 --- a/Src/main.cpp +++ b/Src/main.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -20,10 +21,8 @@ int main() { // LuaVox asio::io_context ioc; - { - LV::Client::CacheHandlerBasic::Ptr handler = LV::Client::CacheHandlerBasic::Create(ioc, "cache"); - } - //LV::Client::VK::Vulkan vkInst(ioc); + LV::Client::VK::Vulkan vkInst(ioc); + ioc.run(); return 0;