#pragma once #include "MemoryPool.hpp" #include "Async.hpp" #include #include #include #include #include #include namespace LV::Net { class Server : public AsyncObject { protected: std::atomic_bool IsAlive = false, NeedClose = false; tcp::acceptor Acceptor; asio::deadline_timer Lock; std::function(tcp::socket)> OnConnect; public: Server(asio::io_context &ioc, std::function(tcp::socket)> &&onConnect, uint16_t port = 0) : AsyncObject(ioc), Acceptor(ioc, tcp::endpoint(tcp::v4(), port)), Lock(ioc, boost::posix_time::pos_infin), OnConnect(std::move(onConnect)) { assert(OnConnect); co_spawn(run()); IsAlive.store(true); } ~Server(); bool isStopped(); uint16_t getPort() { return Acceptor.local_endpoint().port(); } void stop(); void wait(); coro async_wait(); protected: coro run(); }; #if defined(__BYTE_ORDER) && __BYTE_ORDER == __LITTLE_ENDIAN template , int> = 0> static inline T swapEndian(const T &u) { return u; } #else template , int> = 0> static inline T swapEndian(const T &u) { if constexpr (sizeof(T) == 1) { return u; } else if constexpr (sizeof(T) == 2) { return __builtin_bswap16(u); } else if constexpr (sizeof(T) == 4) { return __builtin_bswap32(u); } else if constexpr (sizeof(T) == 8) { return __builtin_bswap64(u); } else { static_assert(sizeof(T) <= 8, "Неподдерживаемый размер для перестановки порядка байтов (Swap Endian)"); return u; } } #endif // Запись в сторону сокета производится пакетами // Считывание потоком using NetPool = BoostPool<12, 14>; class Packet { static constexpr size_t MAX_PACKET_SIZE = 1 << 16; uint16_t Size = 0; std::vector Pages; public: Packet() = default; Packet(const Packet&) = default; Packet(Packet&&) = default; Packet& operator=(const Packet&) = default; Packet& operator=(Packet&&) = default; inline Packet& write(const std::byte *data, uint16_t size) { assert(Size+size < MAX_PACKET_SIZE); while(size) { if(Pages.size()*NetPool::PageSize == Size) Pages.emplace_back(); uint16_t needWrite = std::min(Pages.size()*NetPool::PageSize-Size, size); std::byte *ptr = &Pages.back().front() + (Size % NetPool::PageSize); std::copy(data, data+needWrite, ptr); Size += needWrite; size -= needWrite; } return *this; } template, int> = 0> inline Packet& write(T u) { u = swapEndian(u); write((const std::byte*) &u, sizeof(u)); return *this; } inline Packet& write(std::string_view str) { assert(Size+str.size()+2 < MAX_PACKET_SIZE); write((uint16_t) str.size()); write((const std::byte*) str.data(), str.size()); return *this; } inline Packet& write(const std::string &str) { return write(std::string_view(str)); } inline uint16_t size() const { return Size; } inline const std::vector& getPages() const { return Pages; } template or std::is_convertible_v, int> = 0> inline Packet& operator<<(const T &value) { if constexpr (std::is_convertible_v) return write((std::string_view) value); else return write(value); } void clear() { clearFast(); Pages.clear(); } void clearFast() { Size = 0; } Packet& complite(std::vector &out) { out.resize(Size); for(size_t pos = 0; pos < Size; pos += NetPool::PageSize) { const char *data = (const char*) Pages[pos / NetPool::PageSize].data(); std::copy(data, data+std::min(Size-pos, NetPool::PageSize), (char*) &out[pos]); } return *this; } std::vector complite() { std::vector out; complite(out); return out; } coro<> sendAndFastClear(tcp::socket &socket) { for(size_t pos = 0; pos < Size; pos += NetPool::PageSize) { const char *data = (const char*) Pages[pos / NetPool::PageSize].data(); size_t size = std::min(Size-pos, NetPool::PageSize); co_await asio::async_write(socket, asio::const_buffer(data, size)); } clearFast(); } }; class SmartPacket : public Packet { public: std::function IsStillRelevant; std::function()> OnSend; }; class AsyncSocket : public AsyncObject { NetPool::Array<32> RecvBuffer, SendBuffer; size_t RecvPos = 0, RecvSize = 0, SendSize = 0; tcp::socket Socket; static constexpr uint32_t MAX_SIMPLE_PACKETS = 8192, MAX_SMART_PACKETS = MAX_SIMPLE_PACKETS/4, MAX_PACKETS_SIZE_IN_WAIT = 1 << 24; struct AsyncContext { volatile bool NeedShutdown = false, RunSendShutdowned = false; std::string Error; }; struct SendPacketsObj { boost::mutex Mtx; bool WaitForSemaphore = false; asio::deadline_timer Semaphore, SenderGuard; boost::circular_buffer_space_optimized SimpleBuffer; boost::circular_buffer_space_optimized SmartBuffer; size_t SizeInQueue = 0; std::shared_ptr Context; SendPacketsObj(asio::io_context &ioc) : Semaphore(ioc, boost::posix_time::pos_infin), SenderGuard(ioc, boost::posix_time::pos_infin) {} } SendPackets; public: AsyncSocket(asio::io_context &ioc, tcp::socket &&socket) : AsyncObject(ioc), Socket(std::move(socket)), SendPackets(ioc) { SendPackets.SimpleBuffer.set_capacity(512); SendPackets.SmartBuffer.set_capacity(SendPackets.SimpleBuffer.capacity()/4); SendPackets.Context = std::make_shared(); boost::asio::socket_base::linger optionLinger(true, 4); // После закрытия сокета оставшиеся данные будут доставлены Socket.set_option(optionLinger); boost::asio::ip::tcp::no_delay optionNoDelay(true); // Отключает попытки объёденить данные в крупные пакеты Socket.set_option(optionNoDelay); asio::co_spawn(ioc, runSender(SendPackets.Context), asio::detached); } ~AsyncSocket(); void pushPackets(std::vector *simplePackets, std::vector *smartPackets = nullptr); std::string getError() const; bool isAlive() const; coro<> read(std::byte *data, uint32_t size); template or std::is_same_v, int> = 0> coro read() { if constexpr(std::is_integral_v) { T value; co_await read((std::byte*) &value, sizeof(value)); co_return swapEndian(value); } else { uint16_t size = co_await read(); T value(size, ' '); co_await read((std::byte*) value.data(), size); co_return value;} } coro<> waitForSend(); static inline coro<> read(tcp::socket &socket, std::byte *data, uint32_t size) { co_await asio::async_read(socket, asio::mutable_buffer(data, size)); } template or std::is_same_v, int> = 0> static inline coro read(tcp::socket &socket) { if constexpr(std::is_integral_v) { T value; co_await read(socket, (std::byte*) &value, sizeof(value)); co_return swapEndian(value); } else { uint16_t size = co_await read(socket); T value(size, ' '); co_await read(socket, (std::byte*) value.data(), size); co_return value;} } static inline coro<> write(tcp::socket &socket, const std::byte *data, uint16_t size) { co_await asio::async_write(socket, asio::const_buffer(data, size)); } template, int> = 0> static inline coro<> write(tcp::socket &socket, T u) { u = swapEndian(u); co_await write(socket, (const std::byte*) &u, sizeof(u)); } static inline coro<> write(tcp::socket &socket, std::string_view str) { co_await write(socket, (uint16_t) str.size()); co_await write(socket, (const std::byte*) str.data(), str.size()); } static inline coro<> write(tcp::socket &socket, const std::string &str) { return write(socket, std::string_view(str)); } static inline coro<> write(tcp::socket &socket, const char *str) { return write(socket, std::string_view(str)); } private: coro<> runSender(std::shared_ptr context); }; coro asyncConnectTo(const std::string address, std::function onProgress = nullptr); }