aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkrock21 <krock21@yandex-team.com>2023-10-05 13:23:19 +0300
committerkrock21 <krock21@yandex-team.com>2023-10-05 13:49:18 +0300
commita3c5a0543995dc268f536bcfe2c36b1501fcf9b6 (patch)
tree919a6e3322200ba5d9b65d6d4a43873fb0d94598
parent010b93742060ca2c2362568a97c2d080946e06f5 (diff)
downloadydb-a3c5a0543995dc268f536bcfe2c36b1501fcf9b6.tar.gz
Add DEBEZIUM_JSON CDC format
KIKIMR-15401 RFC: https://a.yandex-team.ru/arcadia/kikimr/docs/ru/overlay/rfc/debezium_support.md
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp2
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp19
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/testlib/basics/feature_flags.h2
-rw-r--r--ydb/core/tx/datashard/change_record.cpp60
-rw-r--r--ydb/core/tx/datashard/change_record.h1
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp38
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp274
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp12
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.h1
-rw-r--r--ydb/core/ydb_convert/table_description.cpp6
-rw-r--r--ydb/public/api/protos/ydb_table.proto2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table_enum.h1
17 files changed, 410 insertions, 18 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 2930eb8b1ca..00e731f862f 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1393,6 +1393,8 @@ public:
add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
} else if (to_lower(format) == "dynamodb_streams_json") {
add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
+ } else if (to_lower(format) == "debezium_json") {
+ add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown changefeed format: " << format));
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index a346fb75726..6c4115f3267 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -3173,7 +3173,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
void AddChangefeed(EChangefeedMode mode, EChangefeedFormat format) {
TKikimrRunner kikimr(TKikimrSettings()
.SetPQConfig(DefaultPQConfig())
- .SetEnableChangefeedDynamoDBStreamsFormat(true));
+ .SetEnableChangefeedDynamoDBStreamsFormat(true)
+ .SetEnableChangefeedDebeziumJsonFormat(true));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3238,6 +3239,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
case EChangefeedFormat::Unknown:
continue;
case EChangefeedFormat::DynamoDBStreamsJson:
+ case EChangefeedFormat::DebeziumJson:
if (mode == EChangefeedMode::Updates) {
continue;
}
@@ -3288,7 +3290,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Y_UNIT_TEST(AddChangefeedNegative) {
TKikimrRunner kikimr(TKikimrSettings()
.SetPQConfig(DefaultPQConfig())
- .SetEnableChangefeedDynamoDBStreamsFormat(true));
+ .SetEnableChangefeedDynamoDBStreamsFormat(true)
+ .SetEnableChangefeedDebeziumJsonFormat(true));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3376,6 +3379,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
const auto result = session.ExecuteSchemeQuery(query, TExecSchemeQuerySettings().RequestType("_document_api_request")).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
}
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table` ADD CHANGEFEED `feed` WITH (
+ MODE = 'UPDATES', FORMAT = 'DEBEZIUM_JSON'
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
}
Y_UNIT_TEST(ChangefeedAwsRegion) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ad2d137bfb3..318c31838be 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -842,6 +842,7 @@ message TFeatureFlags {
optional bool EnableTempTables = 102 [default = false];
optional bool SuppressCompatibilityCheck = 103 [default = false];
optional bool EnableUniqConstraint = 104 [default = false];
+ optional bool EnableChangefeedDebeziumJsonFormat = 105 [default = false];
}
message THttpProxyConfig {
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 631840c53a1..42d93ab4c1a 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -739,6 +739,7 @@ enum ECdcStreamFormat {
ECdcStreamFormatProto = 1;
ECdcStreamFormatJson = 2;
ECdcStreamFormatDynamoDBStreamsJson = 3;
+ ECdcStreamFormatDebeziumJson = 4;
}
message TCdcStreamDescription {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 675ba7d3f16..173e51e6ea3 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -49,8 +49,10 @@ public:
FEATURE_FLAG_SETTER(EnableTopicSplitMerge)
FEATURE_FLAG_SETTER(EnableTempTables)
FEATURE_FLAG_SETTER(EnableChangefeedDynamoDBStreamsFormat)
+ FEATURE_FLAG_SETTER(EnableChangefeedDebeziumJsonFormat)
FEATURE_FLAG_SETTER(ForceColumnTablesCompositeMarks)
FEATURE_FLAG_SETTER(EnableUniqConstraint)
+ FEATURE_FLAG_SETTER(EnableTopicMessageMeta)
#undef FEATURE_FLAG_SETTER
};
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index c053694b59b..db9eb6df7b6 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -155,6 +155,14 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu
}
}
+static void MergeJsonMaps(NJson::TJsonValue& mergeTo, NJson::TJsonValue& mergeFrom) {
+ Y_VERIFY(mergeTo.GetType() == NJson::EJsonValueType::JSON_MAP);
+ Y_VERIFY(mergeFrom.GetType() == NJson::EJsonValueType::JSON_MAP);
+ for (const auto& entry : mergeFrom.GetMap()) {
+ mergeTo.InsertValue(entry.first, entry.second);
+ }
+}
+
static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) {
for (auto v : vt) {
value.AppendValue(v);
@@ -342,6 +350,58 @@ void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, cons
}
}
+void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const {
+ Y_VERIFY(Kind == EKind::CdcDataChange);
+ Y_VERIFY(Schema);
+
+ const auto body = ParseBody(Body);
+ keyJson["payload"].SetType(NJson::JSON_MAP);
+ SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values
+
+ valueJson["payload"].SetType(NJson::JSON_MAP);
+ // payload.before. Optional
+ if (body.HasOldImage()) {
+ SerializeJsonValue(Schema, valueJson["payload"]["before"], body.GetOldImage());
+ MergeJsonMaps(valueJson["payload"]["before"], keyJson["payload"]); // Debezium expects key included in value
+ }
+
+ // payload.after. Optional
+ if (body.HasNewImage()) {
+ SerializeJsonValue(Schema, valueJson["payload"]["after"], body.GetNewImage());
+ MergeJsonMaps(valueJson["payload"]["after"], keyJson["payload"]); // Debezium expects key included in value
+ }
+
+ // payload.op. Mandatory
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert:
+ case NKikimrChangeExchange::TDataChange::kReset:
+ if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
+ valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create
+ } else {
+ valueJson["payload"]["op"] = "u"; // u = update
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kErase:
+ valueJson["payload"]["op"] = "d"; // d = delete
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+
+ // payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here
+ if (virtualTimestamps) {
+ SerializeVirtualTimestamp(valueJson["payload"]["ts"], {Step, TxId});
+ }
+
+ // payload.source. Mandatory.
+ valueJson["payload"]["source"] = NJson::TJsonMap({
+ {"version", "0.0.1"},
+ {"connector", "ydb_debezium_json"},
+ {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()},
+ {"txId", TxId},
+ });
+}
+
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
if (Key) {
return *Key;
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index c1e7920df3f..03107df8ba5 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -51,6 +51,7 @@ public:
void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const;
+ void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 2cbb316f66e..75056856056 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -78,6 +78,13 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
}
+ static NJson::TJsonWriterConfig DefaultJsonConfig() {
+ NJson::TJsonWriterConfig jsonConfig;
+ jsonConfig.ValidateUtf8 = false;
+ jsonConfig.WriteNanAsString = true;
+ return jsonConfig;
+ }
+
void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
NKikimrClient::TPersQueueRequest request;
@@ -121,10 +128,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
TStringStream str;
- NJson::TJsonWriterConfig jsonConfig;
- jsonConfig.ValidateUtf8 = false;
- jsonConfig.WriteNanAsString = true;
- WriteJson(&str, &json, jsonConfig);
+ WriteJson(&str, &json, DefaultJsonConfig());
data.SetData(str.Str());
if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) {
@@ -142,6 +146,29 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
break;
}
+ case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: {
+ NJson::TJsonValue keyJson;
+ NJson::TJsonValue valueJson;
+ record.SerializeToDebeziumJson(keyJson, valueJson, Stream.VirtualTimestamps, Stream.Mode);
+
+ TStringStream keyStr;
+ WriteJson(&keyStr, &keyJson, DefaultJsonConfig());
+
+ TStringStream valueStr;
+ WriteJson(&valueStr, &valueJson, DefaultJsonConfig());
+
+ // Add key in the same way as Kafka integration does
+ auto messageMeta = data.AddMessageMeta();
+ messageMeta->set_key("__key"); // Kafka integration stores kafka key in "__key" metadata
+ messageMeta->set_value(keyStr.Str());
+
+ // Add value
+ data.SetData(valueStr.Str());
+ cmd.SetData(data.SerializeAsString());
+ cmd.SetPartitionKey(record.GetPartitionKey());
+ break;
+ }
+
default: {
LOG_E("Unknown format"
<< ": format# " << static_cast<int>(Stream.Format));
@@ -710,7 +737,8 @@ class TCdcChangeSenderMain
}
case NKikimrSchemeOp::ECdcStreamFormatJson:
- case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: {
+ case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson:
+ case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: {
using namespace NKikimr::NDataStreams::V1;
const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */);
return ShardFromDecimal(hashKey, KeyDesc->Partitions.size());
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index 7f5e3a636fb..f17f8bd6efa 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -115,6 +115,7 @@ static bool IsJsonCdcStream(TUserTable::TCdcStream::EFormat format) {
switch (format) {
case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson:
case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDynamoDBStreamsJson:
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDebeziumJson:
return true;
default:
return false;
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 2082c049a26..0c40a8a8200 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -718,7 +718,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
.SetUseRealThreads(useRealThreads)
.SetDomainName(root)
.SetGrpcPort(PortManager.GetPort(2135))
- .SetEnableChangefeedDynamoDBStreamsFormat(true);
+ .SetEnableChangefeedDynamoDBStreamsFormat(true)
+ .SetEnableChangefeedDebeziumJsonFormat(true)
+ .SetEnableTopicMessageMeta(true);
Server = new TServer(settings);
if (useRealThreads) {
@@ -821,6 +823,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
};
}
+ TCdcStream OldImage(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
+ return TCdcStream{
+ .Name = name,
+ .Mode = NKikimrSchemeOp::ECdcStreamModeOldImage,
+ .Format = format,
+ };
+ }
+
+ TCdcStream NewImage(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
+ return TCdcStream{
+ .Name = name,
+ .Mode = NKikimrSchemeOp::ECdcStreamModeNewImage,
+ .Format = format,
+ };
+ }
+
TCdcStream WithVirtualTimestamps(TCdcStream streamDesc) {
streamDesc.VirtualTimestamps = true;
return streamDesc;
@@ -852,11 +870,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
return MD5::Calc(root.at("key").GetStringRobust());
}
- static bool AreJsonsEqual(const TString& actual, const TString& expected) {
+ static bool AreJsonsEqual(const TString& actual, const TString& expected, bool assertOnParseError = true) {
+ bool parseResult;
NJson::TJsonValue actualJson;
- UNIT_ASSERT(NJson::ReadJsonTree(actual, &actualJson));
+ parseResult = NJson::ReadJsonTree(actual, &actualJson);
+ if (assertOnParseError) {
+ UNIT_ASSERT(parseResult);
+ } else if (!parseResult) {
+ return false;
+ }
NJson::TJsonValue expectedJson;
- UNIT_ASSERT(NJson::ReadJsonTree(expected, &expectedJson));
+ parseResult = NJson::ReadJsonTree(expected, &expectedJson);
+ if (assertOnParseError) {
+ UNIT_ASSERT(parseResult);
+ } else if (!parseResult) {
+ return false;
+ }
class TScanner: public NJson::IScanCallback {
NJson::TJsonValue& Actual;
@@ -868,16 +897,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
{}
bool Do(const TString& path, NJson::TJsonValue*, NJson::TJsonValue& expectedValue) override {
+ // Skip if not "***"
if (expectedValue.GetStringRobust() != "***") {
return true;
}
+ // Discrepancy in path format here. GetValueByPath expects ".array.[0]" while Scanner provides with ".array[0]". Don't use "***" inside a non-root array
+ UNIT_ASSERT_C(!path.Contains("["), TStringBuilder() << "Please don't use \"***\" inside an array. Seems like " << path << " has array on the way");
NJson::TJsonValue actualValue;
+ // If "***", find a corresponding actual value
if (!Actual.GetValueByPath(path, actualValue)) {
+ // Couldn't find an actual value for "***"
Success = false;
return false;
}
+ // Replace "***" with actual value
expectedValue = actualValue;
return true;
}
@@ -890,10 +925,60 @@ Y_UNIT_TEST_SUITE(Cdc) {
TScanner scanner(actualJson);
expectedJson.Scan(scanner);
- UNIT_ASSERT(scanner.IsSuccess());
+ if (!scanner.IsSuccess()) {
+ return false; // actualJson is missing a path to ***
+ }
return actualJson == expectedJson;
}
+ // Unit test to verify that Json comparison with wildcard works
+ Y_UNIT_TEST(AreJsonsEqualReturnsTrueOnEqual) {
+ UNIT_ASSERT(AreJsonsEqual("{}", "{}"));
+ UNIT_ASSERT(AreJsonsEqual("[]", "[]"));
+ UNIT_ASSERT(AreJsonsEqual("1", "1"));
+ UNIT_ASSERT(AreJsonsEqual("null", "null"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"d","e":[1,2,"3"]})"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":{},"key":[1]})"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1,2]})", R"({"update":{},"key":[1,2]})"));
+ // Root wildcard
+ UNIT_ASSERT(AreJsonsEqual("{}", R"("***")"));
+ UNIT_ASSERT(AreJsonsEqual("1", R"("***")"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"a": "b"})", R"("***")"));
+ UNIT_ASSERT(AreJsonsEqual("[1,2,3]", R"("***")"));
+ // Deep wildcard
+ UNIT_ASSERT(AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"***","e":"***"})"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":{},"key":"***"})"));
+ UNIT_ASSERT(AreJsonsEqual(R"({"update":{},"ts":[1,2]})", R"({"update":{},"ts":"***"})"));
+ };
+
+ Y_UNIT_TEST(AreJsonsEqualReturnsFalseOnDifferent) {
+ // Simple cases
+ UNIT_ASSERT(!AreJsonsEqual("{}", "[]"));
+ UNIT_ASSERT(!AreJsonsEqual("[]", "{}"));
+ UNIT_ASSERT(!AreJsonsEqual("1", "2"));
+ UNIT_ASSERT(!AreJsonsEqual("null", "[]"));
+ UNIT_ASSERT(!AreJsonsEqual("null", "{}"));
+ UNIT_ASSERT(!AreJsonsEqual("[]", "null"));
+ UNIT_ASSERT(!AreJsonsEqual("{}", "null"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"a":"b","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"d","e":[9,2,"3"]})"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"key":[1]})", R"({"update":[],"key":[1]})"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"ts":[1,2]})", R"({"update":{},"key":[9,2]})"));
+ // Wildcart in actual value shouldn't be treated as a wildcard
+ UNIT_ASSERT(!AreJsonsEqual(R"("***")", "{}"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"a":"***"})", R"({"a":"b"})"));
+ // Deep wildcard
+ UNIT_ASSERT(!AreJsonsEqual(R"({"a":"z","c":"d","e":[1,2,"3"]})", R"({"a":"b","c":"***","e":"***"})"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"update":{"a":"b"},"key":[1]})", R"({"update":{},"key":"***"})"));
+ UNIT_ASSERT(!AreJsonsEqual(R"({"update":{},"key":{},"ts":[1,2]})", R"({"update":{},"ts":"***"})"));
+ };
+
+ Y_UNIT_TEST(AreJsonsEqualFailsOnWildcardInArray) {
+ // Wildcard in a not-root array is not supported because of a bug in code
+ UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"({"a":[1,{"a":"b"}]})", R"({"a":[1,{"a":"***"}]})"));
+ UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"({"a":[1]})", R"({"a":["***"]})"));
+ UNIT_ASSERT_TEST_FAILS(AreJsonsEqual(R"([1])", R"(["***"])"));
+ }
+
struct PqRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
@@ -930,7 +1015,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
pStream = data->GetPartitionStream();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- UNIT_ASSERT(AreJsonsEqual(item.GetData(), record));
+ UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record);
if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record));
}
@@ -1034,7 +1119,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
for (ui32 i = 0; i < records.size(); ++i) {
const auto& actual = res.GetResult().records().at(i);
const auto& expected = records.at(i);
- UNIT_ASSERT(AreJsonsEqual(actual.data(), expected));
+ UNIT_ASSERT_C(AreJsonsEqual(actual.data(), expected), TStringBuilder() << "Jsons are different: " << actual.data() << " != " << expected);
if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected));
}
@@ -1063,9 +1148,30 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
+ static TString MetadataToString(TVector<std::pair<TString, TString>> messageMetadata) {
+ std::stable_sort(messageMetadata.begin(), messageMetadata.end(), [](const auto& a, const auto& b){return a.first < b.first;});
+ TStringBuilder str;
+ str << "{";
+ for (const auto& entry : messageMetadata) {
+ str << entry.first << ": " << entry.second << ",";
+ }
+ str << "}";
+ return str;
+ }
+
+ static void AssertMessageMetadataContains(
+ const TVector<std::pair<TString, TString>>& actual,
+ const TVector<std::pair<TString, TString>>& expected,
+ std::function<bool(const TString&, const TString&)> areValuesEqual = [](const TString& a, const TString& b) {return AreJsonsEqual(a, b, false);}) {
+ for(const auto& item : expected) {
+ const auto& match = std::find_if(actual.begin(), actual.end(), [&item, &areValuesEqual](const auto& a){return a.first == item.first && areValuesEqual(a.second, item.second);});
+ UNIT_ASSERT_C(match != actual.end(), TStringBuilder() << "Message metadata "<< item.first << ": " << item.second << " was expected, but not found. Actual: " << MetadataToString(actual) << ". Expected: " << MetadataToString(expected));
+ }
+ }
+
struct TopicRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
+ const TVector<TString>& queries, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& records, bool checkKey = true)
{
TTestTopicEnv env(tableDesc, streamDesc);
@@ -1098,7 +1204,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
pStream = data->GetPartitionSession();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- UNIT_ASSERT(AreJsonsEqual(item.GetData(), record));
+ UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first);
+ AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second);
Y_UNUSED(checkKey);
}
} else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) {
@@ -1123,6 +1230,16 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}
+ static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
+ const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
+ {
+ TVector<std::pair<TString, TVector<std::pair<TString, TString>>>> recordsWithMetadata = {};
+ for (const auto& record : records) {
+ recordsWithMetadata.emplace_back(record, TVector<std::pair<TString, TString>>());
+ }
+ Read(tableDesc, streamDesc, queries, recordsWithMetadata, checkKey);
+ }
+
static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
TTestPqEnv env(tableDesc, streamDesc);
@@ -1146,6 +1263,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
+ /**
+ Usage: MessageWithMetadata("body", "metadata_key", "metadata_value")
+ */
+ static std::pair<TString, TVector<std::pair<TString,TString>>> MessageWithOneMetadataItem(const TString& body, const TString& meta_key, const TString& meta_value) {
+ TVector<std::pair<TString,TString>> metas;
+ return std::make_pair(body, TVector<std::pair<TString,TString>>{std::make_pair(meta_key, meta_value)});
+ }
+
#define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \
template<typename TRunner> void N(NUnitTest::TTestContext&); \
struct TTestRegistration##N { \
@@ -1175,6 +1300,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(KeysOnlyLogDebezium) {
+ TopicRunner::Read(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})")
+ });
+ }
+
Y_UNIT_TEST_TRIPLET(UpdatesLog, PqRunner, YdsRunner, TopicRunner) {
TRunner::Read(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
@@ -1215,6 +1356,78 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(NewAndOldImagesLogDebezium) { // Message-level meta is supported through topic api only at the time of writing
+ TopicRunner::Read(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ }, false);
+ }
+
+ Y_UNIT_TEST(OldImageLogDebezium) {
+ TopicRunner::Read(SimpleTable(), OldImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ }, false);
+ }
+
+ Y_UNIT_TEST(NewImageLogDebezium) {
+ TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
+ }, false);
+ }
+
Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) {
TRunner::Read(SimpleTable(), WithVirtualTimestamps(KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
@@ -1228,6 +1441,30 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(VirtualTimestampsNewAndOldImagesLogDebezium) {
+ TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ }, false);
+ }
+
TShardedTableOptions DocApiTable() {
return TShardedTableOptions()
.Columns({
@@ -1398,6 +1635,22 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(DebeziumHugeKey) {
+ const auto key = TString(512_KB, 'A');
+ const auto table = TShardedTableOptions()
+ .Columns({
+ {"key", "Utf8", true, false},
+ {"value", "Uint32", false, false},
+ });
+
+ TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ ("%s", 1);
+ )", key.c_str())}, {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())),
+ }, false);
+ }
+
Y_UNIT_TEST_TRIPLET(Write, PqRunner, YdsRunner, TopicRunner) {
TRunner::Write(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson));
}
@@ -1587,8 +1840,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
if (records.size() == expected.size()) {
for (ui32 i = 0; i < expected.size(); ++i) {
- UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder()
- << records.at(i).second << " != " << expected.at(i));
+ UNIT_ASSERT_C(AreJsonsEqual(records.at(i).second, expected.at(i)), TStringBuilder() << "Jsons are different, " << records.at(i).second << " != " << expected.at(i));
}
break;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 26fbd747629..55b6af0e5e9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -184,6 +184,11 @@ public:
"DYNAMODB_STREAMS_JSON format incompatible with specified stream mode");
return result;
}
+ if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDebeziumJson) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "DEBEZIUM_JSON format incompatible with specified stream mode");
+ return result;
+ }
break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
@@ -212,6 +217,13 @@ public:
return result;
}
break;
+ case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson:
+ if (!AppData()->FeatureFlags.GetEnableChangefeedDebeziumJsonFormat()) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed,
+ "DEBEZIUM_JSON format is not supported yet");
+ return result;
+ }
+ break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
<< "Invalid stream format: " << static_cast<ui32>(streamDesc.GetFormat()));
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index b6bd1124041..c655b0931f4 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -537,6 +537,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnablePQConfigTransactionsAtSchemeShard(opts.EnablePQConfigTransactionsAtSchemeShard_);
app.SetEnableTopicSplitMerge(opts.EnableTopicSplitMerge_);
app.SetEnableChangefeedDynamoDBStreamsFormat(opts.EnableChangefeedDynamoDBStreamsFormat_);
+ app.SetEnableChangefeedDebeziumJsonFormat(opts.EnableChangefeedDebeziumJsonFormat_);
app.ColumnShardConfig.SetDisabledOnSchemeShard(false);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
index 88d9914fad1..1410c7930a6 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
@@ -55,6 +55,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnablePQConfigTransactionsAtSchemeShard, std::nullopt);
OPTION(std::optional<bool>, EnableTopicSplitMerge, std::nullopt);
OPTION(std::optional<bool>, EnableChangefeedDynamoDBStreamsFormat, std::nullopt);
+ OPTION(std::optional<bool>, EnableChangefeedDebeziumJsonFormat, std::nullopt);
#undef OPTION
};
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 758c2714271..3392e3f9265 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -524,6 +524,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
+ case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
+ changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
+ break;
default:
break;
}
@@ -574,6 +577,9 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
case Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON:
out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson);
break;
+ case Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON:
+ out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson);
+ break;
default:
status = Ydb::StatusIds::BAD_REQUEST;
error = "Invalid changefeed format";
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 02cd2a10405..d2e19d4d811 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -141,6 +141,8 @@ message ChangefeedFormat {
FORMAT_JSON = 1;
// Change record in JSON format for document (DynamoDB-compatible) tables
FORMAT_DYNAMODB_STREAMS_JSON = 2;
+ // Debezium-like change record JSON format for common (row oriented) tables
+ FORMAT_DEBEZIUM_JSON = 3;
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 2ef0878da5e..54419be40c0 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -2416,6 +2416,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
case Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON:
format = EChangefeedFormat::DynamoDBStreamsJson;
break;
+ case Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON:
+ format = EChangefeedFormat::DebeziumJson;
+ break;
default:
format = EChangefeedFormat::Unknown;
break;
@@ -2490,6 +2493,9 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
case EChangefeedFormat::DynamoDBStreamsJson:
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
+ case EChangefeedFormat::DebeziumJson:
+ proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
+ break;
case EChangefeedFormat::Unknown:
break;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
index 31c538d6a6b..4102f4593b3 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
@@ -44,6 +44,7 @@ enum class EChangefeedMode {
enum class EChangefeedFormat {
Json /* "JSON" */,
DynamoDBStreamsJson /* "DYNAMODB_STREAMS_JSON" */,
+ DebeziumJson /* "DEBEZIUM_JSON" */,
Unknown = std::numeric_limits<int>::max()
};