Compare commits

2 Commits
master ... mr_s

Author SHA1 Message Date
1710eb974d TOSAsync 2025-03-11 19:37:36 +06:00
e190c79d00 Пересмотр асинхронностей 2025-03-11 14:55:43 +06:00
18 changed files with 659 additions and 257 deletions

View File

@@ -41,7 +41,6 @@ struct PP_Content_ChunkRemove : public ParsedPacket {
using namespace TOS; using namespace TOS;
ServerSession::~ServerSession() { ServerSession::~ServerSession() {
WorkDeadline.cancel();
UseLock.wait_no_use(); UseLock.wait_no_use();
} }

View File

@@ -24,7 +24,8 @@ struct ParsedPacket {
virtual ~ParsedPacket(); virtual ~ParsedPacket();
}; };
class ServerSession : public AsyncObject, public IServerSession, public ISurfaceEventListener { class ServerSession : public IServerSession, public ISurfaceEventListener {
asio::io_context &IOC;
std::unique_ptr<Net::AsyncSocket> Socket; std::unique_ptr<Net::AsyncSocket> Socket;
IRenderSession *RS = nullptr; IRenderSession *RS = nullptr;
DestroyLock UseLock; DestroyLock UseLock;
@@ -56,10 +57,10 @@ class ServerSession : public AsyncObject, public IServerSession, public ISurface
public: public:
// Нужен сокет, на котором только что был согласован игровой протокол (asyncInitGameProtocol) // Нужен сокет, на котором только что был согласован игровой протокол (asyncInitGameProtocol)
ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket> &&socket, IRenderSession *rs = nullptr) ServerSession(asio::io_context &ioc, std::unique_ptr<Net::AsyncSocket> &&socket, IRenderSession *rs = nullptr)
: AsyncObject(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024) : IOC(ioc), Socket(std::move(socket)), RS(rs), NetInputPackets(1024)
{ {
assert(Socket.get()); assert(Socket.get());
co_spawn(run()); asio::co_spawn(IOC, run(), asio::detached);
} }
virtual ~ServerSession(); virtual ~ServerSession();

View File

@@ -73,7 +73,7 @@ ByteBuffer loadPNG(std::istream &&read, int &width, int &height, bool &hasAlpha,
} }
Vulkan::Vulkan(asio::io_context &ioc) Vulkan::Vulkan(asio::io_context &ioc)
: AsyncObject(ioc), GuardLock(ioc.get_executor()) : IOC(ioc), GuardLock(ioc.get_executor())
{ {
Screen.Width = 1920/2; Screen.Width = 1920/2;
Screen.Height = 1080/2; Screen.Height = 1080/2;
@@ -2075,7 +2075,7 @@ void Vulkan::gui_MainMenu() {
ConnectionProgress.InProgress = true; ConnectionProgress.InProgress = true;
ConnectionProgress.Cancel = false; ConnectionProgress.Cancel = false;
ConnectionProgress.Progress.clear(); ConnectionProgress.Progress.clear();
co_spawn(ConnectionProgress.connect(IOC)); asio::co_spawn(IOC, ConnectionProgress.connect(IOC), asio::detached);
} }
if(!Game.Server) { if(!Game.Server) {

View File

@@ -71,9 +71,10 @@ class Buffer;
Vulkan.reInit(); Vulkan.reInit();
*/ */
class Vulkan : public AsyncObject { class Vulkan {
private: private:
Logger LOG = "Vulkan"; Logger LOG = "Vulkan";
asio::io_context &IOC;
struct vkInstanceLayer { struct vkInstanceLayer {
std::string LayerName = "nullptr", Description = "nullptr"; std::string LayerName = "nullptr", Description = "nullptr";

View File

@@ -1,14 +1,16 @@
#pragma once #pragma once
#include "TOSLib.hpp"
#include "boost/asio/async_result.hpp"
#include "boost/asio/deadline_timer.hpp"
#include "boost/date_time/posix_time/posix_time_duration.hpp"
#include <atomic>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/any_io_executor.hpp> #include <boost/asio/impl/execution_context.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/date_time/posix_time/ptime.hpp> #include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp> #include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/lockfree/queue.hpp>
namespace LV { namespace LV {
@@ -20,152 +22,4 @@ using tcp = asio::ip::tcp;
template<typename T = void> template<typename T = void>
using coro = asio::awaitable<T>; using coro = asio::awaitable<T>;
class AsyncObject {
protected:
asio::io_context &IOC;
asio::deadline_timer WorkDeadline;
public:
AsyncObject(asio::io_context &ioc)
: IOC(ioc), WorkDeadline(ioc, boost::posix_time::pos_infin)
{
}
inline asio::io_context& EXEC()
{
return IOC;
}
protected:
template<typename Coroutine>
void co_spawn(Coroutine &&coroutine) {
asio::co_spawn(IOC, WorkDeadline.async_wait(asio::use_awaitable) || std::move(coroutine), asio::detached);
}
};
template<typename ValueType>
class AsyncAtomic : public AsyncObject {
protected:
asio::deadline_timer Deadline;
ValueType Value;
boost::mutex Mtx;
public:
AsyncAtomic(asio::io_context &ioc, ValueType &&value)
: AsyncObject(ioc), Deadline(ioc), Value(std::move(value))
{
}
AsyncAtomic& operator=(ValueType &&value) {
boost::unique_lock lock(Mtx);
Value = std::move(value);
Deadline.expires_from_now(boost::posix_time::pos_infin);
return *this;
}
operator ValueType() const {
return Value;
}
ValueType operator*() const {
return Value;
}
AsyncAtomic& operator++() {
boost::unique_lock lock(Mtx);
Value--;
Deadline.expires_from_now(boost::posix_time::pos_infin);
return *this;
}
AsyncAtomic& operator--() {
boost::unique_lock lock(Mtx);
Value--;
Deadline.expires_from_now(boost::posix_time::pos_infin);
return *this;
}
AsyncAtomic& operator+=(ValueType value) {
boost::unique_lock lock(Mtx);
Value += value;
Deadline.expires_from_now(boost::posix_time::pos_infin);
return *this;
}
AsyncAtomic& operator-=(ValueType value) {
boost::unique_lock lock(Mtx);
Value -= value;
Deadline.expires_from_now(boost::posix_time::pos_infin);
return *this;
}
void wait(ValueType oldValue) {
while(true) {
if(oldValue != Value)
return;
boost::unique_lock lock(Mtx);
if(oldValue != Value)
return;
std::atomic_bool flag = false;
Deadline.async_wait([&](boost::system::error_code errc) { flag.store(true); flag.notify_all(); });
lock.unlock();
flag.wait(false);
}
}
void await(ValueType needValue) {
while(true) {
if(needValue == Value)
return;
boost::unique_lock lock(Mtx);
if(needValue == Value)
return;
std::atomic_bool flag = false;
Deadline.async_wait([&](boost::system::error_code errc) { flag.store(true); flag.notify_all(); });
lock.unlock();
flag.wait(false);
}
}
coro<> async_wait(ValueType oldValue) {
while(true) {
if(oldValue != Value)
co_return;
boost::unique_lock lock(Mtx);
if(oldValue != Value)
co_return;
auto coroutine = Deadline.async_wait();
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {}
}
}
coro<> async_await(ValueType needValue) {
while(true) {
if(needValue == Value)
co_return;
boost::unique_lock lock(Mtx);
if(needValue == Value)
co_return;
auto coroutine = Deadline.async_wait();
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {}
}
}
};
} }

View File

@@ -17,7 +17,7 @@ bool SocketServer::isStopped() {
coro<void> SocketServer::run(std::function<coro<>(tcp::socket)> onConnect) { coro<void> SocketServer::run(std::function<coro<>(tcp::socket)> onConnect) {
while(true) { // TODO: ловить ошибки на async_accept while(true) { // TODO: ловить ошибки на async_accept
try { try {
co_spawn(onConnect(co_await Acceptor.async_accept())); asio::co_spawn(IOC, onConnect(co_await Acceptor.async_accept()), asio::detached);
} catch(const std::exception &exc) { } catch(const std::exception &exc) {
if(const boost::system::system_error *errc = dynamic_cast<const boost::system::system_error*>(&exc); if(const boost::system::system_error *errc = dynamic_cast<const boost::system::system_error*>(&exc);
errc && (errc->code() == asio::error::operation_aborted || errc->code() == asio::error::bad_descriptor)) errc && (errc->code() == asio::error::operation_aborted || errc->code() == asio::error::bad_descriptor))
@@ -31,13 +31,8 @@ AsyncSocket::~AsyncSocket() {
if(SendPackets.Context) if(SendPackets.Context)
SendPackets.Context->NeedShutdown = true; SendPackets.Context->NeedShutdown = true;
{ boost::unique_lock lock(SendPackets.Mtx);
boost::lock_guard lock(SendPackets.Mtx);
SendPackets.SenderGuard.cancel();
WorkDeadline.cancel();
}
if(Socket.is_open()) if(Socket.is_open())
try { Socket.close(); } catch(...) {} try { Socket.close(); } catch(...) {}
} }
@@ -55,6 +50,8 @@ void AsyncSocket::pushPackets(std::vector<Packet> *simplePackets, std::vector<Sm
// TODO: std::cout << "Передоз пакетами, сокет закрыт" << std::endl; // TODO: std::cout << "Передоз пакетами, сокет закрыт" << std::endl;
} }
bool wasPackets = SendPackets.SimpleBuffer.size() || SendPackets.SmartBuffer.size();
if(!Socket.is_open()) { if(!Socket.is_open()) {
if(simplePackets) if(simplePackets)
simplePackets->clear(); simplePackets->clear();
@@ -86,8 +83,7 @@ void AsyncSocket::pushPackets(std::vector<Packet> *simplePackets, std::vector<Sm
SendPackets.SizeInQueue += addedSize; SendPackets.SizeInQueue += addedSize;
if(SendPackets.WaitForSemaphore) { if(!wasPackets) {
SendPackets.WaitForSemaphore = false;
SendPackets.Semaphore.cancel(); SendPackets.Semaphore.cancel();
SendPackets.Semaphore.expires_at(boost::posix_time::pos_infin); SendPackets.Semaphore.expires_at(boost::posix_time::pos_infin);
} }
@@ -124,10 +120,18 @@ coro<> AsyncSocket::read(std::byte *data, uint32_t size) {
void AsyncSocket::closeRead() { void AsyncSocket::closeRead() {
if(Socket.is_open() && !ReadShutdowned) { if(Socket.is_open() && !ReadShutdowned) {
ReadShutdowned = true; ReadShutdowned = true;
// TODO:
try { Socket.shutdown(boost::asio::socket_base::shutdown_receive); } catch(...) {} try { Socket.shutdown(boost::asio::socket_base::shutdown_receive); } catch(...) {}
} }
} }
void AsyncSocket::close() {
if(Socket.is_open()) {
Socket.close();
ReadShutdowned = true;
}
}
coro<> AsyncSocket::waitForSend() { coro<> AsyncSocket::waitForSend() {
asio::deadline_timer waiter(IOC); asio::deadline_timer waiter(IOC);
@@ -147,12 +151,19 @@ coro<> AsyncSocket::runSender(std::shared_ptr<AsyncContext> context) {
while(!context->NeedShutdown) { while(!context->NeedShutdown) {
{ {
boost::unique_lock lock(SendPackets.Mtx); boost::unique_lock lock(SendPackets.Mtx);
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
SendPackets.WaitForSemaphore = true;
auto coroutine = SendPackets.Semaphore.async_wait();
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {} if(context->NeedShutdown) {
break;
}
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
auto coroutine = SendPackets.Semaphore.async_wait();
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {}
}
continue; continue;
} else { } else {
for(int cycle = 0; cycle < 2; cycle++, NextBuffer++) { for(int cycle = 0; cycle < 2; cycle++, NextBuffer++) {

View File

@@ -3,28 +3,28 @@
#include "MemoryPool.hpp" #include "MemoryPool.hpp"
#include "Async.hpp" #include "Async.hpp"
#include "TOSLib.hpp" #include "TOSLib.hpp"
#include "boost/asio/io_context.hpp"
#include <atomic>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp> #include <boost/asio/write.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
#include <condition_variable>
namespace LV::Net { namespace LV::Net {
class SocketServer : public AsyncObject { class SocketServer {
protected: protected:
asio::io_context &IOC;
tcp::acceptor Acceptor; tcp::acceptor Acceptor;
public: public:
SocketServer(asio::io_context &ioc, std::function<coro<>(tcp::socket)> &&onConnect, uint16_t port = 0) SocketServer(asio::io_context &ioc, std::function<coro<>(tcp::socket)> &&onConnect, uint16_t port = 0)
: AsyncObject(ioc), Acceptor(ioc, tcp::endpoint(tcp::v4(), port)) : IOC(ioc), Acceptor(ioc, tcp::endpoint(tcp::v4(), port))
{ {
assert(onConnect); assert(onConnect);
co_spawn(run(std::move(onConnect))); asio::co_spawn(IOC, run(std::move(onConnect)), asio::detached);
} }
bool isStopped(); bool isStopped();
@@ -178,7 +178,8 @@ protected:
std::function<std::optional<SmartPacket>()> OnSend; std::function<std::optional<SmartPacket>()> OnSend;
}; };
class AsyncSocket : public AsyncObject { class AsyncSocket {
asio::io_context &IOC;
NetPool::Array<32> RecvBuffer, SendBuffer; NetPool::Array<32> RecvBuffer, SendBuffer;
size_t RecvPos = 0, RecvSize = 0, SendSize = 0; size_t RecvPos = 0, RecvSize = 0, SendSize = 0;
bool ReadShutdowned = false; bool ReadShutdowned = false;
@@ -196,22 +197,21 @@ protected:
struct SendPacketsObj { struct SendPacketsObj {
boost::mutex Mtx; boost::mutex Mtx;
bool WaitForSemaphore = false; asio::deadline_timer Semaphore;
asio::deadline_timer Semaphore, SenderGuard;
boost::circular_buffer_space_optimized<Packet> SimpleBuffer; boost::circular_buffer_space_optimized<Packet> SimpleBuffer;
boost::circular_buffer_space_optimized<SmartPacket> SmartBuffer; boost::circular_buffer_space_optimized<SmartPacket> SmartBuffer;
size_t SizeInQueue = 0; size_t SizeInQueue = 0;
std::shared_ptr<AsyncContext> Context; std::shared_ptr<AsyncContext> Context;
SendPacketsObj(asio::io_context &ioc) SendPacketsObj(asio::io_context &ioc)
: Semaphore(ioc, boost::posix_time::pos_infin), SenderGuard(ioc, boost::posix_time::pos_infin) : Semaphore(ioc, boost::posix_time::pos_infin)
{} {}
} SendPackets; } SendPackets;
public: public:
AsyncSocket(asio::io_context &ioc, tcp::socket &&socket) AsyncSocket(asio::io_context &ioc, tcp::socket &&socket)
: AsyncObject(ioc), Socket(std::move(socket)), SendPackets(ioc) : IOC(ioc), Socket(std::move(socket)), SendPackets(ioc)
{ {
SendPackets.SimpleBuffer.set_capacity(512); SendPackets.SimpleBuffer.set_capacity(512);
SendPackets.SmartBuffer.set_capacity(SendPackets.SimpleBuffer.capacity()/4); SendPackets.SmartBuffer.set_capacity(SendPackets.SimpleBuffer.capacity()/4);
SendPackets.Context = std::make_shared<AsyncContext>(); SendPackets.Context = std::make_shared<AsyncContext>();
@@ -221,7 +221,7 @@ protected:
boost::asio::ip::tcp::no_delay optionNoDelay(true); // Отключает попытки объёденить данные в крупные пакеты boost::asio::ip::tcp::no_delay optionNoDelay(true); // Отключает попытки объёденить данные в крупные пакеты
Socket.set_option(optionNoDelay); Socket.set_option(optionNoDelay);
co_spawn(runSender(SendPackets.Context)); asio::co_spawn(IOC, runSender(SendPackets.Context), asio::detached);
} }
~AsyncSocket(); ~AsyncSocket();
@@ -239,6 +239,7 @@ protected:
coro<> read(std::byte *data, uint32_t size); coro<> read(std::byte *data, uint32_t size);
void closeRead(); void closeRead();
void close();
template<typename T, std::enable_if_t<std::is_integral_v<T> or std::is_same_v<T, std::string>, int> = 0> template<typename T, std::enable_if_t<std::is_integral_v<T> or std::is_same_v<T, std::string>, int> = 0>
coro<T> read() { coro<T> read() {

422
Src/Common/async_mutex.hpp Normal file
View File

@@ -0,0 +1,422 @@
// SPDX-FileCopyrightText: 2023 Daniel Vrátil <daniel.vratil@gendigital.com>
// SPDX-FileCopyrightText: 2023 Martin Beran <martin.beran@gendigital.com>
//
// SPDX-License-Identifier: BSL-1.0
#pragma once
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <atomic>
#define ASIO_NS boost::asio
namespace avast::asio {
class async_mutex_lock;
class async_mutex;
/** \internal **/
namespace detail {
/**
* \brief Represents a suspended coroutine that is awaiting lock acquisition.
**/
struct locked_waiter {
/**
* \brief Constructs a new locked_waiter.
* \param next_waiter Pointer to the waiter to prepend this locked_waiter to.
**/
explicit locked_waiter(locked_waiter *next_waiter): next(next_waiter) {}
locked_waiter(locked_waiter &&) = delete;
locked_waiter(const locked_waiter &) = delete;
locked_waiter &operator=(locked_waiter &&) = delete;
locked_waiter &operator=(const locked_waiter &) = delete;
/**
* \brief Destructor.
**/
virtual ~locked_waiter() = default;
/**
* \brief Completes the pending asynchronous operation.
*
* Resumes the currently suspended coroutine with the acquired lock.
**/
virtual void completion() = 0;
/**
* The waiters are held in a linked list. This is a pointer to the next member of the list.
**/
locked_waiter *next = nullptr;
};
/**
* \brief Locked waiter that used `async_mutex::async_lock()` to acquire the lock.
**/
template <typename Token>
struct async_locked_waiter final: public locked_waiter {
/**
* \brief Constructs a new async_locked_waiter.
* \param mutex A mutex that the waiter is trying to acquire a lock for.
* \param next_waiter Pointer to the head of the waiters linked list to prepend this waiter to.
* \param token The complention token to call when the asynchronous operation is completed.
**/
async_locked_waiter([[maybe_unused]] async_mutex *mutex, locked_waiter *next_waiter, Token &&token):
locked_waiter(next_waiter), m_token(std::move(token)) {}
void completion() override {
auto executor = ASIO_NS::get_associated_executor(m_token);
ASIO_NS::post(std::move(executor), [token = std::move(m_token)]() mutable { token(); });
}
private:
Token m_token; //!< The completion token to invoke when the lock is acquired.
};
/**
* \brief Locked waiter that used `async_mutex::async_scoped_lock()` to acquire the lock.
**/
template <typename Token>
struct scoped_async_locked_waiter final: public locked_waiter {
/**
* \brief Constructs a new scoped_async_locked_waiter.
* \param mutex A mutex that the waiter is trying to acquire a lock for.
* \param next_waiter Pointer to the head of the waiters linked list to prepend this waiter to.
* \param token The complention token to call when the asynchronous operation is completed.
**/
scoped_async_locked_waiter(async_mutex *mutex, locked_waiter *next_waiter, Token &&token):
locked_waiter(next_waiter), m_mutex(mutex), m_token(std::move(token)) {}
void completion() override;
private:
async_mutex *m_mutex; //!< The mutex whose lock is being awaited.
Token m_token; //!< The completion token to invoke when the lock is acquired.
};
/**
* \brief An initiator for asio::async_initiate().
**/
template <template <typename Token> typename Waiter>
class async_lock_initiator_base {
public:
/**
* Constructs a new initiator for an operation on the given mutex.
*
* \param mutex A mutex on which the asynchronous lock operation is being initiated.
**/
explicit async_lock_initiator_base(async_mutex *mutex): m_mutex(mutex) {}
/**
* \brief Invoked by boost asio when the asynchronous operation is initiated.
*
* \param handler A completion handler (a callable) to be called when the asynchronous operation
* has completed (in our case, the lock has been acquired).
* \tparam Handler A callable with signature void(T) where T is the type of the object that will be
* returned as a result of `co_await`ing the operation. In our case that's either
* `void` for `async_lock()` or `async_mutex_lock` for `async_scoped_lock()`.
**/
template <typename Handler>
void operator()(Handler &&handler);
protected:
async_mutex *m_mutex; //!< The mutex whose lock is being awaited.
};
/**
* \brief Initiator for the async_lock() operation.
**/
using initiate_async_lock = async_lock_initiator_base<async_locked_waiter>;
/**
* \brief Initiator for the async_scoped_lock() operation.
**/
using initiate_scoped_async_lock = async_lock_initiator_base<scoped_async_locked_waiter>;
} // namespace detail
/** \endinternal **/
/**
* \brief A basic mutex that can acquire lock asynchronously using asio coroutines.
**/
class async_mutex {
public:
/**
* \brief Constructs a new unlocked mutex.
**/
async_mutex() noexcept = default;
async_mutex(const async_mutex &) = delete;
async_mutex(async_mutex &&) = delete;
async_mutex &operator=(const async_mutex &) = delete;
async_mutex &operator=(async_mutex &&) = delete;
/**
* \brief Destroys the mutex.
*
* \warning Destroying a mutex in locked state is undefined.
**/
~async_mutex() {
[[maybe_unused]] const auto state = m_state.load(std::memory_order_relaxed);
assert(state == not_locked || state == locked_no_waiters);
assert(m_waiters == nullptr);
}
/**
* \brief Attempts to acquire lock without blocking.
*
* \return Returns `true` when the lock has been acquired, `false` when the
* lock is already held by someone else.
* **/
[[nodiscard]] bool try_lock() noexcept {
auto old_state = not_locked;
return m_state.compare_exchange_strong(old_state, locked_no_waiters, std::memory_order_acquire,
std::memory_order_relaxed);
}
/**
* \brief Asynchronously acquires as lock.
*
* When the returned awaitable is `co_await`ed it initiates the process
* of acquiring a lock. The awaiter is suspended. Once the lock is acquired
* (which can be immediately if nothing else holds the lock currently) the
* awaiter is resumed and is now holding the lock.
*
* It's awaiter's responsibility to release the lock by calling `unlock()`.
*
* \param token A completion token (`asio::use_awaitable`).
* \tparam LockToken Type of the complention token.
* \return An awaitable which will initiate the async operation when `co_await`ed.
* The result of `co_await`ing the awaitable is void.
**/
#ifdef DOXYGEN
template <typename LockToken>
ASIO_NS::awaitable<> async_lock(LockToken &&token);
#else
template <ASIO_NS::completion_token_for<void()> LockToken>
[[nodiscard]] auto async_lock(LockToken &&token) {
return ASIO_NS::async_initiate<LockToken, void()>(detail::initiate_async_lock(this), token);
}
#endif
/**
* \brief Asynchronously acquires a lock and returns a scoped lock helper.
*
* Behaves exactly as `async_lock()`, except that the result of `co_await`ing the
* returned awaitable is a scoped lock object, which will automatically release the
* lock when destroyed.
*
* \param token A completion token (`asio::use_awaitable`).
* \tparam LockToken Type of the completion token.
* \returns An awaitable which will initiate the async operation when `co_await`ed.
* The result of `co_await`ing the awaitable is `async_mutex_lock` holding
* the acquired lock.
**/
#ifdef DOXYGEN
template <typename LockToken>
ASIO_NS::awaitable<async_mutex_lock> async_scoped_lock(LockToken &&token);
#else
template <ASIO_NS::completion_token_for<void(async_mutex_lock)> LockToken>
[[nodiscard]] auto async_scoped_lock(LockToken &&token) {
return ASIO_NS::async_initiate<LockToken, void(async_mutex_lock)>(detail::initiate_scoped_async_lock(this),
token);
}
#endif
/**
* \brief Releases the lock.
*
* \warning Unlocking and already unlocked mutex is undefined.
**/
void unlock() {
assert(m_state.load(std::memory_order_relaxed) != not_locked);
auto *waiters_head = m_waiters;
if (waiters_head == nullptr) {
auto old_state = locked_no_waiters;
// If old state was locked_no_waiters then transitions to not_locked and returns true,
// otherwise do nothing and returns false.
const bool released_lock = m_state.compare_exchange_strong(old_state, not_locked, std::memory_order_release,
std::memory_order_relaxed);
if (released_lock) {
return;
}
// At least one new waiter. Acquire the list of new waiters atomically
old_state = m_state.exchange(locked_no_waiters, std::memory_order_acquire);
assert(old_state != locked_no_waiters && old_state != not_locked);
// Transfer the list to m_waiters, reversing the list in the process
// so that the head of the list is the first waiter to be resumed
// NOLINTNEXTLINE(performance-no-int-to-ptr)
auto *next = reinterpret_cast<detail::locked_waiter *>(old_state);
do {
auto *temp = next->next;
next->next = waiters_head;
waiters_head = next;
next = temp;
} while (next != nullptr);
}
assert(waiters_head != nullptr);
m_waiters = waiters_head->next;
// Complete the async operation.
waiters_head->completion();
delete waiters_head;
}
private:
template <template <typename Token> typename Waiter>
friend class detail::async_lock_initiator_base;
/**
* \brief Indicates that the mutex is not locked.
**/
static constexpr std::uintptr_t not_locked = 1;
/**
* \brief Indicates that the mutex is locked, but no-one else is attempting to acquire the lock at the moment.
**/
static constexpr std::uintptr_t locked_no_waiters = 0;
/**
* \brief Holds the current state of the lock.
*
* The state can be `not_locked`, `locked_no_waiters` or a pointer to the head of a linked list
* of new waiters (waiters who have attempted to acquire the lock since the last call to unlock().
**/
std::atomic<std::uintptr_t> m_state = {not_locked};
/**
* \brief Linked list of known locked waiters.
**/
detail::locked_waiter *m_waiters = nullptr;
};
/**
* \brief A RAII-style lock for async_mutex which automatically unlocks the mutex when destroyed.
**/
class async_mutex_lock {
public:
using mutex_type = async_mutex;
/**
* Constructs a new async_mutex_lock without any associated mutex.
**/
explicit async_mutex_lock() noexcept = default;
/**
* Constructs a new async_mutex_lock, taking ownership of the \c mutex.
*
* \param mutex Locked mutex to be unlocked when this objectis destroyed.
*
* \warning The \c mutex must be in a locked state.
**/
explicit async_mutex_lock(mutex_type &mutex, std::adopt_lock_t) noexcept: m_mutex(&mutex) {}
/**
* \brief Initializes the lock with contents of other. Leaves other with no associated mutex.
* \param other The moved-from object.
**/
async_mutex_lock(async_mutex_lock &&other) noexcept { swap(other); }
/**
* \brief Move assignment operator.
* Replaces the current mutex with those of \c other using move semantics.
* If \c *this already has an associated mutex, the mutex is unlocked.
*
* \param other The moved-from object.
* \returns *this.
*/
async_mutex_lock &operator=(async_mutex_lock &&other) noexcept {
if (m_mutex != nullptr) {
m_mutex->unlock();
}
m_mutex = std::exchange(other.m_mutex, nullptr);
return *this;
}
/**
* \brief Copy constructor (deleted).
**/
async_mutex_lock(const async_mutex_lock &) = delete;
/**
* \brief Copy assignment operator (deleted).
**/
async_mutex_lock &operator=(const async_mutex_lock &) = delete;
~async_mutex_lock() {
if (m_mutex != nullptr) {
m_mutex->unlock();
}
}
bool owns_lock() const noexcept { return m_mutex != nullptr; }
mutex_type *mutex() const noexcept { return m_mutex; }
/**
* \brief Swaps state with \c other.
* \param other the lock to swap state with.
**/
void swap(async_mutex_lock &other) noexcept { std::swap(m_mutex, other.m_mutex); }
private:
mutex_type *m_mutex = nullptr; //!< The locked mutex being held by the scoped mutex lock.
};
/** \internal **/
namespace detail {
template <typename Token>
void scoped_async_locked_waiter<Token>::completion() {
auto executor = ASIO_NS::get_associated_executor(m_token);
ASIO_NS::post(std::move(executor), [token = std::move(m_token), mutex = m_mutex]() mutable {
token(async_mutex_lock{*mutex, std::adopt_lock});
});
}
template <template <typename Token> typename Waiter>
template <typename Handler>
void async_lock_initiator_base<Waiter>::operator()(Handler &&handler) {
auto old_state = m_mutex->m_state.load(std::memory_order_acquire);
std::unique_ptr<Waiter<Handler>> waiter;
while (true) {
if (old_state == async_mutex::not_locked) {
if (m_mutex->m_state.compare_exchange_weak(old_state, async_mutex::locked_no_waiters,
std::memory_order_acquire, std::memory_order_relaxed))
{
// Lock acquired, resume the awaiter stright away
if (waiter) {
waiter->next = nullptr;
waiter->completion();
} else {
Waiter(m_mutex, nullptr, std::forward<Handler>(handler)).completion();
}
return;
}
} else {
if (!waiter) {
// NOLINTNEXTLINE(performance-no-int-to-ptr)
auto *next_waiter = reinterpret_cast<locked_waiter *>(old_state);
waiter.reset(new Waiter(m_mutex, next_waiter, std::forward<Handler>(handler)));
} else {
// NOLINTNEXTLINE(performance-no-int-to-ptr)
waiter->next = reinterpret_cast<locked_waiter *>(old_state);
}
if (m_mutex->m_state.compare_exchange_weak(old_state, reinterpret_cast<std::uintptr_t>(waiter.get()),
std::memory_order_release, std::memory_order_relaxed))
{
waiter.release();
return;
}
}
}
}
} // namespace detail
/** \endinternal **/
} // namespace avast::asio

View File

@@ -13,7 +13,7 @@ namespace LV::Server {
BinaryResourceManager::BinaryResourceManager(asio::io_context &ioc, BinaryResourceManager::BinaryResourceManager(asio::io_context &ioc,
std::shared_ptr<ResourceFile> zeroResource) std::shared_ptr<ResourceFile> zeroResource)
: AsyncObject(ioc), ZeroResource(std::move(zeroResource)) : IOC(ioc), ZeroResource(std::move(zeroResource))
{ {
} }
@@ -88,7 +88,7 @@ ResourceId_t BinaryResourceManager::getResource_Assets(std::string path) {
UpdatedResources.lock_write()->push_back(resId); UpdatedResources.lock_write()->push_back(resId);
} else { } else {
res->IsLoading = true; res->IsLoading = true;
co_spawn(checkResource_Assets(resId, iter->second / inDomainPath, res)); asio::co_spawn(IOC, checkResource_Assets(resId, iter->second / inDomainPath, res), asio::detached);
} }
} }

View File

@@ -10,13 +10,15 @@
#include <vector> #include <vector>
#include <Common/Async.hpp> #include <Common/Async.hpp>
#include "Abstract.hpp" #include "Abstract.hpp"
#include "boost/asio/io_context.hpp"
namespace LV::Server { namespace LV::Server {
namespace fs = std::filesystem; namespace fs = std::filesystem;
class BinaryResourceManager : public AsyncObject { class BinaryResourceManager {
asio::io_context &IOC;
public: public:
private: private:

View File

@@ -7,7 +7,7 @@
namespace LV::Server { namespace LV::Server {
ContentEventController::ContentEventController(std::unique_ptr<RemoteClient> &&remote) ContentEventController::ContentEventController(RemoteClient_ptr &&remote)
: Remote(std::move(remote)) : Remote(std::move(remote))
{ {
} }

View File

@@ -14,6 +14,7 @@
namespace LV::Server { namespace LV::Server {
class RemoteClient; class RemoteClient;
using RemoteClient_ptr = std::unique_ptr<RemoteClient, std::function<void(RemoteClient*)>>;
class GameServer; class GameServer;
class World; class World;
@@ -162,7 +163,7 @@ private:
public: public:
// Управляется сервером // Управляется сервером
std::unique_ptr<RemoteClient> Remote; RemoteClient_ptr Remote;
// Регионы сюда заглядывают // Регионы сюда заглядывают
// Каждый такт значения изменений обновляются GameServer'ом // Каждый такт значения изменений обновляются GameServer'ом
// Объявленная в чанках территория точно отслеживается (активная зона) // Объявленная в чанках территория точно отслеживается (активная зона)
@@ -173,7 +174,7 @@ public:
// std::unordered_map<WorldId_t, std::vector<Pos::GlobalRegion>> SubscribedRegions; // std::unordered_map<WorldId_t, std::vector<Pos::GlobalRegion>> SubscribedRegions;
public: public:
ContentEventController(std::unique_ptr<RemoteClient> &&remote); ContentEventController(RemoteClient_ptr &&remote);
// Измеряется в чанках в радиусе (активная зона) // Измеряется в чанках в радиусе (активная зона)
uint16_t getViewRangeActive() const; uint16_t getViewRangeActive() const;

View File

@@ -20,8 +20,10 @@ namespace LV::Server {
GameServer::~GameServer() { GameServer::~GameServer() {
shutdown("on ~GameServer"); shutdown("on ~GameServer");
RunThread.join(); RunThread.join();
WorkDeadline.cancel();
UseLock.wait_no_use(); UseLock.wait_no_use();
LOG.info() << "Сервер уничтожен"; LOG.info() << "Сервер уничтожен";
} }
@@ -245,7 +247,7 @@ coro<> GameServer::pushSocketGameProtocol(tcp::socket socket, const std::string
co_await Net::AsyncSocket::write<uint8_t>(socket, 0); co_await Net::AsyncSocket::write<uint8_t>(socket, 0);
External.NewConnectedPlayers.lock_write() External.NewConnectedPlayers.lock_write()
->push_back(std::make_unique<RemoteClient>(IOC, std::move(socket), username)); ->push_back(RemoteClient::Create(IOC, std::move(socket), username));
} }
} }
} }
@@ -331,7 +333,7 @@ void GameServer::run() {
// Отключить вновь подключившихся // Отключить вновь подключившихся
auto lock = External.NewConnectedPlayers.lock_write(); auto lock = External.NewConnectedPlayers.lock_write();
for(std::unique_ptr<RemoteClient> &client : *lock) { for(RemoteClient_ptr &client : *lock) {
client->shutdown(EnumDisconnect::ByInterface, ShutdownReason); client->shutdown(EnumDisconnect::ByInterface, ShutdownReason);
} }
@@ -447,8 +449,7 @@ void GameServer::stepPlayers() {
if(!External.NewConnectedPlayers.no_lock_readable().empty()) { if(!External.NewConnectedPlayers.no_lock_readable().empty()) {
auto lock = External.NewConnectedPlayers.lock_write(); auto lock = External.NewConnectedPlayers.lock_write();
for(std::unique_ptr<RemoteClient> &client : *lock) { for(RemoteClient_ptr &client : *lock) {
co_spawn(client->run());
Game.CECs.push_back(std::make_unique<ContentEventController>(std::move(client))); Game.CECs.push_back(std::make_unique<ContentEventController>(std::move(client)));
} }

View File

@@ -25,7 +25,8 @@ namespace LV::Server {
namespace fs = std::filesystem; namespace fs = std::filesystem;
class GameServer : public AsyncObject { class GameServer {
asio::io_context &IOC;
TOS::Logger LOG = "GameServer"; TOS::Logger LOG = "GameServer";
DestroyLock UseLock; DestroyLock UseLock;
std::thread RunThread; std::thread RunThread;
@@ -41,7 +42,7 @@ class GameServer : public AsyncObject {
struct { struct {
Lockable<std::set<std::string>> ConnectedPlayersSet; Lockable<std::set<std::string>> ConnectedPlayersSet;
Lockable<std::list<std::unique_ptr<RemoteClient>>> NewConnectedPlayers; Lockable<std::list<RemoteClient_ptr>> NewConnectedPlayers;
} External; } External;
@@ -112,7 +113,7 @@ class GameServer : public AsyncObject {
public: public:
GameServer(asio::io_context &ioc, fs::path worldPath) GameServer(asio::io_context &ioc, fs::path worldPath)
: AsyncObject(ioc), : IOC(ioc),
Content(ioc, nullptr, nullptr, nullptr) Content(ioc, nullptr, nullptr, nullptr)
{ {
init(worldPath); init(worldPath);

View File

@@ -3,28 +3,33 @@
#include "Common/Net.hpp" #include "Common/Net.hpp"
#include "Server/Abstract.hpp" #include "Server/Abstract.hpp"
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
#include <boost/chrono/duration.hpp>
#include <boost/system/system_error.hpp> #include <boost/system/system_error.hpp>
#include <exception> #include <exception>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include "World.hpp" #include "World.hpp"
#include "boost/asio/steady_timer.hpp"
#include <Common/Packets.hpp> #include <Common/Packets.hpp>
namespace LV::Server { namespace LV::Server {
RemoteClient::~RemoteClient() { coro<> RemoteClient::asyncDestructor() {
shutdown(EnumDisconnect::ByInterface, "~RemoteClient()"); shutdown(EnumDisconnect::ByInterface, "~RemoteClient()");
if(Socket.isAlive()) { asio::steady_timer deadline(IOC);
Socket.closeRead(); deadline.expires_after(std::chrono::seconds(1));
}
Socket.closeRead();
UseLock.wait_no_use(); co_await (deadline.async_wait(asio::use_awaitable) || RunCoro.async_wait());
Socket.close();
co_await deadline.async_wait();
} }
coro<> RemoteClient::run() { RemoteClient::~RemoteClient() = default;
auto useLock = UseLock.lock();
coro<> RemoteClient::run() {
try { try {
while(!IsGoingShutdown && IsConnected) { while(!IsGoingShutdown && IsConnected) {
co_await readPacket(Socket); co_await readPacket(Socket);

View File

@@ -3,12 +3,18 @@
#include <TOSLib.hpp> #include <TOSLib.hpp>
#include <Common/Lockable.hpp> #include <Common/Lockable.hpp>
#include <Common/Net.hpp> #include <Common/Net.hpp>
#include <Common/Async.hpp>
#include "Abstract.hpp" #include "Abstract.hpp"
#include "Common/Packets.hpp" #include "Common/Packets.hpp"
#include "Server/ContentEventController.hpp" #include "Server/ContentEventController.hpp"
#include "TOSAsync.hpp"
#include "boost/asio/detached.hpp"
#include "boost/asio/io_context.hpp"
#include "boost/asio/use_awaitable.hpp"
#include <Common/Abstract.hpp> #include <Common/Abstract.hpp>
#include <bitset> #include <bitset>
#include <initializer_list> #include <initializer_list>
#include <memory>
#include <set> #include <set>
#include <type_traits> #include <type_traits>
#include <unordered_map> #include <unordered_map>
@@ -173,16 +179,16 @@ using EntityKey = std::tuple<WorldId_c, Pos::GlobalRegion>;
class RemoteClient;
using RemoteClient_ptr = std::unique_ptr<RemoteClient, std::function<void(RemoteClient*)>>;
/* /*
Обработчик сокета клиента. Обработчик сокета клиента.
Подписывает клиента на отслеживание необходимых ресурсов Подписывает клиента на отслеживание необходимых ресурсов
на основе передаваемых клиенту данных на основе передаваемых клиенту данных
*/ */
class RemoteClient { class RemoteClient : public TOS::IAsyncDestructible {
TOS::Logger LOG; TOS::Logger LOG;
DestroyLock UseLock;
Net::AsyncSocket Socket; Net::AsyncSocket Socket;
bool IsConnected = true, IsGoingShutdown = false; bool IsConnected = true, IsGoingShutdown = false;
@@ -264,20 +270,32 @@ class RemoteClient {
ResourceRequest NextRequest; ResourceRequest NextRequest;
std::vector<Net::Packet> SimplePackets; std::vector<Net::Packet> SimplePackets;
TOS::WaitableCoro RunCoro;
public: public:
const std::string Username; const std::string Username;
Pos::Object CameraPos = {0, 0, 0}; Pos::Object CameraPos = {0, 0, 0};
ToServer::PacketQuat CameraQuat = {0}; ToServer::PacketQuat CameraQuat = {0};
public: private:
RemoteClient(asio::io_context &ioc, tcp::socket socket, const std::string username) RemoteClient(asio::io_context &ioc, tcp::socket socket, const std::string username)
: LOG("RemoteClient " + username), Socket(ioc, std::move(socket)), Username(username) : IAsyncDestructible(ioc), LOG("RemoteClient " + username), Socket(ioc, std::move(socket)),
Username(username), RunCoro(ioc)
{ {
RunCoro.co_spawn(run());
} }
~RemoteClient(); virtual coro<> asyncDestructor() override;
coro<> run(); coro<> run();
public:
static RemoteClient_ptr Create(asio::io_context &ioc, tcp::socket socket, const std::string username) {
return createUnique<>(ioc, new RemoteClient(ioc, std::move(socket), username));
}
virtual ~RemoteClient();
void shutdown(EnumDisconnect type, const std::string reason); void shutdown(EnumDisconnect type, const std::string reason);
bool isConnected() { return IsConnected; } bool isConnected() { return IsConnected; }

View File

@@ -1,77 +1,136 @@
#pragma once #pragma once
#include "boost/asio/awaitable.hpp" #include "TOSLib.hpp"
#include "boost/asio/co_spawn.hpp" #include "boost/asio/associated_cancellation_slot.hpp"
#include "boost/asio/associated_executor.hpp"
#include "boost/asio/deadline_timer.hpp" #include "boost/asio/deadline_timer.hpp"
#include "boost/asio/detached.hpp"
#include "boost/asio/io_context.hpp" #include "boost/asio/io_context.hpp"
#include "boost/asio/use_awaitable.hpp" #include "boost/system/detail/error_code.hpp"
#include <boost/asio.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp> #include <boost/asio/experimental/awaitable_operators.hpp>
#include <exception>
#include <memory> #include <memory>
#include <type_traits> #include <type_traits>
namespace TOS { namespace TOS {
using namespace boost::asio::experimental::awaitable_operators; using namespace boost::asio::experimental::awaitable_operators;
namespace asio = boost::asio;
template<typename T = void> template<typename T = void>
using coro = boost::asio::awaitable<T>; using coro = boost::asio::awaitable<T>;
class AsyncSemaphore // class AsyncSemaphore
{ // {
boost::asio::deadline_timer Deadline; // boost::asio::deadline_timer Deadline;
std::atomic<uint8_t> Lock = 0; // std::atomic<uint8_t> Lock = 0;
// public:
// AsyncSemaphore(boost::asio::io_context& ioc)
// : Deadline(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin))
// {}
// coro<> async_wait() {
// try {
// co_await Deadline.async_wait(boost::asio::use_awaitable);
// } catch(boost::system::system_error code) {
// if(code.code() != boost::system::errc::operation_canceled)
// throw;
// }
// co_await asio::this_coro::throw_if_cancelled();
// }
// coro<> async_wait(std::function<bool()> predicate) {
// while(!predicate())
// co_await async_wait();
// }
// void notify_one() {
// Deadline.cancel_one();
// }
// void notify_all() {
// Deadline.cancel();
// }
// };
/*
Многие могут уведомлять одного
Ждёт события. После доставки уведомления ждёт повторно
*/
class MultipleToOne_AsyncSymaphore {
asio::deadline_timer Timer;
public: public:
AsyncSemaphore( MultipleToOne_AsyncSymaphore(asio::io_context &ioc)
boost::asio::io_context& ioc) : Timer(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin))
: Deadline(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin))
{} {}
boost::asio::awaitable<void> async_wait() { void notify() {
try { Timer.cancel();
co_await Deadline.async_wait(boost::asio::use_awaitable);
} catch(boost::system::system_error code) {
if(code.code() != boost::system::errc::operation_canceled)
throw;
}
co_await boost::asio::this_coro::throw_if_cancelled();
} }
boost::asio::awaitable<void> async_wait(std::function<bool()> predicate) { void wait() {
while(!predicate()) Timer.wait();
co_await async_wait(); Timer.expires_at(boost::posix_time::ptime(boost::posix_time::pos_infin));
}
coro<> async_wait() {
try { co_await Timer.async_wait(); } catch(...) {}
} }
void notify_one() { };
Deadline.cancel_one();
class WaitableCoro {
asio::io_context &IOC;
std::shared_ptr<MultipleToOne_AsyncSymaphore> Symaphore;
std::exception_ptr LastException;
public:
WaitableCoro(asio::io_context &ioc)
: IOC(ioc)
{}
template<typename Token>
void co_spawn(Token token) {
Symaphore = std::make_shared<MultipleToOne_AsyncSymaphore>(IOC);
asio::co_spawn(IOC, [token = std::move(token), symaphore = Symaphore]() -> coro<> {
co_await std::move(token);
symaphore->notify();
}, asio::detached);
} }
void notify_all() { void wait() {
Deadline.cancel(); Symaphore->wait();
}
coro<> async_wait() {
return Symaphore->async_wait();
} }
}; };
/*
Используется, чтобы вместо уничтожения объекта в умной ссылке, вызвать корутину с co_await asyncDestructor()
*/
class IAsyncDestructible : public std::enable_shared_from_this<IAsyncDestructible> { class IAsyncDestructible : public std::enable_shared_from_this<IAsyncDestructible> {
protected: protected:
boost::asio::any_io_executor IOC; asio::io_context &IOC;
boost::asio::deadline_timer DestructLine;
virtual coro<> asyncDestructor() { DestructLine.cancel(); co_return; } virtual coro<> asyncDestructor() { co_return; }
public: public:
IAsyncDestructible(boost::asio::any_io_executor ioc) IAsyncDestructible(asio::io_context &ioc)
: IOC(ioc), DestructLine(ioc, boost::posix_time::ptime(boost::posix_time::pos_infin)) : IOC(ioc)
{} {}
virtual ~IAsyncDestructible() {} virtual ~IAsyncDestructible() {}
coro<std::variant<std::monostate, std::monostate>> cancelable(coro<> &&c) { return std::move(c) || DestructLine.async_wait(boost::asio::use_awaitable); } protected:
template<typename T, typename = typename std::is_same<IAsyncDestructible, T>> template<typename T, typename = typename std::is_same<IAsyncDestructible, T>>
static std::shared_ptr<T> createShared(boost::asio::any_io_executor ioc, T *ptr) static std::shared_ptr<T> createShared(asio::io_context &ioc, T *ptr)
{ {
return std::shared_ptr<T>(ptr, [ioc = std::move(ioc)](IAsyncDestructible *ptr) { return std::shared_ptr<T>(ptr, [&ioc = ioc](T *ptr) {
boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> {
try { co_await ptr->asyncDestructor(); } catch(...) { } try { co_await ptr->asyncDestructor(); } catch(...) { }
delete ptr; delete ptr;
@@ -80,16 +139,40 @@ public:
}); });
} }
template<typename T, typename ...Args, typename = typename std::is_same<IAsyncDestructible, T>> template<typename T, typename = typename std::is_same<IAsyncDestructible, T>>
static std::shared_ptr<T> makeShared(boost::asio::any_io_executor ioc, Args&& ... args) static coro<std::shared_ptr<T>> createShared(T *ptr)
{ {
std::shared_ptr<T>(new T(ioc, std::forward<Args>(args)..., [ioc = std::move(ioc)](IAsyncDestructible *ptr) { co_return std::shared_ptr<T>(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) {
boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> { boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> {
try { co_await ptr->asyncDestructor(); } catch(...) { } try { co_await ptr->asyncDestructor(); } catch(...) { }
delete ptr; delete ptr;
co_return; co_return;
} (ptr), boost::asio::detached); } (ptr), boost::asio::detached);
})); });
}
template<typename T, typename = typename std::is_same<IAsyncDestructible, T>>
static std::unique_ptr<T, std::function<void(T*)>> createUnique(asio::io_context &ioc, T *ptr)
{
return std::unique_ptr<T, std::function<void(T*)>>(ptr, [&ioc = ioc](T *ptr) {
boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> {
try { co_await ptr->asyncDestructor(); } catch(...) { }
delete ptr;
co_return;
} (ptr), boost::asio::detached);
});
}
template<typename T, typename = typename std::is_same<IAsyncDestructible, T>>
static coro<std::unique_ptr<T, std::function<void(T*)>>> createUnique(T *ptr)
{
co_return std::unique_ptr<T, std::function<void(T*)>>(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) {
boost::asio::co_spawn(ioc, [](IAsyncDestructible *ptr) -> coro<> {
try { co_await ptr->asyncDestructor(); } catch(...) { }
delete ptr;
co_return;
} (ptr), boost::asio::detached);
});
} }
}; };

View File

@@ -1,6 +1,8 @@
#include <iostream> #include <iostream>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <Client/Vulkan/Vulkan.hpp> #include <Client/Vulkan/Vulkan.hpp>
#include <Common/Async.hpp>
#include <Common/async_mutex.hpp>
namespace LV { namespace LV {
@@ -11,7 +13,7 @@ int main() {
// LuaVox // LuaVox
asio::io_context ioc; asio::io_context ioc;
LV::Client::VK::Vulkan vkInst(ioc);
ioc.run(); ioc.run();
return 0; return 0;