From e190c79d003bfa61fc20f528ec63893365a7b1ef Mon Sep 17 00:00:00 2001 From: DrSocalkwe3n Date: Tue, 11 Mar 2025 14:55:43 +0600 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D1=81=D0=BC=D0=BE?= =?UTF-8?q?=D1=82=D1=80=20=D0=B0=D1=81=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD?= =?UTF-8?q?=D0=BD=D0=BE=D1=81=D1=82=D0=B5=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Src/Client/ServerSession.hpp | 7 +- Src/Client/Vulkan/Vulkan.cpp | 4 +- Src/Client/Vulkan/Vulkan.hpp | 3 +- Src/Common/Async.hpp | 160 +--------- Src/Common/Net.cpp | 33 ++- Src/Common/Net.hpp | 24 +- Src/Common/async_mutex.hpp | 422 +++++++++++++++++++++++++++ Src/Server/BinaryResourceManager.cpp | 4 +- Src/Server/BinaryResourceManager.hpp | 4 +- Src/Server/GameServer.cpp | 6 +- Src/Server/GameServer.hpp | 5 +- Src/main.cpp | 4 +- 12 files changed, 482 insertions(+), 194 deletions(-) create mode 100644 Src/Common/async_mutex.hpp diff --git a/Src/Client/ServerSession.hpp b/Src/Client/ServerSession.hpp index f9247a7..b94f189 100644 --- a/Src/Client/ServerSession.hpp +++ b/Src/Client/ServerSession.hpp @@ -24,7 +24,8 @@ struct ParsedPacket { virtual ~ParsedPacket(); }; -class ServerSession : public AsyncObject, public IServerSession, public ISurfaceEventListener { +class ServerSession : public IServerSession, public ISurfaceEventListener { + asio::io_context &IOC; std::unique_ptr Socket; IRenderSession *RS = nullptr; DestroyLock UseLock; @@ -56,10 +57,10 @@ class ServerSession : public AsyncObject, public IServerSession, public ISurface public: // Нужен сокет, на котором только что был согласован игровой протокол (asyncInitGameProtocol) ServerSession(asio::io_context &ioc, std::unique_ptr &&socket, IRenderSession *rs = nullptr) - : AsyncObject(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024) + : IOC(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024) { assert(Socket.get()); - co_spawn(run()); + asio::co_spawn(IOC, run(), asio::detached); } virtual ~ServerSession(); diff --git a/Src/Client/Vulkan/Vulkan.cpp b/Src/Client/Vulkan/Vulkan.cpp index bc57515..fca8b2f 100644 --- a/Src/Client/Vulkan/Vulkan.cpp +++ b/Src/Client/Vulkan/Vulkan.cpp @@ -73,7 +73,7 @@ ByteBuffer loadPNG(std::istream &&read, int &width, int &height, bool &hasAlpha, } Vulkan::Vulkan(asio::io_context &ioc) - : AsyncObject(ioc), GuardLock(ioc.get_executor()) + : IOC(ioc), GuardLock(ioc.get_executor()) { Screen.Width = 1920/2; Screen.Height = 1080/2; @@ -2075,7 +2075,7 @@ void Vulkan::gui_MainMenu() { ConnectionProgress.InProgress = true; ConnectionProgress.Cancel = false; ConnectionProgress.Progress.clear(); - co_spawn(ConnectionProgress.connect(IOC)); + asio::co_spawn(IOC, ConnectionProgress.connect(IOC), asio::detached); } if(!Game.Server) { diff --git a/Src/Client/Vulkan/Vulkan.hpp b/Src/Client/Vulkan/Vulkan.hpp index 00151fc..f37fe28 100644 --- a/Src/Client/Vulkan/Vulkan.hpp +++ b/Src/Client/Vulkan/Vulkan.hpp @@ -71,9 +71,10 @@ class Buffer; Vulkan.reInit(); */ -class Vulkan : public AsyncObject { +class Vulkan { private: Logger LOG = "Vulkan"; + asio::io_context &IOC; struct vkInstanceLayer { std::string LayerName = "nullptr", Description = "nullptr"; diff --git a/Src/Common/Async.hpp b/Src/Common/Async.hpp index 0e96959..5e9e6df 100644 --- a/Src/Common/Async.hpp +++ b/Src/Common/Async.hpp @@ -1,14 +1,16 @@ #pragma once +#include "TOSLib.hpp" +#include "boost/asio/async_result.hpp" +#include "boost/asio/deadline_timer.hpp" +#include "boost/date_time/posix_time/posix_time_duration.hpp" +#include #include -#include -#include -#include -#include -#include +#include #include #include #include +#include namespace LV { @@ -20,152 +22,4 @@ using tcp = asio::ip::tcp; template using coro = asio::awaitable; - -class AsyncObject { -protected: - asio::io_context &IOC; - asio::deadline_timer WorkDeadline; - -public: - AsyncObject(asio::io_context &ioc) - : IOC(ioc), WorkDeadline(ioc, boost::posix_time::pos_infin) - { - } - - inline asio::io_context& EXEC() - { - return IOC; - } - -protected: - template - void co_spawn(Coroutine &&coroutine) { - asio::co_spawn(IOC, WorkDeadline.async_wait(asio::use_awaitable) || std::move(coroutine), asio::detached); - } -}; - - -template -class AsyncAtomic : public AsyncObject { -protected: - asio::deadline_timer Deadline; - ValueType Value; - boost::mutex Mtx; - -public: - AsyncAtomic(asio::io_context &ioc, ValueType &&value) - : AsyncObject(ioc), Deadline(ioc), Value(std::move(value)) - { - } - - AsyncAtomic& operator=(ValueType &&value) { - boost::unique_lock lock(Mtx); - Value = std::move(value); - Deadline.expires_from_now(boost::posix_time::pos_infin); - return *this; - } - - operator ValueType() const { - return Value; - } - - ValueType operator*() const { - return Value; - } - - AsyncAtomic& operator++() { - boost::unique_lock lock(Mtx); - Value--; - Deadline.expires_from_now(boost::posix_time::pos_infin); - return *this; - } - - AsyncAtomic& operator--() { - boost::unique_lock lock(Mtx); - Value--; - Deadline.expires_from_now(boost::posix_time::pos_infin); - return *this; - } - - AsyncAtomic& operator+=(ValueType value) { - boost::unique_lock lock(Mtx); - Value += value; - Deadline.expires_from_now(boost::posix_time::pos_infin); - return *this; - } - - AsyncAtomic& operator-=(ValueType value) { - boost::unique_lock lock(Mtx); - Value -= value; - Deadline.expires_from_now(boost::posix_time::pos_infin); - return *this; - } - - void wait(ValueType oldValue) { - while(true) { - if(oldValue != Value) - return; - - boost::unique_lock lock(Mtx); - - if(oldValue != Value) - return; - - std::atomic_bool flag = false; - Deadline.async_wait([&](boost::system::error_code errc) { flag.store(true); flag.notify_all(); }); - lock.unlock(); - flag.wait(false); - } - } - - void await(ValueType needValue) { - while(true) { - if(needValue == Value) - return; - - boost::unique_lock lock(Mtx); - - if(needValue == Value) - return; - - std::atomic_bool flag = false; - Deadline.async_wait([&](boost::system::error_code errc) { flag.store(true); flag.notify_all(); }); - lock.unlock(); - flag.wait(false); - } - } - - coro<> async_wait(ValueType oldValue) { - while(true) { - if(oldValue != Value) - co_return; - - boost::unique_lock lock(Mtx); - - if(oldValue != Value) - co_return; - - auto coroutine = Deadline.async_wait(); - lock.unlock(); - try { co_await std::move(coroutine); } catch(...) {} - } - } - - coro<> async_await(ValueType needValue) { - while(true) { - if(needValue == Value) - co_return; - - boost::unique_lock lock(Mtx); - - if(needValue == Value) - co_return; - - auto coroutine = Deadline.async_wait(); - lock.unlock(); - try { co_await std::move(coroutine); } catch(...) {} - } - } -}; - } \ No newline at end of file diff --git a/Src/Common/Net.cpp b/Src/Common/Net.cpp index 772d3be..664fde8 100644 --- a/Src/Common/Net.cpp +++ b/Src/Common/Net.cpp @@ -17,7 +17,7 @@ bool SocketServer::isStopped() { coro SocketServer::run(std::function(tcp::socket)> onConnect) { while(true) { // TODO: ловить ошибки на async_accept try { - co_spawn(onConnect(co_await Acceptor.async_accept())); + asio::co_spawn(IOC, onConnect(co_await Acceptor.async_accept()), asio::detached); } catch(const std::exception &exc) { if(const boost::system::system_error *errc = dynamic_cast(&exc); errc && (errc->code() == asio::error::operation_aborted || errc->code() == asio::error::bad_descriptor)) @@ -31,13 +31,8 @@ AsyncSocket::~AsyncSocket() { if(SendPackets.Context) SendPackets.Context->NeedShutdown = true; - { - boost::lock_guard lock(SendPackets.Mtx); - - SendPackets.SenderGuard.cancel(); - WorkDeadline.cancel(); - } - + boost::unique_lock lock(SendPackets.Mtx); + if(Socket.is_open()) try { Socket.close(); } catch(...) {} } @@ -55,6 +50,8 @@ void AsyncSocket::pushPackets(std::vector *simplePackets, std::vectorclear(); @@ -86,8 +83,7 @@ void AsyncSocket::pushPackets(std::vector *simplePackets, std::vector AsyncSocket::runSender(std::shared_ptr context) { while(!context->NeedShutdown) { { boost::unique_lock lock(SendPackets.Mtx); - if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) { - SendPackets.WaitForSemaphore = true; - auto coroutine = SendPackets.Semaphore.async_wait(); - lock.unlock(); - try { co_await std::move(coroutine); } catch(...) {} + if(context->NeedShutdown) { + break; + } + + if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) { + auto coroutine = SendPackets.Semaphore.async_wait(); + + if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) { + lock.unlock(); + try { co_await std::move(coroutine); } catch(...) {} + } + continue; } else { for(int cycle = 0; cycle < 2; cycle++, NextBuffer++) { diff --git a/Src/Common/Net.hpp b/Src/Common/Net.hpp index 4578398..694d890 100644 --- a/Src/Common/Net.hpp +++ b/Src/Common/Net.hpp @@ -3,28 +3,28 @@ #include "MemoryPool.hpp" #include "Async.hpp" #include "TOSLib.hpp" +#include "boost/asio/io_context.hpp" -#include #include #include #include #include #include -#include namespace LV::Net { -class SocketServer : public AsyncObject { +class SocketServer { protected: + asio::io_context &IOC; tcp::acceptor Acceptor; public: SocketServer(asio::io_context &ioc, std::function(tcp::socket)> &&onConnect, uint16_t port = 0) - : AsyncObject(ioc), Acceptor(ioc, tcp::endpoint(tcp::v4(), port)) + : IOC(ioc), Acceptor(ioc, tcp::endpoint(tcp::v4(), port)) { assert(onConnect); - co_spawn(run(std::move(onConnect))); + asio::co_spawn(IOC, run(std::move(onConnect)), asio::detached); } bool isStopped(); @@ -178,7 +178,8 @@ protected: std::function()> OnSend; }; - class AsyncSocket : public AsyncObject { + class AsyncSocket { + asio::io_context &IOC; NetPool::Array<32> RecvBuffer, SendBuffer; size_t RecvPos = 0, RecvSize = 0, SendSize = 0; bool ReadShutdowned = false; @@ -196,22 +197,21 @@ protected: struct SendPacketsObj { boost::mutex Mtx; - bool WaitForSemaphore = false; - asio::deadline_timer Semaphore, SenderGuard; + asio::deadline_timer Semaphore; boost::circular_buffer_space_optimized SimpleBuffer; boost::circular_buffer_space_optimized SmartBuffer; size_t SizeInQueue = 0; std::shared_ptr Context; SendPacketsObj(asio::io_context &ioc) - : Semaphore(ioc, boost::posix_time::pos_infin), SenderGuard(ioc, boost::posix_time::pos_infin) + : Semaphore(ioc, boost::posix_time::pos_infin) {} } SendPackets; public: AsyncSocket(asio::io_context &ioc, tcp::socket &&socket) - : AsyncObject(ioc), Socket(std::move(socket)), SendPackets(ioc) - { + : IOC(ioc), Socket(std::move(socket)), SendPackets(ioc) + { SendPackets.SimpleBuffer.set_capacity(512); SendPackets.SmartBuffer.set_capacity(SendPackets.SimpleBuffer.capacity()/4); SendPackets.Context = std::make_shared(); @@ -221,7 +221,7 @@ protected: boost::asio::ip::tcp::no_delay optionNoDelay(true); // Отключает попытки объёденить данные в крупные пакеты Socket.set_option(optionNoDelay); - co_spawn(runSender(SendPackets.Context)); + asio::co_spawn(IOC, runSender(SendPackets.Context), asio::detached); } ~AsyncSocket(); diff --git a/Src/Common/async_mutex.hpp b/Src/Common/async_mutex.hpp new file mode 100644 index 0000000..efafb6b --- /dev/null +++ b/Src/Common/async_mutex.hpp @@ -0,0 +1,422 @@ +// SPDX-FileCopyrightText: 2023 Daniel Vrátil +// SPDX-FileCopyrightText: 2023 Martin Beran +// +// SPDX-License-Identifier: BSL-1.0 + +#pragma once + +#include +#include +#include +#include +#include +#include + +#define ASIO_NS boost::asio + +namespace avast::asio { + +class async_mutex_lock; +class async_mutex; + +/** \internal **/ +namespace detail { + +/** + * \brief Represents a suspended coroutine that is awaiting lock acquisition. + **/ +struct locked_waiter { + /** + * \brief Constructs a new locked_waiter. + * \param next_waiter Pointer to the waiter to prepend this locked_waiter to. + **/ + explicit locked_waiter(locked_waiter *next_waiter): next(next_waiter) {} + locked_waiter(locked_waiter &&) = delete; + locked_waiter(const locked_waiter &) = delete; + locked_waiter &operator=(locked_waiter &&) = delete; + locked_waiter &operator=(const locked_waiter &) = delete; + /** + * \brief Destructor. + **/ + virtual ~locked_waiter() = default; + + /** + * \brief Completes the pending asynchronous operation. + * + * Resumes the currently suspended coroutine with the acquired lock. + **/ + virtual void completion() = 0; + + /** + * The waiters are held in a linked list. This is a pointer to the next member of the list. + **/ + locked_waiter *next = nullptr; +}; + +/** + * \brief Locked waiter that used `async_mutex::async_lock()` to acquire the lock. + **/ +template +struct async_locked_waiter final: public locked_waiter { + /** + * \brief Constructs a new async_locked_waiter. + * \param mutex A mutex that the waiter is trying to acquire a lock for. + * \param next_waiter Pointer to the head of the waiters linked list to prepend this waiter to. + * \param token The complention token to call when the asynchronous operation is completed. + **/ + async_locked_waiter([[maybe_unused]] async_mutex *mutex, locked_waiter *next_waiter, Token &&token): + locked_waiter(next_waiter), m_token(std::move(token)) {} + + void completion() override { + auto executor = ASIO_NS::get_associated_executor(m_token); + ASIO_NS::post(std::move(executor), [token = std::move(m_token)]() mutable { token(); }); + } + +private: + Token m_token; //!< The completion token to invoke when the lock is acquired. +}; + +/** + * \brief Locked waiter that used `async_mutex::async_scoped_lock()` to acquire the lock. + **/ +template +struct scoped_async_locked_waiter final: public locked_waiter { + /** + * \brief Constructs a new scoped_async_locked_waiter. + * \param mutex A mutex that the waiter is trying to acquire a lock for. + * \param next_waiter Pointer to the head of the waiters linked list to prepend this waiter to. + * \param token The complention token to call when the asynchronous operation is completed. + **/ + scoped_async_locked_waiter(async_mutex *mutex, locked_waiter *next_waiter, Token &&token): + locked_waiter(next_waiter), m_mutex(mutex), m_token(std::move(token)) {} + + void completion() override; + +private: + async_mutex *m_mutex; //!< The mutex whose lock is being awaited. + Token m_token; //!< The completion token to invoke when the lock is acquired. +}; + +/** + * \brief An initiator for asio::async_initiate(). + **/ +template