Files
btrfs-sync-backend/Src/main.cpp
2025-10-06 08:22:54 +06:00

773 lines
29 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <iostream>
#include <filesystem>
#include <unistd.h>
#include <unordered_map>
#include "Asio.hpp"
#include "boost/asio.hpp"
#include "boost/system.hpp"
#include "boost/process.hpp"
#include "boost/json.hpp"
#include "boost/date_time.hpp"
using namespace Asio;
namespace js = boost::json;
constexpr uint32_t IntervalLen = 30;
coro<std::tuple<int, std::string>> runProc(const std::string procName, const std::vector<std::string> args = {}) {
auto exec = co_await asio::this_coro::executor;
asio::readable_pipe r(exec);
boost::process::process proc(exec, boost::process::environment::find_executable(procName),
args,
boost::process::process_stdio{nullptr, r, STDOUT_FILENO}
);
std::string result;
std::vector<char> tempBuff(1024*64);
while(true) {
try {
size_t size = co_await r.async_read_some(asio::mutable_buffer(tempBuff.data(), tempBuff.size()));
result += std::string_view(tempBuff.data(), size);
} catch(const std::exception& exc) {
if(const auto* serr = dynamic_cast<const boost::system::system_error*>(&exc)) {
if(serr->code() == asio::error::eof)
break;
}
throw;
}
}
co_return std::tuple<int, std::string>{co_await proc.async_wait(), std::move(result)};
}
namespace fs = std::filesystem;
extern asio::io_context IOC;
constexpr const char *ConfigFile = "config.json";
constexpr const char *ConfigFilePrev = "config.prev.json";
constexpr const char *ConfigFileNext = "config.next.json";
using StorageId = uint32_t;
using PolicyId = uint32_t;
using ObjectId = uint32_t;
struct Storage {
// Путь относительно SystemRoot
fs::path Path;
js::object dump() const {
return {
{"Path", (std::string_view) Path.string()}
};
}
void load(const js::object& obj) {
Path = (std::string_view) obj.at("Path").get_string();
}
};
struct Policy {
// Каждые 30 минут
// Где храним (-1 на той же системе)
// Сколько последних копий должно хранится
// Множитель интервала, хранилище, сколько последних хранить
std::vector<std::array<uint16_t, 3>> SubIntervals;
js::object dump() const {
js::array res;
for(const auto& arrays : SubIntervals) {
res.push_back({arrays[0], arrays[1], arrays[2]});
}
return {
{"SubIntervals", std::move(res)}
};
}
void load(const js::object& obj) {
for(const auto& arrays : obj.at("SubIntervals").as_array()) {
const auto& array = arrays.as_array();
SubIntervals.push_back(
std::array<uint16_t, 3>{
(uint16_t) array[0].to_number<uint64_t>(),
(uint16_t) array[1].to_number<uint64_t>(),
(uint16_t) array[2].to_number<uint64_t>()
}
);
}
}
};
struct Object {
fs::path Path;
PolicyId Id;
js::object dump() const {
return {
{"Path", (std::string_view) Path.string()},
{"Id", Id}
};
}
void load(const js::object& obj) {
Path = (std::string_view) obj.at("Path").get_string();
Id = obj.at("Id").to_number<uint64_t>();
}
};
struct Configuration {
fs::path SystemRoot = "/";
// Хранилища на которых могут хранится бэкапы
std::unordered_map<StorageId, Storage> Storages;
// Шаблон временных политик единичных объектов
Policy TemplatePolicy;
// Политики резервного копирования
std::unordered_map<PolicyId, Policy> Policies;
// Объекты резервного копирования
std::unordered_map<ObjectId, Object> Objects;
void save() {
std::string data;
try {
js::object storages, policies, objects;
for(const auto& [id, storage] : Storages)
storages.emplace(std::to_string(id), storage.dump());
for(const auto& [id, policy] : Policies)
policies.emplace(std::to_string(id), policy.dump());
for(const auto& [id, object] : Objects)
objects.emplace(std::to_string(id), object.dump());
js::object val {
{"SystemRoot", SystemRoot.string()},
{"Storages", std::move(storages)},
{"TemplatePolicy", TemplatePolicy.dump()},
{"Policies", std::move(policies)},
{"Objects", std::move(objects)},
};
data = js::serialize(val);
} catch(const std::exception& exc) {
MAKE_ERROR("Ошибка во время формирования json объекта:\n\t" << tabulate(exc.what()));
}
if(fs::exists(ConfigFile)) {
try {
fs::copy_file(ConfigFile, ConfigFilePrev, fs::copy_options::overwrite_existing);
} catch(const std::exception& exc) {
MAKE_ERROR("Ошибка во время создания бэкапа конфигурации:\n\t" << tabulate(exc.what()));
}
}
try {
std::ofstream fd(ConfigFileNext);
fd.write(data.c_str(), data.size());
fd.close();
} catch(const std::exception& exc) {
MAKE_ERROR("Ошибка во время сохранения новой конфигурации:\n\t" << tabulate(exc.what()));
}
try {
fs::rename(ConfigFileNext, ConfigFile);
} catch(const std::exception& exc) {
MAKE_ERROR("Ошибка во время применения сохранённой конфигурации:\n\t" << tabulate(exc.what()));
}
}
void load() {
if(!fs::exists(ConfigFile)) {
return;
}
js::object val;
for(int iter = 0; iter < 2; iter++) {
try {
js::parser parser;
std::ifstream fd(iter == 0 ? ConfigFile : ConfigFilePrev);
std::vector<char> buff(1024*64);
while(size_t size = fd.readsome(buff.data(), buff.size())) {
size_t writed = 0;
while(writed != size) {
writed += parser.write(buff.data()+writed, size-writed);
}
}
val = parser.release().as_object();
break;
} catch(const std::exception& exc) {
if(iter == 0)
MAKE_WARNING("Ошибка во время загрузки файла конфигурации:\n\t" << tabulate(exc.what()))
else if(iter == 1)
MAKE_ERROR("Ошибка во время загрузки резервной копии файла конфигурации:\n\t" << tabulate(exc.what()));
}
}
try {
SystemRoot = (std::string_view) val["SystemRoot"].get_string();
for(const auto& [key, storage] : val["Storages"].as_object()) {
Storages[std::stoul(key)].load(storage.as_object());
}
TemplatePolicy.load(val["TemplatePolicy"].as_object());
for(const auto& [key, policy] : val["Policies"].as_object()) {
Policies[std::stoul(key)].load(policy.as_object());
}
for(const auto& [key, object] : val["Objects"].as_object()) {
Objects[std::stoul(key)].load(object.as_object());
}
} catch(const std::exception& exc) {
MAKE_ERROR("Ошибка во время парсинга файла конфигурации:\n\t" << tabulate(exc.what()));
}
}
};
Configuration GlobalConf;
// Проверка действительности конфигурации
// Проверка что все хранилища доступны и являются btrfs.
// Проверка связности OneObject политик.
void checkConfigurationConsistency() {
}
template<typename Tp>
uint32_t getNextIdFor(const std::unordered_map<uint32_t, Tp>& map) {
if(map.empty())
return 0;
std::vector<uint32_t> keys;
keys.reserve(map.size());
for(const auto& pair : map) {
keys.push_back(pair.first);
}
std::sort(keys.begin(), keys.end());
uint32_t expected = 0;
for(uint32_t key : keys) {
if(key > expected)
return expected;
if(expected == UINT32_MAX)
throw std::runtime_error("Нет свободных идентификаторов");
++expected;
}
return expected;
}
coro<std::string> getMountUUID(const fs::path path) {
std::vector<std::string> args;
args.push_back("-no");
args.push_back("UUID");
args.push_back("--target");
args.push_back(path.string());
auto [code, data] = co_await runProc("findmnt",
args
);
if(code != 0)
MAKE_ERROR("Путь не существует");
co_return data;
}
boost::posix_time::ptime parseDateTime(const std::string time) {
std::istringstream ss(time);
boost::posix_time::ptime local_time;
ss.imbue(std::locale(ss.getloc(), new boost::posix_time::time_input_facet("%Y-%m-%d %H:%M:%S")));
ss >> local_time;
return local_time;
}
struct SubVolumeInfo {
boost::posix_time::ptime CreationTime{boost::local_time::not_a_date_time};
std::string UUID, ParentUUID, ReceivedUUID;
};
coro<SubVolumeInfo> getSubvolumeInfo(const fs::path path) {
std::vector<std::string> args;
args.push_back("subvolume");
args.push_back("show");
args.push_back(path);
auto [code, data] = co_await runProc("btrfs",
args
);
if(code != 0)
MAKE_ERROR("Не удалось получить информацию о подразделе btrfs:\n\t" << tabulate(data));
SubVolumeInfo info;
std::istringstream ss(data);
std::string line;
while(std::getline(ss, line)) {
if(info.UUID.empty() && line.find("UUID:") != std::string::npos) {
info.UUID = line.substr(line.find(':') + 1);
info.UUID.erase(0, info.UUID.find_first_not_of(" \t"));
} else if(line.find("Parent UUID:") != std::string::npos) {
info.ParentUUID = line.substr(line.find(':') + 1);
info.ParentUUID.erase(0, info.ParentUUID.find_first_not_of(" \t"));
if(info.ParentUUID == "-") {
info.ParentUUID.clear();
}
} else if(line.find("Received UUID:") != std::string::npos) {
info.ReceivedUUID = line.substr(line.find(':') + 1);
info.ReceivedUUID.erase(0, info.ReceivedUUID.find_first_not_of(" \t"));
if(info.ReceivedUUID == "-") {
info.ReceivedUUID.clear();
}
} else if(line.find("Creation time:") != std::string::npos) {
std::string timePart = line.substr(line.find(':') + 1);
timePart.erase(0, timePart.find_first_not_of(" \t"));
info.CreationTime = parseDateTime(timePart);
}
}
co_return info;
}
std::unordered_map<std::string, StorageId> UUIDToStorage;
struct CycleStorageInfo {
SubVolumeInfo SVInfo;
fs::path Path;
int Reference = 0;
};
fs::path remapPath(const fs::path& original_path, const fs::path& new_root) {
if(original_path.is_absolute()) {
fs::path relative_part;
bool first = true;
for(const auto& part : original_path) {
if(first) {
first = false;
continue;
}
relative_part /= part;
}
return new_root / relative_part;
} else {
return new_root / original_path;
}
}
coro<std::vector<CycleStorageInfo>> getStorageInfo(StorageId id) {
std::vector<CycleStorageInfo> result;
fs::directory_iterator begin(remapPath(GlobalConf.Storages.at(id).Path, GlobalConf.SystemRoot)), end;
for(; begin != end; begin++) {
fs::path snapshot = begin->path();
try {
SubVolumeInfo info = co_await getSubvolumeInfo(snapshot);
result.emplace_back(std::move(info), snapshot, 0);
} catch(const std::exception& exc) {
MAKE_WARNING("Не удалось получить информацию о подразделе \"" << snapshot.string()
<< "\" из хранилища:\n\t" << tabulate(exc.what()));
}
}
co_return result;
}
std::string timeToString(const boost::posix_time::ptime& time) {
std::ostringstream ss;
boost::posix_time::time_facet *facet =
new boost::posix_time::time_facet("%Y-%m-%d %H:%M");
ss.imbue(std::locale(ss.getloc(), facet));
ss << time;
return ss.str();
}
coro<> cycle() {
// Собираем информацию обо всех подразделах в хранилищах
std::unordered_map<StorageId, std::vector<CycleStorageInfo>> storagesInfo;
for(const auto& [id, storage] : GlobalConf.Storages) {
storagesInfo[id] = co_await getStorageInfo(id);
}
boost::int64_t half_hours;
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
{
boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
boost::posix_time::time_duration diff = now - epoch;
boost::int64_t total_seconds = diff.total_seconds();
half_hours = total_seconds / 1800;
}
// Проходимся по объектам и смотрим кому нужно сделать снапшот (пока на том же хранилише)
for(const auto& [id, object] : GlobalConf.Objects) {
try {
const Policy& policy = GlobalConf.Policies.at(object.Id);
fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot);
SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath);
std::string muuid = co_await getMountUUID(objectPath);
StorageId sid = UUIDToStorage.at(muuid);
boost::posix_time::time_duration last = boost::posix_time::pos_infin;
const CycleStorageInfo* lastLocalSnapshot = nullptr;
const std::vector<CycleStorageInfo>& storageInfo = storagesInfo.at(sid);
for(const auto& snapshot : storageInfo) {
if(snapshot.SVInfo.ParentUUID != objectInfo.UUID)
continue;
auto delta = now-snapshot.SVInfo.CreationTime;
if(delta < last) {
last = delta;
lastLocalSnapshot = &snapshot;
}
}
fs::path dstStoragePath = GlobalConf.Storages.at(sid).Path;
if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*policy.SubIntervals.front()[0]))) {
std::cout << "Создание локального снапшота для объекта " << object.Path.string() << " в хранилище " << dstStoragePath.string() << std::endl;
std::vector<std::string> args;
args.push_back("subvolume");
args.push_back("snapshot");
args.push_back("-r");
args.push_back(object.Path);
args.push_back(dstStoragePath / (objectInfo.UUID + " " + timeToString(now)));
auto [code, output] = co_await runProc("btrfs",
args
);
if(code != 0)
MAKE_ERROR("Не удалось создать снапшот:\n\t" << tabulate(output));
std::cout << "Создан снапшот объекта " << object.Path.string() << std::endl;
// Обновим информацию о хранилище
storagesInfo[sid] = co_await getStorageInfo(sid);
}
} catch(const std::exception& exc) {
MAKE_WARNING("Во время проверки необходимости сделать первый интервальный снапшот произошла ошибка, объект: \"" << object.Path.string()
<< "\"\n\t" << tabulate(exc.what()));
}
}
// Теперь нужно перекинуть снапшоты относительно родительских на нужные хранилища
for(const auto& [id, object] : GlobalConf.Objects) {
try {
const Policy& policy = GlobalConf.Policies.at(object.Id);
fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot);
SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath);
std::string muuid = co_await getMountUUID(objectPath);
StorageId sid = UUIDToStorage.at(muuid);
CycleStorageInfo* lastLocalSnapshot = nullptr;
std::vector<CycleStorageInfo>& localStorageInfo = storagesInfo.at(sid);
{
boost::posix_time::time_duration last = boost::posix_time::pos_infin;
for(auto& snapshot : localStorageInfo) {
if(snapshot.SVInfo.ParentUUID != objectInfo.UUID)
continue;
auto delta = now-snapshot.SVInfo.CreationTime;
if(delta < last) {
last = delta;
lastLocalSnapshot = &snapshot;
}
}
if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*policy.SubIntervals.front()[0])))
lastLocalSnapshot = nullptr;
if(lastLocalSnapshot)
lastLocalSnapshot->Reference += 1;
}
if(!lastLocalSnapshot) {
MAKE_ERROR("Не найден свежий снапшот объекта");
}
uint32_t fullInterval = 1;
// Проход по всем интервалам
for(auto [interval, storageId, count] : policy.SubIntervals) {
fullInterval *= interval;
if(storageId == uint16_t(-1)) {
storageId = sid;
}
boost::posix_time::time_duration last = boost::posix_time::pos_infin;
CycleStorageInfo* lastSnapshot = nullptr;
std::vector<CycleStorageInfo>& dstStorageInfo = storagesInfo.at(storageId);
const Storage& dstStorageConf = GlobalConf.Storages.at(storageId);
// Поиск последнего снапшота в хранилище
for(auto& snapshot : dstStorageInfo) {
std::string name = snapshot.Path.filename();
size_t pos = name.find(" ");
if(pos == std::string::npos) {
std::cout << "Неверно названный объект в хранилище: " << snapshot.Path << std::endl;
continue;
}
if(name.substr(0, pos) != objectInfo.UUID)
continue;
auto delta = now-snapshot.SVInfo.CreationTime;
if(delta < last) {
last = delta;
lastSnapshot = &snapshot;
}
}
fs::path parent;
if(lastSnapshot) {
// Найти предка в локальном хранилище
const CycleStorageInfo* parentSnapshot = nullptr;
for(const auto& snapshot : localStorageInfo) {
if(snapshot.SVInfo.UUID != lastSnapshot->SVInfo.ReceivedUUID)
continue;
parentSnapshot = &snapshot;
break;
}
if(!parentSnapshot) {
MAKE_WARNING("Не найден родительский подраздел для хранилища "
<< dstStorageConf.Path << ", объект " << object.Path
);
} else {
lastSnapshot->Reference += 1;
parent = parentSnapshot->Path;
}
}
if(last >= boost::posix_time::time_duration(boost::posix_time::minutes(IntervalLen*fullInterval))) {
// Необходимо скопировать сюда снапшот
asio::readable_pipe sender{IOC};
asio::writable_pipe receiver(IOC);
auto exe = boost::process::environment::find_executable("btrfs");
std::vector<std::string> args = {"send", "-q", "--proto", "0"};
if(!parent.empty()) {
args.push_back("-p");
args.push_back(parent);
}
args.push_back(lastLocalSnapshot->Path);
boost::process::process senderProc(IOC, exe,
args,
boost::process::process_stdio{nullptr, sender, STDOUT_FILENO}
);
args = {"receive", dstStorageConf.Path};
boost::process::process receiverProc(IOC, exe,
args,
boost::process::process_stdio{receiver, nullptr, STDOUT_FILENO}
);
std::vector<char> tempBuff(1024*1024*32);
size_t transfered = 0;
while(true) {
try {
size_t size = co_await sender.async_read_some(asio::mutable_buffer(tempBuff.data(), tempBuff.size()));
transfered += size;
co_await asio::async_write(receiver, asio::const_buffer(tempBuff.data(), size));
} catch(const std::exception& exc) {
if(const auto* serr = dynamic_cast<const boost::system::system_error*>(&exc)) {
if(serr->code() == asio::error::eof)
break;
}
throw;
}
}
int senderCode = co_await senderProc.async_wait();
receiver.close();
int receiverCode = co_await receiverProc.async_wait();
if(senderCode != 0 || receiverCode != 0) {
MAKE_ERROR("В процессе передачи подраздела один из процессов вернул не ноль");
}
if(parent.empty())
std::cout << "Передан снапшот объекта " << object.Path.string() << " в хранилище " << dstStorageConf.Path << " (передано " << transfered << ")" << std::endl;
else
std::cout << "Передан снапшот объекта " << object.Path.string() << " в хранилище " << dstStorageConf.Path << " (передано " << transfered << ") " << " относительно родителя" << std::endl;
fs::path dstSubvolumePath = dstStorageConf.Path / lastLocalSnapshot->Path.filename();
storagesInfo[storageId].emplace_back(co_await getSubvolumeInfo(dstSubvolumePath), dstSubvolumePath, 1);
}
}
} catch(const std::exception& exc) {
MAKE_WARNING("Во время проверки необходимости отправки последнего снапшота на внешние хранилища произошла ошибка: \"" << object.Path.string()
<< "\"\n\t" << tabulate(exc.what()));
}
}
// Отмечаем необходимые подразделы
for(const auto& [id, object] : GlobalConf.Objects) {
try {
const Policy& policy = GlobalConf.Policies.at(object.Id);
fs::path objectPath = remapPath(object.Path, GlobalConf.SystemRoot);
SubVolumeInfo objectInfo = co_await getSubvolumeInfo(objectPath);
std::string muuid = co_await getMountUUID(objectPath);
StorageId sid = UUIDToStorage.at(muuid);
uint32_t fullInterval = 1;
// По всем интервалам на хранилищах
// Проход по всем интервалам
for(auto [interval, storageId, count] : policy.SubIntervals) {
fullInterval *= interval;
if(storageId == uint16_t(-1)) {
storageId = sid;
}
std::vector<CycleStorageInfo>& dstStorageInfo = storagesInfo.at(storageId);
// Найти последнее в интервале и отметить
for(int iter = 0; iter < count; iter++) {
uint32_t countInterval = fullInterval*(iter+1);
CycleStorageInfo* lastSnapshot = nullptr;
boost::posix_time::time_duration last = boost::posix_time::neg_infin;
for(auto& snapshot : dstStorageInfo) {
std::string name = snapshot.Path.filename();
size_t pos = name.find(" ");
if(pos == std::string::npos) {
std::cout << "Неверно названный объект в хранилище: " << snapshot.Path << std::endl;
continue;
}
if(name.substr(0, pos) != objectInfo.UUID)
continue;
auto delta = now-boost::posix_time::minutes(IntervalLen*countInterval)-snapshot.SVInfo.CreationTime;
if(delta.is_negative() && delta > last) {
last = delta;
lastSnapshot = &snapshot;
}
}
if(lastSnapshot)
lastSnapshot->Reference += 1;
}
}
} catch(const std::exception& exc) {
MAKE_WARNING("Во время проверки зависимостей существующих подразделов произошла ошибка: \"" << object.Path.string()
<< "\"\n\t" << tabulate(exc.what()));
}
}
// Удалить ненужные снапшоты (на локальном хранилище должно остаться по одному экземпляру на каждый интервал политики)
for(auto& [id, storage] : storagesInfo) {
for(auto& [svinfo, path, refs] : storage) {
if(refs != 0)
continue;
try {
std::vector<std::string> args;
args.push_back("subvolume");
args.push_back("delete");
args.push_back(path);
auto [code, output] = co_await runProc("btrfs", args);
if(code != 0) {
MAKE_ERROR(output);
}
} catch(const std::exception& exc) {
MAKE_WARNING("Ошибка удаления подраздела \"" << path
<< "\"\n\t" << tabulate(exc.what()));
continue;
}
std::cout << "Удалено " << path << std::endl;
}
}
co_return;
}
bool Needshutdown = false;
asio::io_context IOC;
asio::steady_timer HourHalfTimer(IOC);
asio::signal_set SignalSet(IOC, SIGINT);
coro<> async_main() {
try {
std::cout << "Поиск точек монтирования хранилищ..." << std::endl;
for(const auto& [id, storage] : GlobalConf.Storages) {
std::string uuid = co_await getMountUUID(storage.Path);
UUIDToStorage[uuid] = id;
}
std::cout << "Запускаем цикл" << std::endl;
while(!Needshutdown) {
HourHalfTimer.expires_after(asio::chrono::minutes(std::max<uint16_t>(IntervalLen / 6, 1)));
co_await cycle();
co_await HourHalfTimer.async_wait();
}
} catch(const std::exception& exc) {
std::cout << "Корутина завершилась ошибкой:\n\t" << tabulate(exc.what()) << std::endl;
SignalSet.cancel();
}
// auto [code, data] = co_await runProc("btrfs", {"filesystem", "show"});
}
int main() {
std::cout << "Загружаем конфигурацию..." << std::endl;
GlobalConf.load();
std::cout << "Готово" << std::endl;
SignalSet.async_wait([](const boost::system::error_code& errc, int code) {
if(errc)
return;
std::cout << "Получена команда на завершение работы..." << std::endl;
Needshutdown = true;
HourHalfTimer.cancel();
});
asio::co_spawn(IOC, async_main(), asio::detached);
IOC.run();
std::cout << "Сохраняем конфигурацию..." << std::endl;
GlobalConf.save();
std::cout << "End of program" << std::endl;
return 0;
}