From d6ee2185a7e735cee9d7be64ba8359700b43df39 Mon Sep 17 00:00:00 2001 From: DrSocalkwe3n Date: Mon, 6 Oct 2025 08:22:54 +0600 Subject: [PATCH] =?UTF-8?q?=D0=9C=D0=B8=D0=BD=D0=B8=20=D1=80=D0=B5=D0=BB?= =?UTF-8?q?=D0=B8=D0=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 + CMakeLists.txt | 55 ++++ Src/Asio.hpp | 212 ++++++++++++++ Src/main.cpp | 773 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1044 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 Src/Asio.hpp create mode 100644 Src/main.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bbe6198 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.cache +/.vscode +/build +/Work \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..3382630 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.13) + +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdata-sections -ffunction-sections") +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gc-sections") # -rdynamic + +# gprof +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pg") +# set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -pg") + +project(btrfs-sync-backend VERSION 0.0 LANGUAGES CXX) +add_executable(${PROJECT_NAME}) +target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_23) + +# sanitizer +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + target_compile_options(${PROJECT_NAME} PUBLIC -fsanitize=address,undefined -fno-omit-frame-pointer -fno-sanitize-recover=all) + target_link_options(${PROJECT_NAME} PUBLIC -fsanitize=address,undefined) + set(ENV{ASAN_OPTIONS} detect_leaks=0) +endif() + +if("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU") + target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines) +elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") + target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutine) +endif() + +file(GLOB_RECURSE SOURCES RELATIVE ${PROJECT_SOURCE_DIR} "Src/*.cpp") +target_sources(${PROJECT_NAME} PRIVATE ${SOURCES}) +target_include_directories(${PROJECT_NAME} PUBLIC "${PROJECT_SOURCE_DIR}/Src") + + +include(FetchContent) + +# Boost +set(Boost_USE_STATIC_LIBS ON) +set(Boost_ENABLE_CMAKE ON) +set(Boost_INCLUDE_LIBRARIES ${Boost_INCLUDE_LIBRARIES} system asio json thread stacktrace timer serialization uuid process filesystem) +FetchContent_Declare( + Boost + GIT_REPOSITORY https://github.com/boostorg/boost.git + GIT_TAG boost-1.89.0 + GIT_SHALLOW true + GIT_PROGRESS true + USES_TERMINAL_DOWNLOAD true +) +FetchContent_MakeAvailable(Boost) +target_link_libraries(${PROJECT_NAME} PUBLIC Boost::asio Boost::process Boost::json) + +# ICU +find_package(ICU REQUIRED COMPONENTS uc i18n) +target_link_libraries(${PROJECT_NAME} PUBLIC ICU::uc ICU::i18n) \ No newline at end of file diff --git a/Src/Asio.hpp b/Src/Asio.hpp new file mode 100644 index 0000000..8ad160e --- /dev/null +++ b/Src/Asio.hpp @@ -0,0 +1,212 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + + +using namespace boost::asio::experimental::awaitable_operators; + + +inline std::string tabulate(const std::string& tb) { + return boost::algorithm::replace_all_copy(tb, "\n", "\n\t"); +} + +// #ifdef NDEBUG +#define MAKE_ERROR(text) { std::stringstream error; error << text << std::endl; throw std::runtime_error(error.str()); } +#define MAKE_WARNING(text) { std::stringstream error; error << text << std::endl; std::cerr << error.str(); } +// #else +// #define MAKE_ERROR(text) { std::stringstream error; error << boost::stacktrace::stacktrace(); std::string tb = tabulate(error.str()); error.str(""); error << text << "\n\t" << tb << std::endl; throw std::runtime_error(error.str()); } +// #define MAKE_WARNING(text) { std::stringstream error; error << boost::stacktrace::stacktrace(); std::string tb = tabulate(error.str()); error.str(""); error << text << "\n\t" << tb << std::endl; std::cerr << error.str(); } +// #endif + +#define MAKE_INFO(text) { std::stringstream info; info << text << std::endl; std::cout << info.str(); } + +namespace Asio { + +namespace asio = boost::asio; +template +using coro = asio::awaitable; + +/* + Асинхроный ожидатель завершения использования ресурса +*/ +class AsyncUseControl { +public: + class Lock { + AsyncUseControl *AUC; + + public: + Lock(AsyncUseControl *auc) + : AUC(auc) + {} + + Lock() + : AUC(nullptr) + {} + + ~Lock() { + if(AUC) + unlock(); + } + + Lock(const Lock&) = delete; + Lock(Lock&& obj) + : AUC(obj.AUC) + { + obj.AUC = nullptr; + } + + Lock& operator=(const Lock&) = delete; + Lock& operator=(Lock&& obj) { + if(&obj == this) + return *this; + + if(AUC) + unlock(); + + AUC = obj.AUC; + obj.AUC = nullptr; + + return *this; + } + + void unlock() { + assert(AUC); + + if(--AUC->Uses == 0 && AUC->OnNoUse) { + asio::post(AUC->IOC, std::move(AUC->OnNoUse)); + } + + AUC = nullptr; + } + }; + +private: + asio::io_context &IOC; + bool NoUse = false; + std::move_only_function OnNoUse; + std::atomic_int Uses = 0; + +public: + AsyncUseControl(asio::io_context &ioc) + : IOC(ioc) + {} + + template> + auto wait(Token&& token = asio::default_completion_token_t()) { + NoUse = true; + + auto initiation = [this](auto&& token) { + int value; + do { + value = Uses.exchange(-1); + } while(value == -1); + + OnNoUse = std::move(token); + + if(value == 0) { + asio::post(IOC, std::move(OnNoUse)); + } + + Uses.exchange(value); + }; + + return asio::async_initiate(initiation, token); + } + + Lock use() { + int value; + do { + value = Uses.exchange(-1); + } while(value == -1); + + if(NoUse) { + Uses.exchange(value); + throw boost::system::system_error(asio::error::operation_aborted, "OnNoUse"); + } + + Uses.exchange(++value); + return Lock(this); + } +}; + + +/* + Используется, чтобы вместо уничтожения объекта в умной ссылке, вызвать корутину с co_await asyncDestructor() +*/ +class IAsyncDestructible : public std::enable_shared_from_this { +protected: + asio::io_context &IOC; + AsyncUseControl AUC; + + virtual coro<> asyncDestructor() { co_await AUC.wait(); } + +public: + IAsyncDestructible(asio::io_context &ioc) + : IOC(ioc), AUC(ioc) + {} + + virtual ~IAsyncDestructible() {} + +protected: + template> + static std::shared_ptr CreateShared(asio::io_context &ioc, T *ptr) + { + return std::shared_ptr(ptr, [&ioc](T *ptr) { + boost::asio::co_spawn(ioc, + [ptr]() mutable -> coro<> { + try { + co_await dynamic_cast(ptr)->asyncDestructor(); + } catch(const std::exception& exc) { + std::string error = "IAsyncDestructible<"; + error += typeid(T).name(); + error += ">:\n"; + error += tabulate(exc.what()); + std::cerr << error << std::endl; + } + + try { + delete ptr; + } catch(const std::exception& exc) { + std::string error = "IAsyncDestructible~<"; + error += typeid(T).name(); + error += ">:\n"; + error += tabulate(exc.what()); + std::cerr << error << std::endl; + } + + }, + boost::asio::detached); + }); + } + + template> + static coro> CreateShared(T *ptr) + { + co_return std::shared_ptr(ptr, [ioc = asio::get_associated_executor(co_await asio::this_coro::executor)](T *ptr) { + boost::asio::co_spawn(ioc, + [ptr]() mutable -> coro<> { + try { + co_await dynamic_cast(ptr)->asyncDestructor(); + } catch(const std::exception& exc) { + std::string error = "IAsyncDestructible<"; + error += typeid(T).name(); + error += ">:\n"; + std::string subError = exc.what(); + boost::replace_all(subError, "\n", "\n\t"); + error += subError; + std::cerr << error << std::endl; + } + + asio::post(asio::get_associated_executor(co_await asio::this_coro::executor), [ptr](){ delete ptr; }); + }, + boost::asio::detached); + }); + } +}; + +} \ No newline at end of file diff --git a/Src/main.cpp b/Src/main.cpp new file mode 100644 index 0000000..322bcb2 --- /dev/null +++ b/Src/main.cpp @@ -0,0 +1,773 @@ + +#include +#include +#include +#include + +#include "Asio.hpp" +#include "boost/asio.hpp" +#include "boost/system.hpp" +#include "boost/process.hpp" +#include "boost/json.hpp" +#include "boost/date_time.hpp" + + +using namespace Asio; +namespace js = boost::json; +constexpr uint32_t IntervalLen = 30; + +coro> runProc(const std::string procName, const std::vector args = {}) { + auto exec = co_await asio::this_coro::executor; + asio::readable_pipe r(exec); + + boost::process::process proc(exec, boost::process::environment::find_executable(procName), + args, + boost::process::process_stdio{nullptr, r, STDOUT_FILENO} + ); + + std::string result; + std::vector tempBuff(1024*64); + + while(true) { + try { + size_t size = co_await r.async_read_some(asio::mutable_buffer(tempBuff.data(), tempBuff.size())); + result += std::string_view(tempBuff.data(), size); + } catch(const std::exception& exc) { + if(const auto* serr = dynamic_cast(&exc)) { + if(serr->code() == asio::error::eof) + break; + } + + throw; + } + } + + co_return std::tuple{co_await proc.async_wait(), std::move(result)}; +} + +namespace fs = std::filesystem; +extern asio::io_context IOC; + +constexpr const char *ConfigFile = "config.json"; +constexpr const char *ConfigFilePrev = "config.prev.json"; +constexpr const char *ConfigFileNext = "config.next.json"; + +using StorageId = uint32_t; +using PolicyId = uint32_t; +using ObjectId = uint32_t; + +struct Storage { + // Путь относительно SystemRoot + fs::path Path; + + js::object dump() const { + return { + {"Path", (std::string_view) Path.string()} + }; + } + + void load(const js::object& obj) { + Path = (std::string_view) obj.at("Path").get_string(); + } +}; + +struct Policy { + // Каждые 30 минут + // Где храним (-1 на той же системе) + // Сколько последних копий должно хранится + + // Множитель интервала, хранилище, сколько последних хранить + std::vector> SubIntervals; + + + js::object dump() const { + js::array res; + + for(const auto& arrays : SubIntervals) { + res.push_back({arrays[0], arrays[1], arrays[2]}); + } + + return { + {"SubIntervals", std::move(res)} + }; + } + + void load(const js::object& obj) { + for(const auto& arrays : obj.at("SubIntervals").as_array()) { + const auto& array = arrays.as_array(); + SubIntervals.push_back( + std::array{ + (uint16_t) array[0].to_number(), + (uint16_t) array[1].to_number(), + (uint16_t) array[2].to_number() + } + ); + } + } +}; + +struct Object { + fs::path Path; + PolicyId Id; + + js::object dump() const { + return { + {"Path", (std::string_view) Path.string()}, + {"Id", Id} + }; + } + + void load(const js::object& obj) { + Path = (std::string_view) obj.at("Path").get_string(); + Id = obj.at("Id").to_number(); + } +}; + +struct Configuration { + fs::path SystemRoot = "/"; + // Хранилища на которых могут хранится бэкапы + std::unordered_map Storages; + // Шаблон временных политик единичных объектов + Policy TemplatePolicy; + // Политики резервного копирования + std::unordered_map Policies; + // Объекты резервного копирования + std::unordered_map Objects; + + void save() { + std::string data; + + try { + js::object storages, policies, objects; + + for(const auto& [id, storage] : Storages) + storages.emplace(std::to_string(id), storage.dump()); + + for(const auto& [id, policy] : Policies) + policies.emplace(std::to_string(id), policy.dump()); + + for(const auto& [id, object] : Objects) + objects.emplace(std::to_string(id), object.dump()); + + js::object val { + {"SystemRoot", SystemRoot.string()}, + {"Storages", std::move(storages)}, + {"TemplatePolicy", TemplatePolicy.dump()}, + {"Policies", std::move(policies)}, + {"Objects", std::move(objects)}, + }; + + data = js::serialize(val); + } catch(const std::exception& exc) { + MAKE_ERROR("Ошибка во время формирования json объекта:\n\t" << tabulate(exc.what())); + } + + if(fs::exists(ConfigFile)) { + try { + fs::copy_file(ConfigFile, ConfigFilePrev, fs::copy_options::overwrite_existing); + } catch(const std::exception& exc) { + MAKE_ERROR("Ошибка во время создания бэкапа конфигурации:\n\t" << tabulate(exc.what())); + } + } + + try { + std::ofstream fd(ConfigFileNext); + fd.write(data.c_str(), data.size()); + fd.close(); + } catch(const std::exception& exc) { + MAKE_ERROR("Ошибка во время сохранения новой конфигурации:\n\t" << tabulate(exc.what())); + } + + try { + fs::rename(ConfigFileNext, ConfigFile); + } catch(const std::exception& exc) { + MAKE_ERROR("Ошибка во время применения сохранённой конфигурации:\n\t" << tabulate(exc.what())); + } + } + + void load() { + if(!fs::exists(ConfigFile)) { + + return; + } + + js::object val; + + for(int iter = 0; iter < 2; iter++) { + try { + js::parser parser; + std::ifstream fd(iter == 0 ? ConfigFile : ConfigFilePrev); + std::vector buff(1024*64); + while(size_t size = fd.readsome(buff.data(), buff.size())) { + size_t writed = 0; + while(writed != size) { + writed += parser.write(buff.data()+writed, size-writed); + } + } + + val = parser.release().as_object(); + break; + } catch(const std::exception& exc) { + if(iter == 0) + MAKE_WARNING("Ошибка во время загрузки файла конфигурации:\n\t" << tabulate(exc.what())) + else if(iter == 1) + MAKE_ERROR("Ошибка во время загрузки резервной копии файла конфигурации:\n\t" << tabulate(exc.what())); + } + } + + try { + SystemRoot = (std::string_view) val["SystemRoot"].get_string(); + + for(const auto& [key, storage] : val["Storages"].as_object()) { + Storages[std::stoul(key)].load(storage.as_object()); + } + + TemplatePolicy.load(val["TemplatePolicy"].as_object()); + + for(const auto& [key, policy] : val["Policies"].as_object()) { + Policies[std::stoul(key)].load(policy.as_object()); + } + + for(const auto& [key, object] : val["Objects"].as_object()) { + Objects[std::stoul(key)].load(object.as_object()); + } + } catch(const std::exception& exc) { + MAKE_ERROR("Ошибка во время парсинга файла конфигурации:\n\t" << tabulate(exc.what())); + } + + } +}; + +Configuration GlobalConf; + +// Проверка действительности конфигурации +// Проверка что все хранилища доступны и являются btrfs. +// Проверка связности OneObject политик. +void checkConfigurationConsistency() { + +} + +template +uint32_t getNextIdFor(const std::unordered_map& map) { + if(map.empty()) + return 0; + + std::vector keys; + keys.reserve(map.size()); + for(const auto& pair : map) { + keys.push_back(pair.first); + } + + std::sort(keys.begin(), keys.end()); + + uint32_t expected = 0; + for(uint32_t key : keys) { + if(key > expected) + return expected; + + if(expected == UINT32_MAX) + throw std::runtime_error("Нет свободных идентификаторов"); + + ++expected; + } + + return expected; +} + +coro getMountUUID(const fs::path path) { + std::vector args; + args.push_back("-no"); + args.push_back("UUID"); + args.push_back("--target"); + args.push_back(path.string()); + + auto [code, data] = co_await runProc("findmnt", + args + ); + + if(code != 0) + MAKE_ERROR("Путь не существует"); + + co_return data; +} + +boost::posix_time::ptime parseDateTime(const std::string time) { + std::istringstream ss(time); + boost::posix_time::ptime local_time; + + ss.imbue(std::locale(ss.getloc(), new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S"))); + ss >> local_time; + + return local_time; +} + +struct SubVolumeInfo { + boost::posix_time::ptime CreationTime{boost::local_time::not_a_date_time}; + std::string UUID, ParentUUID, ReceivedUUID; +}; + +coro getSubvolumeInfo(const fs::path path) { + std::vector args; + args.push_back("subvolume"); + args.push_back("show"); + args.push_back(path); + + auto [code, data] = co_await runProc("btrfs", + args + ); + + if(code != 0) + MAKE_ERROR("Не удалось получить информацию о подразделе btrfs:\n\t" << tabulate(data)); + + SubVolumeInfo info; + std::istringstream ss(data); + std::string line; + + while(std::getline(ss, line)) { + if(info.UUID.empty() && line.find("UUID:") != std::string::npos) { + info.UUID = line.substr(line.find(':') + 1); + info.UUID.erase(0, info.UUID.find_first_not_of(" \t")); + } else if(line.find("Parent UUID:") != std::string::npos) { + info.ParentUUID = line.substr(line.find(':') + 1); + info.ParentUUID.erase(0, info.ParentUUID.find_first_not_of(" \t")); + if(info.ParentUUID == "-") { + info.ParentUUID.clear(); + } + } else if(line.find("Received UUID:") != std::string::npos) { + info.ReceivedUUID = line.substr(line.find(':') + 1); + info.ReceivedUUID.erase(0, info.ReceivedUUID.find_first_not_of(" \t")); + if(info.ReceivedUUID == "-") { + info.ReceivedUUID.clear(); + } + } else if(line.find("Creation time:") != std::string::npos) { + std::string timePart = line.substr(line.find(':') + 1); + timePart.erase(0, timePart.find_first_not_of(" \t")); + info.CreationTime = parseDateTime(timePart); + } + } + + co_return info; +} + +std::unordered_map UUIDToStorage; + +struct CycleStorageInfo { + SubVolumeInfo SVInfo; + fs::path Path; + int Reference = 0; +}; + +fs::path remapPath(const fs::path& original_path, const fs::path& new_root) { + if(original_path.is_absolute()) { + fs::path relative_part; + bool first = true; + for(const auto& part : original_path) { + if(first) { + first = false; + continue; + } + relative_part /= part; + } + return new_root / relative_part; + } else { + return new_root / original_path; + } +} + +coro> getStorageInfo(StorageId id) { + std::vector result; + fs::directory_iterator begin(remapPath(GlobalConf.Storages.at(id).Path, GlobalConf.SystemRoot)), end; + + for(; begin != end; begin++) { + fs::path snapshot = begin->path(); + + try { + SubVolumeInfo info = co_await getSubvolumeInfo(snapshot); + result.emplace_back(std::move(info), snapshot, 0); + } catch(const std::exception& exc) { + MAKE_WARNING("Не удалось получить информацию о подразделе \"" << snapshot.string() + << "\" из хранилища:\n\t" << tabulate(exc.what())); + } + } + + co_return result; +} + +std::string timeToString(const boost::posix_time::ptime& time) { + std::ostringstream ss; + boost::posix_time::time_facet *facet = + new boost::posix_time::time_facet("%Y-%m-%d %H:%M"); + + ss.imbue(std::locale(ss.getloc(), facet)); + ss << time; + + return ss.str(); +} + +coro<> cycle() { + // Собираем информацию обо всех подразделах в хранилищах + std::unordered_map> storagesInfo; + for(const auto& [id, storage] : GlobalConf.Storages) { + storagesInfo[id] = co_await getStorageInfo(id); + } + + boost::int64_t half_hours; + boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); + { + boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); + boost::posix_time::time_duration diff = now - epoch; + boost::int64_t total_seconds = diff.total_seconds(); + half_hours = total_seconds / 1800; + } + + // Проходимся по объектам и смотрим кому нужно сделать снапшот (пока на том же хранилише) + for(const auto& [id, object] : GlobalConf.Objects) { + try { + const Policy& policy = GlobalConf.Policies.at(object.Id); + fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot); + SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath); + std::string muuid = co_await getMountUUID(objectPath); + StorageId sid = UUIDToStorage.at(muuid); + + boost::posix_time::time_duration last = boost::posix_time::pos_infin; + const CycleStorageInfo* lastLocalSnapshot = nullptr; + const std::vector& storageInfo = storagesInfo.at(sid); + + for(const auto& snapshot : storageInfo) { + if(snapshot.SVInfo.ParentUUID != objectInfo.UUID) + continue; + + auto delta = now-snapshot.SVInfo.CreationTime; + if(delta < last) { + last = delta; + lastLocalSnapshot = &snapshot; + } + } + + fs::path dstStoragePath = GlobalConf.Storages.at(sid).Path; + + if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*policy.SubIntervals.front()[0]))) { + std::cout << "Создание локального снапшота для объекта " << object.Path.string() << " в хранилище " << dstStoragePath.string() << std::endl; + std::vector args; + args.push_back("subvolume"); + args.push_back("snapshot"); + args.push_back("-r"); + args.push_back(object.Path); + args.push_back(dstStoragePath / (objectInfo.UUID + " " + timeToString(now))); + + auto [code, output] = co_await runProc("btrfs", + args + ); + + if(code != 0) + MAKE_ERROR("Не удалось создать снапшот:\n\t" << tabulate(output)); + + std::cout << "Создан снапшот объекта " << object.Path.string() << std::endl; + + // Обновим информацию о хранилище + storagesInfo[sid] = co_await getStorageInfo(sid); + } + } catch(const std::exception& exc) { + MAKE_WARNING("Во время проверки необходимости сделать первый интервальный снапшот произошла ошибка, объект: \"" << object.Path.string() + << "\"\n\t" << tabulate(exc.what())); + } + } + + // Теперь нужно перекинуть снапшоты относительно родительских на нужные хранилища + for(const auto& [id, object] : GlobalConf.Objects) { + try { + const Policy& policy = GlobalConf.Policies.at(object.Id); + fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot); + SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath); + std::string muuid = co_await getMountUUID(objectPath); + StorageId sid = UUIDToStorage.at(muuid); + + CycleStorageInfo* lastLocalSnapshot = nullptr; + std::vector& localStorageInfo = storagesInfo.at(sid); + { + boost::posix_time::time_duration last = boost::posix_time::pos_infin; + + for(auto& snapshot : localStorageInfo) { + if(snapshot.SVInfo.ParentUUID != objectInfo.UUID) + continue; + + auto delta = now-snapshot.SVInfo.CreationTime; + if(delta < last) { + last = delta; + lastLocalSnapshot = &snapshot; + } + } + + if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*policy.SubIntervals.front()[0]))) + lastLocalSnapshot = nullptr; + + if(lastLocalSnapshot) + lastLocalSnapshot->Reference += 1; + } + + if(!lastLocalSnapshot) { + MAKE_ERROR("Не найден свежий снапшот объекта"); + } + + uint32_t fullInterval = 1; + + // Проход по всем интервалам + for(auto [interval, storageId, count] : policy.SubIntervals) { + fullInterval *= interval; + + if(storageId == uint16_t(-1)) { + storageId = sid; + } + + boost::posix_time::time_duration last = boost::posix_time::pos_infin; + CycleStorageInfo* lastSnapshot = nullptr; + + std::vector& dstStorageInfo = storagesInfo.at(storageId); + const Storage& dstStorageConf = GlobalConf.Storages.at(storageId); + + // Поиск последнего снапшота в хранилище + for(auto& snapshot : dstStorageInfo) { + std::string name = snapshot.Path.filename(); + size_t pos = name.find(" "); + + if(pos == std::string::npos) { + std::cout << "Неверно названный объект в хранилище: " << snapshot.Path << std::endl; + continue; + } + + if(name.substr(0, pos) != objectInfo.UUID) + continue; + + auto delta = now-snapshot.SVInfo.CreationTime; + if(delta < last) { + last = delta; + lastSnapshot = &snapshot; + } + } + + fs::path parent; + if(lastSnapshot) { + // Найти предка в локальном хранилище + const CycleStorageInfo* parentSnapshot = nullptr; + for(const auto& snapshot : localStorageInfo) { + if(snapshot.SVInfo.UUID != lastSnapshot->SVInfo.ReceivedUUID) + continue; + + parentSnapshot = &snapshot; + break; + } + + if(!parentSnapshot) { + MAKE_WARNING("Не найден родительский подраздел для хранилища " + << dstStorageConf.Path << ", объект " << object.Path + ); + } else { + lastSnapshot->Reference += 1; + parent = parentSnapshot->Path; + } + } + + if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*fullInterval))) { + // Необходимо скопировать сюда снапшот + + asio::readable_pipe sender{IOC}; + asio::writable_pipe receiver(IOC); + + auto exe = boost::process::environment::find_executable("btrfs"); + std::vector args = {"send", "-q", "--proto", "0"}; + + if(!parent.empty()) { + args.push_back("-p"); + args.push_back(parent); + } + + args.push_back(lastLocalSnapshot->Path); + + boost::process::process senderProc(IOC, exe, + args, + boost::process::process_stdio{nullptr, sender, STDOUT_FILENO} + ); + + args = {"receive", dstStorageConf.Path}; + boost::process::process receiverProc(IOC, exe, + args, + boost::process::process_stdio{receiver, nullptr, STDOUT_FILENO} + ); + + std::vector tempBuff(1024*1024*32); + + size_t transfered = 0; + while(true) { + try { + size_t size = co_await sender.async_read_some(asio::mutable_buffer(tempBuff.data(), tempBuff.size())); + transfered += size; + co_await asio::async_write(receiver, asio::const_buffer(tempBuff.data(), size)); + } catch(const std::exception& exc) { + if(const auto* serr = dynamic_cast(&exc)) { + if(serr->code() == asio::error::eof) + break; + } + + throw; + } + } + + int senderCode = co_await senderProc.async_wait(); + receiver.close(); + int receiverCode = co_await receiverProc.async_wait(); + + if(senderCode != 0 || receiverCode != 0) { + MAKE_ERROR("В процессе передачи подраздела один из процессов вернул не ноль"); + } + + if(parent.empty()) + std::cout << "Передан снапшот объекта " << object.Path.string() << " в хранилище " << dstStorageConf.Path << " (передано " << transfered << ")" << std::endl; + else + std::cout << "Передан снапшот объекта " << object.Path.string() << " в хранилище " << dstStorageConf.Path << " (передано " << transfered << ") " << " относительно родителя" << std::endl; + + fs::path dstSubvolumePath = dstStorageConf.Path / lastLocalSnapshot->Path.filename(); + storagesInfo[storageId].emplace_back(co_await getSubvolumeInfo(dstSubvolumePath), dstSubvolumePath, 1); + } + } + } catch(const std::exception& exc) { + MAKE_WARNING("Во время проверки необходимости отправки последнего снапшота на внешние хранилища произошла ошибка: \"" << object.Path.string() + << "\"\n\t" << tabulate(exc.what())); + } + } + + // Отмечаем необходимые подразделы + for(const auto& [id, object] : GlobalConf.Objects) { + try { + const Policy& policy = GlobalConf.Policies.at(object.Id); + fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot); + SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath); + std::string muuid = co_await getMountUUID(objectPath); + StorageId sid = UUIDToStorage.at(muuid); + + uint32_t fullInterval = 1; + + // По всем интервалам на хранилищах + // Проход по всем интервалам + for(auto [interval, storageId, count] : policy.SubIntervals) { + fullInterval *= interval; + + if(storageId == uint16_t(-1)) { + storageId = sid; + } + + std::vector& dstStorageInfo = storagesInfo.at(storageId); + + // Найти последнее в интервале и отметить + for(int iter = 0; iter < count; iter++) { + uint32_t countInterval = fullInterval*(iter+1); + CycleStorageInfo* lastSnapshot = nullptr; + boost::posix_time::time_duration last = boost::posix_time::neg_infin; + + for(auto& snapshot : dstStorageInfo) { + std::string name = snapshot.Path.filename(); + size_t pos = name.find(" "); + + if(pos == std::string::npos) { + std::cout << "Неверно названный объект в хранилище: " << snapshot.Path << std::endl; + continue; + } + + if(name.substr(0, pos) != objectInfo.UUID) + continue; + + auto delta = now-boost::posix_time::minutes(IntervalLen*countInterval)-snapshot.SVInfo.CreationTime; + if(delta.is_negative() && delta > last) { + last = delta; + lastSnapshot = &snapshot; + } + } + + if(lastSnapshot) + lastSnapshot->Reference += 1; + } + } + } catch(const std::exception& exc) { + MAKE_WARNING("Во время проверки зависимостей существующих подразделов произошла ошибка: \"" << object.Path.string() + << "\"\n\t" << tabulate(exc.what())); + } + } + + // Удалить ненужные снапшоты (на локальном хранилище должно остаться по одному экземпляру на каждый интервал политики) + for(auto& [id, storage] : storagesInfo) { + for(auto& [svinfo, path, refs] : storage) { + if(refs != 0) + continue; + + try { + std::vector args; + args.push_back("subvolume"); + args.push_back("delete"); + args.push_back(path); + auto [code, output] = co_await runProc("btrfs", args); + if(code != 0) { + MAKE_ERROR(output); + } + } catch(const std::exception& exc) { + MAKE_WARNING("Ошибка удаления подраздела \"" << path + << "\"\n\t" << tabulate(exc.what())); + continue; + } + + std::cout << "Удалено " << path << std::endl; + } + } + + co_return; +} + +bool Needshutdown = false; +asio::io_context IOC; +asio::steady_timer HourHalfTimer(IOC); +asio::signal_set SignalSet(IOC, SIGINT); + +coro<> async_main() { + try { + std::cout << "Поиск точек монтирования хранилищ..." << std::endl; + for(const auto& [id, storage] : GlobalConf.Storages) { + std::string uuid = co_await getMountUUID(storage.Path); + UUIDToStorage[uuid] = id; + } + + std::cout << "Запускаем цикл" << std::endl; + + while(!Needshutdown) { + HourHalfTimer.expires_after(asio::chrono::minutes(std::max(IntervalLen / 6, 1))); + co_await cycle(); + co_await HourHalfTimer.async_wait(); + } + } catch(const std::exception& exc) { + std::cout << "Корутина завершилась ошибкой:\n\t" << tabulate(exc.what()) << std::endl; + SignalSet.cancel(); + } + + // auto [code, data] = co_await runProc("btrfs", {"filesystem", "show"}); +} + +int main() { + std::cout << "Загружаем конфигурацию..." << std::endl; + GlobalConf.load(); + std::cout << "Готово" << std::endl; + + SignalSet.async_wait([](const boost::system::error_code& errc, int code) { + if(errc) + return; + + std::cout << "Получена команда на завершение работы..." << std::endl; + Needshutdown = true; + HourHalfTimer.cancel(); + }); + + asio::co_spawn(IOC, async_main(), asio::detached); + IOC.run(); + + std::cout << "Сохраняем конфигурацию..." << std::endl; + GlobalConf.save(); + + std::cout << "End of program" << std::endl; + return 0; +} \ No newline at end of file