summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp')
-rw-r--r--contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp191
1 files changed, 191 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp b/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp
new file mode 100644
index 00000000000..2b46c3d14ad
--- /dev/null
+++ b/contrib/clickhouse/src/Core/MySQL/MySQLGtid.cpp
@@ -0,0 +1,191 @@
+#include "MySQLGtid.h"
+#include <boost/algorithm/string.hpp>
+#include <IO/ReadHelpers.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+void GTIDSet::tryMerge(size_t i)
+{
+ if ((i + 1) >= intervals.size())
+ return;
+
+ if (intervals[i].end != intervals[i + 1].start)
+ return;
+ intervals[i].end = intervals[i + 1].end;
+ intervals.erase(intervals.begin() + i + 1, intervals.begin() + i + 1 + 1);
+}
+
+void GTIDSets::parse(String gtid_format)
+{
+ if (gtid_format.empty())
+ {
+ return;
+ }
+
+ std::vector<String> gtid_sets;
+ boost::split(gtid_sets, gtid_format, [](char c) { return c == ','; });
+
+ for (const auto & gs : gtid_sets)
+ {
+ auto gset = boost::trim_copy(gs);
+
+ std::vector<String> server_ids;
+ boost::split(server_ids, gset, [](char c) { return c == ':'; });
+
+ GTIDSet set;
+ set.uuid = DB::parse<UUID>(server_ids[0]);
+
+ for (size_t k = 1; k < server_ids.size(); ++k)
+ {
+ std::vector<String> inters;
+ boost::split(inters, server_ids[k], [](char c) { return c == '-'; });
+
+ GTIDSet::Interval val;
+ switch (inters.size())
+ {
+ case 1: {
+ val.start = std::stol(inters[0]);
+ val.end = val.start + 1;
+ break;
+ }
+ case 2: {
+ val.start = std::stol(inters[0]);
+ val.end = std::stol(inters[1]) + 1;
+ break;
+ }
+ default:
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "GTIDParse: Invalid GTID interval: {}", server_ids[k]);
+ }
+ set.intervals.emplace_back(val);
+ }
+ sets.emplace_back(set);
+ }
+}
+
+void GTIDSets::update(const GTID & other)
+{
+ for (GTIDSet & set : sets)
+ {
+ if (set.uuid == other.uuid)
+ {
+ for (auto i = 0U; i < set.intervals.size(); ++i)
+ {
+ auto & current = set.intervals[i];
+
+ /// Already Contained.
+ if (other.seq_no >= current.start && other.seq_no < current.end)
+ {
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "GTIDSets updates other: {} invalid successor to {}",
+ std::to_string(other.seq_no), std::to_string(current.end));
+ }
+
+ /// Try to shrink Sequence interval.
+ GTIDSet::tryShrink(set, i, current);
+
+ /// Sequence, extend the interval.
+ if (other.seq_no == current.end)
+ {
+ set.intervals[i].end = other.seq_no + 1;
+ set.tryMerge(i);
+ return;
+ }
+ }
+
+ /// Add new interval.
+ GTIDSet::Interval new_interval{other.seq_no, other.seq_no + 1};
+ for (auto it = set.intervals.begin(); it != set.intervals.end(); ++it)
+ {
+ if (other.seq_no < (*it).start)
+ {
+ set.intervals.insert(it, new_interval);
+ return;
+ }
+ }
+ set.intervals.emplace_back(new_interval);
+ return;
+ }
+ }
+
+ GTIDSet set;
+ GTIDSet::Interval interval{other.seq_no, other.seq_no + 1};
+ set.uuid = other.uuid;
+ set.intervals.emplace_back(interval);
+ sets.emplace_back(set);
+}
+
+void GTIDSet::tryShrink(GTIDSet & set, unsigned int i, GTIDSet::Interval & current)
+{
+ if (i != set.intervals.size() -1)
+ {
+ auto & next = set.intervals[i+1];
+ if (current.end == next.start)
+ set.tryMerge(i);
+ }
+}
+
+String GTIDSets::toString() const
+{
+ WriteBufferFromOwnString buffer;
+
+ for (size_t i = 0; i < sets.size(); ++i)
+ {
+ GTIDSet set = sets[i];
+ writeUUIDText(set.uuid, buffer);
+
+ for (const auto & interval : set.intervals)
+ {
+ buffer.write(':');
+ auto start = interval.start;
+ auto end = interval.end;
+
+ if (end == (start + 1))
+ {
+ writeString(std::to_string(start), buffer);
+ }
+ else
+ {
+ writeString(std::to_string(start), buffer);
+ buffer.write('-');
+ writeString(std::to_string(end - 1), buffer);
+ }
+ }
+
+ if (i < (sets.size() - 1))
+ {
+ buffer.write(',');
+ }
+ }
+ return buffer.str();
+}
+
+String GTIDSets::toPayload() const
+{
+ WriteBufferFromOwnString buffer;
+
+ UInt64 sets_size = sets.size();
+ buffer.write(reinterpret_cast<const char *>(&sets_size), 8);
+
+ for (const auto & set : sets)
+ {
+ // MySQL UUID is big-endian.
+ writeBinaryBigEndian(UUIDHelpers::getHighBytes(set.uuid), buffer);
+ writeBinaryBigEndian(UUIDHelpers::getLowBytes(set.uuid), buffer);
+
+ UInt64 intervals_size = set.intervals.size();
+ buffer.write(reinterpret_cast<const char *>(&intervals_size), 8);
+ for (const auto & interval : set.intervals)
+ {
+ buffer.write(reinterpret_cast<const char *>(&interval.start), 8);
+ buffer.write(reinterpret_cast<const char *>(&interval.end), 8);
+ }
+ }
+ return buffer.str();
+}
+
+}