diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-20 13:36:54 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-20 14:25:45 +0300 |
commit | bca7336904e9b140c15c00049fecb7e8b55f13e1 (patch) | |
tree | 7753fd0d2000fa9c1bde73ccbe0a0b2761f6df73 | |
parent | 2d0d384d7d5e471f0981cc6ae8b1703e9d03807c (diff) | |
download | ydb-bca7336904e9b140c15c00049fecb7e8b55f13e1.tar.gz |
Additional checks & tests KIKIMR-15401
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 40 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp | 120 |
6 files changed, 126 insertions, 91 deletions
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index b886b3eeffb..6f2ed24d6f7 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -350,11 +350,12 @@ void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, cons } } -void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const { +void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const { Y_ABORT_UNLESS(Kind == EKind::CdcDataChange); Y_ABORT_UNLESS(Schema); const auto body = ParseBody(Body); + keyJson["payload"].SetType(NJson::JSON_MAP); SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values @@ -379,7 +380,7 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T case NKikimrChangeExchange::TDataChange::kUpsert: case NKikimrChangeExchange::TDataChange::kReset: if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { - valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create + valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create } else { valueJson["payload"]["op"] = "u"; // u = update } @@ -392,18 +393,15 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T } } - // payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here - if (virtualTimestamps) { - SerializeVirtualTimestamp(valueJson["payload"]["ts"], {Step, TxId}); - } - // payload.source. Mandatory. valueJson["payload"]["source"] = NJson::TJsonMap({ - {"version", "0.0.1"}, - {"connector", "ydb_debezium_json"}, - {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, + {"version", "1.0.0"}, + {"connector", "ydb"}, + {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, // payload.ts_ms has no sense {"snapshot", Source == ESource::InitialScan}, + {"step", Step}, {"txId", TxId}, + // TODO: db & table }); } diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index b37388ec4f4..abe5cb6d757 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -57,7 +57,7 @@ public: void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const; void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const; void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const; - void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const; + void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index f103e3680d0..358a1517bc8 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -157,7 +157,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: { NJson::TJsonValue keyJson; NJson::TJsonValue valueJson; - record.SerializeToDebeziumJson(keyJson, valueJson, Stream.VirtualTimestamps, Stream.Mode); + record.SerializeToDebeziumJson(keyJson, valueJson, Stream.Mode); TStringStream keyStr; WriteJson(&keyStr, &keyJson, DefaultJsonConfig()); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 7d7a47f509a..42e1ede5d78 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1292,7 +1292,7 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false, bool timestamps = false) { + static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false) { NJsonWriter::TBuf body; auto root = body.BeginObject(); auto payload = root.WriteKey("payload").BeginObject(); @@ -1301,8 +1301,9 @@ Y_UNIT_TEST_SUITE(Cdc) { .WriteKey("op").WriteString(op) .WriteKey("source") .BeginObject() - .WriteKey("connector").WriteString("ydb_debezium_json") - .WriteKey("version").WriteString("***") + .WriteKey("connector").WriteString("ydb") + .WriteKey("version").WriteString("1.0.0") + .WriteKey("step").WriteString("***") .WriteKey("txId").WriteString("***") .WriteKey("ts_ms").WriteString("***") .WriteKey("snapshot").WriteBool(snapshot) @@ -1316,10 +1317,6 @@ Y_UNIT_TEST_SUITE(Cdc) { payload.WriteKey("after").UnsafeWriteValue(after); } - if (timestamps) { - payload.WriteKey("ts").WriteString("***"); - } - payload.EndObject(); root.EndObject(); return body.Str(); @@ -1495,30 +1492,6 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - Y_UNIT_TEST(VirtualTimestampsDebezium) { - TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"( - UPSERT INTO `/Root/Table` (key, value) VALUES - (1, 10), - (2, 20), - (3, 30); - )", R"( - UPSERT INTO `/Root/Table` (key, value) VALUES - (1, 100), - (2, 200), - (3, 300); - )", R"( - DELETE FROM `/Root/Table` WHERE key = 1; - )"}, { - {DebeziumBody("c", nullptr, R"({"key":1,"value":10})", false, true), {{"__key", R"({"payload":{"key":1}})"}}}, - {DebeziumBody("c", nullptr, R"({"key":2,"value":20})", false, true), {{"__key", R"({"payload":{"key":2}})"}}}, - {DebeziumBody("c", nullptr, R"({"key":3,"value":30})", false, true), {{"__key", R"({"payload":{"key":3}})"}}}, - {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})", false, true), {{"__key", R"({"payload":{"key":1}})"}}}, - {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})", false, true), {{"__key", R"({"payload":{"key":2}})"}}}, - {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})", false, true), {{"__key", R"({"payload":{"key":3}})"}}}, - {DebeziumBody("d", R"({"key":1,"value":100})", nullptr, false, true), {{"__key", R"({"payload":{"key":1}})"}}}, - }); - } - TShardedTableOptions DocApiTable() { return TShardedTableOptions() .Columns({ diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 96bf95347cd..83f6fd3c9b8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -198,11 +198,6 @@ public: switch (streamDesc.GetFormat()) { case NKikimrSchemeOp::ECdcStreamFormatProto: case NKikimrSchemeOp::ECdcStreamFormatJson: - if (!streamDesc.GetAwsRegion().empty()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "AWS_REGION option incompatible with specified stream format"); - return result; - } break; case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: if (!AppData()->FeatureFlags.GetEnableChangefeedDynamoDBStreamsFormat()) { @@ -229,6 +224,41 @@ public: return result; } + if (!streamDesc.GetAwsRegion().empty()) { + switch (streamDesc.GetFormat()) { + case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: + break; + default: + result->SetError(NKikimrScheme::StatusInvalidParameter, + "AWS_REGION option incompatible with specified stream format"); + return result; + } + } + + if (streamDesc.GetVirtualTimestamps()) { + switch (streamDesc.GetFormat()) { + case NKikimrSchemeOp::ECdcStreamFormatProto: + case NKikimrSchemeOp::ECdcStreamFormatJson: + break; + default: + result->SetError(NKikimrScheme::StatusInvalidParameter, + "VIRTUAL_TIMESTAMPS incompatible with specified stream format"); + return result; + } + } + + if (streamDesc.GetResolvedTimestampsIntervalMs()) { + switch (streamDesc.GetFormat()) { + case NKikimrSchemeOp::ECdcStreamFormatProto: + case NKikimrSchemeOp::ECdcStreamFormatJson: + break; + default: + result->SetError(NKikimrScheme::StatusInvalidParameter, + "RESOLVED_TIMESTAMPS incompatible with specified stream format"); + return result; + } + } + TString errStr; if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) { result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 97385b36f82..db5bb53eeba 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -72,7 +72,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { Y_UNIT_TEST(VirtualTimestamps) { TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedDynamoDBStreamsFormat(true) + .EnableChangefeedDebeziumJsonFormat(true)); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -83,29 +86,42 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - VirtualTimestamps: true - } - )"); - env.TestWaitNotification(runtime, txId); + for (const char* format : TVector<const char*>{"Proto", "Json"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + VirtualTimestamps: true + } + )", format, format)); + env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { - NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), - NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), - NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), - NLs::StreamVirtualTimestamps(true), - }); + TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), { + NLs::StreamVirtualTimestamps(true), + }); + } + + for (const char* format : TVector<const char*>{"DynamoDBStreamsJson", "DebeziumJson"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + VirtualTimestamps: true + } + )", format, format), {NKikimrScheme::StatusInvalidParameter}); + } } Y_UNIT_TEST(ResolvedTimestamps) { TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedDynamoDBStreamsFormat(true) + .EnableChangefeedDebeziumJsonFormat(true)); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -116,20 +132,34 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - ResolvedTimestampsIntervalMs: 1000 - } - )"); - env.TestWaitNotification(runtime, txId); + for (const char* format : TVector<const char*>{"Proto", "Json"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + ResolvedTimestampsIntervalMs: 1000 + } + )", format, format)); + env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { - NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)), - }); + TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), { + NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)), + }); + } + + for (const char* format : TVector<const char*>{"DynamoDBStreamsJson", "DebeziumJson"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + ResolvedTimestampsIntervalMs: 1000 + } + )", format, format), {NKikimrScheme::StatusInvalidParameter}); + } } Y_UNIT_TEST(RetentionPeriod) { @@ -354,7 +384,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { Y_UNIT_TEST(DocApi) { TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true)); + TTestEnv env(runtime, TTestEnvOptions() + .EnableChangefeedDynamoDBStreamsFormat(true) + .EnableChangefeedDebeziumJsonFormat(true)); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -393,16 +425,18 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", {NKikimrScheme::StatusInvalidParameter}); - // invalid aws region - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "DocumentTable" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - AwsRegion: "foo" - } - )", {NKikimrScheme::StatusInvalidParameter}); + // invalid format + for (const char* format : TVector<const char*>{"Proto", "Json", "DebeziumJson"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "DocumentTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + AwsRegion: "foo" + } + )", format), {NKikimrScheme::StatusInvalidParameter}); + } // ok TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( |