Передача ресурсов клиенту

This commit is contained in:
2025-08-27 00:26:11 +06:00
parent 57d6e816fc
commit cfbbfa286a
14 changed files with 313 additions and 190 deletions

View File

@@ -136,23 +136,34 @@ struct DefNode_t {};
/* Интерфейс обработчика сессии с сервером */
class IServerSession {
struct ArrayHasher {
std::size_t operator()(const Hash_t& a) const {
std::size_t h = 0;
for (auto e : a)
h ^= std::hash<int>{}(e) + 0x9e3779b9 + (h << 6) + (h >> 2);
// struct ArrayHasher {
// std::size_t operator()(const Hash_t& a) const {
// std::size_t h = 0;
// for (auto e : a)
// h ^= std::hash<int>{}(e) + 0x9e3779b9 + (h << 6) + (h >> 2);
return h;
}
};
// return h;
// }
// };
public:
struct AssetEntry {
EnumAssets Type;
ResourceId Id;
std::string Domain, Key;
Resource Res;
};
static constexpr uint64_t TIME_BEFORE_UNLOAD_RESOURCE = 180;
struct {
std::unordered_map<Hash_t, BinaryResource, ArrayHasher> Resources;
// Оперируемые ресурсы
std::unordered_map<Hash_t, AssetEntry> Assets;
// Недавно использованные ресурсы, пока хранятся здесь в течении TIME_BEFORE_UNLOAD_RESOURCE секунд
std::unordered_map<Hash_t, std::pair<AssetEntry, uint64_t>> NotInUseAssets;
} Binary;
struct {
std::unordered_map<DefVoxelId, DefVoxel_t> DefVoxel;
std::unordered_map<DefVoxelId, DefVoxel_t> DefVoxel;
std::unordered_map<DefNodeId, DefNode_t> DefNode;
std::unordered_map<DefWorldId, DefWorldInfo> DefWorld;
std::unordered_map<DefPortalId, DefPortalInfo> DefPortal;

View File

@@ -1,10 +1,12 @@
#include "AssetsManager.hpp"
#include "Common/Abstract.hpp"
#include "sqlite3.h"
#include <chrono>
#include <cstddef>
#include <filesystem>
#include <fstream>
#include <optional>
#include <thread>
#include <utility>
@@ -114,7 +116,7 @@ AssetsManager::AssetsManager(boost::asio::io_context &ioc, const fs::path &cache
}
sql = R"(
SELECT data inline_cache where sha256=?;
SELECT data FROM inline_cache where sha256=?;
)";
if(sqlite3_prepare_v2(DB, sql, -1, &STMT_INLINE_GET, nullptr) != SQLITE_OK) {
@@ -157,16 +159,21 @@ AssetsManager::~AssetsManager() {
STMT_DISK_INSERT, STMT_DISK_UPDATE_TIME, STMT_DISK_REMOVE, STMT_DISK_CONTAINS,
STMT_DISK_SUM, STMT_DISK_COUNT, STMT_INLINE_INSERT, STMT_INLINE_GET,
STMT_INLINE_UPDATE_TIME, STMT_INLINE_SUM, STMT_INLINE_COUNT
})
}) {
if(stmt)
sqlite3_finalize(stmt);
}
if(DB)
sqlite3_close(DB);
OffThread.join();
LOG.info() << "Хранилище кеша закрыто";
}
coro<> AssetsManager::asyncDestructor() {
assert(NeedShutdown); // Должен быть вызван нормальный shutdown
NeedShutdown = true;
co_await IAsyncDestructible::asyncDestructor();
}
@@ -175,7 +182,7 @@ void AssetsManager::readWriteThread(AsyncUseControl::Lock lock) {
std::vector<fs::path> assets;
size_t maxCacheDatabaseSize, maxLifeTime;
while(!NeedShutdown && !WriteQueue.get_read().empty()) {
while(!NeedShutdown || !WriteQueue.get_read().empty()) {
// Получить новые данные
if(Changes.get_read().AssetsChange) {
auto lock = Changes.lock();
@@ -377,9 +384,11 @@ void AssetsManager::readWriteThread(AsyncUseControl::Lock lock) {
continue;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
} catch(const std::exception& exc) {
LOG.warn() << "Ошибка в работе потока: " << exc.what();
LOG.warn() << "Ошибка в работе потока:\n" << exc.what();
IssuedAnError = true;
}
}

View File

@@ -146,17 +146,6 @@ public:
lock->FullRecheck = true;
}
// Уведомление о завершении работы
void prepareShutdown() {
NeedShutdown = true;
}
// После этого вызова уже нельзя будет обращатся ко внешним ресурсам
void shutdown() {
assert(NeedShutdown);
OffThread.join();
}
bool hasError() {
return IssuedAnError;
}
@@ -208,7 +197,7 @@ private:
bool NeedShutdown = false, IssuedAnError = false;
std::thread OffThread;
virtual coro<> asyncDestructor();
AssetsManager(boost::asio::io_context &ioc, const fs::path &cachePath,
size_t maxCacheDatabaseSize, size_t maxLifeTime);

View File

@@ -2,6 +2,7 @@
#include "Client/Abstract.hpp"
#include "Common/Abstract.hpp"
#include "Common/Net.hpp"
#include "TOSAsync.hpp"
#include "TOSLib.hpp"
#include "glm/ext/quaternion_geometric.hpp"
#include <GLFW/glfw3.h>
@@ -18,6 +19,28 @@
namespace LV::Client {
ServerSession::ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket>&& socket)
: IAsyncDestructible(ioc), Socket(std::move(socket)), NetInputPackets(1024)
{
assert(Socket.get());
try {
AM = AssetsManager::Create(ioc, "Cache");
asio::co_spawn(ioc, run(AUC.use()), asio::detached);
// TODO: добавить оптимизацию для подключения клиента к внутреннему серверу
} catch(const std::exception &exc) {
MAKE_ERROR("Ошибка инициализации обработчика объекта подключения к серверу:\n" << exc.what());
}
}
coro<> ServerSession::asyncDestructor() {
co_await IAsyncDestructible::asyncDestructor();
}
ParsedPacket::~ParsedPacket() = default;
struct PP_Content_ChunkVoxels : public ParsedPacket {
@@ -88,20 +111,9 @@ struct PP_Definition_FreeNode : public ParsedPacket {
{}
};
struct PP_Resource_InitResSend : public ParsedPacket {
Hash_t Hash;
BinaryResource Resource;
PP_Resource_InitResSend(Hash_t hash, BinaryResource res)
: ParsedPacket(ToClient::L1::Resource, (uint8_t) ToClient::L2Resource::InitResSend), Hash(hash), Resource(res)
{}
};
using namespace TOS;
ServerSession::~ServerSession() {
WorkDeadline.cancel();
UseLock.wait_no_use();
}
coro<> ServerSession::asyncAuthorizeWithServer(tcp::socket &socket, const std::string username, const std::string token, int a_ar_r, std::function<void(const std::string&)> onProgress) {
@@ -338,13 +350,6 @@ void ServerSession::atFreeDrawTime(GlobalTime gTime, float dTime) {
ParsedPacket *pack;
while(NetInputPackets.pop(pack)) {
if(pack->Level1 == ToClient::L1::Definition) {
ToClient::L2Resource l2 = ToClient::L2Resource(pack->Level2);
if(l2 == ToClient::L2Resource::InitResSend) {
PP_Resource_InitResSend &p = *dynamic_cast<PP_Resource_InitResSend*>(pack);
}
} else if(pack->Level1 == ToClient::L1::Definition) {
ToClient::L2Definition l2 = ToClient::L2Definition(pack->Level2);
if(l2 == ToClient::L2Definition::Voxel) {
@@ -471,21 +476,17 @@ void ServerSession::atFreeDrawTime(GlobalTime gTime, float dTime) {
}
}
coro<> ServerSession::run() {
auto useLock = UseLock.lock();
void ServerSession::setRenderSession(IRenderSession* session) {
RS = session;
}
coro<> ServerSession::run(AsyncUseControl::Lock) {
try {
while(!IsGoingShutdown && IsConnected) {
co_await readPacket(*Socket);
}
} catch(const std::exception &exc) {
// if(const auto *errc = dynamic_cast<const boost::system::system_error*>(&exc);
// errc && errc->code() == boost::asio::error::operation_aborted)
// {
// co_return;
// }
TOS::Logger("ServerSession").warn() << exc.what();
LOG.error() << "Ошибка обработки сокета:\n" << exc.what();
}
IsConnected = false;
@@ -551,19 +552,33 @@ coro<> ServerSession::rP_Resource(Net::AsyncSocket &sock) {
case ToClient::L2Resource::Bind:
{
uint32_t count = co_await sock.read<uint32_t>();
AsyncContext.ThisTickEntry.AssetsBinds.reserve(AsyncContext.ThisTickEntry.AssetsBinds.size()+count);
for(size_t iter = 0; iter < count; iter++) {
uint8_t type = co_await sock.read<uint8_t>();
uint32_t id = co_await sock.read<uint32_t>();
std::string domain, key;
domain = co_await sock.read<std::string>();
key = co_await sock.read<std::string>();
Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
AsyncContext.ThisTickEntry.AssetsBinds.emplace_back(
(EnumAssets) type, (ResourceId) id, std::move(domain),
std::move(key), hash
);
}
}
case ToClient::L2Resource::Lost:
{
uint32_t count = co_await sock.read<uint32_t>();
AsyncContext.ThisTickEntry.AssetsLost.reserve(AsyncContext.ThisTickEntry.AssetsLost.size()+count);
for(size_t iter = 0; iter < count; iter++) {
uint8_t type = co_await sock.read<uint8_t>();
// uint8_t type = co_await sock.read<uint8_t>();
uint32_t id = co_await sock.read<uint32_t>();
AsyncContext.ThisTickEntry.AssetsLost.push_back(id);
}
}
case ToClient::L2Resource::InitResSend:
@@ -571,26 +586,41 @@ coro<> ServerSession::rP_Resource(Net::AsyncSocket &sock) {
uint32_t size = co_await sock.read<uint32_t>();
Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
ResourceId id = co_await sock.read<uint32_t>();
EnumAssets type = (EnumAssets) co_await sock.read<uint8_t>();
std::string domain = co_await sock.read<std::string>();
std::string key = co_await sock.read<std::string>();
uint32_t chunkSize = co_await sock.read<uint32_t>();
assert(chunkSize < std::pow(2, 26));
std::u8string data(size, '\0');
co_await sock.read((std::byte*) data.data(), data.size());
PP_Resource_InitResSend *packet = new PP_Resource_InitResSend(
hash,
std::make_shared<std::u8string>(std::move(data))
);
while(!NetInputPackets.push(packet));
AsyncContext.AssetsLoading[hash] = AssetLoading{
type, id, std::move(domain), std::move(key),
std::u8string(size, '\0'), 0
};
co_return;
}
case ToClient::L2Resource::ChunkSend:
{
Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
uint32_t size = co_await sock.read<uint32_t>();
AssetLoading& al = AsyncContext.AssetsLoading.at(hash);
if(al.Data.size()-al.Offset < size)
MAKE_ERROR("Несоответствие ожидаемого размера ресурса");
co_await sock.read((std::byte*) al.Data.data() + al.Offset, size);
al.Offset += size;
if(al.Offset == al.Data.size()) {
// Ресурс полностью загружен
AsyncContext.LoadedAssets.lock()->emplace_back(
al.Type, al.Id, std::move(al.Domain), std::move(al.Key), std::move(al.Data)
);
AsyncContext.AssetsLoading.erase(AsyncContext.AssetsLoading.find(hash));
}
co_return;
}
default:
protocolError();
}

View File

@@ -6,12 +6,15 @@
#include "Common/Lockable.hpp"
#include "Common/Net.hpp"
#include "Common/Packets.hpp"
#include "TOSAsync.hpp"
#include <TOSLib.hpp>
#include <boost/asio/io_context.hpp>
#include <filesystem>
#include <memory>
#include <boost/lockfree/spsc_queue.hpp>
#include <Client/ResourceCache.hpp>
#include <Client/AssetsManager.hpp>
#include <queue>
#include <unordered_map>
namespace LV::Client {
@@ -26,18 +29,100 @@ struct ParsedPacket {
virtual ~ParsedPacket();
};
class ServerSession : public AsyncObject, public IServerSession, public ISurfaceEventListener {
class ServerSession : public IAsyncDestructible, public IServerSession, public ISurfaceEventListener {
public:
using Ptr = std::shared_ptr<ServerSession>;
public:
static Ptr Create(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket> &&socket) {
return createShared(ioc, new ServerSession(ioc, std::move(socket)));
}
virtual ~ServerSession();
// Авторизоваться или (зарегистрироваться и авторизоваться) или зарегистрироваться
static coro<> asyncAuthorizeWithServer(tcp::socket &socket, const std::string username, const std::string token, int a_ar_r, std::function<void(const std::string&)> onProgress = nullptr);
// Начать игровой протокол в авторизированном сокете
static coro<std::unique_ptr<Net::AsyncSocket>> asyncInitGameProtocol(asio::io_context &ioc, tcp::socket &&socket, std::function<void(const std::string&)> onProgress = nullptr);
void shutdown(EnumDisconnect type);
bool isConnected() {
return Socket->isAlive() && IsConnected;
}
// ISurfaceEventListener
virtual void onResize(uint32_t width, uint32_t height) override;
virtual void onChangeFocusState(bool isFocused) override;
virtual void onCursorPosChange(int32_t width, int32_t height) override;
virtual void onCursorMove(float xMove, float yMove) override;
virtual void onCursorBtn(EnumCursorBtn btn, bool state) override;
virtual void onKeyboardBtn(int btn, int state) override;
virtual void onJoystick() override;
// IServerSession
virtual void atFreeDrawTime(GlobalTime gTime, float dTime) override;
void setRenderSession(IRenderSession* session);
private:
TOS::Logger LOG = "ServerSession";
std::unique_ptr<Net::AsyncSocket> Socket;
IRenderSession *RS = nullptr;
// Обработчик кеша ресурсов сервера
CacheHandler::Ptr CHDB;
AssetsManager::Ptr AM;
struct AssetLoading {
EnumAssets Type;
ResourceId Id;
std::string Domain, Key;
std::u8string Data;
size_t Offset;
};
struct AssetBindEntry {
EnumAssets Type;
ResourceId Id;
std::string Domain, Key;
Hash_t Hash;
};
struct TickData {
std::vector<WorldId_t> LostWorld;
// std::vector<std::pair<WorldId_t, DefWorld>>
// Потерянные из видимости ресурсы
std::vector<ResourceId> AssetsLost;
// Новые привязки ресурсов
std::vector<AssetBindEntry> AssetsBinds;
};
struct {
// Сюда обращается ветка, обрабатывающая сокет; run()
// Получение ресурсов с сервера
std::unordered_map<Hash_t, AssetLoading> AssetsLoading;
// Получение привязок
// Накопление данных за такт сервера
TickData ThisTickEntry;
// Обменный пункт
// Привязки ресурсов
TOS::SpinlockObject<std::vector<AssetEntry>> AssetsBindings;
// Полученные ресурсы с сервера
TOS::SpinlockObject<std::vector<AssetEntry>> LoadedAssets;
// Пакеты обновлений игрового мира
TOS::SpinlockObject<std::queue<TickData>> TickSequence;
} AsyncContext;
DestroyLock UseLock;
bool IsConnected = true, IsGoingShutdown = false;
TOS::Logger LOG = "ServerSession";
boost::lockfree::spsc_queue<ParsedPacket*> NetInputPackets;
// PYR - поворот камеры по осям xyz в радианах, PYR_Offset для сглаживание поворота
@@ -59,70 +144,20 @@ class ServerSession : public AsyncObject, public IServerSession, public ISurface
GlobalTime LastSendPYR_POS;
public:
// Нужен сокет, на котором только что был согласован игровой протокол (asyncInitGameProtocol)
ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket> &&socket, IRenderSession *rs = nullptr)
: AsyncObject(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024)
{
assert(Socket.get());
try {
fs::create_directories("Cache");
CHDB = CacheHandlerBasic::Create(ioc, "Cache");
// Отправка информации о загруженном кеше
// TODO: добавить оптимизацию для подключения клиента к внутреннему серверу
auto [data, count] = CHDB->getAll();
Net::Packet packet;
packet << uint32_t(count);
packet.write((const std::byte*) data.data(), data.size());
Socket->pushPacket(std::move(packet));
} catch(const std::exception &exc) {
MAKE_ERROR("Ошибка инициализации обработчика кеша ресурсов сервера:\n" << exc.what());
}
co_spawn(run());
}
virtual ~ServerSession();
// Авторизоваться или (зарегистрироваться и авторизоваться) или зарегистрироваться
static coro<> asyncAuthorizeWithServer(tcp::socket &socket, const std::string username, const std::string token, int a_ar_r, std::function<void(const std::string&)> onProgress = nullptr);
// Начать игровой протокол в авторизированном сокете
static coro<std::unique_ptr<Net::AsyncSocket>> asyncInitGameProtocol(asio::io_context &ioc, tcp::socket &&socket, std::function<void(const std::string&)> onProgress = nullptr);
void shutdown(EnumDisconnect type);
bool isConnected() {
return Socket->isAlive() && IsConnected;
}
void waitShutdown() {
UseLock.wait_no_use();
}
// ISurfaceEventListener
virtual void onResize(uint32_t width, uint32_t height) override;
virtual void onChangeFocusState(bool isFocused) override;
virtual void onCursorPosChange(int32_t width, int32_t height) override;
virtual void onCursorMove(float xMove, float yMove) override;
virtual void onCursorBtn(EnumCursorBtn btn, bool state) override;
virtual void onKeyboardBtn(int btn, int state) override;
virtual void onJoystick() override;
virtual void atFreeDrawTime(GlobalTime gTime, float dTime) override;
private:
coro<> run();
// Приём данных с сокета
coro<> run(AsyncUseControl::Lock);
void protocolError();
coro<> readPacket(Net::AsyncSocket &sock);
coro<> rP_System(Net::AsyncSocket &sock);
coro<> rP_Resource(Net::AsyncSocket &sock);
coro<> rP_Definition(Net::AsyncSocket &sock);
coro<> rP_Content(Net::AsyncSocket &sock);
// Нужен сокет, на котором только что был согласован игровой протокол (asyncInitGameProtocol)
ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket> &&socket);
virtual coro<> asyncDestructor() override;
};
}

View File

@@ -2215,7 +2215,8 @@ void Vulkan::gui_MainMenu() {
std::unique_ptr<Net::AsyncSocket> sock = std::move(ConnectionProgress.Socket);
Game.RSession = std::make_unique<VulkanRenderSession>();
*this << Game.RSession;
Game.Session = std::make_unique<ServerSession>(IOC, std::move(sock), Game.RSession.get());
Game.Session = ServerSession::Create(IOC, std::move(sock));
Game.Session->setRenderSession(Game.RSession.get());
Game.RSession->setServerSession(Game.Session.get());
Game.ImGuiInterfaces.push_back(&Vulkan::gui_ConnectedToServer);
}

View File

@@ -250,7 +250,7 @@ public:
DestroyLock UseLock;
std::thread MainThread;
std::shared_ptr<VulkanRenderSession> RSession;
std::unique_ptr<ServerSession> Session;
ServerSession::Ptr Session;
std::list<void (Vulkan::*)()> ImGuiInterfaces;
std::unique_ptr<ServerObj> Server;