Переработка менеджера ресурсов на стороне клиентов

This commit is contained in:
2026-01-07 01:58:15 +06:00
parent c13ad06ba9
commit 523f9725c0
7 changed files with 443 additions and 939 deletions

View File

@@ -61,7 +61,7 @@ const char* toClientPacketName(ToClient type) {
}
ServerSession::ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket>&& socket)
: IAsyncDestructible(ioc), Socket(std::move(socket)) //, NetInputPackets(1024)
: IAsyncDestructible(ioc), Socket(std::move(socket)), AM(ioc, "Cache")
{
assert(Socket.get());
@@ -71,12 +71,7 @@ ServerSession::ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSo
Profiles.DefNode[3] = {3};
Profiles.DefNode[4] = {4};
std::fill(NextServerId.begin(), NextServerId.end(), 0);
for(auto& vec : ServerIdToDK)
vec.emplace_back();
try {
AM = AssetsManager::Create(ioc, "Cache");
asio::co_spawn(ioc, run(AUC.use()), asio::detached);
// TODO: добавить оптимизацию для подключения клиента к внутреннему серверу
} catch(const std::exception &exc) {
@@ -312,287 +307,28 @@ void ServerSession::onJoystick() {
}
void ServerSession::update(GlobalTime gTime, float dTime) {
// Если были получены ресурсы, отправим их на запись в кеш
if(!AsyncContext.LoadedAssets.get_read().empty()) {
std::vector<AssetEntry> assets = std::move(*AsyncContext.LoadedAssets.lock());
std::vector<Resource> resources;
resources.reserve(assets.size());
for(AssetEntry& entry : assets) {
entry.Hash = entry.Res.hash();
if(const AssetsManager::BindInfo* bind = AM->getBind(entry.Type, entry.Id))
entry.Dependencies = AM->rebindHeader(entry.Type, bind->Header);
else
entry.Dependencies.clear();
resources.push_back(entry.Res);
AsyncContext.LoadedResources.emplace_back(std::move(entry));
// // Проверяем используется ли сейчас ресурс
// auto iter = MyAssets.ExistBinds[(int) entry.Type].find(entry.Id);
// if(iter == MyAssets.ExistBinds[(int) entry.Type].end()) {
// // Не используется
// MyAssets.NotInUse[(int) entry.Type][entry.Domain + ':' + entry.Key] = {entry, TIME_BEFORE_UNLOAD_RESOURCE+time(nullptr)};
// } else {
// // Используется
// Assets.InUse[(int) entry.Type][entry.Id] = entry;
// changedResources[entry.Type].insert({entry.Id, entry});
// }
}
AM->pushResources(std::move(resources));
}
// Получить ресурсы с AssetsManager
// Если AssetsManager запрашивает ресурсы с сервера
{
static std::atomic<uint32_t> debugAssetReadLogCount = 0;
std::vector<std::pair<AssetsManager::ResourceKey, std::optional<Resource>>> resources = AM->pullReads();
std::vector<Hash_t> needRequest;
std::vector<Hash_t> needRequest = AM.pullNeededResources();
Net::Packet pack;
std::vector<Net::Packet> packets;
for(auto& [key, res] : resources) {
bool cacheHit = false;
Hash_t actualHash = {};
if(res) {
actualHash = res->hash();
cacheHit = actualHash == key.Hash;
}
if(cacheHit) {
auto& waitingByDomain = AsyncContext.ResourceWait[(int) key.Type];
auto iterDomain = waitingByDomain.find(key.Domain);
if(iterDomain != waitingByDomain.end()) {
auto& entries = iterDomain->second;
entries.erase(std::remove_if(entries.begin(), entries.end(),
[&](const std::pair<std::string, Hash_t>& entry) {
return entry.first == key.Key && entry.second == key.Hash;
}),
entries.end());
if(entries.empty())
waitingByDomain.erase(iterDomain);
}
}
auto check = [&]() {
if(pack.size() > 64000)
packets.emplace_back(std::move(pack));
};
if(key.Domain == "test"
&& (key.Type == EnumAssets::Nodestate
|| key.Type == EnumAssets::Model
|| key.Type == EnumAssets::Texture))
{
uint32_t idx = debugAssetReadLogCount.fetch_add(1);
if(idx < 128) {
if(res) {
LOG.debug() << "Cache hit type=" << assetTypeName(key.Type)
<< " id=" << key.Id
<< " key=" << key.Domain << ':' << key.Key
<< " size=" << res->size();
} else {
LOG.debug() << "Cache miss type=" << assetTypeName(key.Type)
<< " id=" << key.Id
<< " key=" << key.Domain << ':' << key.Key
<< " hash=" << int(key.Hash[0]) << '.'
<< int(key.Hash[1]) << '.'
<< int(key.Hash[2]) << '.'
<< int(key.Hash[3]);
}
}
}
if(!res) {
// Проверить не был ли уже отправлен запрос на получение этого хеша
auto iter = std::lower_bound(AsyncContext.AlreadyLoading.begin(), AsyncContext.AlreadyLoading.end(), key.Hash);
if(iter == AsyncContext.AlreadyLoading.end() || *iter != key.Hash) {
AsyncContext.AlreadyLoading.insert(iter, key.Hash);
needRequest.push_back(key.Hash);
}
} else {
if(actualHash != key.Hash) {
auto iter = std::lower_bound(AsyncContext.AlreadyLoading.begin(), AsyncContext.AlreadyLoading.end(), key.Hash);
if(iter == AsyncContext.AlreadyLoading.end() || *iter != key.Hash) {
AsyncContext.AlreadyLoading.insert(iter, key.Hash);
needRequest.push_back(key.Hash);
}
}
std::vector<uint8_t> deps;
if(const AssetsManager::BindInfo* bind = AM->getBind(key.Type, key.Id))
deps = AM->rebindHeader(key.Type, bind->Header);
AssetEntry entry {
.Type = key.Type,
.Id = key.Id,
.Domain = key.Domain,
.Key = key.Key,
.Res = std::move(*res),
.Hash = actualHash,
.Dependencies = std::move(deps)
};
AsyncContext.LoadedResources.emplace_back(std::move(entry));
}
pack << (uint8_t) ToServer::L1::System << (uint8_t) ToServer::L2System::ResourceRequest;
pack << (uint16_t) needRequest.size();
for(const Hash_t& hash : needRequest) {
pack.write((const std::byte*) hash.data(), 32);
check();
}
if(!needRequest.empty()) {
assert(needRequest.size() < (1 << 16));
if(pack.size())
packets.emplace_back(std::move(pack));
uint32_t idx = debugAssetReadLogCount.fetch_add(1);
if(idx < 128) {
LOG.debug() << "Send ResourceRequest count=" << needRequest.size();
}
for(const auto& hash : needRequest) {
LOG.debug() << "Client request hash="
<< int(hash[0]) << '.'
<< int(hash[1]) << '.'
<< int(hash[2]) << '.'
<< int(hash[3]);
}
Net::Packet p;
p << (uint8_t) ToServer::L1::System << (uint8_t) ToServer::L2System::ResourceRequest;
p << (uint16_t) needRequest.size();
for(const Hash_t& hash : needRequest)
p.write((const std::byte*) hash.data(), 32);
Socket->pushPacket(std::move(p));
}
}
// Разбираемся с полученными меж тактами привязками ресурсов
if(!AsyncContext.AssetsBinds.get_read().empty()) {
AssetsBindsChange abc;
// Нужно объеденить изменения в один AssetsBindsChange (abc)
{
std::vector<AssetsBindsChange> list = std::move(*AsyncContext.AssetsBinds.lock());
for(AssetsBindsChange entry : list) {
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++)
std::sort(entry.Lost[type].begin(), entry.Lost[type].end());
// Если до этого была объявлена привязка, а теперь она потеряна, то просто сокращаем значения.
// Иначе дописываем в lost
for(ssize_t iter = abc.Binds.size()-1; iter >= 0; iter--) {
const AssetBindEntry& abe = abc.Binds[iter];
auto& lost = entry.Lost[(int) abe.Type];
auto iterator = std::lower_bound(lost.begin(), lost.end(), abe.Id);
if(iterator != lost.end() && *iterator == abe.Id) {
// Привязка будет удалена
lost.erase(iterator);
abc.Binds.erase(abc.Binds.begin()+iter);
}
}
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++) {
abc.Lost[type].append_range(entry.Lost[type]);
entry.Lost[type].clear();
std::sort(abc.Lost[type].begin(), abc.Lost[type].end());
}
for(AssetBindEntry& abe : entry.Binds) {
auto iterator = std::lower_bound(entry.Lost[(int) abe.Type].begin(), entry.Lost[(int) abe.Type].end(), abe.Id);
if(iterator != entry.Lost[(int) abe.Type].end() && *iterator == abe.Id) {
// Получили новую привязку, которая была удалена в предыдущем такте
entry.Lost[(int) abe.Type].erase(iterator);
} else {
// Данная привязка не удалялась, может она была изменена?
bool hasChanged = false;
for(AssetBindEntry& abe2 : abc.Binds) {
if(abe2.Type == abe.Type && abe2.Id == abe.Id) {
// Привязка была изменена
abe2 = std::move(abe);
hasChanged = true;
break;
}
}
if(!hasChanged)
// Изменения не было, это просто новая привязка
abc.Binds.emplace_back(std::move(abe));
}
}
entry.Binds.clear();
}
}
// Запрос к дисковому кешу новых ресурсов
std::vector<AssetsManager::ResourceKey> needToLoad;
for(const AssetBindEntry& bind : abc.Binds) {
bool needQuery = true;
// Проверить in memory кеш по домену+ключу
{
std::string dk = bind.Domain + ':' + bind.Key;
auto &niubdk = MyAssets.NotInUse[(int) bind.Type];
auto iter = niubdk.find(dk);
if(iter != niubdk.end()) {
// Есть ресурс
needQuery = iter->second.first.Hash != bind.Hash;
}
}
// Проверить если такой запрос уже был отправлен в AssetsManager и ожидает ответа
if(needQuery) {
auto& list = AsyncContext.ResourceWait[(int) bind.Type];
auto iterDomain = list.find(bind.Domain);
if(iterDomain != list.end()) {
for(const auto& [key, hash] : iterDomain->second) {
if(key == bind.Key && hash == bind.Hash) {
needQuery = false;
break;
}
}
}
}
// Под рукой нет ресурса, отправим на проверку в AssetsManager
if(needQuery) {
AsyncContext.ResourceWait[(int) bind.Type][bind.Domain].emplace_back(bind.Key, bind.Hash);
needToLoad.push_back(AssetsManager::ResourceKey{
.Hash = bind.Hash,
.Type = bind.Type,
.Domain = bind.Domain,
.Key = bind.Key,
.Id = bind.Id
});
}
}
// Отправляем запрос на получение ресурсов
if(!needToLoad.empty()) {
static std::atomic<uint32_t> debugReadRequestLogCount = 0;
AssetsManager::ResourceKey firstDebug;
bool hasDebug = false;
for(const auto& entry : needToLoad) {
if(entry.Domain == "test"
&& (entry.Type == EnumAssets::Nodestate
|| entry.Type == EnumAssets::Model
|| entry.Type == EnumAssets::Texture))
{
firstDebug = entry;
hasDebug = true;
break;
}
}
if(hasDebug && debugReadRequestLogCount.fetch_add(1) < 64) {
LOG.debug() << "Queue asset read count=" << needToLoad.size()
<< " type=" << assetTypeName(firstDebug.Type)
<< " id=" << firstDebug.Id
<< " key=" << firstDebug.Domain << ':' << firstDebug.Key
<< " hash=" << int(firstDebug.Hash[0]) << '.'
<< int(firstDebug.Hash[1]) << '.'
<< int(firstDebug.Hash[2]) << '.'
<< int(firstDebug.Hash[3]);
}
for(const auto& entry : needToLoad) {
LOG.debug() << "Client wants type=" << assetTypeName(entry.Type)
<< " id=" << entry.Id
<< " key=" << entry.Domain << ':' << entry.Key
<< " hash=" << int(entry.Hash[0]) << '.'
<< int(entry.Hash[1]) << '.'
<< int(entry.Hash[2]) << '.'
<< int(entry.Hash[3]);
}
AM->pushReads(std::move(needToLoad));
}
AsyncContext.Binds.push_back(std::move(abc));
Socket->pushPackets(&packets);
}
if(!AsyncContext.TickSequence.get_read().empty()) {
@@ -624,6 +360,20 @@ void ServerSession::update(GlobalTime gTime, float dTime) {
{
for(TickData& data : ticks) {
// Привязки Id -> Domain+Key
for(const auto& binds : data.BindsDK) {
AM.pushAssetsBindDK(binds.Domains, binds.Keys);
}
// Привязки Id -> Hash+Header
for(auto& binds : data.BindsHH) {
AM.pushAssetsBindHH(std::move(binds.HashAndHeaders));
}
// Полученные ресурсы с сервера
if(!data.ReceivedAssets.empty())
AM.pushNewResources(std::move(data.ReceivedAssets));
{
for(auto& [id, profile] : data.Profile_Voxel_AddOrChange) {
auto iter = std::lower_bound(profile_Voxel_Lost.begin(), profile_Voxel_Lost.end(), id);
@@ -908,128 +658,6 @@ void ServerSession::update(GlobalTime gTime, float dTime) {
RS->pushStageTickSync();
// Применяем изменения по ресурсам, профилям и контенту
// Разбираемся с изменениями в привязках ресурсов
{
AssetsBindsChange abc;
for(AssetsBindsChange entry : AsyncContext.Binds) {
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++)
std::sort(entry.Lost[type].begin(), entry.Lost[type].end());
for(ssize_t iter = abc.Binds.size()-1; iter >= 0; iter--) {
const AssetBindEntry& abe = abc.Binds[iter];
auto& lost = entry.Lost[(int) abe.Type];
auto iterator = std::lower_bound(lost.begin(), lost.end(), abe.Id);
if(iterator != lost.end() && *iterator == abe.Id) {
lost.erase(iterator);
abc.Binds.erase(abc.Binds.begin()+iter);
}
}
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++) {
abc.Lost[type].append_range(entry.Lost[type]);
entry.Lost[type].clear();
std::sort(abc.Lost[type].begin(), abc.Lost[type].end());
}
for(AssetBindEntry& abe : entry.Binds) {
auto iterator = std::lower_bound(entry.Lost[(int) abe.Type].begin(), entry.Lost[(int) abe.Type].end(), abe.Id);
if(iterator != entry.Lost[(int) abe.Type].end() && *iterator == abe.Id) {
// Получили новую привязку, которая была удалена в предыдущем такте
entry.Lost[(int) abe.Type].erase(iterator);
} else {
// Данная привязка не удалялась, может она была изменена?
bool hasChanged = false;
for(AssetBindEntry& abe2 : abc.Binds) {
if(abe2.Type == abe.Type && abe2.Id == abe.Id) {
// Привязка была изменена
abe2 = std::move(abe);
hasChanged = true;
break;
}
}
if(!hasChanged)
// Изменения не было, это просто новая привязка
abc.Binds.emplace_back(std::move(abe));
}
}
entry.Binds.clear();
}
AsyncContext.Binds.clear();
for(AssetBindEntry& entry : abc.Binds) {
std::vector<uint8_t> deps;
if(!entry.Header.empty())
deps = AM->rebindHeader(entry.Type, entry.Header);
MyAssets.ExistBinds[(int) entry.Type].insert(entry.Id);
result.Assets_ChangeOrAdd[entry.Type].push_back(entry.Id);
auto iterLoaded = IServerSession::Assets[entry.Type].find(entry.Id);
if(iterLoaded != IServerSession::Assets[entry.Type].end())
iterLoaded->second.Dependencies = deps;
// Если ресурс был в кеше, то достаётся от туда
auto iter = MyAssets.NotInUse[(int) entry.Type].find(entry.Domain+':'+entry.Key);
if(iter != MyAssets.NotInUse[(int) entry.Type].end()) {
iter->second.first.Dependencies = deps;
IServerSession::Assets[entry.Type][entry.Id] = std::get<0>(iter->second);
result.Assets_ChangeOrAdd[entry.Type].push_back(entry.Id);
MyAssets.NotInUse[(int) entry.Type].erase(iter);
}
}
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++) {
for(ResourceId id : abc.Lost[type]) {
MyAssets.ExistBinds[type].erase(id);
// Потерянные ресурсы уходят в кеш
auto iter = IServerSession::Assets[(EnumAssets) type].find(id);
if(iter != IServerSession::Assets[(EnumAssets) type].end()) {
MyAssets.NotInUse[(int) iter->second.Type][iter->second.Domain+':'+iter->second.Key] = {iter->second, TIME_BEFORE_UNLOAD_RESOURCE+time(nullptr)};
IServerSession::Assets[(EnumAssets) type].erase(iter);
result.Assets_Lost[iter->second.Type].push_back(iter->second.Id);
}
}
result.Assets_Lost[(EnumAssets) type] = std::move(abc.Lost[type]);
}
}
// Получаем ресурсы
{
for(AssetEntry& entry : AsyncContext.LoadedResources) {
if(MyAssets.ExistBinds[(int) entry.Type].contains(entry.Id)) {
// Ресурс ещё нужен
IServerSession::Assets[entry.Type][entry.Id] = entry;
result.Assets_ChangeOrAdd[entry.Type].push_back(entry.Id);
} else {
// Ресурс уже не нужен, отправляем в кеш
MyAssets.NotInUse[(int) entry.Type][entry.Domain+':'+entry.Key] = {entry, TIME_BEFORE_UNLOAD_RESOURCE+time(nullptr)};
}
}
AsyncContext.LoadedResources.clear();
}
// Чистим кеш ресурсов
{
uint64_t now = time(nullptr);
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++) {
std::vector<std::string> toDelete;
for(auto& [key, value] : MyAssets.NotInUse[type]) {
if(std::get<1>(value) < now)
toDelete.push_back(key);
}
for(std::string& key : toDelete)
MyAssets.NotInUse[type].erase(MyAssets.NotInUse[type].find(key));
}
}
// Определения
{
for(auto& [resId, def] : profile_Node_AddOrChange) {
@@ -1140,8 +768,8 @@ void ServerSession::update(GlobalTime gTime, float dTime) {
// Оповещение модуля рендера об изменениях ресурсов
std::unordered_map<EnumAssets, std::unordered_map<ResourceId, AssetEntry>> changedResources;
std::unordered_map<EnumAssets, std::unordered_set<ResourceId>> lostResources;
// std::unordered_map<EnumAssets, std::unordered_map<ResourceId, AssetEntry>> changedResources;
// std::unordered_map<EnumAssets, std::unordered_set<ResourceId>> lostResources;
// Обработка полученных ресурсов
@@ -1215,21 +843,8 @@ void ServerSession::setRenderSession(IRenderSession* session) {
}
void ServerSession::resetResourceSyncState() {
AM->clearServerBindings();
AsyncContext.AssetsLoading.clear();
AsyncContext.AlreadyLoading.clear();
for(int type = 0; type < (int) EnumAssets::MAX_ENUM; type++)
AsyncContext.ResourceWait[type].clear();
for(auto& vec : ServerIdToDK)
vec.clear();
std::fill(NextServerId.begin(), NextServerId.end(), 1);
for(auto& vec : ServerIdToDK)
vec.emplace_back();
AsyncContext.Binds.clear();
AsyncContext.LoadedResources.clear();
AsyncContext.ThisTickEntry = {};
AsyncContext.LoadedAssets.lock()->clear();
AsyncContext.AssetsBinds.lock()->clear();
AsyncContext.TickSequence.lock()->clear();
}
@@ -1323,212 +938,68 @@ coro<> ServerSession::rP_Disconnect(Net::AsyncSocket &sock) {
}
coro<> ServerSession::rP_AssetsBindDK(Net::AsyncSocket &sock) {
UpdateAssetsBindsDK update;
std::string compressed = co_await sock.read<std::string>();
std::u8string in((const char8_t*) compressed.data(), compressed.size());
std::u8string data = unCompressLinear(in);
Net::LinearReader lr(data);
uint16_t domainsCount = lr.read<uint16_t>();
std::vector<std::string> domains;
domains.reserve(domainsCount);
update.Domains.reserve(domainsCount);
for(uint16_t i = 0; i < domainsCount; ++i)
domains.push_back(lr.read<std::string>());
update.Domains.push_back(lr.read<std::string>());
for(size_t type = 0; type < static_cast<size_t>(EnumAssets::MAX_ENUM); ++type) {
update.Keys[type].resize(update.Domains.size());
uint32_t count = lr.read<uint32_t>();
for(uint32_t i = 0; i < count; ++i) {
uint16_t domainId = lr.read<uint16_t>();
std::string key = lr.read<std::string>();
if(domainId >= domains.size())
if(domainId >= update.Domains.size())
continue;
ResourceId serverId = NextServerId[type]++;
auto& table = ServerIdToDK[type];
if(table.size() <= serverId)
table.resize(serverId + 1);
table[serverId] = {domains[domainId], std::move(key)};
update.Keys.at(type).at(domainId).push_back(key);
}
}
co_return;
AsyncContext.ThisTickEntry.BindsDK.emplace_back(std::move(update));
}
coro<> ServerSession::rP_AssetsBindHH(Net::AsyncSocket &sock) {
static std::atomic<uint32_t> debugResourceLogCount = 0;
AssetsBindsChange abc;
UpdateAssetsBindsHH update;
for(size_t typeIndex = 0; typeIndex < static_cast<size_t>(EnumAssets::MAX_ENUM); ++typeIndex) {
uint32_t count = co_await sock.read<uint32_t>();
if(count == 0)
continue;
std::vector<AssetBindEntry> binds;
binds.reserve(count);
for(size_t iter = 0; iter < count; ++iter) {
uint32_t id = co_await sock.read<uint32_t>();
Hash_t hash;
ResourceFile::Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
std::string headerStr = co_await sock.read<std::string>();
std::vector<uint8_t> header(headerStr.begin(), headerStr.end());
auto& table = ServerIdToDK[typeIndex];
assert(id <= table.size());
if(id >= table.size()) {
LOG.warn() << "AssetsBindHH without domain/key for id=" << id;
continue;
}
const auto& [domain, key] = table[id];
assert(!domain.empty() && !key.empty());
if(domain.empty() && key.empty()) {
LOG.warn() << "AssetsBindHH missing domain/key for id=" << id;
continue;
}
EnumAssets type = static_cast<EnumAssets>(typeIndex);
AssetsManager::BindResult bindResult = AM->bindServerResource(
type, (ResourceId) id, domain, key, hash, header);
if(bindResult.ReboundFrom)
abc.Lost[typeIndex].push_back(*bindResult.ReboundFrom);
if(!bindResult.Changed && !bindResult.ReboundFrom)
continue;
binds.emplace_back(AssetBindEntry{
.Type = type,
.Id = bindResult.LocalId,
.Domain = domain,
.Key = key,
.Hash = hash,
.Header = std::move(header)
});
if(binds.back().Domain == "test"
&& (binds.back().Type == EnumAssets::Nodestate
|| binds.back().Type == EnumAssets::Model
|| binds.back().Type == EnumAssets::Texture))
{
uint32_t idx = debugResourceLogCount.fetch_add(1);
if(idx < 128) {
LOG.debug() << "Bind asset type=" << assetTypeName(binds.back().Type)
<< " id=" << binds.back().Id
<< " key=" << binds.back().Domain << ':' << binds.back().Key
<< " hash=" << int(binds.back().Hash[0]) << '.'
<< int(binds.back().Hash[1]) << '.'
<< int(binds.back().Hash[2]) << '.'
<< int(binds.back().Hash[3]);
}
}
}
if(!binds.empty()) {
abc.Binds.append_range(binds);
binds.clear();
update.HashAndHeaders[typeIndex].emplace_back(id, hash, std::u8string((const char8_t*) headerStr.data(), headerStr.size()));
}
}
bool hasLost = false;
for(const auto& list : abc.Lost) {
if(!list.empty()) {
hasLost = true;
break;
}
}
if(!abc.Binds.empty() || hasLost)
AsyncContext.AssetsBinds.lock()->push_back(std::move(abc));
else
co_return;
co_return;
AsyncContext.ThisTickEntry.BindsHH.emplace_back(std::move(update));
}
coro<> ServerSession::rP_AssetsInitSend(Net::AsyncSocket &sock) {
static std::atomic<uint32_t> debugResourceLogCount = 0;
uint32_t size = co_await sock.read<uint32_t>();
Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
std::vector<AssetLoadingEntry> matches;
for(int typeIndex = 0; typeIndex < (int) EnumAssets::MAX_ENUM; ++typeIndex) {
auto& waitingByDomain = AsyncContext.ResourceWait[typeIndex];
for(auto iterDomain = waitingByDomain.begin(); iterDomain != waitingByDomain.end(); ) {
auto& entries = iterDomain->second;
for(size_t i = 0; i < entries.size(); ) {
if(entries[i].second == hash) {
EnumAssets type = static_cast<EnumAssets>(typeIndex);
const std::string& domain = iterDomain->first;
const std::string& key = entries[i].first;
ResourceId localId = AM->getOrCreateLocalId(type, domain, key);
matches.push_back(AssetLoadingEntry{
.Type = type,
.Id = localId,
.Domain = domain,
.Key = key
});
entries.erase(entries.begin() + i);
} else {
++i;
}
}
if(entries.empty())
iterDomain = waitingByDomain.erase(iterDomain);
else
++iterDomain;
}
}
if(matches.empty()) {
LOG.warn() << "AssetsInitSend for unknown hash " << int(hash[0]) << '.'
<< int(hash[1]) << '.' << int(hash[2]) << '.' << int(hash[3]);
AsyncContext.AssetsLoading[hash] = AssetLoading{
.Entries = {},
.Data = std::u8string(size, '\0'),
.Offset = 0
};
co_return;
}
AssetLoadingEntry first = matches.front();
if(first.Domain == "test"
&& (first.Type == EnumAssets::Nodestate
|| first.Type == EnumAssets::Model
|| first.Type == EnumAssets::Texture))
{
uint32_t idx = debugResourceLogCount.fetch_add(1);
if(idx < 128) {
LOG.debug() << "AssetsInitSend type=" << assetTypeName(first.Type)
<< " id=" << first.Id
<< " key=" << first.Domain << ':' << first.Key
<< " size=" << size
<< " matches=" << matches.size();
}
}
AsyncContext.AssetsLoading[hash] = AssetLoading{
.Entries = std::move(matches),
.Data = std::u8string(size, '\0'),
.Offset = 0
};
LOG.debug() << "Server started sending type=" << assetTypeName(first.Type)
<< " id=" << first.Id
<< " key=" << first.Domain << ':'
<< first.Key
<< " hash=" << int(hash[0]) << '.'
<< int(hash[1]) << '.'
<< int(hash[2]) << '.'
<< int(hash[3])
<< " size=" << size;
co_return;
}
coro<> ServerSession::rP_AssetsNextSend(Net::AsyncSocket &sock) {
static std::atomic<uint32_t> debugResourceLogCount = 0;
Hash_t hash;
co_await sock.read((std::byte*) hash.data(), hash.size());
uint32_t size = co_await sock.read<uint32_t>();
@@ -1549,56 +1020,8 @@ coro<> ServerSession::rP_AssetsNextSend(Net::AsyncSocket &sock) {
if(al.Offset != al.Data.size())
co_return;
if(!al.Entries.empty()) {
const size_t resSize = al.Data.size();
Resource res(std::move(al.Data));
for(AssetLoadingEntry& entry : al.Entries) {
if(entry.Domain == "test"
&& (entry.Type == EnumAssets::Nodestate
|| entry.Type == EnumAssets::Model
|| entry.Type == EnumAssets::Texture))
{
uint32_t idx = debugResourceLogCount.fetch_add(1);
if(idx < 128) {
LOG.debug() << "Resource loaded type=" << assetTypeName(entry.Type)
<< " id=" << entry.Id
<< " key=" << entry.Domain << ':' << entry.Key
<< " size=" << resSize;
}
}
const EnumAssets type = entry.Type;
const ResourceId id = entry.Id;
const std::string domain = entry.Domain;
const std::string key = entry.Key;
AsyncContext.LoadedAssets.lock()->emplace_back(AssetEntry{
.Type = type,
.Id = id,
.Domain = std::move(entry.Domain),
.Key = std::move(entry.Key),
.Res = res,
.Hash = hash
});
LOG.debug() << "Client received type=" << assetTypeName(type)
<< " id=" << id
<< " key=" << domain << ':' << key
<< " hash=" << int(hash[0]) << '.'
<< int(hash[1]) << '.'
<< int(hash[2]) << '.'
<< int(hash[3])
<< " size=" << resSize;
}
}
AsyncContext.ThisTickEntry.ReceivedAssets.emplace_back(hash, std::move(al.Data));
AsyncContext.AssetsLoading.erase(AsyncContext.AssetsLoading.find(hash));
auto iter = std::lower_bound(AsyncContext.AlreadyLoading.begin(), AsyncContext.AlreadyLoading.end(), hash);
if(iter != AsyncContext.AlreadyLoading.end() && *iter == hash)
AsyncContext.AlreadyLoading.erase(iter);
co_return;
}
coro<> ServerSession::rP_DefinitionsUpdate(Net::AsyncSocket &sock) {