From ecc77544b2016228e468d9355333f8c31e805e11 Mon Sep 17 00:00:00 2001 From: DrSocalkwe3n Date: Thu, 26 Jun 2025 15:45:05 +0600 Subject: [PATCH] ResourceCache --- CMakeLists.txt | 31 ++- Src/Client/ResourceCache.cpp | 484 +++++++++++++++++++++++++++++++++++ Src/Client/ResourceCache.hpp | 359 ++++++++------------------ Src/TOSAsync.hpp | 298 +++++++++++++++++++-- Src/TOSLib.hpp | 127 +++++++++ Src/main.cpp | 9 +- 6 files changed, 1024 insertions(+), 284 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a782013..d370f02 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,10 @@ cmake_minimum_required(VERSION 3.13) -option(BUILD_CLIENT "Build the client" TRUE) +option(BUILD_CLIENT "Build the client" ON) +option(USE_LIBURING "Build with liburing support" ON) -set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) @@ -16,15 +17,15 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pg") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -pg") # sanitizer -# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") -# set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment") -project (LuaVox VERSION 0.0 DESCRIPTION "LuaVox Description") +project(LuaVox VERSION 0.0 DESCRIPTION "LuaVox Description") add_library(luavox_common INTERFACE) -target_compile_features(luavox_common INTERFACE cxx_std_20) +target_compile_features(luavox_common INTERFACE cxx_std_23) if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU") target_compile_options(luavox_common INTERFACE -fcoroutines) @@ -32,6 +33,21 @@ elseif ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") target_compile_options(luavox_common INTERFACE -fcoroutine) endif() +if(USE_LIBURING) + find_package(PkgConfig REQUIRED) + pkg_check_modules(LIBURING liburing>=2.0 IMPORTED_TARGET) + + if(LIBURING_FOUND) + message(STATUS "liburing found, enabling io_uring support") + target_compile_definitions(luavox_common INTERFACE LUAVOX_HAVE_LIBURING) + target_link_libraries(luavox_common INTERFACE PkgConfig::LIBURING) + else() + message(FATAL_ERROR "liburing >= 2.0 not found but USE_LIBURING is ON") + endif() +else() + message(STATUS "liburing support is disabled") +endif() + include(FetchContent) # Boost @@ -106,9 +122,6 @@ add_library(assets STATIC resources.cpp assets.o) set_target_properties(assets PROPERTIES LINKER_LANGUAGE C) target_link_libraries(luavox_common INTERFACE assets) -# uring -target_link_libraries(luavox_common INTERFACE uring) - if(BUILD_CLIENT) add_executable(luavox_client) diff --git a/Src/Client/ResourceCache.cpp b/Src/Client/ResourceCache.cpp index fd87461..b8763cd 100644 --- a/Src/Client/ResourceCache.cpp +++ b/Src/Client/ResourceCache.cpp @@ -1,7 +1,491 @@ #include "ResourceCache.hpp" +#include namespace LV::Client { +CacheDatabase::CacheDatabase(const fs::path &cachePath) + : Path(cachePath) +{ + int errc = sqlite3_open_v2((Path / "db.sqlite3").c_str(), &DB, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE, nullptr); + if(errc) { + MAKE_ERROR("Не удалось открыть базу данных " << (Path / "db.sqlite3").c_str() << ": " << sqlite3_errmsg(DB)); + } + + const char* sql = R"( + CREATE TABLE IF NOT EXISTS files( + sha256 BLOB(32) NOT NULL, -- + last_used INT NOT NULL, -- unix timestamp + size INT NOT NULL, -- file size + UNIQUE (sha256)); + )"; + + errc = sqlite3_exec(DB, sql, nullptr, nullptr, nullptr); + if(errc != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить таблицу базы: " << sqlite3_errmsg(DB)); + } + + sql = R"( + INSERT OR REPLACE INTO files (sha256, last_used, size) + VALUES (?, ?, ?); + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_INSERT, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_INSERT: " << sqlite3_errmsg(DB)); + } + + sql = R"( + UPDATE files SET last_used = ? WHERE sha256 = ?; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_UPDATE_TIME, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB)); + } + + sql = R"( + DELETE FROM files WHERE sha256=?; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_REMOVE, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_REMOVE: " << sqlite3_errmsg(DB)); + } + + sql = R"( + SELECT sha256 FROM files; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_ALL_HASH, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_ALL_HASH: " << sqlite3_errmsg(DB)); + } + + sql = R"( + SELECT SUM(size) FROM files; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_SUM, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_SUM: " << sqlite3_errmsg(DB)); + } + + sql = R"( + SELECT sha256, size FROM files WHERE last_used < ?; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_OLD, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_OLD: " << sqlite3_errmsg(DB)); + } + + sql = R"( + SELECT sha256 + FROM files + ORDER BY last_used ASC, size ASC + LIMIT ( + SELECT COUNT(*) FROM ( + SELECT SUM(size) OVER (ORDER BY last_used ASC, size ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total + FROM files + ORDER BY last_used ASC, size ASC + ) sub + WHERE running_total <= ? + ); + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_TO_FREE, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_TO_FREE: " << sqlite3_errmsg(DB)); + } + + sql = R"( + SELECT COUNT(*) FROM files; + )"; + + if(sqlite3_prepare_v2(DB, sql, -1, &STMT_COUNT, nullptr) != SQLITE_OK) { + MAKE_ERROR("Не удалось подготовить запрос STMT_COUNT: " << sqlite3_errmsg(DB)); + } +} + +CacheDatabase::~CacheDatabase() { + for(sqlite3_stmt* stmt : {STMT_INSERT, STMT_UPDATE_TIME, STMT_REMOVE, STMT_ALL_HASH, STMT_SUM, STMT_OLD, STMT_TO_FREE, STMT_COUNT}) + if(stmt) + sqlite3_finalize(stmt); + + if(DB) + sqlite3_close(DB); +} + +size_t CacheDatabase::getCacheSize() { + size_t Size; + if(sqlite3_step(STMT_SUM) != SQLITE_ROW) { + sqlite3_reset(STMT_SUM); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_SUM: " << sqlite3_errmsg(DB)); + } + + Size = sqlite3_column_int64(STMT_SUM, 0); + sqlite3_reset(STMT_SUM); + return Size; +} + +std::pair CacheDatabase::getAllHash() { + if(sqlite3_step(STMT_COUNT) != SQLITE_ROW) { + sqlite3_reset(STMT_COUNT); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_COUNT: " << sqlite3_errmsg(DB)); + } + + size_t count = sqlite3_column_int(STMT_COUNT, 0); + sqlite3_reset(STMT_COUNT); + + std::string out; + out.reserve(32*count); + + int errc; + size_t readed = 0; + while(true) { + errc = sqlite3_step(STMT_ALL_HASH); + if(errc == SQLITE_DONE) + break; + else if(errc != SQLITE_ROW) { + sqlite3_reset(STMT_ALL_HASH); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_ALL_HASH: " << sqlite3_errmsg(DB)); + } + + const char *hash = (const char*) sqlite3_column_blob(STMT_ALL_HASH, 0); + readed++; + out += std::string_view(hash, hash+32); + } + + sqlite3_reset(STMT_ALL_HASH); + return {out, readed}; +} + +void CacheDatabase::updateTimeFor(HASH hash) { + sqlite3_bind_blob(STMT_UPDATE_TIME, 0, (const void*) hash.data(), 32, SQLITE_STATIC); + sqlite3_bind_int(STMT_UPDATE_TIME, 1, time(nullptr)); + if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_OK) { + sqlite3_reset(STMT_UPDATE_TIME); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB)); + } + + sqlite3_reset(STMT_UPDATE_TIME); +} + +void CacheDatabase::insert(HASH hash, size_t size) { + assert(size < (size_t(1) << 31)-1 && size > 0); + + sqlite3_bind_blob(STMT_INSERT, 0, (const void*) hash.data(), 32, SQLITE_STATIC); + sqlite3_bind_int(STMT_INSERT, 1, (int) size); + sqlite3_bind_int(STMT_INSERT, 2, time(nullptr)); + if(sqlite3_step(STMT_INSERT) != SQLITE_OK) { + sqlite3_reset(STMT_INSERT); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_INSERT: " << sqlite3_errmsg(DB)); + } + + sqlite3_reset(STMT_INSERT); +} + +std::vector CacheDatabase::findExcessHashes(size_t bytesToFree, int timeBefore = time(nullptr)-604800) { + std::vector out; + size_t removed = 0; + + sqlite3_bind_int(STMT_OLD, 0, timeBefore); + while(true) { + int errc = sqlite3_step(STMT_OLD); + if(errc == SQLITE_DONE) + break; + else if(errc != SQLITE_ROW) { + sqlite3_reset(STMT_OLD); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_OLD: " << sqlite3_errmsg(DB)); + } + + const uint8_t *hash = (const uint8_t*) sqlite3_column_blob(STMT_OLD, 0); + removed += sqlite3_column_int(STMT_OLD, 1); + HASH obj; + for(int iter = 0; iter < 32; iter++) + obj[iter] = hash[iter]; + + out.push_back(obj); + } + + sqlite3_reset(STMT_OLD); + + if(removed > bytesToFree) + return out; + + sqlite3_bind_int(STMT_TO_FREE, 0, (int) bytesToFree); + + while(true) { + int errc = sqlite3_step(STMT_TO_FREE); + if(errc == SQLITE_DONE) + break; + else if(errc != SQLITE_ROW) { + sqlite3_reset(STMT_TO_FREE); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_TO_FREE: " << sqlite3_errmsg(DB)); + } + + const uint8_t *hash = (const uint8_t*) sqlite3_column_blob(STMT_TO_FREE, 0); + HASH obj; + for(int iter = 0; iter < 32; iter++) + obj[iter] = hash[iter]; + + out.push_back(obj); + } + + sqlite3_reset(STMT_TO_FREE); + return out; +} + +void CacheDatabase::remove(HASH hash) { + sqlite3_bind_blob(STMT_REMOVE, 0, (const void*) hash.data(), 32, SQLITE_STATIC); + if(sqlite3_step(STMT_REMOVE) != SQLITE_OK) { + sqlite3_reset(STMT_REMOVE); + MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_REMOVE: " << sqlite3_errmsg(DB)); + } + + sqlite3_reset(STMT_REMOVE); +} + +std::string CacheDatabase::hashToString(HASH hash) { + std::string text; + text.reserve(64); + + for(int iter = 0; iter < 32; iter++) { + int val = hash[31-iter] & 0xf; + if(val > 9) + text += 'a'+val-10; + else + text += '0'+val; + + val = (hash[31-iter] >> 4) & 0xf; + if(val > 9) + text += 'a'+val-10; + else + text += '0'+val; + } + + return text; +} + +int CacheDatabase::hexCharToInt(char c) { + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'a' && c <= 'f') return c - 'a' + 10; + throw std::invalid_argument("Invalid hexadecimal character"); +} + +CacheDatabase::HASH CacheDatabase::stringToHash(const std::string_view view) { + if (view.size() != 64) + throw std::invalid_argument("Hex string must be exactly 64 characters long"); + + HASH hash; + + for (size_t i = 0; i < 32; ++i) { + size_t offset = 62 - i * 2; + int high = hexCharToInt(view[offset]); + int low = hexCharToInt(view[offset + 1]); + hash[i] = (high << 4) | low; + } + + return hash; +} + +CacheHandler::CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath) + : IAsyncDestructible(ioc), Path(cachePath), DB(Path) +{ +} + +CacheHandler::~CacheHandler() = default; + +std::pair CacheHandler::getAll() { + return DB.getAllHash(); +} + +coro<> CacheHandlerBasic::asyncDestructor() { + NeedShutdown = true; + co_await CacheHandler::asyncDestructor(); +} + +void CacheHandlerBasic::readThread(AsyncUseControl::Lock lock) { + LOG.info() << "readThread started"; + + while(!NeedShutdown) { + if(ReadQueue.get_read().empty()) + goto wait; + else { + auto lock = ReadQueue.lock(); + if(lock->empty()) + goto wait; + + CacheDatabase::HASH hash = lock->front(); + lock->pop(); + lock.unlock(); + + std::string name = CacheDatabase::hashToString(hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + + std::shared_ptr data = std::make_shared(); + + try { + std::ifstream fd(path, std::ios::binary | std::ios::ate); + if (!fd.is_open()) + MAKE_ERROR("!is_open(): " << fd.exceptions()); + + if (fd.fail()) + MAKE_ERROR("fail(): " << fd.exceptions()); + + std::ifstream::pos_type size = fd.tellg(); + fd.seekg(0, std::ios::beg); + data->resize(size); + fd.read(data->data(), size); + + if (!fd.good()) + MAKE_ERROR("!good(): " << fd.exceptions()); + } catch(const std::exception &exc) { + LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what(); + } + + ReadedQueue.lock()->emplace_back(hash, std::move(data)); + continue; + } + + wait: + TOS::Time::sleep3(20); + } + + LOG.info() << "readThread ended"; + lock.unlock(); +} + +void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) { + LOG.info() << "readThread started"; + + while(!NeedShutdown) { + if(!ReadQueue.get_read().empty()) { + auto lock = ReadQueue.lock(); + if(!lock->empty()) { + CacheDatabase::HASH hash = lock->front(); + lock->pop(); + lock.unlock(); + + std::string name = CacheDatabase::hashToString(hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + + std::shared_ptr data = std::make_shared(); + + try { + std::ifstream fd(path, std::ios::binary | std::ios::ate); + if (!fd.is_open()) + MAKE_ERROR("!is_open(): " << fd.exceptions()); + + if (fd.fail()) + MAKE_ERROR("fail(): " << fd.exceptions()); + + std::ifstream::pos_type size = fd.tellg(); + fd.seekg(0, std::ios::beg); + data->resize(size); + fd.read(data->data(), size); + + if (!fd.good()) + MAKE_ERROR("!good(): " << fd.exceptions()); + + DB.updateTimeFor(hash); + } catch(const std::exception &exc) { + LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what(); + } + + ReadedQueue.lock()->emplace_back(hash, std::move(data)); + continue; + } + } + + if(!WriteQueue.get_read().empty()) { + auto lock = WriteQueue.lock(); + if(!lock->empty()) { + DataTask task = lock->front(); + lock->pop(); + lock.unlock(); + + std::string name = CacheDatabase::hashToString(task.Hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + + + try { + // Проверка на наличие свободного места (виртуально) + if(ssize_t free = ssize_t(MaxCacheDirectorySize)-DB.getCacheSize(); free < task.Data->size()) { + // Недостаточно места, сколько необходимо освободить с запасом + ssize_t need = task.Data->size()-free + 64*1024*1024; + std::vector hashes = DB.findExcessHashes(need, time(nullptr)-MaxLifeTime); + + LOG.warn() << "Удаление устаревшего кеша в количестве " << hashes.size() << "..."; + + for(CacheDatabase::HASH hash : hashes) { + std::string name = CacheDatabase::hashToString(task.Hash); + fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4); + DB.remove(hash); + fs::remove(path); + + fs::path up1 = path.parent_path(); + LOG.info() << "В директории " << up1.c_str() << " не осталось файлов, удаляем..."; + size_t count = std::distance(fs::directory_iterator(up1), fs::directory_iterator()); + if(count == 0) + fs::remove(up1); + } + } + + fs::create_directories(path.parent_path()); + + std::ofstream fd(path, std::ios::binary | std::ios::ate); + fd.write(task.Data->data(), task.Data->size()); + + DB.insert(task.Hash, task.Data->size()); + } catch(const std::exception &exc) { + LOG.error() << "Не удалось сохранить ресурс " << path.c_str() << ": " << exc.what(); + } + } + } + + TOS::Time::sleep3(20); + } + + LOG.info() << "readWriteThread ended"; + lock.unlock(); +} + +CacheHandlerBasic::CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path &cachePath) + : CacheHandler(ioc, cachePath), + ReadThread(&CacheHandlerBasic::readThread, this, AUC.use()), + ReadWriteThread(&CacheHandlerBasic::readWriteThread, this, AUC.use()) +{ + LOG.info() << "Инициализировано хранилище кеша: " << cachePath.c_str(); +} + +CacheHandlerBasic::~CacheHandlerBasic() { + ReadThread.join(); + ReadWriteThread.join(); + LOG.info() << "ДеИнициализировано хранилище кеша: " << Path.c_str(); +} + +void CacheHandlerBasic::pushWrite(std::string &&data, CacheDatabase::HASH hash) { + std::shared_ptr dat = std::make_shared(std::move(data)); + WriteCache.lock()->push_back({hash, dat}); + WriteQueue.lock()->push({hash, dat}); +} + +void CacheHandlerBasic::pushRead(CacheDatabase::HASH hash) { + ReadQueue.lock()->push(hash); +} + +std::vector> CacheHandlerBasic::pullReads() { + std::vector data; + + { + auto lock = ReadedQueue.lock(); + data = std::move(*lock); + } + + std::vector> out; + out.reserve(data.size()); + + for(auto &value : data) { + out.emplace_back(value.Hash, std::move(*value.Data)); + } + + return out; +} + } \ No newline at end of file diff --git a/Src/Client/ResourceCache.hpp b/Src/Client/ResourceCache.hpp index 7a40497..8c53d9d 100644 --- a/Src/Client/ResourceCache.hpp +++ b/Src/Client/ResourceCache.hpp @@ -1,18 +1,22 @@ #include #include +#include +#include #include #include #include +#include #include #include namespace LV::Client { +using namespace TOS; namespace fs = std::filesystem; // NOT ThreadSafe -class ResourceCacheHandler { +class CacheDatabase { const fs::path Path; sqlite3 *DB = nullptr; @@ -21,291 +25,144 @@ class ResourceCacheHandler { *STMT_REMOVE = nullptr, *STMT_ALL_HASH = nullptr, *STMT_SUM = nullptr, + *STMT_OLD = nullptr, *STMT_TO_FREE = nullptr, *STMT_COUNT = nullptr; - size_t Size = -1; - public: - ResourceCacheHandler(const std::string_view cache_path) - : Path(cache_path) - { - int errc = sqlite3_open_v2((Path / "db.sqlite3").c_str(), &DB, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE, nullptr); - if(errc) { - MAKE_ERROR("Не удалось открыть базу данных " << (Path / "db.sqlite3").c_str() << ": " << sqlite3_errmsg(DB)); - } + CacheDatabase(const fs::path &cachePath); + ~CacheDatabase(); - const char* sql = R"( - CREATE TABLE IF NOT EXISTS files( - sha256 BLOB(32) NOT NULL, -- - last_used INT NOT NULL, -- unix timestamp - size INT NOT NULL, -- file size - UNIQUE (sha256)); - )"; - - errc = sqlite3_exec(DB, sql, nullptr, nullptr, nullptr); - if(errc != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить таблицу базы: " << sqlite3_errmsg(DB)); - } - - sql = R"( - INSERT OR REPLACE INTO files (sha256, last_used, size) - VALUES (?, ?, ?); - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_INSERT, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_INSERT: " << sqlite3_errmsg(DB)); - } - - sql = R"( - UPDATE files SET last_used = ? WHERE sha256 = ?; - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_UPDATE_TIME, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB)); - } - - sql = R"( - DELETE FROM files WHERE sha256=?; - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_REMOVE, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_REMOVE: " << sqlite3_errmsg(DB)); - } - - sql = R"( - SELECT sha256 FROM files; - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_ALL_HASH, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_ALL_HASH: " << sqlite3_errmsg(DB)); - } - - sql = R"( - SELECT SUM(size) FROM files; - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_SUM, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_SUM: " << sqlite3_errmsg(DB)); - } - - sql = R"( - SELECT sha256 - FROM files - WHERE last_used < ? - ORDER BY last_used ASC, size ASC - LIMIT ( - SELECT COUNT(*) FROM ( - SELECT SUM(size) OVER (ORDER BY last_used ASC, size ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total - FROM files - WHERE last_used < ? - ORDER BY last_used ASC, size ASC - ) sub - WHERE running_total <= ? - ); - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_TO_FREE, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_TO_FREE: " << sqlite3_errmsg(DB)); - } - - sql = R"( - SELECT COUNT(*) FROM files; - )"; - - if(sqlite3_prepare_v2(DB, sql, -1, &STMT_COUNT, nullptr) != SQLITE_OK) { - MAKE_ERROR("Не удалось подготовить запрос STMT_COUNT: " << sqlite3_errmsg(DB)); - } - } - - ~ResourceCacheHandler() { - for(sqlite3_stmt* stmt : {STMT_INSERT, STMT_UPDATE_TIME, STMT_REMOVE, STMT_ALL_HASH, STMT_SUM, STMT_TO_FREE, STMT_COUNT}) - if(stmt) - sqlite3_finalize(stmt); - - if(DB) - sqlite3_close(DB); - } - - ResourceCacheHandler(const ResourceCacheHandler&) = delete; - ResourceCacheHandler(ResourceCacheHandler&&) = delete; - ResourceCacheHandler& operator=(const ResourceCacheHandler&) = delete; - ResourceCacheHandler& operator=(ResourceCacheHandler&&) = delete; + CacheDatabase(const CacheDatabase&) = delete; + CacheDatabase(CacheDatabase&&) = delete; + CacheDatabase& operator=(const CacheDatabase&) = delete; + CacheDatabase& operator=(CacheDatabase&&) = delete; /* Выдаёт размер занимаемый всем хранимым кешем */ - size_t getCacheSize() { - if(Size == -1) { - if(sqlite3_step(STMT_SUM) != SQLITE_ROW) { - sqlite3_reset(STMT_SUM); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_SUM: " << sqlite3_errmsg(DB)); - } - - Size = sqlite3_column_int(STMT_SUM, 0); - sqlite3_reset(STMT_SUM); - } - - return Size; - } + size_t getCacheSize(); // TODO: добавить ограничения на количество файлов /* Создаёт линейный массив в котором подряд указаны все хэш суммы в бинарном виде и возвращает их количество */ - std::pair getAllHash() { - if(sqlite3_step(STMT_COUNT) != SQLITE_ROW) { - sqlite3_reset(STMT_COUNT); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_COUNT: " << sqlite3_errmsg(DB)); - } - - size_t count = sqlite3_column_int(STMT_COUNT, 0); - sqlite3_reset(STMT_COUNT); - - std::string out; - out.reserve(32*count); - - int errc; - size_t readed = 0; - while(true) { - errc = sqlite3_step(STMT_ALL_HASH); - if(errc == SQLITE_DONE) - break; - else if(errc != SQLITE_ROW) { - sqlite3_reset(STMT_ALL_HASH); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_ALL_HASH: " << sqlite3_errmsg(DB)); - } - - const char *hash = (const char*) sqlite3_column_blob(STMT_ALL_HASH, 0); - readed++; - out += std::string_view(hash, hash+32); - } - - sqlite3_reset(STMT_ALL_HASH); - return {out, readed}; - } + std::pair getAllHash(); using HASH = std::array; /* Обновляет время использования кеша */ - void updateTimeFor(HASH hash) { - sqlite3_bind_blob(STMT_UPDATE_TIME, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - sqlite3_bind_int(STMT_UPDATE_TIME, 1, time(nullptr)); - if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_OK) { - sqlite3_reset(STMT_UPDATE_TIME); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB)); - } - - sqlite3_reset(STMT_UPDATE_TIME); - } + void updateTimeFor(HASH hash); /* Добавляет запись */ - void insert(HASH hash, size_t size) { - assert(size < (size_t(1) << 31)-1 && size > 0); - - sqlite3_bind_blob(STMT_INSERT, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - sqlite3_bind_int(STMT_INSERT, 1, (int) size); - sqlite3_bind_int(STMT_INSERT, 2, time(nullptr)); - if(sqlite3_step(STMT_INSERT) != SQLITE_OK) { - sqlite3_reset(STMT_INSERT); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_INSERT: " << sqlite3_errmsg(DB)); - } - - sqlite3_reset(STMT_INSERT); - } + void insert(HASH hash, size_t size); /* - Выдаёт хэши на удаление по размеру в сумме больше bytesToFree. В приоритете старые, потом мелкие + Выдаёт хэши на удаление по размеру в сумме больше bytesToFree. + Сначала удаляется старьё, потом по приоритету дата использования + размер */ - std::vector findExcessHashes(size_t bytesToFree, int timeBefore = time(nullptr)-604800) { - sqlite3_bind_int(STMT_TO_FREE, 0, timeBefore); - sqlite3_bind_int(STMT_TO_FREE, 1, timeBefore); - sqlite3_bind_int(STMT_TO_FREE, 2, (int) bytesToFree); - - std::vector out; - while(true) { - int errc = sqlite3_step(STMT_TO_FREE); - if(errc == SQLITE_DONE) - break; - else if(errc != SQLITE_ROW) { - sqlite3_reset(STMT_TO_FREE); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_TO_FREE: " << sqlite3_errmsg(DB)); - } - - const uint8_t *hash = (const uint8_t*) sqlite3_column_blob(STMT_TO_FREE, 0); - HASH obj; - for(int iter = 0; iter < 32; iter++) - obj[iter] = hash[iter]; - - out.push_back(obj); - } - - sqlite3_reset(STMT_TO_FREE); - return out; - } + std::vector findExcessHashes(size_t bytesToFree, int timeBefore); /* Удаление записи */ - void remove(HASH hash) { - sqlite3_bind_blob(STMT_REMOVE, 0, (const void*) hash.data(), 32, SQLITE_STATIC); - if(sqlite3_step(STMT_REMOVE) != SQLITE_OK) { - sqlite3_reset(STMT_REMOVE); - MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_REMOVE: " << sqlite3_errmsg(DB)); - } + void remove(HASH hash); - sqlite3_reset(STMT_REMOVE); - } - - static std::string hashToString(HASH hash) { - std::string text; - text.reserve(64); - - for(int iter = 0; iter < 32; iter++) { - int val = hash[31-iter] & 0xf; - if(val > 9) - text += 'a'+val-10; - else - text += '0'+val; - - val = (hash[31-iter] >> 4) & 0xf; - if(val > 9) - text += 'a'+val-10; - else - text += '0'+val; - } - - return text; - } - - static int hexCharToInt(char c) { - if (c >= '0' && c <= '9') return c - '0'; - if (c >= 'a' && c <= 'f') return c - 'a' + 10; - throw std::invalid_argument("Invalid hexadecimal character"); - } - - static HASH stringToHash(const std::string_view view) { - if (view.size() != 64) - throw std::invalid_argument("Hex string must be exactly 64 characters long"); - - HASH hash; - - for (size_t i = 0; i < 32; ++i) { - size_t offset = 62 - i * 2; - int high = hexCharToInt(view[offset]); - int low = hexCharToInt(view[offset + 1]); - hash[i] = (high << 4) | low; - } - - return hash; - } + static std::string hashToString(HASH hash); + static int hexCharToInt(char c); + static HASH stringToHash(const std::string_view view); }; +/* + Читает и пишет ресурсы на диск + В приоритете чтение + Кодировки только на стороне сервера, на клиенте уже готовые данные + + NOT ThreadSafe +*/ +class CacheHandler : public IAsyncDestructible { +protected: + const fs::path Path; + CacheDatabase DB; + +protected: + CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath); + +public: + virtual ~CacheHandler(); + + // Добавить задачу на запись + virtual void pushWrite(std::string &&data, CacheDatabase::HASH hash) = 0; + + // Добавить задачу на чтение + virtual void pushRead(CacheDatabase::HASH hash) = 0; + + // Получить считанные данные + virtual std::vector> pullReads() = 0; + + // Получить список доступных ресурсов + std::pair getAll(); +}; + +class CacheHandlerBasic : public CacheHandler { + Logger LOG = "CacheHandlerBasic"; + + std::thread ReadThread, ReadWriteThread; + + struct DataTask { + CacheDatabase::HASH Hash; + std::shared_ptr Data; + }; + + // Очередь задач на чтение + SpinlockObject> ReadQueue; + // Кэш данных, которые ещё не записались + SpinlockObject>>> WriteCache; + // Очередь записи данных на диск + SpinlockObject> WriteQueue; + // Список полностью считанных файлов + SpinlockObject> ReadedQueue; + bool NeedShutdown = false; + size_t MaxCacheDirectorySize = 8*1024*1024*1024ULL; + size_t MaxLifeTime = 7*24*60*60; + +public: + using Ptr = std::shared_ptr; + +private: + virtual coro<> asyncDestructor() override; + + void readThread(AsyncUseControl::Lock lock); + + void readWriteThread(AsyncUseControl::Lock lock); + +protected: + CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path& cachePath); + +public: + virtual ~CacheHandlerBasic(); + + static std::shared_ptr Create(asio::io_context &ioc, const fs::path& cachePath) { + return createShared(ioc, new CacheHandlerBasic(ioc, cachePath)); + } + + virtual void pushWrite(std::string &&data, CacheDatabase::HASH hash) override; + virtual void pushRead(CacheDatabase::HASH hash) override; + virtual std::vector> pullReads() override; +}; + +#ifdef LUAVOX_HAVE_LIBURING + +class CacheHandlerUring : public CacheHandler { + +}; + +#endif } \ No newline at end of file diff --git a/Src/TOSAsync.hpp b/Src/TOSAsync.hpp index 1530374..2940a8d 100644 --- a/Src/TOSAsync.hpp +++ b/Src/TOSAsync.hpp @@ -1,20 +1,24 @@ #pragma once -#include "boost/asio/awaitable.hpp" -#include "boost/asio/co_spawn.hpp" -#include "boost/asio/deadline_timer.hpp" -#include "boost/asio/detached.hpp" -#include "boost/asio/io_context.hpp" -#include "boost/asio/use_awaitable.hpp" +#include "TOSLib.hpp" +#include +#include "boost/system/detail/error_code.hpp" +#include "boost/system/system_error.hpp" +#include #include +#include #include #include +#include + + namespace TOS { using namespace boost::asio::experimental::awaitable_operators; template using coro = boost::asio::awaitable; +namespace asio = boost::asio; class AsyncSemaphore { @@ -52,26 +56,183 @@ public: } }; -class IAsyncDestructible : public std::enable_shared_from_this { -protected: - boost::asio::any_io_executor IOC; - boost::asio::deadline_timer DestructLine; - virtual coro<> asyncDestructor() { DestructLine.cancel(); co_return; } +/* + Многие могут уведомлять одного + Ждёт события. После доставки уведомления ждёт повторно +*/ +class MultipleToOne_AsyncSymaphore { + asio::deadline_timer Timer; public: - IAsyncDestructible(boost::asio::any_io_executor ioc) - : IOC(ioc), DestructLine(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) + MultipleToOne_AsyncSymaphore(asio::io_context &ioc) + : Timer(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) + {} + + void notify() { + Timer.cancel(); + } + + void wait() { + try { Timer.wait(); } catch(...) {} + Timer.expires_at(boost::posix_time::ptime(boost::posix_time::pos_infin)); + } + + coro<> async_wait() { + try { co_await Timer.async_wait(); } catch(...) {} + } +}; + +class WaitableCoro { + asio::io_context &IOC; + std::shared_ptr Symaphore; + std::exception_ptr LastException; + +public: + WaitableCoro(asio::io_context &ioc) + : IOC(ioc) + {} + + void co_spawn(coro<> token) { + Symaphore = std::make_shared(IOC); + asio::co_spawn(IOC, [token = std::move(token), symaphore = Symaphore]() -> coro<> { + try { co_await std::move(const_cast&>(token)); } catch(...) {} + symaphore->notify(); + }, asio::detached); + } + + void wait() { + Symaphore->wait(); + } + + coro<> async_wait() { + return Symaphore->async_wait(); + } +}; + +class AsyncUseControl { +public: + class Lock { + AsyncUseControl *AUC; + + public: + Lock(AsyncUseControl *auc) + : AUC(auc) + {} + + Lock() + : AUC(nullptr) + {} + + ~Lock() { + if(AUC) + unlock(); + } + + Lock(const Lock&) = delete; + Lock(Lock&& obj) + : AUC(obj.AUC) + { + obj.AUC = nullptr; + } + + Lock& operator=(const Lock&) = delete; + Lock& operator=(Lock&& obj) { + if(&obj == this) + return *this; + + if(AUC) + unlock(); + + AUC = obj.AUC; + obj.AUC = nullptr; + + return *this; + } + + void unlock() { + assert(AUC); + + if(--AUC->Uses == 0 && AUC->OnNoUse) { + AUC->OnNoUse(); + } + + AUC = nullptr; + } + }; + +private: + std::move_only_function OnNoUse; + std::atomic_int Uses = 0; + +public: + template> + auto wait(Token&& token = asio::default_completion_token_t()) { + auto initiation = [this](auto&& token) { + int value; + do { + value = Uses.exchange(-1); + } while(value == -1); + + OnNoUse = std::move(token); + + if(value == 0) + OnNoUse(); + + Uses.exchange(value); + }; + + return asio::async_initiate(initiation, token); + } + + Lock use() { + int value; + do { + value = Uses.exchange(-1); + } while(value == -1); + + if(OnNoUse) + throw boost::system::system_error(asio::error::operation_aborted, "OnNoUse"); + + Uses.exchange(++value); + return Lock(this); + } +}; + +/* + Используется, чтобы вместо уничтожения объекта в умной ссылке, вызвать корутину с co_await asyncDestructor() +*/ +class IAsyncDestructible : public std::enable_shared_from_this { +protected: + asio::io_context &IOC; + AsyncUseControl AUC; + + virtual coro<> asyncDestructor() { co_await AUC.wait(); } + +public: + IAsyncDestructible(asio::io_context &ioc) + : IOC(ioc) {} virtual ~IAsyncDestructible() {} - coro> cancelable(coro<> &&c) { return std::move(c) || DestructLine.async_wait(boost::asio::use_awaitable); } +protected: + template> + static std::shared_ptr createShared(asio::io_context &ioc, T *ptr) + { + return std::shared_ptr(ptr, [&ioc = ioc](T *ptr) { + boost::asio::co_spawn(ioc, [&ioc = ioc](IAsyncDestructible *ptr) -> coro<> { + try { co_await ptr->asyncDestructor(); } catch(...) { } + delete ptr; + co_return; + } (ptr), boost::asio::detached); + }); + } template> - static std::shared_ptr createShared(boost::asio::any_io_executor ioc, T *ptr) + static coro> createShared(T *ptr) { - return std::shared_ptr(ptr, [ioc = std::move(ioc)](IAsyncDestructible *ptr) { + co_return std::shared_ptr(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { try { co_await ptr->asyncDestructor(); } catch(...) { } delete ptr; @@ -80,16 +241,113 @@ public: }); } - template> - static std::shared_ptr makeShared(boost::asio::any_io_executor ioc, Args&& ... args) + template> + static std::unique_ptr> createUnique(asio::io_context &ioc, T *ptr) { - std::shared_ptr(new T(ioc, std::forward(args)..., [ioc = std::move(ioc)](IAsyncDestructible *ptr) { + return std::unique_ptr>(ptr, [&ioc = ioc](T *ptr) { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { try { co_await ptr->asyncDestructor(); } catch(...) { } delete ptr; co_return; } (ptr), boost::asio::detached); - })); + }); + } + + template> + static coro>> createUnique(T *ptr) + { + co_return std::unique_ptr>(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) { + boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { + try { co_await ptr->asyncDestructor(); } catch(...) { } + delete ptr; + co_return; + } (ptr), boost::asio::detached); + }); + } +}; + +template +class AsyncMutexObject { +public: + class Lock { + public: + Lock(AsyncMutexObject* obj) + : Obj(obj) + {} + + Lock(const Lock& other) = delete; + Lock(Lock&& other) + : Obj(other.Obj) + { + other.Obj = nullptr; + } + + ~Lock() { + if(Obj) + unlock(); + } + + Lock& operator=(const Lock& other) = delete; + Lock& operator=(Lock& other) { + if(&other == this) + return *this; + + if(Obj) + unlock(); + + Obj = other.Obj; + other.Obj = nullptr; + } + + T& get() const { assert(Obj); return Obj->value; } + T* operator->() const { assert(Obj); return &Obj->value; } + T& operator*() const { assert(Obj); return Obj->value; } + + void unlock() { + assert(Obj); + + typename SpinlockObject::Lock ctx = Obj->Ctx.lock(); + if(ctx->Chain.empty()) { + ctx->InExecution = false; + } else { + auto token = std::move(ctx->Chain.front()); + ctx->Chain.pop_front(); + ctx.unlock(); + token(Lock(Obj)); + } + + Obj = nullptr; + } + + private: + AsyncMutexObject *Obj; + }; + +private: + struct Context { + std::list> Chain; + bool InExecution = false; + }; + + SpinlockObject Ctx; + T value; + +public: + template> + auto lock(Token&& token = Token()) { + auto initiation = [this](auto&& token) mutable { + typename SpinlockObject::Lock ctx = Ctx.lock(); + + if(ctx->InExecution) { + ctx->Chain.emplace_back(std::move(token)); + } else { + ctx->InExecution = true; + ctx.unlock(); + token(Lock(this)); + } + }; + + return boost::asio::async_initiate(std::move(initiation), token); } }; diff --git a/Src/TOSLib.hpp b/Src/TOSLib.hpp index c30b6ee..0bb3dff 100644 --- a/Src/TOSLib.hpp +++ b/Src/TOSLib.hpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,6 +17,131 @@ namespace TOS { +template +class MutexObject { +public: + template + explicit MutexObject(Args&&... args) + : value(std::forward(args)...) {} + + class SharedLock { + public: + SharedLock(MutexObject* obj, std::shared_lock lock) + : obj(obj), lock(std::move(lock)) {} + + const T& get() const { return obj->value; } + const T& operator*() const { return obj->value; } + const T* operator->() const { return &obj->value; } + + void unlock() { lock.unlock(); } + + operator bool() const { + return lock.owns_lock(); + } + + private: + MutexObject* obj; + std::shared_lock lock; + }; + + class ExclusiveLock { + public: + ExclusiveLock(MutexObject* obj, std::unique_lock lock) + : obj(obj), lock(std::move(lock)) {} + + T& get() const { return obj->value; } + T& operator*() const { return obj->value; } + T* operator->() const { return &obj->value; } + + void unlock() { lock.unlock(); } + + operator bool() const { + return lock.owns_lock(); + } + + private: + MutexObject* obj; + std::unique_lock lock; + }; + + SharedLock shared_lock() { + return SharedLock(this, std::shared_lock(mutex)); + } + + SharedLock shared_lock(const std::try_to_lock_t& tag) { + return SharedLock(this, std::shared_lock(mutex, tag)); + } + + SharedLock shared_lock(const std::adopt_lock_t& tag) { + return SharedLock(this, std::shared_lock(mutex, tag)); + } + + SharedLock shared_lock(const std::defer_lock_t& tag) { + return SharedLock(this, std::shared_lock(mutex, tag)); + } + + ExclusiveLock exclusive_lock() { + return ExclusiveLock(this, std::unique_lock(mutex)); + } + + ExclusiveLock exclusive_lock(const std::try_to_lock_t& tag) { + return ExclusiveLock(this, std::unique_lock(mutex, tag)); + } + + ExclusiveLock exclusive_lock(const std::adopt_lock_t& tag) { + return ExclusiveLock(this, std::unique_lock(mutex, tag)); + } + + ExclusiveLock exclusive_lock(const std::defer_lock_t& tag) { + return ExclusiveLock(this, std::unique_lock(mutex, tag)); + } + +private: + T value; + mutable std::shared_mutex mutex; +}; + +template +class SpinlockObject { +public: + template + explicit SpinlockObject(Args&&... args) + : value(std::forward(args)...) {} + + class Lock { + public: + Lock(SpinlockObject* obj, std::atomic_flag& lock) + : obj(obj), lock(lock) { + while (lock.test_and_set(std::memory_order_acquire)); + } + + ~Lock() { + if(obj) + lock.clear(std::memory_order_release); + } + + T& get() const { assert(obj); return obj->value; } + T* operator->() const { assert(obj); return &obj->value; } + T& operator*() const { assert(obj); return obj->value; } + + void unlock() { obj = nullptr; lock.clear(std::memory_order_release);} + + private: + SpinlockObject* obj; + std::atomic_flag& lock; + }; + + Lock lock() { + return Lock(this, mutex); + } + + const T& get_read() { return value; } + +private: + T value; + std::atomic_flag mutex = ATOMIC_FLAG_INIT; +}; + #if __BYTE_ORDER == __LITTLE_ENDIAN template static inline T swapEndian(const T &u) { return u; } diff --git a/Src/main.cpp b/Src/main.cpp index 7db35bd..38832a0 100644 --- a/Src/main.cpp +++ b/Src/main.cpp @@ -20,7 +20,10 @@ int main() { // LuaVox asio::io_context ioc; - LV::Client::VK::Vulkan vkInst(ioc); + { + LV::Client::CacheHandlerBasic::Ptr handler = LV::Client::CacheHandlerBasic::Create(ioc, "cache"); + } + //LV::Client::VK::Vulkan vkInst(ioc); ioc.run(); return 0; @@ -34,9 +37,7 @@ int main() { TOS::Logger::addLogFile(".*", TOS::EnumLogType::All, "log.raw"); std::cout << "Hello world!" << std::endl; - //return LV::main(); - - LV::Client::ResourceCacheHandler handler("cache"); + return LV::main(); return 0; }