From 1710eb974da2a465fb3595b6323b7fd13cd922f1 Mon Sep 17 00:00:00 2001 From: DrSocalkwe3n Date: Tue, 11 Mar 2025 19:37:36 +0600 Subject: [PATCH] TOSAsync --- Src/Client/ServerSession.cpp | 1 - Src/Common/Net.cpp | 8 ++ Src/Common/Net.hpp | 1 + Src/Server/ContentEventController.cpp | 2 +- Src/Server/ContentEventController.hpp | 5 +- Src/Server/GameServer.cpp | 7 +- Src/Server/GameServer.hpp | 2 +- Src/Server/RemoteClient.cpp | 21 ++-- Src/Server/RemoteClient.hpp | 32 +++-- Src/TOSAsync.hpp | 163 +++++++++++++++++++------- 10 files changed, 178 insertions(+), 64 deletions(-) diff --git a/Src/Client/ServerSession.cpp b/Src/Client/ServerSession.cpp index 8d86896..5b3a4c0 100644 --- a/Src/Client/ServerSession.cpp +++ b/Src/Client/ServerSession.cpp @@ -41,7 +41,6 @@ struct PP_Content_ChunkRemove : public ParsedPacket { using namespace TOS; ServerSession::~ServerSession() { - WorkDeadline.cancel(); UseLock.wait_no_use(); } diff --git a/Src/Common/Net.cpp b/Src/Common/Net.cpp index 664fde8..b90edff 100644 --- a/Src/Common/Net.cpp +++ b/Src/Common/Net.cpp @@ -120,10 +120,18 @@ coro<> AsyncSocket::read(std::byte *data, uint32_t size) { void AsyncSocket::closeRead() { if(Socket.is_open() && !ReadShutdowned) { ReadShutdowned = true; + // TODO: try { Socket.shutdown(boost::asio::socket_base::shutdown_receive); } catch(...) {} } } +void AsyncSocket::close() { + if(Socket.is_open()) { + Socket.close(); + ReadShutdowned = true; + } +} + coro<> AsyncSocket::waitForSend() { asio::deadline_timer waiter(IOC); diff --git a/Src/Common/Net.hpp b/Src/Common/Net.hpp index 694d890..77381b7 100644 --- a/Src/Common/Net.hpp +++ b/Src/Common/Net.hpp @@ -239,6 +239,7 @@ protected: coro<> read(std::byte *data, uint32_t size); void closeRead(); + void close(); template or std::is_same_v, int> = 0> coro read() { diff --git a/Src/Server/ContentEventController.cpp b/Src/Server/ContentEventController.cpp index 35d29af..ef0eae4 100644 --- a/Src/Server/ContentEventController.cpp +++ b/Src/Server/ContentEventController.cpp @@ -7,7 +7,7 @@ namespace LV::Server { -ContentEventController::ContentEventController(std::unique_ptr &&remote) +ContentEventController::ContentEventController(RemoteClient_ptr &&remote) : Remote(std::move(remote)) { } diff --git a/Src/Server/ContentEventController.hpp b/Src/Server/ContentEventController.hpp index 01dadbb..aeeb0c4 100644 --- a/Src/Server/ContentEventController.hpp +++ b/Src/Server/ContentEventController.hpp @@ -14,6 +14,7 @@ namespace LV::Server { class RemoteClient; +using RemoteClient_ptr = std::unique_ptr>; class GameServer; class World; @@ -162,7 +163,7 @@ private: public: // Управляется сервером - std::unique_ptr Remote; + RemoteClient_ptr Remote; // Регионы сюда заглядывают // Каждый такт значения изменений обновляются GameServer'ом // Объявленная в чанках территория точно отслеживается (активная зона) @@ -173,7 +174,7 @@ public: // std::unordered_map> SubscribedRegions; public: - ContentEventController(std::unique_ptr &&remote); + ContentEventController(RemoteClient_ptr &&remote); // Измеряется в чанках в радиусе (активная зона) uint16_t getViewRangeActive() const; diff --git a/Src/Server/GameServer.cpp b/Src/Server/GameServer.cpp index 34d3cd3..139609a 100644 --- a/Src/Server/GameServer.cpp +++ b/Src/Server/GameServer.cpp @@ -247,7 +247,7 @@ coro<> GameServer::pushSocketGameProtocol(tcp::socket socket, const std::string co_await Net::AsyncSocket::write(socket, 0); External.NewConnectedPlayers.lock_write() - ->push_back(std::make_unique(IOC, std::move(socket), username)); + ->push_back(RemoteClient::Create(IOC, std::move(socket), username)); } } } @@ -333,7 +333,7 @@ void GameServer::run() { // Отключить вновь подключившихся auto lock = External.NewConnectedPlayers.lock_write(); - for(std::unique_ptr &client : *lock) { + for(RemoteClient_ptr &client : *lock) { client->shutdown(EnumDisconnect::ByInterface, ShutdownReason); } @@ -449,8 +449,7 @@ void GameServer::stepPlayers() { if(!External.NewConnectedPlayers.no_lock_readable().empty()) { auto lock = External.NewConnectedPlayers.lock_write(); - for(std::unique_ptr &client : *lock) { - asio::co_spawn(IOC, client->run(), asio::detached); + for(RemoteClient_ptr &client : *lock) { Game.CECs.push_back(std::make_unique(std::move(client))); } diff --git a/Src/Server/GameServer.hpp b/Src/Server/GameServer.hpp index db629f0..8141296 100644 --- a/Src/Server/GameServer.hpp +++ b/Src/Server/GameServer.hpp @@ -42,7 +42,7 @@ class GameServer { struct { Lockable> ConnectedPlayersSet; - Lockable>> NewConnectedPlayers; + Lockable> NewConnectedPlayers; } External; diff --git a/Src/Server/RemoteClient.cpp b/Src/Server/RemoteClient.cpp index 0d61d74..3606a88 100644 --- a/Src/Server/RemoteClient.cpp +++ b/Src/Server/RemoteClient.cpp @@ -3,28 +3,33 @@ #include "Common/Net.hpp" #include "Server/Abstract.hpp" #include +#include #include #include #include #include #include "World.hpp" +#include "boost/asio/steady_timer.hpp" #include namespace LV::Server { -RemoteClient::~RemoteClient() { +coro<> RemoteClient::asyncDestructor() { shutdown(EnumDisconnect::ByInterface, "~RemoteClient()"); - if(Socket.isAlive()) { - Socket.closeRead(); - } - - UseLock.wait_no_use(); + asio::steady_timer deadline(IOC); + deadline.expires_after(std::chrono::seconds(1)); + + Socket.closeRead(); + co_await (deadline.async_wait(asio::use_awaitable) || RunCoro.async_wait()); + Socket.close(); + + co_await deadline.async_wait(); } -coro<> RemoteClient::run() { - auto useLock = UseLock.lock(); +RemoteClient::~RemoteClient() = default; +coro<> RemoteClient::run() { try { while(!IsGoingShutdown && IsConnected) { co_await readPacket(Socket); diff --git a/Src/Server/RemoteClient.hpp b/Src/Server/RemoteClient.hpp index 9ff45a6..ce89889 100644 --- a/Src/Server/RemoteClient.hpp +++ b/Src/Server/RemoteClient.hpp @@ -3,12 +3,18 @@ #include #include #include +#include #include "Abstract.hpp" #include "Common/Packets.hpp" #include "Server/ContentEventController.hpp" +#include "TOSAsync.hpp" +#include "boost/asio/detached.hpp" +#include "boost/asio/io_context.hpp" +#include "boost/asio/use_awaitable.hpp" #include #include #include +#include #include #include #include @@ -173,16 +179,16 @@ using EntityKey = std::tuple; - +class RemoteClient; +using RemoteClient_ptr = std::unique_ptr>; /* Обработчик сокета клиента. Подписывает клиента на отслеживание необходимых ресурсов на основе передаваемых клиенту данных */ -class RemoteClient { +class RemoteClient : public TOS::IAsyncDestructible { TOS::Logger LOG; - DestroyLock UseLock; Net::AsyncSocket Socket; bool IsConnected = true, IsGoingShutdown = false; @@ -264,20 +270,32 @@ class RemoteClient { ResourceRequest NextRequest; std::vector SimplePackets; + TOS::WaitableCoro RunCoro; + public: const std::string Username; Pos::Object CameraPos = {0, 0, 0}; ToServer::PacketQuat CameraQuat = {0}; -public: +private: + RemoteClient(asio::io_context &ioc, tcp::socket socket, const std::string username) - : LOG("RemoteClient " + username), Socket(ioc, std::move(socket)), Username(username) + : IAsyncDestructible(ioc), LOG("RemoteClient " + username), Socket(ioc, std::move(socket)), + Username(username), RunCoro(ioc) { + RunCoro.co_spawn(run()); } - ~RemoteClient(); - + virtual coro<> asyncDestructor() override; coro<> run(); + +public: + static RemoteClient_ptr Create(asio::io_context &ioc, tcp::socket socket, const std::string username) { + return createUnique<>(ioc, new RemoteClient(ioc, std::move(socket), username)); + } + + virtual ~RemoteClient(); + void shutdown(EnumDisconnect type, const std::string reason); bool isConnected() { return IsConnected; } diff --git a/Src/TOSAsync.hpp b/Src/TOSAsync.hpp index 1530374..32e2df3 100644 --- a/Src/TOSAsync.hpp +++ b/Src/TOSAsync.hpp @@ -1,77 +1,136 @@ #pragma once -#include "boost/asio/awaitable.hpp" -#include "boost/asio/co_spawn.hpp" +#include "TOSLib.hpp" +#include "boost/asio/associated_cancellation_slot.hpp" +#include "boost/asio/associated_executor.hpp" #include "boost/asio/deadline_timer.hpp" -#include "boost/asio/detached.hpp" #include "boost/asio/io_context.hpp" -#include "boost/asio/use_awaitable.hpp" +#include "boost/system/detail/error_code.hpp" +#include +#include #include +#include #include #include namespace TOS { using namespace boost::asio::experimental::awaitable_operators; +namespace asio = boost::asio; template using coro = boost::asio::awaitable; -class AsyncSemaphore -{ - boost::asio::deadline_timer Deadline; - std::atomic Lock = 0; +// class AsyncSemaphore +// { +// boost::asio::deadline_timer Deadline; +// std::atomic Lock = 0; + +// public: +// AsyncSemaphore(boost::asio::io_context& ioc) +// : Deadline(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) +// {} + +// coro<> async_wait() { +// try { +// co_await Deadline.async_wait(boost::asio::use_awaitable); +// } catch(boost::system::system_error code) { +// if(code.code() != boost::system::errc::operation_canceled) +// throw; +// } + +// co_await asio::this_coro::throw_if_cancelled(); +// } + +// coro<> async_wait(std::function predicate) { +// while(!predicate()) +// co_await async_wait(); +// } + +// void notify_one() { +// Deadline.cancel_one(); +// } + +// void notify_all() { +// Deadline.cancel(); +// } +// }; + +/* + Многие могут уведомлять одного + Ждёт события. После доставки уведомления ждёт повторно +*/ +class MultipleToOne_AsyncSymaphore { + asio::deadline_timer Timer; public: - AsyncSemaphore( - boost::asio::io_context& ioc) - : Deadline(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) + MultipleToOne_AsyncSymaphore(asio::io_context &ioc) + : Timer(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) {} - boost::asio::awaitable async_wait() { - try { - co_await Deadline.async_wait(boost::asio::use_awaitable); - } catch(boost::system::system_error code) { - if(code.code() != boost::system::errc::operation_canceled) - throw; - } - - co_await boost::asio::this_coro::throw_if_cancelled(); + void notify() { + Timer.cancel(); } - boost::asio::awaitable async_wait(std::function predicate) { - while(!predicate()) - co_await async_wait(); + void wait() { + Timer.wait(); + Timer.expires_at(boost::posix_time::ptime(boost::posix_time::pos_infin)); + } + + coro<> async_wait() { + try { co_await Timer.async_wait(); } catch(...) {} } - void notify_one() { - Deadline.cancel_one(); +}; + +class WaitableCoro { + asio::io_context &IOC; + std::shared_ptr Symaphore; + std::exception_ptr LastException; + +public: + WaitableCoro(asio::io_context &ioc) + : IOC(ioc) + {} + + template + void co_spawn(Token token) { + Symaphore = std::make_shared(IOC); + asio::co_spawn(IOC, [token = std::move(token), symaphore = Symaphore]() -> coro<> { + co_await std::move(token); + symaphore->notify(); + }, asio::detached); } - void notify_all() { - Deadline.cancel(); + void wait() { + Symaphore->wait(); + } + + coro<> async_wait() { + return Symaphore->async_wait(); } }; +/* + Используется, чтобы вместо уничтожения объекта в умной ссылке, вызвать корутину с co_await asyncDestructor() +*/ class IAsyncDestructible : public std::enable_shared_from_this { protected: - boost::asio::any_io_executor IOC; - boost::asio::deadline_timer DestructLine; + asio::io_context &IOC; - virtual coro<> asyncDestructor() { DestructLine.cancel(); co_return; } + virtual coro<> asyncDestructor() { co_return; } public: - IAsyncDestructible(boost::asio::any_io_executor ioc) - : IOC(ioc), DestructLine(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) + IAsyncDestructible(asio::io_context &ioc) + : IOC(ioc) {} virtual ~IAsyncDestructible() {} - coro> cancelable(coro<> &&c) { return std::move(c) || DestructLine.async_wait(boost::asio::use_awaitable); } - +protected: template> - static std::shared_ptr createShared(boost::asio::any_io_executor ioc, T *ptr) + static std::shared_ptr createShared(asio::io_context &ioc, T *ptr) { - return std::shared_ptr(ptr, [ioc = std::move(ioc)](IAsyncDestructible *ptr) { + return std::shared_ptr(ptr, [&ioc = ioc](T *ptr) { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { try { co_await ptr->asyncDestructor(); } catch(...) { } delete ptr; @@ -80,16 +139,40 @@ public: }); } - template> - static std::shared_ptr makeShared(boost::asio::any_io_executor ioc, Args&& ... args) + template> + static coro> createShared(T *ptr) { - std::shared_ptr(new T(ioc, std::forward(args)..., [ioc = std::move(ioc)](IAsyncDestructible *ptr) { + co_return std::shared_ptr(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { try { co_await ptr->asyncDestructor(); } catch(...) { } delete ptr; co_return; } (ptr), boost::asio::detached); - })); + }); + } + + template> + static std::unique_ptr> createUnique(asio::io_context &ioc, T *ptr) + { + return std::unique_ptr>(ptr, [&ioc = ioc](T *ptr) { + boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { + try { co_await ptr->asyncDestructor(); } catch(...) { } + delete ptr; + co_return; + } (ptr), boost::asio::detached); + }); + } + + template> + static coro>> createUnique(T *ptr) + { + co_return std::unique_ptr>(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) { + boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { + try { co_await ptr->asyncDestructor(); } catch(...) { } + delete ptr; + co_return; + } (ptr), boost::asio::detached); + }); } };