ClientCache

This commit is contained in:
2025-06-27 12:20:46 +06:00
parent ecc77544b2
commit 4743583831
10 changed files with 229 additions and 86 deletions

View File

@@ -1,4 +1,5 @@
#include "ResourceCache.hpp"
#include "sqlite3.h"
#include <fstream>
@@ -156,9 +157,9 @@ std::pair<std::string, size_t> CacheDatabase::getAllHash() {
}
void CacheDatabase::updateTimeFor(HASH hash) {
sqlite3_bind_blob(STMT_UPDATE_TIME, 0, (const void*) hash.data(), 32, SQLITE_STATIC);
sqlite3_bind_int(STMT_UPDATE_TIME, 1, time(nullptr));
if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_OK) {
sqlite3_bind_blob(STMT_UPDATE_TIME, 1, (const void*) hash.data(), 32, SQLITE_STATIC);
sqlite3_bind_int(STMT_UPDATE_TIME, 2, time(nullptr));
if(sqlite3_step(STMT_UPDATE_TIME) != SQLITE_DONE) {
sqlite3_reset(STMT_UPDATE_TIME);
MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_UPDATE_TIME: " << sqlite3_errmsg(DB));
}
@@ -169,10 +170,10 @@ void CacheDatabase::updateTimeFor(HASH hash) {
void CacheDatabase::insert(HASH hash, size_t size) {
assert(size < (size_t(1) << 31)-1 && size > 0);
sqlite3_bind_blob(STMT_INSERT, 0, (const void*) hash.data(), 32, SQLITE_STATIC);
sqlite3_bind_int(STMT_INSERT, 1, (int) size);
sqlite3_bind_blob(STMT_INSERT, 1, (const void*) hash.data(), 32, SQLITE_STATIC);
sqlite3_bind_int(STMT_INSERT, 2, time(nullptr));
if(sqlite3_step(STMT_INSERT) != SQLITE_OK) {
sqlite3_bind_int(STMT_INSERT, 3, (int) size);
if(sqlite3_step(STMT_INSERT) != SQLITE_DONE) {
sqlite3_reset(STMT_INSERT);
MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_INSERT: " << sqlite3_errmsg(DB));
}
@@ -184,7 +185,7 @@ std::vector<CacheDatabase::HASH> CacheDatabase::findExcessHashes(size_t bytesToF
std::vector<HASH> out;
size_t removed = 0;
sqlite3_bind_int(STMT_OLD, 0, timeBefore);
sqlite3_bind_int(STMT_OLD, 1, timeBefore);
while(true) {
int errc = sqlite3_step(STMT_OLD);
if(errc == SQLITE_DONE)
@@ -208,7 +209,7 @@ std::vector<CacheDatabase::HASH> CacheDatabase::findExcessHashes(size_t bytesToF
if(removed > bytesToFree)
return out;
sqlite3_bind_int(STMT_TO_FREE, 0, (int) bytesToFree);
sqlite3_bind_int(STMT_TO_FREE, 1, (int) bytesToFree);
while(true) {
int errc = sqlite3_step(STMT_TO_FREE);
@@ -232,8 +233,8 @@ std::vector<CacheDatabase::HASH> CacheDatabase::findExcessHashes(size_t bytesToF
}
void CacheDatabase::remove(HASH hash) {
sqlite3_bind_blob(STMT_REMOVE, 0, (const void*) hash.data(), 32, SQLITE_STATIC);
if(sqlite3_step(STMT_REMOVE) != SQLITE_OK) {
sqlite3_bind_blob(STMT_REMOVE, 1, (const void*) hash.data(), 32, SQLITE_STATIC);
if(sqlite3_step(STMT_REMOVE) != SQLITE_DONE) {
sqlite3_reset(STMT_REMOVE);
MAKE_ERROR("Не удалось выполнить подготовленный запрос STMT_REMOVE: " << sqlite3_errmsg(DB));
}
@@ -246,13 +247,13 @@ std::string CacheDatabase::hashToString(HASH hash) {
text.reserve(64);
for(int iter = 0; iter < 32; iter++) {
int val = hash[31-iter] & 0xf;
int val = (hash[31-iter] >> 4) & 0xf;
if(val > 9)
text += 'a'+val-10;
else
text += '0'+val;
val = (hash[31-iter] >> 4) & 0xf;
val = hash[31-iter] & 0xf;
if(val > 9)
text += 'a'+val-10;
else
@@ -284,8 +285,10 @@ CacheDatabase::HASH CacheDatabase::stringToHash(const std::string_view view) {
return hash;
}
CacheHandler::CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath)
: IAsyncDestructible(ioc), Path(cachePath), DB(Path)
CacheHandler::CacheHandler(boost::asio::io_context &ioc, const fs::path &cachePath,
size_t maxCacheDirectorySize, size_t maxLifeTime)
: IAsyncDestructible(ioc), Path(cachePath), DB(Path),
MaxCacheDirectorySize(maxCacheDirectorySize), MaxLifeTime(maxLifeTime)
{
}
@@ -295,13 +298,17 @@ std::pair<std::string, size_t> CacheHandler::getAll() {
return DB.getAllHash();
}
size_t CacheHandler::getCacheSize() {
return DB.getCacheSize();
}
coro<> CacheHandlerBasic::asyncDestructor() {
NeedShutdown = true;
co_await CacheHandler::asyncDestructor();
}
void CacheHandlerBasic::readThread(AsyncUseControl::Lock lock) {
LOG.info() << "readThread started";
LOG.info() << "Поток чтения запущен";
while(!NeedShutdown) {
if(ReadQueue.get_read().empty())
@@ -318,54 +325,22 @@ void CacheHandlerBasic::readThread(AsyncUseControl::Lock lock) {
std::string name = CacheDatabase::hashToString(hash);
fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4);
std::shared_ptr<std::string> data = std::make_shared<std::string>();
try {
std::ifstream fd(path, std::ios::binary | std::ios::ate);
if (!fd.is_open())
MAKE_ERROR("!is_open(): " << fd.exceptions());
if (fd.fail())
MAKE_ERROR("fail(): " << fd.exceptions());
std::ifstream::pos_type size = fd.tellg();
fd.seekg(0, std::ios::beg);
data->resize(size);
fd.read(data->data(), size);
if (!fd.good())
MAKE_ERROR("!good(): " << fd.exceptions());
} catch(const std::exception &exc) {
LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what();
}
ReadedQueue.lock()->emplace_back(hash, std::move(data));
continue;
}
wait:
TOS::Time::sleep3(20);
}
LOG.info() << "readThread ended";
lock.unlock();
}
void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) {
LOG.info() << "readThread started";
while(!NeedShutdown) {
if(!ReadQueue.get_read().empty()) {
auto lock = ReadQueue.lock();
if(!lock->empty()) {
CacheDatabase::HASH hash = lock->front();
lock->pop();
lock.unlock();
std::string name = CacheDatabase::hashToString(hash);
fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4);
std::shared_ptr<std::string> data = std::make_shared<std::string>();
std::shared_ptr<std::string> data;
{
auto lock_wc = WriteCache.lock();
auto iter = lock_wc->begin();
while(iter != lock_wc->end()) {
if(iter->first == hash) {
// Копируем
data = std::make_shared<std::string>(*iter->second);
break;
}
}
}
if(!data) {
data = std::make_shared<std::string>();
try {
std::ifstream fd(path, std::ios::binary | std::ios::ate);
@@ -382,11 +357,75 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) {
if (!fd.good())
MAKE_ERROR("!good(): " << fd.exceptions());
DB.updateTimeFor(hash);
} catch(const std::exception &exc) {
LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what();
}
}
ReadedQueue.lock()->emplace_back(hash, std::move(data));
continue;
}
wait:
TOS::Time::sleep3(20);
}
LOG.info() << "Поток чтения остановлен";
lock.unlock();
}
void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) {
LOG.info() << "Поток чтения/записи запущен";
while(!NeedShutdown || !WriteQueue.get_read().empty()) {
if(!ReadQueue.get_read().empty()) {
auto lock = ReadQueue.lock();
if(!lock->empty()) {
CacheDatabase::HASH hash = lock->front();
lock->pop();
lock.unlock();
std::string name = CacheDatabase::hashToString(hash);
fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4);
std::shared_ptr<std::string> data;
{
auto lock_wc = WriteCache.lock();
auto iter = lock_wc->begin();
while(iter != lock_wc->end()) {
if(iter->first == hash) {
// Копируем
data = std::make_shared<std::string>(*iter->second);
break;
}
}
}
if(!data) {
data = std::make_shared<std::string>();
try {
std::ifstream fd(path, std::ios::binary | std::ios::ate);
if (!fd.is_open())
MAKE_ERROR("!is_open(): " << fd.exceptions());
if (fd.fail())
MAKE_ERROR("fail(): " << fd.exceptions());
std::ifstream::pos_type size = fd.tellg();
fd.seekg(0, std::ios::beg);
data->resize(size);
fd.read(data->data(), size);
if (!fd.good())
MAKE_ERROR("!good(): " << fd.exceptions());
DB.updateTimeFor(hash);
} catch(const std::exception &exc) {
LOG.error() << "Не удалось считать ресурс " << path.c_str() << ": " << exc.what();
}
}
ReadedQueue.lock()->emplace_back(hash, std::move(data));
continue;
@@ -414,7 +453,7 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) {
LOG.warn() << "Удаление устаревшего кеша в количестве " << hashes.size() << "...";
for(CacheDatabase::HASH hash : hashes) {
std::string name = CacheDatabase::hashToString(task.Hash);
std::string name = CacheDatabase::hashToString(hash);
fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4);
DB.remove(hash);
fs::remove(path);
@@ -436,18 +475,30 @@ void CacheHandlerBasic::readWriteThread(AsyncUseControl::Lock lock) {
} catch(const std::exception &exc) {
LOG.error() << "Не удалось сохранить ресурс " << path.c_str() << ": " << exc.what();
}
auto lock = WriteCache.lock();
auto iter = lock->begin();
while(iter != lock->end()) {
if(iter->first == task.Hash)
break;
iter++;
}
assert(iter != lock->end());
lock->erase(iter);
}
}
TOS::Time::sleep3(20);
}
LOG.info() << "readWriteThread ended";
LOG.info() << "Поток чтения/записи остановлен";
lock.unlock();
}
CacheHandlerBasic::CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path &cachePath)
: CacheHandler(ioc, cachePath),
CacheHandlerBasic::CacheHandlerBasic(boost::asio::io_context &ioc, const fs::path &cachePath,
size_t maxCacheDirectorySize, size_t maxLifeTime)
: CacheHandler(ioc, cachePath, maxCacheDirectorySize, maxLifeTime),
ReadThread(&CacheHandlerBasic::readThread, this, AUC.use()),
ReadWriteThread(&CacheHandlerBasic::readWriteThread, this, AUC.use())
{
@@ -488,4 +539,37 @@ std::vector<std::pair<CacheDatabase::HASH, std::string>> CacheHandlerBasic::pull
return out;
}
void CacheHandlerBasic::updateParams(size_t maxLifeTime, size_t maxCacheDirectorySize) {
MaxLifeTime = maxLifeTime;
if(MaxCacheDirectorySize != maxCacheDirectorySize) {
MaxCacheDirectorySize = maxCacheDirectorySize;
size_t size = DB.getCacheSize();
if(size > maxCacheDirectorySize) {
size_t needToFree = size-maxCacheDirectorySize+64*1024*1024;
try {
LOG.info() << "Начата вычистка кеша на сумму " << needToFree/1024/1024 << " Мб";
std::vector<CacheDatabase::HASH> hashes = DB.findExcessHashes(needToFree, time(nullptr)-MaxLifeTime);
LOG.warn() << "Удаление кеша в количестве " << hashes.size() << "...";
for(CacheDatabase::HASH hash : hashes) {
std::string name = CacheDatabase::hashToString(hash);
fs::path path = Path / name.substr(0, 2) / name.substr(2, 2) / name.substr(4);
DB.remove(hash);
fs::remove(path);
fs::path up1 = path.parent_path();
LOG.info() << "В директории " << up1.c_str() << " не осталось файлов, удаляем...";
size_t count = std::distance(fs::directory_iterator(up1), fs::directory_iterator());
if(count == 0)
fs::remove(up1);
}
} catch(const std::exception &exc) {
LOG.error() << "Не удалось очистить кеш до новой границы: " << exc.what();
}
}
}
}
}