aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-17 19:33:25 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-17 20:27:06 +0300
commit68aad25a66091da7133f67052c5757e132418923 (patch)
tree1775fcec8a3d690d6f4dd775269016f3d49f47e4
parent770d5a50f59aaeaa24f62898b530365a904e79bf (diff)
downloadydb-68aad25a66091da7133f67052c5757e132418923.tar.gz
Reworked debezium tests KIKIMR-15401, KIKIMR-19583
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp357
1 files changed, 179 insertions, 178 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 3634ebbeea..fa8a1caad5 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -15,6 +15,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <util/generic/size_literals.h>
+#include <util/string/join.h>
#include <util/string/printf.h>
#include <util/string/strip.h>
@@ -880,6 +881,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
} else if (!parseResult) {
return false;
}
+
NJson::TJsonValue expectedJson;
parseResult = NJson::ReadJsonTree(expected, &expectedJson);
if (assertOnParseError) {
@@ -902,8 +904,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (expectedValue.GetStringRobust() != "***") {
return true;
}
- // Discrepancy in path format here. GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]". Don't use "***" inside a non-root array
- UNIT_ASSERT_C(!path.Contains("["), TStringBuilder() << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way");
+
+ // Discrepancy in path format here.
+ // GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]".
+ // Don't use "***" inside a non-root array.
+ UNIT_ASSERT_C(!path.Contains("["), TStringBuilder()
+ << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way");
NJson::TJsonValue actualValue;
// If "***", find a corresponding actual value
@@ -929,10 +935,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (!scanner.IsSuccess()) {
return false; // actualJson is missing a path to ***
}
+
return actualJson == expectedJson;
}
- // Unit test to verify that Json comparison with wildcard works
+ static void AssertJsonsEqual(const TString& actual, const TString& expected) {
+ UNIT_ASSERT_C(AreJsonsEqual(actual, expected), TStringBuilder()
+ << "Jsons are different: " << actual << " != " << expected);
+ }
+
+ static bool CheckJsonsEqual(const TString& actual, const TString& expected) {
+ return AreJsonsEqual(actual, expected, false);
+ }
+
Y_UNIT_TEST(AreJsonsEqualReturnsTrueOnEqual) {
UNIT_ASSERT(AreJsonsEqual("{}", "{}"));
UNIT_ASSERT(AreJsonsEqual("[]", "[]"));
@@ -1016,7 +1031,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
pStream = data->GetPartitionStream();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record);
+ AssertJsonsEqual(item.GetData(), record);
if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record));
}
@@ -1120,7 +1135,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
for (ui32 i = 0; i < records.size(); ++i) {
const auto& actual = res.GetResult().records().at(i);
const auto& expected = records.at(i);
- UNIT_ASSERT_C(AreJsonsEqual(actual.data(), expected), TStringBuilder() << "Jsons are different: " << actual.data() << " != " << expected);
+ AssertJsonsEqual(actual.data(), expected);
if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected));
}
@@ -1149,52 +1164,28 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
- static TString MetadataToString(TVector<std::pair<TString, TString>> messageMetadata) {
- std::stable_sort(messageMetadata.begin(), messageMetadata.end(), [](const auto& a, const auto& b){return a.first < b.first;});
- TStringBuilder str;
- str << "{";
- for (const auto& entry : messageMetadata) {
- str << entry.first << ": " << entry.second << ",";
- }
- str << "}";
- return str;
- }
-
- static void AssertMessageMetadataContains(
- const TVector<std::pair<TString, TString>>& actual,
- const TVector<std::pair<TString, TString>>& expected,
- std::function<bool(const TString&, const TString&)> areValuesEqual = [](const TString& a, const TString& b) {return AreJsonsEqual(a, b, false);}) {
- for(const auto& item : expected) {
- const auto& match = std::find_if(actual.begin(), actual.end(), [&item, &areValuesEqual](const auto& a){return a.first == item.first && areValuesEqual(a.second, item.second);});
- UNIT_ASSERT_C(match != actual.end(), TStringBuilder() << "Message metadata "<< item.first << ": " << item.second << " was expected, but not found. Actual: " << MetadataToString(actual) << ". Expected: " << MetadataToString(expected));
- }
- }
-
struct TopicRunner {
- static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& records, bool checkKey = true)
- {
- TTestTopicEnv env(tableDesc, streamDesc);
-
- for (const auto& query : queries) {
- ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
- }
+ private:
+ using TMessageMeta = TVector<std::pair<TString, TString>>;
- auto& client = env.GetClient();
+ static TString DumpMessageMeta(TMessageMeta messageMeta) {
+ std::stable_sort(messageMeta.begin(), messageMeta.end());
+ return JoinSeq(", ", messageMeta);
+ }
- // add consumer
- {
- auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings().BeginAddConsumer("user")
- .EndAddConsumer()).ExtractValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ static void AssertMessageMetaContains(const TMessageMeta& actual, const TMessageMeta& expected) {
+ for (const auto& e : expected) {
+ auto it = std::find_if(actual.begin(), actual.end(), [&e](const auto& a) {
+ return a.first == e.first && CheckJsonsEqual(a.second, e.second);
+ });
+ UNIT_ASSERT_C(it != actual.end(), TStringBuilder() << "Message meta '" << e << "' was expected"
+ << ": actual# " << DumpMessageMeta(actual)
+ << ", expected# " << DumpMessageMeta(expected));
}
+ }
- // get records
- auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings()
- .AppendTopics(TString("/Root/Table/Stream"))
- .ConsumerName("user")
- );
-
+ public:
+ static void WaitForContent(NYdb::NTopic::IReadSession* reader, const TVector<std::pair<TString, TMessageMeta>>& records) {
ui32 reads = 0;
while (reads < records.size()) {
auto ev = reader->GetEvent(true);
@@ -1204,10 +1195,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
pStream = data->GetPartitionSession();
for (const auto& item : data->GetMessages()) {
- const auto& record = records.at(reads++);
- UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first);
- AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second);
- Y_UNUSED(checkKey);
+ const auto& [body, meta] = records.at(reads++);
+ AssertJsonsEqual(item.GetData(), body);
+ AssertMessageMetaContains(item.GetMessageMeta()->Fields, meta);
}
} else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) {
pStream = create->GetPartitionSession();
@@ -1223,6 +1213,35 @@ Y_UNIT_TEST_SUITE(Cdc) {
UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), "/Root/Table/Stream");
}
}
+ }
+
+ static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
+ const TVector<TString>& queries, const TVector<std::pair<TString, TMessageMeta>>& records)
+ {
+ TTestTopicEnv env(tableDesc, streamDesc);
+
+ for (const auto& query : queries) {
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
+ }
+
+ auto& client = env.GetClient();
+
+ // add consumer
+ {
+ auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer("user").EndAddConsumer()).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ // create reader
+ auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings()
+ .AppendTopics(TString("/Root/Table/Stream"))
+ .ConsumerName("user")
+ );
+
+ // get records
+ WaitForContent(reader.get(), records);
+
// remove consumer
{
auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings()
@@ -1234,11 +1253,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
{
- TVector<std::pair<TString, TVector<std::pair<TString, TString>>>> recordsWithMetadata = {};
+ Y_UNUSED(checkKey);
+
+ TVector<std::pair<TString, TMessageMeta>> recordsWithMetadata(Reserve(records.size()));
for (const auto& record : records) {
- recordsWithMetadata.emplace_back(record, TVector<std::pair<TString, TString>>());
+ recordsWithMetadata.emplace_back(record, TMessageMeta());
}
- Read(tableDesc, streamDesc, queries, recordsWithMetadata, checkKey);
+
+ Read(tableDesc, streamDesc, queries, recordsWithMetadata);
}
static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
@@ -1264,12 +1286,37 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
- /**
- Usage: MessageWithMetadata("body", "metadata_key", "metadata_value")
- */
- static std::pair<TString, TVector<std::pair<TString,TString>>> MessageWithOneMetadataItem(const TString& body, const TString& meta_key, const TString& meta_value) {
- TVector<std::pair<TString,TString>> metas;
- return std::make_pair(body, TVector<std::pair<TString,TString>>{std::make_pair(meta_key, meta_value)});
+ static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false, bool timestamps = false) {
+ NJsonWriter::TBuf body;
+ auto root = body.BeginObject();
+ auto payload = root.WriteKey("payload").BeginObject();
+
+ payload
+ .WriteKey("op").WriteString(op)
+ .WriteKey("source")
+ .BeginObject()
+ .WriteKey("connector").WriteString("ydb_debezium_json")
+ .WriteKey("version").WriteString("***")
+ .WriteKey("txId").WriteString("***")
+ .WriteKey("ts_ms").WriteString("***")
+ .WriteKey("snapshot").WriteBool(snapshot)
+ .EndObject();
+
+ if (before) {
+ payload.WriteKey("before").UnsafeWriteValue(before);
+ }
+
+ if (after) {
+ payload.WriteKey("after").UnsafeWriteValue(after);
+ }
+
+ if (timestamps) {
+ payload.WriteKey("ts").WriteString("***");
+ }
+
+ payload.EndObject();
+ root.EndObject();
+ return body.Str();
}
#define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \
@@ -1310,10 +1357,10 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})")
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("d", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
});
}
@@ -1357,7 +1404,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- Y_UNIT_TEST(NewAndOldImagesLogDebezium) { // Message-level meta is supported through topic api only at the time of writing
+ Y_UNIT_TEST(NewAndOldImagesLogDebezium) {
TopicRunner::Read(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
@@ -1371,14 +1418,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- }, false);
+ {DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("d", R"({"key":1,"value":100})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ });
}
Y_UNIT_TEST(OldImageLogDebezium) {
@@ -1395,14 +1442,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- }, false);
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("u", R"({"key":1,"value":10})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", R"({"key":2,"value":20})", nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", R"({"key":3,"value":30})", nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("d", R"({"key":1,"value":100})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ });
}
Y_UNIT_TEST(NewImageLogDebezium) {
@@ -1419,14 +1466,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
- }, false);
+ {DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("u", nullptr, R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", nullptr, R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", nullptr, R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("d", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
+ });
}
Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) {
@@ -1442,7 +1489,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- Y_UNIT_TEST(VirtualTimestampsNewAndOldImagesLogDebezium) {
+ Y_UNIT_TEST(VirtualTimestampsDebezium) {
TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
@@ -1456,14 +1503,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
- }, false);
+ {DebeziumBody("c", nullptr, R"({"key":1,"value":10})", false, true), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("c", nullptr, R"({"key":2,"value":20})", false, true), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("c", nullptr, R"({"key":3,"value":30})", false, true), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})", false, true), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})", false, true), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})", false, true), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("d", R"({"key":1,"value":100})", nullptr, false, true), {{"__key", R"({"payload":{"key":1}})"}}},
+ });
}
TShardedTableOptions DocApiTable() {
@@ -1636,7 +1683,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- Y_UNIT_TEST(DebeziumHugeKey) {
+ Y_UNIT_TEST(HugeKeyDebezium) {
const auto key = TString(512_KB, 'A');
const auto table = TShardedTableOptions()
.Columns({
@@ -1648,8 +1695,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
UPSERT INTO `/Root/Table` (key, value) VALUES
("%s", 1);
)", key.c_str())}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())),
- }, false);
+ {DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
+ });
}
Y_UNIT_TEST_TRIPLET(Write, PqRunner, YdsRunner, TopicRunner) {
@@ -1841,7 +1888,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
if (records.size() == expected.size()) {
for (ui32 i = 0; i < expected.size(); ++i) {
- UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder() << "Jsons are different, " << records.at(i).second << " != " << expected.at(i));
+ AssertJsonsEqual(records.at(i).second, expected.at(i));
}
break;
@@ -2550,49 +2597,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- void WaitForContent(NYdb::NTopic::TTopicClient& client, const TString& consumerName, const TString& path, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& expected) {
- // get records
- auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings()
- .AppendTopics(path)
- .ConsumerName(consumerName)
- );
-
- ui32 reads = 0;
- while (reads < expected.size()) {
- auto ev = reader->GetEvent(true);
- UNIT_ASSERT(ev);
-
- NYdb::NTopic::TPartitionSession::TPtr pStream;
- if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
- pStream = data->GetPartitionSession();
- for (const auto& item : data->GetMessages()) {
- const auto& record = expected.at(reads++);
- UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first);
- AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second);
- }
- data->Commit();
- } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) {
- pStream = create->GetPartitionSession();
- create->Confirm();
- } else if (auto* destroy = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*ev)) {
- pStream = destroy->GetPartitionSession();
- destroy->Confirm();
- } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*ev)) {
- break;
- }
-
- if (pStream) {
- UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), path);
- }
- }
-
- UNIT_ASSERT_C(reads == expected.size(), "Read less events than expected");
- }
-
Y_UNIT_TEST(InitialScanDebezium) {
- auto table = SimpleTable();
- auto unusedStream = KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream");
- TTestTopicEnv env(table, unusedStream);
+ TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream"));
+ auto& client = env.GetClient();
// Populate data
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
@@ -2603,31 +2610,27 @@ Y_UNIT_TEST_SUITE(Cdc) {
)");
// add a stream with initial scan
- WaitTxNotification(
- env.GetServer(), env.GetEdgeActor(),
- AsyncAlterAddStream(
- env.GetServer(), "/Root", "Table",
- WithInitialScan(NewAndOldImages(
- NKikimrSchemeOp::ECdcStreamFormatDebeziumJson))));
-
- auto &client = env.GetClient();
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(), AsyncAlterAddStream(env.GetServer(), "/Root", "Table",
+ WithInitialScan(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson))));
// add consumer
{
- auto res = client
- .AlterTopic("/Root/Table/Stream",
- NYdb::NTopic::TAlterTopicSettings()
- .BeginAddConsumer("user")
- .EndAddConsumer())
- .ExtractValueSync();
+ auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer("user").EndAddConsumer()).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
+ // create reader
+ auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings()
+ .AppendTopics(TString("/Root/Table/Stream"))
+ .ConsumerName("user")
+ );
+
// Wait for initial scan records
- WaitForContent(client, "user", "/Root/Table/Stream", {
- MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ TopicRunner::WaitForContent(reader.get(), {
+ {DebeziumBody("r", nullptr, R"({"key":1,"value":10})", true), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("r", nullptr, R"({"key":2,"value":20})", true), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("r", nullptr, R"({"key":3,"value":30})", true), {{"__key", R"({"payload":{"key":3}})"}}},
});
// Perform update after initial scan
@@ -2640,24 +2643,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
)");
// Wait for update records
- WaitForContent(client, "user", "/Root/Table/Stream", {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":4,"value":400}}})", "__key", R"({"payload":{"key":4}})"),
+ TopicRunner::WaitForContent(reader.get(), {
+ {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}},
+ {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}},
+ {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}},
+ {DebeziumBody("c", nullptr, R"({"key":4,"value":400})"), {{"__key", R"({"payload":{"key":4}})"}}},
});
-
- // remove consumer
- {
- auto res =
- client
- .AlterTopic(
- "/Root/Table/Stream",
- NYdb::NTopic::TAlterTopicSettings().AppendDropConsumers(
- "user"))
- .ExtractValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
- }
}
Y_UNIT_TEST(InitialScanUpdatedRows) {
@@ -3191,3 +3182,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
} // Cdc
} // NKikimr
+
+template <>
+void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
+ output << x.first << ":" << x.second;
+}
+
+void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
+ TStringOutput output(dst);
+ output << x;
+}