diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp b/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp new file mode 100644 index 00000000000..2b46c3d14ad --- /dev/null +++ b/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp @@ -0,0 +1,191 @@ +#include "MySQLGtid.h" +#include <boost/algorithm/string.hpp> +#include <IO/ReadHelpers.h> + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +void GTIDSet::tryMerge(size_t i) +{ + if ((i + 1) >= intervals.size()) + return; + + if (intervals[i].end != intervals[i + 1].start) + return; + intervals[i].end = intervals[i + 1].end; + intervals.erase(intervals.begin() + i + 1, intervals.begin() + i + 1 + 1); +} + +void GTIDSets::parse(String gtid_format) +{ + if (gtid_format.empty()) + { + return; + } + + std::vector<String> gtid_sets; + boost::split(gtid_sets, gtid_format, [](char c) { return c == ','; }); + + for (const auto & gs : gtid_sets) + { + auto gset = boost::trim_copy(gs); + + std::vector<String> server_ids; + boost::split(server_ids, gset, [](char c) { return c == ':'; }); + + GTIDSet set; + set.uuid = DB::parse<UUID>(server_ids[0]); + + for (size_t k = 1; k < server_ids.size(); ++k) + { + std::vector<String> inters; + boost::split(inters, server_ids[k], [](char c) { return c == '-'; }); + + GTIDSet::Interval val; + switch (inters.size()) + { + case 1: { + val.start = std::stol(inters[0]); + val.end = val.start + 1; + break; + } + case 2: { + val.start = std::stol(inters[0]); + val.end = std::stol(inters[1]) + 1; + break; + } + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "GTIDParse: Invalid GTID interval: {}", server_ids[k]); + } + set.intervals.emplace_back(val); + } + sets.emplace_back(set); + } +} + +void GTIDSets::update(const GTID & other) +{ + for (GTIDSet & set : sets) + { + if (set.uuid == other.uuid) + { + for (auto i = 0U; i < set.intervals.size(); ++i) + { + auto & current = set.intervals[i]; + + /// Already Contained. + if (other.seq_no >= current.start && other.seq_no < current.end) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "GTIDSets updates other: {} invalid successor to {}", + std::to_string(other.seq_no), std::to_string(current.end)); + } + + /// Try to shrink Sequence interval. + GTIDSet::tryShrink(set, i, current); + + /// Sequence, extend the interval. + if (other.seq_no == current.end) + { + set.intervals[i].end = other.seq_no + 1; + set.tryMerge(i); + return; + } + } + + /// Add new interval. + GTIDSet::Interval new_interval{other.seq_no, other.seq_no + 1}; + for (auto it = set.intervals.begin(); it != set.intervals.end(); ++it) + { + if (other.seq_no < (*it).start) + { + set.intervals.insert(it, new_interval); + return; + } + } + set.intervals.emplace_back(new_interval); + return; + } + } + + GTIDSet set; + GTIDSet::Interval interval{other.seq_no, other.seq_no + 1}; + set.uuid = other.uuid; + set.intervals.emplace_back(interval); + sets.emplace_back(set); +} + +void GTIDSet::tryShrink(GTIDSet & set, unsigned int i, GTIDSet::Interval & current) +{ + if (i != set.intervals.size() -1) + { + auto & next = set.intervals[i+1]; + if (current.end == next.start) + set.tryMerge(i); + } +} + +String GTIDSets::toString() const +{ + WriteBufferFromOwnString buffer; + + for (size_t i = 0; i < sets.size(); ++i) + { + GTIDSet set = sets[i]; + writeUUIDText(set.uuid, buffer); + + for (const auto & interval : set.intervals) + { + buffer.write(':'); + auto start = interval.start; + auto end = interval.end; + + if (end == (start + 1)) + { + writeString(std::to_string(start), buffer); + } + else + { + writeString(std::to_string(start), buffer); + buffer.write('-'); + writeString(std::to_string(end - 1), buffer); + } + } + + if (i < (sets.size() - 1)) + { + buffer.write(','); + } + } + return buffer.str(); +} + +String GTIDSets::toPayload() const +{ + WriteBufferFromOwnString buffer; + + UInt64 sets_size = sets.size(); + buffer.write(reinterpret_cast<const char *>(&sets_size), 8); + + for (const auto & set : sets) + { + // MySQL UUID is big-endian. + writeBinaryBigEndian(UUIDHelpers::getHighBytes(set.uuid), buffer); + writeBinaryBigEndian(UUIDHelpers::getLowBytes(set.uuid), buffer); + + UInt64 intervals_size = set.intervals.size(); + buffer.write(reinterpret_cast<const char *>(&intervals_size), 8); + for (const auto & interval : set.intervals) + { + buffer.write(reinterpret_cast<const char *>(&interval.start), 8); + buffer.write(reinterpret_cast<const char *>(&interval.end), 8); + } + } + return buffer.str(); +} + +} |
