diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-17 19:33:25 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-17 20:27:06 +0300 |
commit | 68aad25a66091da7133f67052c5757e132418923 (patch) | |
tree | 1775fcec8a3d690d6f4dd775269016f3d49f47e4 | |
parent | 770d5a50f59aaeaa24f62898b530365a904e79bf (diff) | |
download | ydb-68aad25a66091da7133f67052c5757e132418923.tar.gz |
Reworked debezium tests KIKIMR-15401, KIKIMR-19583
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 357 |
1 files changed, 179 insertions, 178 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 3634ebbeea..fa8a1caad5 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -15,6 +15,7 @@ #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <util/generic/size_literals.h> +#include <util/string/join.h> #include <util/string/printf.h> #include <util/string/strip.h> @@ -880,6 +881,7 @@ Y_UNIT_TEST_SUITE(Cdc) { } else if (!parseResult) { return false; } + NJson::TJsonValue expectedJson; parseResult = NJson::ReadJsonTree(expected, &expectedJson); if (assertOnParseError) { @@ -902,8 +904,12 @@ Y_UNIT_TEST_SUITE(Cdc) { if (expectedValue.GetStringRobust() != "***") { return true; } - // Discrepancy in path format here. GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]". Don't use "***" inside a non-root array - UNIT_ASSERT_C(!path.Contains("["), TStringBuilder() << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way"); + + // Discrepancy in path format here. + // GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]". + // Don't use "***" inside a non-root array. + UNIT_ASSERT_C(!path.Contains("["), TStringBuilder() + << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way"); NJson::TJsonValue actualValue; // If "***", find a corresponding actual value @@ -929,10 +935,19 @@ Y_UNIT_TEST_SUITE(Cdc) { if (!scanner.IsSuccess()) { return false; // actualJson is missing a path to *** } + return actualJson == expectedJson; } - // Unit test to verify that Json comparison with wildcard works + static void AssertJsonsEqual(const TString& actual, const TString& expected) { + UNIT_ASSERT_C(AreJsonsEqual(actual, expected), TStringBuilder() + << "Jsons are different: " << actual << " != " << expected); + } + + static bool CheckJsonsEqual(const TString& actual, const TString& expected) { + return AreJsonsEqual(actual, expected, false); + } + Y_UNIT_TEST(AreJsonsEqualReturnsTrueOnEqual) { UNIT_ASSERT(AreJsonsEqual("{}", "{}")); UNIT_ASSERT(AreJsonsEqual("[]", "[]")); @@ -1016,7 +1031,7 @@ Y_UNIT_TEST_SUITE(Cdc) { pStream = data->GetPartitionStream(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record); + AssertJsonsEqual(item.GetData(), record); if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record)); } @@ -1120,7 +1135,7 @@ Y_UNIT_TEST_SUITE(Cdc) { for (ui32 i = 0; i < records.size(); ++i) { const auto& actual = res.GetResult().records().at(i); const auto& expected = records.at(i); - UNIT_ASSERT_C(AreJsonsEqual(actual.data(), expected), TStringBuilder() << "Jsons are different: " << actual.data() << " != " << expected); + AssertJsonsEqual(actual.data(), expected); if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected)); } @@ -1149,52 +1164,28 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - static TString MetadataToString(TVector<std::pair<TString, TString>> messageMetadata) { - std::stable_sort(messageMetadata.begin(), messageMetadata.end(), [](const auto& a, const auto& b){return a.first < b.first;}); - TStringBuilder str; - str << "{"; - for (const auto& entry : messageMetadata) { - str << entry.first << ": " << entry.second << ","; - } - str << "}"; - return str; - } - - static void AssertMessageMetadataContains( - const TVector<std::pair<TString, TString>>& actual, - const TVector<std::pair<TString, TString>>& expected, - std::function<bool(const TString&, const TString&)> areValuesEqual = [](const TString& a, const TString& b) {return AreJsonsEqual(a, b, false);}) { - for(const auto& item : expected) { - const auto& match = std::find_if(actual.begin(), actual.end(), [&item, &areValuesEqual](const auto& a){return a.first == item.first && areValuesEqual(a.second, item.second);}); - UNIT_ASSERT_C(match != actual.end(), TStringBuilder() << "Message metadata "<< item.first << ": " << item.second << " was expected, but not found. Actual: " << MetadataToString(actual) << ". Expected: " << MetadataToString(expected)); - } - } - struct TopicRunner { - static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& records, bool checkKey = true) - { - TTestTopicEnv env(tableDesc, streamDesc); - - for (const auto& query : queries) { - ExecSQL(env.GetServer(), env.GetEdgeActor(), query); - } + private: + using TMessageMeta = TVector<std::pair<TString, TString>>; - auto& client = env.GetClient(); + static TString DumpMessageMeta(TMessageMeta messageMeta) { + std::stable_sort(messageMeta.begin(), messageMeta.end()); + return JoinSeq(", ", messageMeta); + } - // add consumer - { - auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings().BeginAddConsumer("user") - .EndAddConsumer()).ExtractValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + static void AssertMessageMetaContains(const TMessageMeta& actual, const TMessageMeta& expected) { + for (const auto& e : expected) { + auto it = std::find_if(actual.begin(), actual.end(), [&e](const auto& a) { + return a.first == e.first && CheckJsonsEqual(a.second, e.second); + }); + UNIT_ASSERT_C(it != actual.end(), TStringBuilder() << "Message meta '" << e << "' was expected" + << ": actual# " << DumpMessageMeta(actual) + << ", expected# " << DumpMessageMeta(expected)); } + } - // get records - auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings() - .AppendTopics(TString("/Root/Table/Stream")) - .ConsumerName("user") - ); - + public: + static void WaitForContent(NYdb::NTopic::IReadSession* reader, const TVector<std::pair<TString, TMessageMeta>>& records) { ui32 reads = 0; while (reads < records.size()) { auto ev = reader->GetEvent(true); @@ -1204,10 +1195,9 @@ Y_UNIT_TEST_SUITE(Cdc) { if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) { pStream = data->GetPartitionSession(); for (const auto& item : data->GetMessages()) { - const auto& record = records.at(reads++); - UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first); - AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second); - Y_UNUSED(checkKey); + const auto& [body, meta] = records.at(reads++); + AssertJsonsEqual(item.GetData(), body); + AssertMessageMetaContains(item.GetMessageMeta()->Fields, meta); } } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) { pStream = create->GetPartitionSession(); @@ -1223,6 +1213,35 @@ Y_UNIT_TEST_SUITE(Cdc) { UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), "/Root/Table/Stream"); } } + } + + static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, + const TVector<TString>& queries, const TVector<std::pair<TString, TMessageMeta>>& records) + { + TTestTopicEnv env(tableDesc, streamDesc); + + for (const auto& query : queries) { + ExecSQL(env.GetServer(), env.GetEdgeActor(), query); + } + + auto& client = env.GetClient(); + + // add consumer + { + auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer("user").EndAddConsumer()).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + // create reader + auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings() + .AppendTopics(TString("/Root/Table/Stream")) + .ConsumerName("user") + ); + + // get records + WaitForContent(reader.get(), records); + // remove consumer { auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings() @@ -1234,11 +1253,14 @@ Y_UNIT_TEST_SUITE(Cdc) { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) { - TVector<std::pair<TString, TVector<std::pair<TString, TString>>>> recordsWithMetadata = {}; + Y_UNUSED(checkKey); + + TVector<std::pair<TString, TMessageMeta>> recordsWithMetadata(Reserve(records.size())); for (const auto& record : records) { - recordsWithMetadata.emplace_back(record, TVector<std::pair<TString, TString>>()); + recordsWithMetadata.emplace_back(record, TMessageMeta()); } - Read(tableDesc, streamDesc, queries, recordsWithMetadata, checkKey); + + Read(tableDesc, streamDesc, queries, recordsWithMetadata); } static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) { @@ -1264,12 +1286,37 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - /** - Usage: MessageWithMetadata("body", "metadata_key", "metadata_value") - */ - static std::pair<TString, TVector<std::pair<TString,TString>>> MessageWithOneMetadataItem(const TString& body, const TString& meta_key, const TString& meta_value) { - TVector<std::pair<TString,TString>> metas; - return std::make_pair(body, TVector<std::pair<TString,TString>>{std::make_pair(meta_key, meta_value)}); + static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false, bool timestamps = false) { + NJsonWriter::TBuf body; + auto root = body.BeginObject(); + auto payload = root.WriteKey("payload").BeginObject(); + + payload + .WriteKey("op").WriteString(op) + .WriteKey("source") + .BeginObject() + .WriteKey("connector").WriteString("ydb_debezium_json") + .WriteKey("version").WriteString("***") + .WriteKey("txId").WriteString("***") + .WriteKey("ts_ms").WriteString("***") + .WriteKey("snapshot").WriteBool(snapshot) + .EndObject(); + + if (before) { + payload.WriteKey("before").UnsafeWriteValue(before); + } + + if (after) { + payload.WriteKey("after").UnsafeWriteValue(after); + } + + if (timestamps) { + payload.WriteKey("ts").WriteString("***"); + } + + payload.EndObject(); + root.EndObject(); + return body.Str(); } #define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \ @@ -1310,10 +1357,10 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})") + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("d", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, }); } @@ -1357,7 +1404,7 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - Y_UNIT_TEST(NewAndOldImagesLogDebezium) { // Message-level meta is supported through topic api only at the time of writing + Y_UNIT_TEST(NewAndOldImagesLogDebezium) { TopicRunner::Read(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"( UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10), @@ -1371,14 +1418,14 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - }, false); + {DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("d", R"({"key":1,"value":100})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + }); } Y_UNIT_TEST(OldImageLogDebezium) { @@ -1395,14 +1442,14 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - }, false); + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("u", R"({"key":1,"value":10})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", R"({"key":2,"value":20})", nullptr), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", R"({"key":3,"value":30})", nullptr), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("d", R"({"key":1,"value":100})", nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + }); } Y_UNIT_TEST(NewImageLogDebezium) { @@ -1419,14 +1466,14 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), - }, false); + {DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("u", nullptr, R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", nullptr, R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", nullptr, R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("d", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}}, + }); } Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) { @@ -1442,7 +1489,7 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - Y_UNIT_TEST(VirtualTimestampsNewAndOldImagesLogDebezium) { + Y_UNIT_TEST(VirtualTimestampsDebezium) { TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"( UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10), @@ -1456,14 +1503,14 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), - }, false); + {DebeziumBody("c", nullptr, R"({"key":1,"value":10})", false, true), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("c", nullptr, R"({"key":2,"value":20})", false, true), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("c", nullptr, R"({"key":3,"value":30})", false, true), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})", false, true), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})", false, true), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})", false, true), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("d", R"({"key":1,"value":100})", nullptr, false, true), {{"__key", R"({"payload":{"key":1}})"}}}, + }); } TShardedTableOptions DocApiTable() { @@ -1636,7 +1683,7 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - Y_UNIT_TEST(DebeziumHugeKey) { + Y_UNIT_TEST(HugeKeyDebezium) { const auto key = TString(512_KB, 'A'); const auto table = TShardedTableOptions() .Columns({ @@ -1648,8 +1695,8 @@ Y_UNIT_TEST_SUITE(Cdc) { UPSERT INTO `/Root/Table` (key, value) VALUES ("%s", 1); )", key.c_str())}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())), - }, false); + {DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}}, + }); } Y_UNIT_TEST_TRIPLET(Write, PqRunner, YdsRunner, TopicRunner) { @@ -1841,7 +1888,7 @@ Y_UNIT_TEST_SUITE(Cdc) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); if (records.size() == expected.size()) { for (ui32 i = 0; i < expected.size(); ++i) { - UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder() << "Jsons are different, " << records.at(i).second << " != " << expected.at(i)); + AssertJsonsEqual(records.at(i).second, expected.at(i)); } break; @@ -2550,49 +2597,9 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - void WaitForContent(NYdb::NTopic::TTopicClient& client, const TString& consumerName, const TString& path, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& expected) { - // get records - auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings() - .AppendTopics(path) - .ConsumerName(consumerName) - ); - - ui32 reads = 0; - while (reads < expected.size()) { - auto ev = reader->GetEvent(true); - UNIT_ASSERT(ev); - - NYdb::NTopic::TPartitionSession::TPtr pStream; - if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) { - pStream = data->GetPartitionSession(); - for (const auto& item : data->GetMessages()) { - const auto& record = expected.at(reads++); - UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first); - AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second); - } - data->Commit(); - } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) { - pStream = create->GetPartitionSession(); - create->Confirm(); - } else if (auto* destroy = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*ev)) { - pStream = destroy->GetPartitionSession(); - destroy->Confirm(); - } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*ev)) { - break; - } - - if (pStream) { - UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), path); - } - } - - UNIT_ASSERT_C(reads == expected.size(), "Read less events than expected"); - } - Y_UNIT_TEST(InitialScanDebezium) { - auto table = SimpleTable(); - auto unusedStream = KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream"); - TTestTopicEnv env(table, unusedStream); + TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream")); + auto& client = env.GetClient(); // Populate data ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( @@ -2603,31 +2610,27 @@ Y_UNIT_TEST_SUITE(Cdc) { )"); // add a stream with initial scan - WaitTxNotification( - env.GetServer(), env.GetEdgeActor(), - AsyncAlterAddStream( - env.GetServer(), "/Root", "Table", - WithInitialScan(NewAndOldImages( - NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)))); - - auto &client = env.GetClient(); + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), AsyncAlterAddStream(env.GetServer(), "/Root", "Table", + WithInitialScan(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)))); // add consumer { - auto res = client - .AlterTopic("/Root/Table/Stream", - NYdb::NTopic::TAlterTopicSettings() - .BeginAddConsumer("user") - .EndAddConsumer()) - .ExtractValueSync(); + auto res = client.AlterTopic("/Root/Table/Stream", NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer("user").EndAddConsumer()).ExtractValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } + // create reader + auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings() + .AppendTopics(TString("/Root/Table/Stream")) + .ConsumerName("user") + ); + // Wait for initial scan records - WaitForContent(client, "user", "/Root/Table/Stream", { - MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + TopicRunner::WaitForContent(reader.get(), { + {DebeziumBody("r", nullptr, R"({"key":1,"value":10})", true), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("r", nullptr, R"({"key":2,"value":20})", true), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("r", nullptr, R"({"key":3,"value":30})", true), {{"__key", R"({"payload":{"key":3}})"}}}, }); // Perform update after initial scan @@ -2640,24 +2643,12 @@ Y_UNIT_TEST_SUITE(Cdc) { )"); // Wait for update records - WaitForContent(client, "user", "/Root/Table/Stream", { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":4,"value":400}}})", "__key", R"({"payload":{"key":4}})"), + TopicRunner::WaitForContent(reader.get(), { + {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})"), {{"__key", R"({"payload":{"key":1}})"}}}, + {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})"), {{"__key", R"({"payload":{"key":2}})"}}}, + {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})"), {{"__key", R"({"payload":{"key":3}})"}}}, + {DebeziumBody("c", nullptr, R"({"key":4,"value":400})"), {{"__key", R"({"payload":{"key":4}})"}}}, }); - - // remove consumer - { - auto res = - client - .AlterTopic( - "/Root/Table/Stream", - NYdb::NTopic::TAlterTopicSettings().AppendDropConsumers( - "user")) - .ExtractValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } } Y_UNIT_TEST(InitialScanUpdatedRows) { @@ -3191,3 +3182,13 @@ Y_UNIT_TEST_SUITE(Cdc) { } // Cdc } // NKikimr + +template <> +void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) { + output << x.first << ":" << x.second; +} + +void AppendToString(TString& dst, const std::pair<TString, TString>& x) { + TStringOutput output(dst); + output << x; +} |