Пересмотр асинхронностей

This commit is contained in:
2025-03-11 14:55:43 +06:00
parent e99ae2f6ba
commit e190c79d00
12 changed files with 482 additions and 194 deletions

View File

@@ -17,7 +17,7 @@ bool SocketServer::isStopped() {
coro<void> SocketServer::run(std::function<coro<>(tcp::socket)> onConnect) {
while(true) { // TODO: ловить ошибки на async_accept
try {
co_spawn(onConnect(co_await Acceptor.async_accept()));
asio::co_spawn(IOC, onConnect(co_await Acceptor.async_accept()), asio::detached);
} catch(const std::exception &exc) {
if(const boost::system::system_error *errc = dynamic_cast<const boost::system::system_error*>(&exc);
errc && (errc->code() == asio::error::operation_aborted || errc->code() == asio::error::bad_descriptor))
@@ -31,13 +31,8 @@ AsyncSocket::~AsyncSocket() {
if(SendPackets.Context)
SendPackets.Context->NeedShutdown = true;
{
boost::lock_guard lock(SendPackets.Mtx);
SendPackets.SenderGuard.cancel();
WorkDeadline.cancel();
}
boost::unique_lock lock(SendPackets.Mtx);
if(Socket.is_open())
try { Socket.close(); } catch(...) {}
}
@@ -55,6 +50,8 @@ void AsyncSocket::pushPackets(std::vector<Packet> *simplePackets, std::vector<Sm
// TODO: std::cout << "Передоз пакетами, сокет закрыт" << std::endl;
}
bool wasPackets = SendPackets.SimpleBuffer.size() || SendPackets.SmartBuffer.size();
if(!Socket.is_open()) {
if(simplePackets)
simplePackets->clear();
@@ -86,8 +83,7 @@ void AsyncSocket::pushPackets(std::vector<Packet> *simplePackets, std::vector<Sm
SendPackets.SizeInQueue += addedSize;
if(SendPackets.WaitForSemaphore) {
SendPackets.WaitForSemaphore = false;
if(!wasPackets) {
SendPackets.Semaphore.cancel();
SendPackets.Semaphore.expires_at(boost::posix_time::pos_infin);
}
@@ -147,12 +143,19 @@ coro<> AsyncSocket::runSender(std::shared_ptr<AsyncContext> context) {
while(!context->NeedShutdown) {
{
boost::unique_lock lock(SendPackets.Mtx);
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
SendPackets.WaitForSemaphore = true;
auto coroutine = SendPackets.Semaphore.async_wait();
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {}
if(context->NeedShutdown) {
break;
}
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
auto coroutine = SendPackets.Semaphore.async_wait();
if(SendPackets.SimpleBuffer.empty() && SendPackets.SmartBuffer.empty()) {
lock.unlock();
try { co_await std::move(coroutine); } catch(...) {}
}
continue;
} else {
for(int cycle = 0; cycle < 2; cycle++, NextBuffer++) {