diff options
author | krock21 <krock21@yandex-team.com> | 2023-10-05 13:23:19 +0300 |
---|---|---|
committer | krock21 <krock21@yandex-team.com> | 2023-10-05 13:49:18 +0300 |
commit | a3c5a0543995dc268f536bcfe2c36b1501fcf9b6 (patch) | |
tree | 919a6e3322200ba5d9b65d6d4a43873fb0d94598 | |
parent | 010b93742060ca2c2362568a97c2d080946e06f5 (diff) | |
download | ydb-a3c5a0543995dc268f536bcfe2c36b1501fcf9b6.tar.gz |
Add DEBEZIUM_JSON CDC format
KIKIMR-15401
RFC: https://a.yandex-team.ru/arcadia/kikimr/docs/ru/overlay/rfc/debezium_support.md
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 19 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
-rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 60 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 38 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 274 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 6 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table_enum.h | 1 |
17 files changed, 410 insertions, 18 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 2930eb8b1ca..00e731f862f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1393,6 +1393,8 @@ public: add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON); } else if (to_lower(format) == "dynamodb_streams_json") { add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON); + } else if (to_lower(format) == "debezium_json") { + add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON); } else { ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), TStringBuilder() << "Unknown changefeed format: " << format)); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index a346fb75726..6c4115f3267 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3173,7 +3173,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) { void AddChangefeed(EChangefeedMode mode, EChangefeedFormat format) { TKikimrRunner kikimr(TKikimrSettings() .SetPQConfig(DefaultPQConfig()) - .SetEnableChangefeedDynamoDBStreamsFormat(true)); + .SetEnableChangefeedDynamoDBStreamsFormat(true) + .SetEnableChangefeedDebeziumJsonFormat(true)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3238,6 +3239,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { case EChangefeedFormat::Unknown: continue; case EChangefeedFormat::DynamoDBStreamsJson: + case EChangefeedFormat::DebeziumJson: if (mode == EChangefeedMode::Updates) { continue; } @@ -3288,7 +3290,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) { Y_UNIT_TEST(AddChangefeedNegative) { TKikimrRunner kikimr(TKikimrSettings() .SetPQConfig(DefaultPQConfig()) - .SetEnableChangefeedDynamoDBStreamsFormat(true)); + .SetEnableChangefeedDynamoDBStreamsFormat(true) + .SetEnableChangefeedDebeziumJsonFormat(true)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3376,6 +3379,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) { const auto result = session.ExecuteSchemeQuery(query, TExecSchemeQuerySettings().RequestType("_document_api_request")).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); } + + { + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table` ADD CHANGEFEED `feed` WITH ( + MODE = 'UPDATES', FORMAT = 'DEBEZIUM_JSON' + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } } Y_UNIT_TEST(ChangefeedAwsRegion) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ad2d137bfb3..318c31838be 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -842,6 +842,7 @@ message TFeatureFlags { optional bool EnableTempTables = 102 [default = false]; optional bool SuppressCompatibilityCheck = 103 [default = false]; optional bool EnableUniqConstraint = 104 [default = false]; + optional bool EnableChangefeedDebeziumJsonFormat = 105 [default = false]; } message THttpProxyConfig { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 631840c53a1..42d93ab4c1a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -739,6 +739,7 @@ enum ECdcStreamFormat { ECdcStreamFormatProto = 1; ECdcStreamFormatJson = 2; ECdcStreamFormatDynamoDBStreamsJson = 3; + ECdcStreamFormatDebeziumJson = 4; } message TCdcStreamDescription { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 675ba7d3f16..173e51e6ea3 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -49,8 +49,10 @@ public: FEATURE_FLAG_SETTER(EnableTopicSplitMerge) FEATURE_FLAG_SETTER(EnableTempTables) FEATURE_FLAG_SETTER(EnableChangefeedDynamoDBStreamsFormat) + FEATURE_FLAG_SETTER(EnableChangefeedDebeziumJsonFormat) FEATURE_FLAG_SETTER(ForceColumnTablesCompositeMarks) FEATURE_FLAG_SETTER(EnableUniqConstraint) + FEATURE_FLAG_SETTER(EnableTopicMessageMeta) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index c053694b59b..db9eb6df7b6 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -155,6 +155,14 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu } } +static void MergeJsonMaps(NJson::TJsonValue& mergeTo, NJson::TJsonValue& mergeFrom) { + Y_VERIFY(mergeTo.GetType() == NJson::EJsonValueType::JSON_MAP); + Y_VERIFY(mergeFrom.GetType() == NJson::EJsonValueType::JSON_MAP); + for (const auto& entry : mergeFrom.GetMap()) { + mergeTo.InsertValue(entry.first, entry.second); + } +} + static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) { for (auto v : vt) { value.AppendValue(v); @@ -342,6 +350,58 @@ void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, cons } } +void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const { + Y_VERIFY(Kind == EKind::CdcDataChange); + Y_VERIFY(Schema); + + const auto body = ParseBody(Body); + keyJson["payload"].SetType(NJson::JSON_MAP); + SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values + + valueJson["payload"].SetType(NJson::JSON_MAP); + // payload.before. Optional + if (body.HasOldImage()) { + SerializeJsonValue(Schema, valueJson["payload"]["before"], body.GetOldImage()); + MergeJsonMaps(valueJson["payload"]["before"], keyJson["payload"]); // Debezium expects key included in value + } + + // payload.after. Optional + if (body.HasNewImage()) { + SerializeJsonValue(Schema, valueJson["payload"]["after"], body.GetNewImage()); + MergeJsonMaps(valueJson["payload"]["after"], keyJson["payload"]); // Debezium expects key included in value + } + + // payload.op. Mandatory + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: + if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { + valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create + } else { + valueJson["payload"]["op"] = "u"; // u = update + } + break; + case NKikimrChangeExchange::TDataChange::kErase: + valueJson["payload"]["op"] = "d"; // d = delete + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } + + // payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here + if (virtualTimestamps) { + SerializeVirtualTimestamp(valueJson["payload"]["ts"], {Step, TxId}); + } + + // payload.source. Mandatory. + valueJson["payload"]["source"] = NJson::TJsonMap({ + {"version", "0.0.1"}, + {"connector", "ydb_debezium_json"}, + {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, + {"txId", TxId}, + }); +} + TConstArrayRef<TCell> TChangeRecord::GetKey() const { if (Key) { return *Key; diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index c1e7920df3f..03107df8ba5 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -51,6 +51,7 @@ public: void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const; void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const; void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const; + void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 2cbb316f66e..75056856056 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -78,6 +78,13 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } } + static NJson::TJsonWriterConfig DefaultJsonConfig() { + NJson::TJsonWriterConfig jsonConfig; + jsonConfig.ValidateUtf8 = false; + jsonConfig.WriteNanAsString = true; + return jsonConfig; + } + void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); NKikimrClient::TPersQueueRequest request; @@ -121,10 +128,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } TStringStream str; - NJson::TJsonWriterConfig jsonConfig; - jsonConfig.ValidateUtf8 = false; - jsonConfig.WriteNanAsString = true; - WriteJson(&str, &json, jsonConfig); + WriteJson(&str, &json, DefaultJsonConfig()); data.SetData(str.Str()); if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) { @@ -142,6 +146,29 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti break; } + case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: { + NJson::TJsonValue keyJson; + NJson::TJsonValue valueJson; + record.SerializeToDebeziumJson(keyJson, valueJson, Stream.VirtualTimestamps, Stream.Mode); + + TStringStream keyStr; + WriteJson(&keyStr, &keyJson, DefaultJsonConfig()); + + TStringStream valueStr; + WriteJson(&valueStr, &valueJson, DefaultJsonConfig()); + + // Add key in the same way as Kafka integration does + auto messageMeta = data.AddMessageMeta(); + messageMeta->set_key("__key"); // Kafka integration stores kafka key in "__key" metadata + messageMeta->set_value(keyStr.Str()); + + // Add value + data.SetData(valueStr.Str()); + cmd.SetData(data.SerializeAsString()); + cmd.SetPartitionKey(record.GetPartitionKey()); + break; + } + default: { LOG_E("Unknown format" << ": format# " << static_cast<int>(Stream.Format)); @@ -710,7 +737,8 @@ class TCdcChangeSenderMain } case NKikimrSchemeOp::ECdcStreamFormatJson: - case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: { + case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: + case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: { using namespace NKikimr::NDataStreams::V1; const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */); return ShardFromDecimal(hashKey, KeyDesc->Partitions.size()); diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 7f5e3a636fb..f17f8bd6efa 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -115,6 +115,7 @@ static bool IsJsonCdcStream(TUserTable::TCdcStream::EFormat format) { switch (format) { case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson: case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDynamoDBStreamsJson: + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDebeziumJson: return true; default: return false; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 2082c049a26..0c40a8a8200 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -718,7 +718,9 @@ Y_UNIT_TEST_SUITE(Cdc) { .SetUseRealThreads(useRealThreads) .SetDomainName(root) .SetGrpcPort(PortManager.GetPort(2135)) - .SetEnableChangefeedDynamoDBStreamsFormat(true); + .SetEnableChangefeedDynamoDBStreamsFormat(true) + .SetEnableChangefeedDebeziumJsonFormat(true) + .SetEnableTopicMessageMeta(true); Server = new TServer(settings); if (useRealThreads) { @@ -821,6 +823,22 @@ Y_UNIT_TEST_SUITE(Cdc) { }; } + TCdcStream OldImage(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { + return TCdcStream{ + .Name = name, + .Mode = NKikimrSchemeOp::ECdcStreamModeOldImage, + .Format = format, + }; + } + + TCdcStream NewImage(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { + return TCdcStream{ + .Name = name, + .Mode = NKikimrSchemeOp::ECdcStreamModeNewImage, + .Format = format, + }; + } + TCdcStream WithVirtualTimestamps(TCdcStream streamDesc) { streamDesc.VirtualTimestamps = true; return streamDesc; @@ -852,11 +870,22 @@ Y_UNIT_TEST_SUITE(Cdc) { return MD5::Calc(root.at("key").GetStringRobust()); } - static bool AreJsonsEqual(const TString& actual, const TString& expected) { + static bool AreJsonsEqual(const TString& actual, const TString& expected, bool assertOnParseError = true) { + bool parseResult; NJson::TJsonValue actualJson; - UNIT_ASSERT(NJson::ReadJsonTree(actual, &actualJson)); + parseResult = NJson::ReadJsonTree(actual, &actualJson); + if (assertOnParseError) { + UNIT_ASSERT(parseResult); + } else if (!parseResult) { + return false; + } NJson::TJsonValue expectedJson; - UNIT_ASSERT(NJson::ReadJsonTree(expected, &expectedJson)); + parseResult = NJson::ReadJsonTree(expected, &expectedJson); + if (assertOnParseError) { + UNIT_ASSERT(parseResult); + } else if (!parseResult) { + return false; + } class TScanner: public NJson::IScanCallback { NJson::TJsonValue& Actual; @@ -868,16 +897,22 @@ Y_UNIT_TEST_SUITE(Cdc) { {} bool Do(const TString& path, NJson::TJsonValue*, NJson::TJsonValue& expectedValue) override { + // Skip if not "***" if (expectedValue.GetStringRobust() != "***") { return true; } + // Discrepancy in path format here. GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]". Don't use "***" inside a non-root array + UNIT_ASSERT_C(!path.Contains("["), TStringBuilder() << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way"); NJson::TJsonValue actualValue; + // If "***", find a corresponding actual value if (!Actual.GetValueByPath(path, actualValue)) { + // Couldn't find an actual value for "***" Success = false; return false; } + // Replace "***" with actual value expectedValue = actualValue; return true; } @@ -890,10 +925,60 @@ Y_UNIT_TEST_SUITE(Cdc) { TScanner scanner(actualJson); expectedJson.Scan(scanner); - UNIT_ASSERT(scanner.IsSuccess()); + if (!scanner.IsSuccess()) { + return false; // actualJson is missing a path to *** + } return actualJson == expectedJson; } + // Unit test to verify that Json comparison with wildcard works + Y_UNIT_TEST(AreJsonsEqualReturnsTrueOnEqual) { + UNIT_ASSERT(AreJsonsEqual("{}", "{}")); + UNIT_ASSERT(AreJsonsEqual("[]", "[]")); + UNIT_ASSERT(AreJsonsEqual("1", "1")); + UNIT_ASSERT(AreJsonsEqual("null", "null")); + UNIT_ASSERT(AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"d","e":[1,2,"3"]})")); + UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":{},"key":[1]})")); + UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1,2]})", R"({"update":{},"key":[1,2]})")); + // Root wildcard + UNIT_ASSERT(AreJsonsEqual("{}", R"("***")")); + UNIT_ASSERT(AreJsonsEqual("1", R"("***")")); + UNIT_ASSERT(AreJsonsEqual(R"({"a": "b"})", R"("***")")); + UNIT_ASSERT(AreJsonsEqual("[1,2,3]", R"("***")")); + // Deep wildcard + UNIT_ASSERT(AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"***","e":"***"})")); + UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":{},"key":"***"})")); + UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"ts":[1,2]})", R"({"update":{},"ts":"***"})")); + }; + + Y_UNIT_TEST(AreJsonsEqualReturnsFalseOnDifferent) { + // Simple cases + UNIT_ASSERT(!AreJsonsEqual("{}", "[]")); + UNIT_ASSERT(!AreJsonsEqual("[]", "{}")); + UNIT_ASSERT(!AreJsonsEqual("1", "2")); + UNIT_ASSERT(!AreJsonsEqual("null", "[]")); + UNIT_ASSERT(!AreJsonsEqual("null", "{}")); + UNIT_ASSERT(!AreJsonsEqual("[]", "null")); + UNIT_ASSERT(!AreJsonsEqual("{}", "null")); + UNIT_ASSERT(!AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"d","e":[9,2,"3"]})")); + UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":[],"key":[1]})")); + UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"ts":[1,2]})", R"({"update":{},"key":[9,2]})")); + // Wildcart in actual value shouldn't be treated as a wildcard + UNIT_ASSERT(!AreJsonsEqual(R"("***")", "{}")); + UNIT_ASSERT(!AreJsonsEqual(R"({"a":"***"})", R"({"a":"b"})")); + // Deep wildcard + UNIT_ASSERT(!AreJsonsEqual(R"({"a":"z","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"***","e":"***"})")); + UNIT_ASSERT(!AreJsonsEqual(R"({"update":{"a":"b"},"key":[1]})", R"({"update":{},"key":"***"})")); + UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"key":{},"ts":[1,2]})", R"({"update":{},"ts":"***"})")); + }; + + Y_UNIT_TEST(AreJsonsEqualFailsOnWildcardInArray) { + // Wildcard in a not-root array is not supported because of a bug in code + UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"({"a":[1,{"a":"b"}]})", R"({"a":[1,{"a":"***"}]})")); + UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"({"a":[1]})", R"({"a":["***"]})")); + UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"([1])", R"(["***"])")); + } + struct PqRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) @@ -930,7 +1015,7 @@ Y_UNIT_TEST_SUITE(Cdc) { pStream = data->GetPartitionStream(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - UNIT_ASSERT(AreJsonsEqual(item.GetData(), record)); + UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record); if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record)); } @@ -1034,7 +1119,7 @@ Y_UNIT_TEST_SUITE(Cdc) { for (ui32 i = 0; i < records.size(); ++i) { const auto& actual = res.GetResult().records().at(i); const auto& expected = records.at(i); - UNIT_ASSERT(AreJsonsEqual(actual.data(), expected)); + UNIT_ASSERT_C(AreJsonsEqual(actual.data(), expected), TStringBuilder() << "Jsons are different: " << actual.data() << " != " << expected); if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected)); } @@ -1063,9 +1148,30 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; + static TString MetadataToString(TVector<std::pair<TString, TString>> messageMetadata) { + std::stable_sort(messageMetadata.begin(), messageMetadata.end(), [](const auto& a, const auto& b){return a.first < b.first;}); + TStringBuilder str; + str << "{"; + for (const auto& entry : messageMetadata) { + str << entry.first << ": " << entry.second << ","; + } + str << "}"; + return str; + } + + static void AssertMessageMetadataContains( + const TVector<std::pair<TString, TString>>& actual, + const TVector<std::pair<TString, TString>>& expected, + std::function<bool(const TString&, const TString&)> areValuesEqual = [](const TString& a, const TString& b) {return AreJsonsEqual(a, b, false);}) { + for(const auto& item : expected) { + const auto& match = std::find_if(actual.begin(), actual.end(), [&item, &areValuesEqual](const auto& a){return a.first == item.first && areValuesEqual(a.second, item.second);}); + UNIT_ASSERT_C(match != actual.end(), TStringBuilder() << "Message metadata "<< item.first << ": " << item.second << " was expected, but not found. Actual: " << MetadataToString(actual) << ". Expected: " << MetadataToString(expected)); + } + } + struct TopicRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) + const TVector<TString>& queries, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& records, bool checkKey = true) { TTestTopicEnv env(tableDesc, streamDesc); @@ -1098,7 +1204,8 @@ Y_UNIT_TEST_SUITE(Cdc) { pStream = data->GetPartitionSession(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - UNIT_ASSERT(AreJsonsEqual(item.GetData(), record)); + UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first); + AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second); Y_UNUSED(checkKey); } } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) { @@ -1123,6 +1230,16 @@ Y_UNIT_TEST_SUITE(Cdc) { } } + static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, + const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) + { + TVector<std::pair<TString, TVector<std::pair<TString, TString>>>> recordsWithMetadata = {}; + for (const auto& record : records) { + recordsWithMetadata.emplace_back(record, TVector<std::pair<TString, TString>>()); + } + Read(tableDesc, streamDesc, queries, recordsWithMetadata, checkKey); + } + static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) { TTestPqEnv env(tableDesc, streamDesc); @@ -1146,6 +1263,14 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; + /** + Usage: MessageWithMetadata("body", "metadata_key", "metadata_value") + */ + static std::pair<TString, TVector<std::pair<TString,TString>>> MessageWithOneMetadataItem(const TString& body, const TString& meta_key, const TString& meta_value) { + TVector<std::pair<TString,TString>> metas; + return std::make_pair(body, TVector<std::pair<TString,TString>>{std::make_pair(meta_key, meta_value)}); + } + #define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \ template<typename TRunner> void N(NUnitTest::TTestContext&); \ struct TTestRegistration##N { \ @@ -1175,6 +1300,22 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(KeysOnlyLogDebezium) { + TopicRunner::Read(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})") + }); + } + Y_UNIT_TEST_TRIPLET(UpdatesLog, PqRunner, YdsRunner, TopicRunner) { TRunner::Read(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( UPSERT INTO `/Root/Table` (key, value) VALUES @@ -1215,6 +1356,78 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(NewAndOldImagesLogDebezium) { // Message-level meta is supported through topic api only at the time of writing + TopicRunner::Read(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + }, false); + } + + Y_UNIT_TEST(OldImageLogDebezium) { + TopicRunner::Read(SimpleTable(), OldImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + }, false); + } + + Y_UNIT_TEST(NewImageLogDebezium) { + TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), + }, false); + } + Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) { TRunner::Read(SimpleTable(), WithVirtualTimestamps(KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)), {R"( UPSERT INTO `/Root/Table` (key, value) VALUES @@ -1228,6 +1441,30 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(VirtualTimestampsNewAndOldImagesLogDebezium) { + TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + }, false); + } + TShardedTableOptions DocApiTable() { return TShardedTableOptions() .Columns({ @@ -1398,6 +1635,22 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(DebeziumHugeKey) { + const auto key = TString(512_KB, 'A'); + const auto table = TShardedTableOptions() + .Columns({ + {"key", "Utf8", true, false}, + {"value", "Uint32", false, false}, + }); + + TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + ("%s", 1); + )", key.c_str())}, { + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())), + }, false); + } + Y_UNIT_TEST_TRIPLET(Write, PqRunner, YdsRunner, TopicRunner) { TRunner::Write(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)); } @@ -1587,8 +1840,7 @@ Y_UNIT_TEST_SUITE(Cdc) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); if (records.size() == expected.size()) { for (ui32 i = 0; i < expected.size(); ++i) { - UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder() - << records.at(i).second << " != " << expected.at(i)); + UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder() << "Jsons are different, " << records.at(i).second << " != " << expected.at(i)); } break; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 26fbd747629..55b6af0e5e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -184,6 +184,11 @@ public: "DYNAMODB_STREAMS_JSON format incompatible with specified stream mode"); return result; } + if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDebeziumJson) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "DEBEZIUM_JSON format incompatible with specified stream mode"); + return result; + } break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() @@ -212,6 +217,13 @@ public: return result; } break; + case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: + if (!AppData()->FeatureFlags.GetEnableChangefeedDebeziumJsonFormat()) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, + "DEBEZIUM_JSON format is not supported yet"); + return result; + } + break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "Invalid stream format: " << static_cast<ui32>(streamDesc.GetFormat())); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index b6bd1124041..c655b0931f4 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -537,6 +537,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnablePQConfigTransactionsAtSchemeShard(opts.EnablePQConfigTransactionsAtSchemeShard_); app.SetEnableTopicSplitMerge(opts.EnableTopicSplitMerge_); app.SetEnableChangefeedDynamoDBStreamsFormat(opts.EnableChangefeedDynamoDBStreamsFormat_); + app.SetEnableChangefeedDebeziumJsonFormat(opts.EnableChangefeedDebeziumJsonFormat_); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 88d9914fad1..1410c7930a6 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -55,6 +55,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional<bool>, EnablePQConfigTransactionsAtSchemeShard, std::nullopt); OPTION(std::optional<bool>, EnableTopicSplitMerge, std::nullopt); OPTION(std::optional<bool>, EnableChangefeedDynamoDBStreamsFormat, std::nullopt); + OPTION(std::optional<bool>, EnableChangefeedDebeziumJsonFormat, std::nullopt); #undef OPTION }; diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 758c2714271..3392e3f9265 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -524,6 +524,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson: changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON); break; + case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson: + changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON); + break; default: break; } @@ -574,6 +577,9 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, case Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON: out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson); break; + case Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON: + out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson); + break; default: status = Ydb::StatusIds::BAD_REQUEST; error = "Invalid changefeed format"; diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 02cd2a10405..d2e19d4d811 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -141,6 +141,8 @@ message ChangefeedFormat { FORMAT_JSON = 1; // Change record in JSON format for document (DynamoDB-compatible) tables FORMAT_DYNAMODB_STREAMS_JSON = 2; + // Debezium-like change record JSON format for common (row oriented) tables + FORMAT_DEBEZIUM_JSON = 3; } } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 2ef0878da5e..54419be40c0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2416,6 +2416,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { case Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON: format = EChangefeedFormat::DynamoDBStreamsJson; break; + case Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON: + format = EChangefeedFormat::DebeziumJson; + break; default: format = EChangefeedFormat::Unknown; break; @@ -2490,6 +2493,9 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { case EChangefeedFormat::DynamoDBStreamsJson: proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON); break; + case EChangefeedFormat::DebeziumJson: + proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON); + break; case EChangefeedFormat::Unknown: break; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h index 31c538d6a6b..4102f4593b3 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h @@ -44,6 +44,7 @@ enum class EChangefeedMode { enum class EChangefeedFormat { Json /* "JSON" */, DynamoDBStreamsJson /* "DYNAMODB_STREAMS_JSON" */, + DebeziumJson /* "DEBEZIUM_JSON" */, Unknown = std::numeric_limits<int>::max() }; |