aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-20 13:36:54 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-20 14:25:45 +0300
commitbca7336904e9b140c15c00049fecb7e8b55f13e1 (patch)
tree7753fd0d2000fa9c1bde73ccbe0a0b2761f6df73
parent2d0d384d7d5e471f0981cc6ae8b1703e9d03807c (diff)
downloadydb-bca7336904e9b140c15c00049fecb7e8b55f13e1.tar.gz
Additional checks & tests KIKIMR-15401
-rw-r--r--ydb/core/tx/datashard/change_record.cpp18
-rw-r--r--ydb/core/tx/datashard/change_record.h2
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp35
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp40
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp120
6 files changed, 126 insertions, 91 deletions
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index b886b3eeffb..6f2ed24d6f7 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -350,11 +350,12 @@ void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, cons
}
}
-void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const {
+void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const {
Y_ABORT_UNLESS(Kind == EKind::CdcDataChange);
Y_ABORT_UNLESS(Schema);
const auto body = ParseBody(Body);
+
keyJson["payload"].SetType(NJson::JSON_MAP);
SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values
@@ -379,7 +380,7 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T
case NKikimrChangeExchange::TDataChange::kUpsert:
case NKikimrChangeExchange::TDataChange::kReset:
if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
- valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create
+ valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create
} else {
valueJson["payload"]["op"] = "u"; // u = update
}
@@ -392,18 +393,15 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T
}
}
- // payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here
- if (virtualTimestamps) {
- SerializeVirtualTimestamp(valueJson["payload"]["ts"], {Step, TxId});
- }
-
// payload.source. Mandatory.
valueJson["payload"]["source"] = NJson::TJsonMap({
- {"version", "0.0.1"},
- {"connector", "ydb_debezium_json"},
- {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()},
+ {"version", "1.0.0"},
+ {"connector", "ydb"},
+ {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, // payload.ts_ms has no sense
{"snapshot", Source == ESource::InitialScan},
+ {"step", Step},
{"txId", TxId},
+ // TODO: db & table
});
}
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index b37388ec4f4..abe5cb6d757 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -57,7 +57,7 @@ public:
void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const;
- void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, bool virtualTimestamps, TUserTable::TCdcStream::EMode streamMode) const;
+ void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index f103e3680d0..358a1517bc8 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -157,7 +157,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: {
NJson::TJsonValue keyJson;
NJson::TJsonValue valueJson;
- record.SerializeToDebeziumJson(keyJson, valueJson, Stream.VirtualTimestamps, Stream.Mode);
+ record.SerializeToDebeziumJson(keyJson, valueJson, Stream.Mode);
TStringStream keyStr;
WriteJson(&keyStr, &keyJson, DefaultJsonConfig());
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 7d7a47f509a..42e1ede5d78 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1292,7 +1292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
- static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false, bool timestamps = false) {
+ static TString DebeziumBody(const char* op, const char* before, const char* after, bool snapshot = false) {
NJsonWriter::TBuf body;
auto root = body.BeginObject();
auto payload = root.WriteKey("payload").BeginObject();
@@ -1301,8 +1301,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
.WriteKey("op").WriteString(op)
.WriteKey("source")
.BeginObject()
- .WriteKey("connector").WriteString("ydb_debezium_json")
- .WriteKey("version").WriteString("***")
+ .WriteKey("connector").WriteString("ydb")
+ .WriteKey("version").WriteString("1.0.0")
+ .WriteKey("step").WriteString("***")
.WriteKey("txId").WriteString("***")
.WriteKey("ts_ms").WriteString("***")
.WriteKey("snapshot").WriteBool(snapshot)
@@ -1316,10 +1317,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
payload.WriteKey("after").UnsafeWriteValue(after);
}
- if (timestamps) {
- payload.WriteKey("ts").WriteString("***");
- }
-
payload.EndObject();
root.EndObject();
return body.Str();
@@ -1495,30 +1492,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- Y_UNIT_TEST(VirtualTimestampsDebezium) {
- TopicRunner::Read(SimpleTable(), WithVirtualTimestamps(NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)), {R"(
- UPSERT INTO `/Root/Table` (key, value) VALUES
- (1, 10),
- (2, 20),
- (3, 30);
- )", R"(
- UPSERT INTO `/Root/Table` (key, value) VALUES
- (1, 100),
- (2, 200),
- (3, 300);
- )", R"(
- DELETE FROM `/Root/Table` WHERE key = 1;
- )"}, {
- {DebeziumBody("c", nullptr, R"({"key":1,"value":10})", false, true), {{"__key", R"({"payload":{"key":1}})"}}},
- {DebeziumBody("c", nullptr, R"({"key":2,"value":20})", false, true), {{"__key", R"({"payload":{"key":2}})"}}},
- {DebeziumBody("c", nullptr, R"({"key":3,"value":30})", false, true), {{"__key", R"({"payload":{"key":3}})"}}},
- {DebeziumBody("u", R"({"key":1,"value":10})", R"({"key":1,"value":100})", false, true), {{"__key", R"({"payload":{"key":1}})"}}},
- {DebeziumBody("u", R"({"key":2,"value":20})", R"({"key":2,"value":200})", false, true), {{"__key", R"({"payload":{"key":2}})"}}},
- {DebeziumBody("u", R"({"key":3,"value":30})", R"({"key":3,"value":300})", false, true), {{"__key", R"({"payload":{"key":3}})"}}},
- {DebeziumBody("d", R"({"key":1,"value":100})", nullptr, false, true), {{"__key", R"({"payload":{"key":1}})"}}},
- });
- }
-
TShardedTableOptions DocApiTable() {
return TShardedTableOptions()
.Columns({
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 96bf95347cd..83f6fd3c9b8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -198,11 +198,6 @@ public:
switch (streamDesc.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormatProto:
case NKikimrSchemeOp::ECdcStreamFormatJson:
- if (!streamDesc.GetAwsRegion().empty()) {
- result->SetError(NKikimrScheme::StatusInvalidParameter,
- "AWS_REGION option incompatible with specified stream format");
- return result;
- }
break;
case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson:
if (!AppData()->FeatureFlags.GetEnableChangefeedDynamoDBStreamsFormat()) {
@@ -229,6 +224,41 @@ public:
return result;
}
+ if (!streamDesc.GetAwsRegion().empty()) {
+ switch (streamDesc.GetFormat()) {
+ case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson:
+ break;
+ default:
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "AWS_REGION option incompatible with specified stream format");
+ return result;
+ }
+ }
+
+ if (streamDesc.GetVirtualTimestamps()) {
+ switch (streamDesc.GetFormat()) {
+ case NKikimrSchemeOp::ECdcStreamFormatProto:
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ break;
+ default:
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "VIRTUAL_TIMESTAMPS incompatible with specified stream format");
+ return result;
+ }
+ }
+
+ if (streamDesc.GetResolvedTimestampsIntervalMs()) {
+ switch (streamDesc.GetFormat()) {
+ case NKikimrSchemeOp::ECdcStreamFormatProto:
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ break;
+ default:
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "RESOLVED_TIMESTAMPS incompatible with specified stream format");
+ return result;
+ }
+ }
+
TString errStr;
if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) {
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index 97385b36f82..db5bb53eeba 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -72,7 +72,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
Y_UNIT_TEST(VirtualTimestamps) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedDynamoDBStreamsFormat(true)
+ .EnableChangefeedDebeziumJsonFormat(true));
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
@@ -83,29 +86,42 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- VirtualTimestamps: true
- }
- )");
- env.TestWaitNotification(runtime, txId);
+ for (const char* format : TVector<const char*>{"Proto", "Json"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ VirtualTimestamps: true
+ }
+ )", format, format));
+ env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
- NLs::PathExist,
- NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
- NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
- NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
- NLs::StreamVirtualTimestamps(true),
- });
+ TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), {
+ NLs::StreamVirtualTimestamps(true),
+ });
+ }
+
+ for (const char* format : TVector<const char*>{"DynamoDBStreamsJson", "DebeziumJson"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ VirtualTimestamps: true
+ }
+ )", format, format), {NKikimrScheme::StatusInvalidParameter});
+ }
}
Y_UNIT_TEST(ResolvedTimestamps) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedDynamoDBStreamsFormat(true)
+ .EnableChangefeedDebeziumJsonFormat(true));
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
@@ -116,20 +132,34 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- ResolvedTimestampsIntervalMs: 1000
- }
- )");
- env.TestWaitNotification(runtime, txId);
+ for (const char* format : TVector<const char*>{"Proto", "Json"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ ResolvedTimestampsIntervalMs: 1000
+ }
+ )", format, format));
+ env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
- NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)),
- });
+ TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), {
+ NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)),
+ });
+ }
+
+ for (const char* format : TVector<const char*>{"DynamoDBStreamsJson", "DebeziumJson"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ ResolvedTimestampsIntervalMs: 1000
+ }
+ )", format, format), {NKikimrScheme::StatusInvalidParameter});
+ }
}
Y_UNIT_TEST(RetentionPeriod) {
@@ -354,7 +384,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
Y_UNIT_TEST(DocApi) {
TTestBasicRuntime runtime;
- TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true));
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableChangefeedDynamoDBStreamsFormat(true)
+ .EnableChangefeedDebeziumJsonFormat(true));
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
@@ -393,16 +425,18 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", {NKikimrScheme::StatusInvalidParameter});
- // invalid aws region
- TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "DocumentTable"
- StreamDescription {
- Name: "Stream"
- Mode: ECdcStreamModeKeysOnly
- Format: ECdcStreamFormatProto
- AwsRegion: "foo"
- }
- )", {NKikimrScheme::StatusInvalidParameter});
+ // invalid format
+ for (const char* format : TVector<const char*>{"Proto", "Json", "DebeziumJson"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "DocumentTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ AwsRegion: "foo"
+ }
+ )", format), {NKikimrScheme::StatusInvalidParameter});
+ }
// ok
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(