diff options
author | krock21 <krock21@yandex-team.com> | 2023-10-13 17:21:56 +0300 |
---|---|---|
committer | krock21 <krock21@yandex-team.com> | 2023-10-13 17:58:45 +0300 |
commit | 9ad3ba816f192c0a5609a6570145b014c840f540 (patch) | |
tree | d4796cb23e708e2141e5fcfdbffa147bba347307 | |
parent | 20d386885c1cda01f4f95f84d6118615c592c0f3 (diff) | |
download | ydb-9ad3ba816f192c0a5609a6570145b014c840f540.tar.gz |
KIKIMR-19583 Add source field to change record. Add read(op = r) operation and source.snapshot values to Debezium CDC format
9 files changed, 221 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 087f6cab84..a58469e4cc 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -291,6 +291,7 @@ public: .WithTableId(tablePathId) .WithSchemaVersion(table->GetTableSchemaVersion()) .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) .Build(); ChangeRecords.push_back(IDataShardChangeCollector::TChange{ diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 31651a6608..73a1849799 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -372,20 +372,24 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T } // payload.op. Mandatory - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: - case NKikimrChangeExchange::TDataChange::kReset: - if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { - valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create - } else { - valueJson["payload"]["op"] = "u"; // u = update - } - break; - case NKikimrChangeExchange::TDataChange::kErase: - valueJson["payload"]["op"] = "d"; // d = delete - break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + if (Source == ESource::InitialScan) { + valueJson["payload"]["op"] = "r"; // r = read + } else { + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: + if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { + valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create + } else { + valueJson["payload"]["op"] = "u"; // u = update + } + break; + case NKikimrChangeExchange::TDataChange::kErase: + valueJson["payload"]["op"] = "d"; // d = delete + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } } // payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here @@ -398,6 +402,7 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T {"version", "0.0.1"}, {"connector", "ydb_debezium_json"}, {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, + {"snapshot", Source == ESource::InitialScan}, {"txId", TxId}, }); } @@ -490,6 +495,7 @@ void TChangeRecord::Out(IOutputStream& out) const { << " TxId: " << TxId << " PathId: " << PathId << " Kind: " << Kind + << " Source: " << Source << " Body: " << Body.size() << "b" << " TableId: " << TableId << " SchemaVersion: " << SchemaVersion @@ -562,6 +568,11 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithBody(TString&& body) { return *this; } +TChangeRecordBuilder& TChangeRecordBuilder::WithSource(ESource source) { + Record.Source = source; + return *this; +} + TChangeRecord&& TChangeRecordBuilder::Build() { return std::move(Record); } diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 03107df8ba..b37388ec4f 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -22,6 +22,11 @@ class TChangeRecord { friend class TChangeRecordBuilder; public: + enum class ESource: ui8 { + Unspecified = 0, + InitialScan = 1, + }; + enum class EKind: ui8 { AsyncIndex, CdcDataChange, @@ -44,6 +49,7 @@ public: const TPathId& GetPathId() const { return PathId; } EKind GetKind() const { return Kind; } const TString& GetBody() const { return Body; } + ESource GetSource() const { return Source; } const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } @@ -72,6 +78,7 @@ private: TPathId PathId; EKind Kind; TString Body; + ESource Source = ESource::Unspecified; ui64 SchemaVersion; TPathId TableId; @@ -84,6 +91,7 @@ private: class TChangeRecordBuilder { using EKind = TChangeRecord::EKind; + using ESource = TChangeRecord::ESource; public: explicit TChangeRecordBuilder(EKind kind); @@ -104,6 +112,8 @@ public: TChangeRecordBuilder& WithBody(const TString& body); TChangeRecordBuilder& WithBody(TString&& body); + TChangeRecordBuilder& WithSource(ESource source); + TChangeRecord&& Build(); private: diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 052f985977..5d197b32b7 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -548,6 +548,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLEH() { html << "LockOffset"; } TABLEH() { html << "PathId"; } TABLEH() { html << "Kind"; } + TABLEH() { html << "Source"; } TABLEH() { html << "TableId"; } TABLEH() { html << "SchemaVersion"; } } @@ -565,6 +566,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon TABLED() { html << record.GetLockOffset(); } TABLED() { PathLink(html, record.GetPathId()); } TABLED() { html << record.GetKind(); } + TABLED() { html << record.GetSource(); } TABLED() { PathLink(html, record.GetTableId()); } TABLED() { html << record.GetSchemaVersion(); } } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 5197f9db13..c9564e262f 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -725,7 +725,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update( NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()), - NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody())); + NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()), + NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource())); } else { auto& state = LockChangeRecords[lockId]; Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(), @@ -773,7 +774,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate<Schema::LockChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); db.Table<Schema::LockChangeRecordDetails>().Key(record.GetLockId(), record.GetLockOffset()).Update( NIceDb::TUpdate<Schema::LockChangeRecordDetails::Kind>(record.GetKind()), - NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody())); + NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody()), + NIceDb::TUpdate<Schema::LockChangeRecordDetails::Source>(record.GetSource())); } } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 3775e54656..a96a7d5a55 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -105,7 +105,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { } } - RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>()) + auto changeRecordBuilder = TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>()) .WithOrder(it->Order) .WithGroup(itCommit->second.Group) .WithStep(itCommit->second.Step) @@ -119,8 +119,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { .WithSchema(schema) .WithBody(details.GetValue<Schema::LockChangeRecordDetails::Body>()) .WithLockId(itQueue->second.LockId) - .WithLockOffset(itQueue->second.LockOffset) - .Build()); + .WithLockOffset(itQueue->second.LockOffset); + + if (details.HaveValue<Schema::LockChangeRecordDetails::Source>()) { + changeRecordBuilder.WithSource(details.GetValue<Schema::LockChangeRecordDetails::Source>()); + } + + RecordsToSend[recipient].emplace_back(changeRecordBuilder.Build()); } else { auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select(); auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select(); @@ -157,7 +162,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { } } - RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>()) + auto changeRecordBuilder = TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>()) .WithOrder(it->Order) .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>()) .WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>()) @@ -169,8 +174,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { .WithTableId(tableId) .WithSchemaVersion(schemaVersion) .WithSchema(schema) - .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) - .Build()); + .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()); + + if (details.HaveValue<Schema::ChangeRecordDetails::Source>()) { + changeRecordBuilder.WithSource(details.GetValue<Schema::ChangeRecordDetails::Source>()); + } + + RecordsToSend[recipient].emplace_back(changeRecordBuilder.Build()); } MemUsage += it->BodySize; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 5895e33597..9819649631 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -788,9 +788,10 @@ class TDataShard struct Order : Column<1, NScheme::NTypeIds::Uint64> {}; struct Kind : Column<2, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; }; struct Body : Column<3, NScheme::NTypeIds::String> { using Type = TString; }; + struct Source : Column<4, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::ESource; }; using TKey = TableKey<Order>; - using TColumns = TableColumns<Order, Kind, Body>; + using TColumns = TableColumns<Order, Kind, Body, Source>; }; struct ChangeSenders : Table<19> { @@ -935,9 +936,10 @@ class TDataShard struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {}; struct Kind : Column<3, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; }; struct Body : Column<4, NScheme::NTypeIds::String> { using Type = TString; }; + struct Source : Column<5, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::ESource; }; using TKey = TableKey<LockId, LockOffset>; - using TColumns = TableColumns<LockId, LockOffset, Kind, Body>; + using TColumns = TableColumns<LockId, LockOffset, Kind, Body, Source>; }; // Maps [Order ... Order+N-1] change records in the shard order diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 0c40a8a820..3634ebbeea 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -720,7 +720,8 @@ Y_UNIT_TEST_SUITE(Cdc) { .SetGrpcPort(PortManager.GetPort(2135)) .SetEnableChangefeedDynamoDBStreamsFormat(true) .SetEnableChangefeedDebeziumJsonFormat(true) - .SetEnableTopicMessageMeta(true); + .SetEnableTopicMessageMeta(true) + .SetEnableChangefeedInitialScan(true); Server = new TServer(settings); if (useRealThreads) { @@ -1309,10 +1310,10 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})") + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})") }); } @@ -1370,13 +1371,13 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), }, false); } @@ -1394,13 +1395,13 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), }, false); } @@ -1418,13 +1419,13 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"), }, false); } @@ -1455,13 +1456,13 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), - MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"), }, false); } @@ -1647,7 +1648,7 @@ Y_UNIT_TEST_SUITE(Cdc) { UPSERT INTO `/Root/Table` (key, value) VALUES ("%s", 1); )", key.c_str())}, { - MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())), }, false); } @@ -2549,6 +2550,116 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + void WaitForContent(NYdb::NTopic::TTopicClient& client, const TString& consumerName, const TString& path, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& expected) { + // get records + auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings() + .AppendTopics(path) + .ConsumerName(consumerName) + ); + + ui32 reads = 0; + while (reads < expected.size()) { + auto ev = reader->GetEvent(true); + UNIT_ASSERT(ev); + + NYdb::NTopic::TPartitionSession::TPtr pStream; + if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) { + pStream = data->GetPartitionSession(); + for (const auto& item : data->GetMessages()) { + const auto& record = expected.at(reads++); + UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first); + AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second); + } + data->Commit(); + } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) { + pStream = create->GetPartitionSession(); + create->Confirm(); + } else if (auto* destroy = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*ev)) { + pStream = destroy->GetPartitionSession(); + destroy->Confirm(); + } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*ev)) { + break; + } + + if (pStream) { + UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), path); + } + } + + UNIT_ASSERT_C(reads == expected.size(), "Read less events than expected"); + } + + Y_UNIT_TEST(InitialScanDebezium) { + auto table = SimpleTable(); + auto unusedStream = KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream"); + TTestTopicEnv env(table, unusedStream); + + // Populate data + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + // add a stream with initial scan + WaitTxNotification( + env.GetServer(), env.GetEdgeActor(), + AsyncAlterAddStream( + env.GetServer(), "/Root", "Table", + WithInitialScan(NewAndOldImages( + NKikimrSchemeOp::ECdcStreamFormatDebeziumJson)))); + + auto &client = env.GetClient(); + + // add consumer + { + auto res = client + .AlterTopic("/Root/Table/Stream", + NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer("user") + .EndAddConsumer()) + .ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + // Wait for initial scan records + WaitForContent(client, "user", "/Root/Table/Stream", { + MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"), + }); + + // Perform update after initial scan + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300), + (4, 400); + )"); + + // Wait for update records + WaitForContent(client, "user", "/Root/Table/Stream", { + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"), + MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":4,"value":400}}})", "__key", R"({"payload":{"key":4}})"), + }); + + // remove consumer + { + auto res = + client + .AlterTopic( + "/Root/Table/Stream", + NYdb::NTopic::TAlterTopicSettings().AppendDropConsumers( + "user")) + .ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + } + Y_UNIT_TEST(InitialScanUpdatedRows) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 7bf964d741..40b18c561a 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1074,6 +1074,11 @@ "ColumnId": 3, "ColumnName": "Body", "ColumnType": "String" + }, + { + "ColumnId": 4, + "ColumnName": "Source", + "ColumnType": "Byte" } ], "ColumnsDropped": [], @@ -1082,7 +1087,8 @@ "Columns": [ 1, 2, - 3 + 3, + 4 ], "RoomID": 0, "Codec": 0, @@ -2120,6 +2126,11 @@ "ColumnId": 4, "ColumnName": "Body", "ColumnType": "String" + }, + { + "ColumnId": 5, + "ColumnName": "Source", + "ColumnType": "Byte" } ], "ColumnsDropped": [], @@ -2129,7 +2140,8 @@ 1, 2, 3, - 4 + 4, + 5 ], "RoomID": 0, "Codec": 0, |