#pragma once #include "boost/thread/pthread/condition_variable_fwd.hpp" #include #include #include #include #include #include #include #include #include #include #include #include /* PSQL::Connection Conn(ioc, "hostaddr= dbname= user= password="); co_await Conn.async_prepare("name", "query", asio::use_awaitable); */ namespace TOS::PSQL { enum class EnumFieldType : int { TEXT = 0, BINARY = 1, }; template class TypeEncoder { private: using no_ref_t = std::remove_const_t>; public: using encoder_t = std::conditional_t< std::is_array_v, TypeEncoder>*>, TypeEncoder>; using value_t = std::string; public: static constexpr std::size_t size(const T& t) { return sizeof(t); } static constexpr int type(const T& t) { return 0; } value_t to_text_value(const T& t) { return std::to_string(t); } const char* c_str(const value_t& t) { return t.c_str(); } }; template <> class TypeEncoder { public: using encoder_t = TypeEncoder; using value_t = const char*; public: static std::size_t size(const std::string& t) { return t.size(); } static constexpr EnumFieldType type(const std::string& t) { return EnumFieldType::TEXT; } value_t to_text_value(const std::string& t) { return t.c_str(); } const char* c_str(const std::string& t) { return t.c_str(); } }; template <> class TypeEncoder { public: using encoder_t = TypeEncoder; using value_t = const char*; public: static std::size_t size(const char* const& t) { return std::strlen(t); } static constexpr EnumFieldType type(const char* const& t) { return EnumFieldType::TEXT; } value_t to_text_value(const char* const& t) { return t; } const char* c_str(const value_t& t) { return t; } }; template class TypeEncoder>> { public: using value_t = T; public: static std::size_t size(const T& t) { return sizeof(t); } static constexpr EnumFieldType type(const T& t) { return EnumFieldType::BINARY; } value_t to_text_value(const T& t) { return boost::endian::native_to_big(t); } const char* c_str(const T& t) { return reinterpret_cast(&t); } }; template class TypeEncoder>> { public: using value_t = T; public: static std::size_t size(const T& t) { return sizeof(t); } static constexpr EnumFieldType type(const T& t) { return EnumFieldType::BINARY; } value_t to_text_value(const T& t) { using namespace boost::endian; value_t v; endian_store( reinterpret_cast(&v), t); return v; } const char* c_str(const T& t) { return reinterpret_cast(&t); } }; template class TypeDecoder { public: static constexpr std::size_t min_size = 0; static constexpr std::size_t max_size = std::numeric_limits::max(); static constexpr bool nullable = false; public: T from_binary(const char* data, std::size_t length) { return data; } }; template class TypeDecoder, void> { private: using underlying_decoder_t = TypeDecoder; public: static constexpr std::size_t min_size = underlying_decoder_t::min_size; static constexpr std::size_t max_size = underlying_decoder_t::max_size; static constexpr bool nullable = true; public: std::optional from_binary(const char* data, std::size_t length) { if (length == 0) { return {}; } else { return underlying_decoder_t{}.from_binary(data, length); } } }; template class TypeDecoder>> { public: static constexpr std::size_t min_size = sizeof(T); static constexpr std::size_t max_size = sizeof(T); static constexpr bool nullable = false; public: T from_binary(const char* data, std::size_t length) { using namespace boost::endian; return endian_load(reinterpret_cast(data)); } }; template class TypeDecoder>> { public: static constexpr std::size_t min_size = sizeof(T); static constexpr std::size_t max_size = sizeof(T); static constexpr bool nullable = false; public: T from_binary(const char* data, std::size_t length) { using namespace boost::endian; return endian_load(reinterpret_cast(data)); } }; template <> class TypeDecoder { public: static constexpr std::size_t min_size = 0; static constexpr std::size_t max_size = std::numeric_limits::max(); static constexpr bool nullable = true; public: const char* from_binary(const char* data, std::size_t length) { return data; } }; template <> class TypeDecoder { public: static constexpr std::size_t min_size = 0; static constexpr std::size_t max_size = std::numeric_limits::max(); static constexpr bool nullable = true; public: std::string from_binary(const char* data, std::size_t length) { return std::string(data, length); } }; class Field { private: const PGresult* const Result; const size_t IndexRow, IndexColumn; public: Field(const PGresult* res, size_t row, size_t column) : Result{res}, IndexRow{row}, IndexColumn{column} { } template T as() const { using decoder_t = TypeDecoder; if (!decoder_t::nullable && is_null()) throw std::length_error{"PSQL: field is null"}; const int field_length = PQgetlength(Result, IndexRow, IndexColumn); if (!(field_length == 0 && decoder_t::nullable) && field_length < decoder_t::min_size || field_length > decoder_t::max_size) throw std::length_error("PSQL: Размер поля " + std::to_string(field_length) + " не в пределах " + std::to_string(decoder_t::min_size) + "-" + std::to_string(decoder_t::max_size)); return unsafe_as(); } template T as(T&& default_value) const { if (is_null()) return std::forward(default_value); else return as(); } template T unsafe_as() const { TypeDecoder decoder{}; return decoder.from_binary(PQgetvalue(Result, IndexRow, IndexColumn), PQgetlength(Result, IndexRow, IndexColumn)); } template T unsafe_as(T&& default_value) const { if (is_null()) return std::forward(default_value); else return unsafe_as(); } bool is_null() const { return PQgetisnull(Result, IndexRow, IndexColumn); } template void to(T &obj) const { obj = as(); } template void to(T &obj, T&& default_value) const { obj = as(default_value); } }; namespace utility { template std::tuple::encoder_t::value_t...> create_value_holders(Params&&... params) { return std::make_tuple(typename TypeEncoder::encoder_t{}.to_text_value(params)...); } template std::array value_array(Params&&... params) { return {typename TypeEncoder::encoder_t{}.c_str(params)...}; } template std::array size_array(Params&&... params) { return {static_cast(typename TypeEncoder::encoder_t{}.size(params))...}; } template std::array type_array(Params&&... params) { return {typename TypeEncoder::encoder_t{}.type(params)...}; } } class Row { private: const PGresult* const Result; const size_t IndexRow; public: Row(const PGresult* result, size_t row) : Result(result), IndexRow(row) {} const Field operator[](size_t column) const { size_t columns = PQnfields(Result); if (column >= columns) throw std::out_of_range{"PSQL: Колонка " + std::to_string(column) + " >= size(" + std::to_string(columns) + ")"}; return Field(Result, IndexRow, column); } const Field at(size_t column) const { return this->operator[](column); } }; class ResultIterator : public std::random_access_iterator_tag { public: //using size_type = std::size_t; using difference_type = std::ptrdiff_t; private: const PGresult* const Result; size_t IndexRow; public: ResultIterator(const PGresult* result) : ResultIterator(result, 0) { } ResultIterator(const PGresult* result, size_t row) : Result{result}, IndexRow{row} {} const Row operator*() { return Row(Result, IndexRow); } ResultIterator& operator++(int) { IndexRow++; return *this; } ResultIterator& operator--(int) { IndexRow--; return *this; } ResultIterator operator+(size_t n) const { return ResultIterator(Result, IndexRow+n); } friend inline ResultIterator operator+(size_t n, const ResultIterator& rhs) { return ResultIterator(rhs.Result, rhs.IndexRow+n); } ResultIterator operator-(size_t n) { return ResultIterator(Result, IndexRow-n); } friend inline ResultIterator operator-(size_t n, const ResultIterator& rhs) { return ResultIterator(rhs.Result, rhs.IndexRow-n); } ResultIterator& operator+=(size_t n) { IndexRow += n; return *this; } ResultIterator& operator-=(size_t n) { IndexRow -= n; return *this; } ResultIterator operator[](size_t n) { return ResultIterator(Result, IndexRow+n); } friend inline difference_type operator-(const ResultIterator& lhs, const ResultIterator& rhs) { return lhs.IndexRow - rhs.IndexRow; } friend inline bool operator==(const ResultIterator& lhs, const ResultIterator& rhs) { return lhs.Result == rhs.Result && lhs.IndexRow == rhs.IndexRow; } friend inline bool operator!=(const ResultIterator& lhs, const ResultIterator& rhs) { return lhs.Result != rhs.Result || lhs.IndexRow != rhs.IndexRow; } friend inline auto operator<=>(const ResultIterator& lhs, const ResultIterator& rhs) { return lhs.IndexRow <=> rhs.IndexRow; } // friend inline bool operator<(const ResultIterator& lhs, const ResultIterator& rhs) { // return lhs.IndexRow < rhs.IndexRow; // } // friend inline bool operator<=(const ResultIterator& lhs, const ResultIterator& rhs) { // return lhs.IndexRow <= rhs.IndexRow; // } // friend inline bool operator>(const ResultIterator& lhs, const ResultIterator& rhs) { // return lhs.IndexRow > rhs.IndexRow; // } // friend inline bool operator>=(const ResultIterator& lhs, const ResultIterator& rhs) { // return lhs.IndexRow >= rhs.IndexRow; // } }; class Result { private: PGresult *Result_; public: using iterator = ResultIterator; using ConstIterator = iterator; enum class EnumStatus : int { EMPTY_QUERY = PGRES_EMPTY_QUERY, COMMAND_OK = PGRES_COMMAND_OK, TUPLES_OK = PGRES_TUPLES_OK, BAD_RESPONSE = PGRES_BAD_RESPONSE, FATAL_ERROR = PGRES_FATAL_ERROR, }; public: Result(PGresult* const& result) noexcept : Result_(result) {} Result(const Result& other) = delete; Result(Result&& other) noexcept : Result(other.Result_) { other.Result_ = nullptr; } Result& operator=(const Result& other) = delete; Result& operator=(Result&& other) noexcept { std::swap(Result_, other.Result_); return *this; } ~Result() { if(Result_ != nullptr) PQclear(Result_); } /** * If true, indicates that we are done and this result is empty. An empty * result is typically used to mark the end of a series of result objects * (e.g. \ref Transaction::async_exec_all). * * The result object is empty when this returns true, therefore, * the object must not be used, calling any other member function is invalid. */ bool done() const { return Result_ == nullptr; } bool ok() const { return status() == EnumStatus::TUPLES_OK || status() == EnumStatus::COMMAND_OK; } EnumStatus status() const { return static_cast(PQresultStatus(Result_)); } ConstIterator begin() const { return {Result_}; } ConstIterator cbegin() const { return begin(); } ConstIterator end() const { int rows = PQntuples(Result_); assert(rows >= 0); if (rows < 0) rows = 0; return {Result_, size_t(rows)}; } ConstIterator cend() const { return end(); } const Row operator[](size_t n) const { return *(begin() + n); } const Row at(size_t n) const { if (n >= size()) throw std::out_of_range("PSQL: row " + std::to_string(n) + " >= size(" + std::to_string(size()) +")"); return (*this)[n]; } size_t size() const { return PQntuples(Result_); } size_t affected_rows() const { // const char *s = PQcmdTuples(Result); // if (s[0] == '\0') // throw std::runtime_error{"invalid query type for affected rows"}; return std::stoull(PQcmdTuples(Result_)); } std::string error_message() const { return std::string(PQresultErrorMessage(Result_)); } }; template class SocketOperations { protected: SocketOperations() = default; SocketOperations(const SocketOperations&) = delete; SocketOperations(SocketOperations&&) = default; SocketOperations& operator=(const SocketOperations&) = delete; SocketOperations& operator=(SocketOperations&&) = default; ~SocketOperations() = default; template auto handle_exec(ResultCallableT&& handler) { auto initiation = [this](ResultCallableT&& handler) { auto wrapped_handler = [handler = std::move(handler), r = std::make_shared(nullptr)](ResultCallableT &&res) mutable { // if (!res.done()) { // *r = std::move(res); // } else { // handler(std::move(*r)); // } if (res.done()) handler(std::move(*r)); }; on_write_ready({}); wait_read_ready(std::move(wrapped_handler)); }; return boost::asio::async_initiate(initiation, handler); } template auto handle_exec_all(ResultCallableT &&handler) { auto initiation = [this](ResultCallableT &&handler) { on_write_ready({}); wait_read_ready(std::move(handler)); }; return boost::asio::async_initiate(initiation, handler); } private: template void wait_read_ready(ResultCallableT &&handler) { derived().socket().async_wait(std::decay_t::wait_read, [this, handler = std::move(handler)](auto&& ec) mutable { on_read_ready(std::move(handler), ec); }); } void wait_write_ready() { derived().socket().async_wait(std::decay_t::wait_write, std::bind(&SocketOperations::on_write_ready, this, std::placeholders::_1)); } template void on_read_ready(ResultCallableT&& handler, const boost::system::error_code &ec) { while (true) { if (PQconsumeInput(derived().connection().underlying_handle()) != 1) { // TODO: convert this to some kind of error via the callback throw std::runtime_error{ "PSQL: получение не удалось " + std::string{derived().connection().last_error_message()}}; } if (!PQisBusy(derived().connection().underlying_handle())) { PGresult *pqres = PQgetResult(derived().connection().underlying_handle()); handler(Result(pqres)); if (!pqres) { break; } } else { wait_read_ready(std::move(handler)); break; } } } void on_write_ready(const boost::system::error_code &ec) { const int ret = PQflush(derived().connection().underlying_handle()); if (ret == 1) { wait_write_ready(); } else if (ret != 0) { // TODO: ignore or convert this to some kind of error via the callback throw std::runtime_error{ "PSQL: отправка не удалась " + std::string{derived().connection().last_error_message()}}; } } Derived& derived() { return *static_cast(this); } }; template class Transaction; class Connection : public SocketOperations { private: boost::asio::ip::tcp::socket Socket; PGconn *PGConnection; friend class SocketOperations; public: template Connection(Executor &exc, const std::string &pgconninfo) : Socket(exc), PGConnection(PQconnectdb(pgconninfo.c_str())) { if (status() != CONNECTION_OK) throw std::runtime_error{"PSQL: не удалось подключиться " + std::string{PQerrorMessage(PGConnection)}}; if (PQsetnonblocking(PGConnection, 1) != 0) throw std::runtime_error{"PSQL: не удалось установить не блокирующий параметр " + std::string{PQerrorMessage(PGConnection)}}; const int sock = PQsocket(PGConnection); if (sock < 0) throw std::runtime_error("PSQL: не удалось получить действительный дескриптор сокета"); Socket.assign(boost::asio::ip::tcp::v4(), sock); } ~Connection() { if (PGConnection) PQfinish(PGConnection); } Connection(Connection const&) = delete; Connection(Connection&& rhs) noexcept : Socket(std::move(rhs.Socket)), PGConnection{std::move(rhs.PGConnection)} { rhs.PGConnection = nullptr; } Connection& operator=(Connection const&) = delete; Connection& operator=(Connection&& rhs) noexcept { std::swap(Socket, rhs.Socket); std::swap(PGConnection, rhs.PGConnection); return *this; } template auto async_prepare( const std::string &statement_name, const std::string &query, CompletionTokenT &&handler) { const auto res = PQsendPrepare(connection().underlying_handle(), statement_name.c_str(), query.c_str(), 0, nullptr); if (res != 1) { throw std::runtime_error{ "error preparing statement '" + statement_name + "': " + std::string{connection().last_error_message()}}; } return handle_exec(std::forward(handler)); } /** * Creates a read/write transaction. Make sure the created transaction * object lives until you are done with it. */ template < class Unused_RWT = void, class Unused_IsolationT = void, class TransactionHandlerT> auto async_transaction(TransactionHandlerT&& handler) { using txn_t = Transaction; auto initiation = [this](auto&& handler) { auto w = std::make_shared(*this); w->async_exec("BEGIN", [handler = std::move(handler), w](auto&& res) mutable { handler(std::move(*w)); } ); }; return boost::asio::async_initiate< TransactionHandlerT, void(txn_t)>( initiation, handler); } PGconn* underlying_handle() { return PGConnection; } const PGconn* underlying_handle() const { return PGConnection; } boost::asio::ip::tcp::socket& socket() { return Socket; } const char* last_error_message() const { return PQerrorMessage(underlying_handle()); } private: int status() const { return PQstatus(PGConnection); } Connection& connection() { return *this; } }; template class Transaction : public SocketOperations> { friend class SocketOperations>; private: Connection *Conn; bool Done; public: Transaction(Connection &conn) : Conn(&conn), Done(false) {} Transaction(const Transaction&) = delete; Transaction(Transaction&& rhs) noexcept : Conn(rhs.Conn), Done(rhs.Done) { rhs.Done = true; } Transaction& operator=(const Transaction&) = delete; Transaction& operator=(Transaction&& rhs) noexcept { Conn = rhs.Conn; Done = rhs.Done; rhs.Done = true; } /** * Destructor. * If neither \ref commit() nor \ref rollback() has been used, destructing * will do a sync rollback. */ ~Transaction() noexcept(false) { if (!Done) { const Result res{PQexec(connection().underlying_handle(), "ROLLBACK")}; if(Result::EnumStatus::COMMAND_OK != res.status()) throw std::runtime_error("PSQL: Ошибка завершения транзакции: " + res.error_message()); } } /// See \ref async_exec(query, handler, params) for more. template auto async_exec(const std::string &query, ResultCallableT&& handler) { return async_exec_2(query, std::forward(handler), nullptr, nullptr, nullptr, 0); } /// See \ref async_exec_prepared(statement_name, handler, params) for more. template auto async_exec_prepared(const std::string& statement_name, ResultCallableT&& handler) { return async_exec_prepared_2(statement_name, std::forward(handler), nullptr, nullptr, nullptr, 0); } /** * Execute a query asynchronously. * \p query must contain a single query. For multiple queries, see * \ref async_exec_all(query, handler, params). * \p handler will be called once with the result. * \p params parameters to pass in the same order to $1, $2, ... * * This function must not be called again before the handler is called. */ template auto async_exec(const std::string &query, ResultCallableT&& handler, Params&&... params) { using namespace utility; const auto value_holders = create_value_holders(params...); const auto value_arr = std::apply( [this](auto&&... args) { return value_array(args...); }, value_holders); const auto size_arr = size_array(params...); const auto type_arr = type_array(params...); return async_exec_2(query, std::forward(handler), value_arr.data(), size_arr.data(), type_arr.data(), sizeof...(params)); } /** * Execute a query asynchronously. * \p statement_name prepared statement name. * \ref async_exec_all(query, handler, params). * \p handler will be called once with the result. * \p params parameters to pass in the same order to $1, $2, ... * * This function must not be called again before the handler is called. */ template auto async_exec_prepared(const std::string& statement_name, ResultCallableT&& handler, Params&&... params) { using namespace utility; const auto value_holders = create_value_holders(params...); const auto value_arr = std::apply( [this](auto&&... args) { return value_array(args...); }, value_holders); const auto size_arr = size_array(params...); const auto type_arr = type_array(params...); return async_exec_prepared_2(statement_name, std::forward(handler), value_arr.data(), size_arr.data(),type_arr.data(), sizeof...(params)); } /** * Execute queries asynchronously. * Supports multiple queries in \p query, separated by ';' but does not * support parameter binding. * \p handler will be called once for each query and once more with an * empty result where \ref result.done() returns true. * * This function must not be called again before the handler is called * with a result where \ref result.done() returns true. */ template auto async_exec_all(const std::string &query, ResultCallableT&& handler) { if(!Done) throw std::runtime_error("Запрос уже выполняется"); const auto res = PQsendQuery(connection().underlying_handle(), query.c_str()); if (res != 1) { throw std::runtime_error{ "PSQL: Ошибка выполнения запроса: " + std::string{connection().last_error_message()}}; } return this->handle_exec_all(std::forward(handler)); } template auto commit(ResultCallableT&& handler) { const auto initiation = [this](auto&& handler) { async_exec("COMMIT", [this, handler = std::move(handler)](auto&& res) mutable { Done = true; handler(std::forward(res)); }); }; return boost::asio::async_initiate< ResultCallableT, void(Result)>( initiation, handler); } template auto rollback(ResultCallableT&& handler) { const auto initiation = [this](auto&& handler) { async_exec("ROLLBACK", [this, handler = std::move(handler)](auto&& res) mutable { Done = true; handler(std::forward(res)); }); }; return boost::asio::async_initiate< ResultCallableT, void(Result)>( initiation, handler); } protected: Connection& connection() { return *Conn; } private: template auto async_exec_2(const std::string &query, ResultCallableT&& handler, const char* const* value_arr, const int* size_arr, const int* type_arr, std::size_t num_values) { if(!Done) throw std::runtime_error("PSQL: Запрос уже выполняется"); const auto res = PQsendQueryParams(connection().underlying_handle(), query.c_str(), num_values, nullptr, value_arr, size_arr, type_arr, static_cast(EnumFieldType::BINARY)); if (res != 1) { throw std::runtime_error{ "PSQL: Ошибка выполнения запроса '" + query + "': " + std::string{connection().last_error_message()}}; } return this->handle_exec(std::forward(handler)); } template auto async_exec_prepared_2(const std::string& statement_name, ResultCallableT&& handler, const char* const* value_arr, const int* size_arr, const int* type_arr, std::size_t num_values) { if(!Done) throw std::runtime_error("PSQL: Запрос уже выполняется"); const auto res = PQsendQueryPrepared(connection().underlying_handle(), statement_name.c_str(), num_values, value_arr, size_arr, type_arr, 1); if (res != 1) { throw std::runtime_error("PSQL: Ошибка выполнения запроса '" + statement_name + "': " + std::string{connection().last_error_message()}); } return this->handle_exec(std::forward(handler)); } }; using work = Transaction; /** * Asynchronously executes a query. * This function must not be called again before the handler is called. */ template inline auto async_exec(Transaction& t, const std::string &query, ResultCallableT&& handler, Params&&... params) { return t.async_exec(query, std::forward(handler), std::forward(params)...); } /** * Starts a transaction, asynchronously executes a query and commits the transaction. * This function must not be called again before the handler is called. */ template inline auto async_exec(Connection& c, std::string query, ResultCallableT&& token, Params... params) { auto initiation = [](auto&& handler, Connection& c, std::string query, auto&&... params) mutable { c.template async_transaction<>([handler = std::move(handler), query = std::move(query), params...](auto txn) mutable { std::unique_ptr ptxn = std::make_unique(std::move(txn)); work &txn_ref = *ptxn; auto wrapped_handler = [handler = std::move(handler), ptxn = std::move(ptxn)](auto&& result) mutable { if(result.ok()) { work &txn_ref = *ptxn; txn_ref.commit([ptxn = std::move(ptxn), handler = std::move(handler), result = std::move(result)] (auto&& commit_result) mutable { ptxn = nullptr; if(commit_result.ok()) handler(std::exception_ptr(nullptr), std::move(result)); else handler(std::exception_ptr(std::make_exception_ptr(std::runtime_error(commit_result.error_message()))), std::move(commit_result)); }); } else { ptxn = nullptr; handler(std::exception_ptr(std::make_exception_ptr(std::runtime_error(result.error_message()))), std::move(result)); } }; async_exec(txn_ref, query, std::move(wrapped_handler), std::move(params)...); }); }; return boost::asio::async_initiate( initiation, token, std::ref(c), std::move(query), std::forward(params)...); } /** * Asynchronously executes a prepared query. * This function must not be called again before the handler is called. */ template inline auto async_exec_prepared(Transaction& t, const std::string &name, ResultCallableT &&handler, Params&&... params) { return t.async_exec_prepared(name, std::forward(handler), std::forward(params)...); } /** * Starts a transaction, asynchronously executes a prepared query and commits * the transaction. * This function must not be called again before the handler is called. */ template inline auto async_exec_prepared(Connection& c, std::string name, ResultCallableT&& handler, Params... params) { auto initiation = [](auto&& handler, Connection& c, std::string name, auto&&... params) mutable { c.template async_transaction<>([ handler = std::move(handler), name = std::move(name), params...](auto txn) mutable { auto ptxn = std::make_unique(std::move(txn)); auto& txn_ref = *ptxn; auto wrapped_handler = [handler = std::move(handler), ptxn = std::move(ptxn)](auto&& result) mutable { if (result.ok()) { auto& txn_ref = *ptxn; txn_ref.commit([ptxn = std::move(ptxn), handler = std::move(handler), result = std::move(result)] (auto&& commit_result) mutable { if (commit_result.ok()) { handler(std::move(result)); } else { handler(std::move(commit_result)); } }); } else { handler(std::move(result)); } }; async_exec_prepared(txn_ref, name, std::move(wrapped_handler), std::move(params)...); }); }; return boost::asio::async_initiate< ResultCallableT, void(Result)>( initiation, handler, std::ref(c), std::move(name), std::forward(params)...); } }