aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkrock21 <krock21@yandex-team.com>2023-10-13 17:21:56 +0300
committerkrock21 <krock21@yandex-team.com>2023-10-13 17:58:45 +0300
commit9ad3ba816f192c0a5609a6570145b014c840f540 (patch)
treed4796cb23e708e2141e5fcfdbffa147bba347307
parent20d386885c1cda01f4f95f84d6118615c592c0f3 (diff)
downloadydb-9ad3ba816f192c0a5609a6570145b014c840f540.tar.gz
KIKIMR-19583 Add source field to change record. Add read(op = r) operation and source.snapshot values to Debezium CDC format
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp1
-rw-r--r--ydb/core/tx/datashard/change_record.cpp39
-rw-r--r--ydb/core/tx/datashard/change_record.h10
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp22
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp179
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema16
9 files changed, 221 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index 087f6cab84..a58469e4cc 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -291,6 +291,7 @@ public:
.WithTableId(tablePathId)
.WithSchemaVersion(table->GetTableSchemaVersion())
.WithBody(body.SerializeAsString())
+ .WithSource(TChangeRecord::ESource::InitialScan)
.Build();
ChangeRecords.push_back(IDataShardChangeCollector::TChange{
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 31651a6608..73a1849799 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -372,20 +372,24 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T
}
// payload.op. Mandatory
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TDataChange::kUpsert:
- case NKikimrChangeExchange::TDataChange::kReset:
- if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
- valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create
- } else {
- valueJson["payload"]["op"] = "u"; // u = update
- }
- break;
- case NKikimrChangeExchange::TDataChange::kErase:
- valueJson["payload"]["op"] = "d"; // d = delete
- break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ if (Source == ESource::InitialScan) {
+ valueJson["payload"]["op"] = "r"; // r = read
+ } else {
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert:
+ case NKikimrChangeExchange::TDataChange::kReset:
+ if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
+ valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // u = update, c = create
+ } else {
+ valueJson["payload"]["op"] = "u"; // u = update
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kErase:
+ valueJson["payload"]["op"] = "d"; // d = delete
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
}
// payload.ts. Optional. "ts_ms" int64 in Debezium, "ts" array here
@@ -398,6 +402,7 @@ void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::T
{"version", "0.0.1"},
{"connector", "ydb_debezium_json"},
{"ts_ms", GetApproximateCreationDateTime().MilliSeconds()},
+ {"snapshot", Source == ESource::InitialScan},
{"txId", TxId},
});
}
@@ -490,6 +495,7 @@ void TChangeRecord::Out(IOutputStream& out) const {
<< " TxId: " << TxId
<< " PathId: " << PathId
<< " Kind: " << Kind
+ << " Source: " << Source
<< " Body: " << Body.size() << "b"
<< " TableId: " << TableId
<< " SchemaVersion: " << SchemaVersion
@@ -562,6 +568,11 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithBody(TString&& body) {
return *this;
}
+TChangeRecordBuilder& TChangeRecordBuilder::WithSource(ESource source) {
+ Record.Source = source;
+ return *this;
+}
+
TChangeRecord&& TChangeRecordBuilder::Build() {
return std::move(Record);
}
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 03107df8ba..b37388ec4f 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -22,6 +22,11 @@ class TChangeRecord {
friend class TChangeRecordBuilder;
public:
+ enum class ESource: ui8 {
+ Unspecified = 0,
+ InitialScan = 1,
+ };
+
enum class EKind: ui8 {
AsyncIndex,
CdcDataChange,
@@ -44,6 +49,7 @@ public:
const TPathId& GetPathId() const { return PathId; }
EKind GetKind() const { return Kind; }
const TString& GetBody() const { return Body; }
+ ESource GetSource() const { return Source; }
const TPathId& GetTableId() const { return TableId; }
ui64 GetSchemaVersion() const { return SchemaVersion; }
@@ -72,6 +78,7 @@ private:
TPathId PathId;
EKind Kind;
TString Body;
+ ESource Source = ESource::Unspecified;
ui64 SchemaVersion;
TPathId TableId;
@@ -84,6 +91,7 @@ private:
class TChangeRecordBuilder {
using EKind = TChangeRecord::EKind;
+ using ESource = TChangeRecord::ESource;
public:
explicit TChangeRecordBuilder(EKind kind);
@@ -104,6 +112,8 @@ public:
TChangeRecordBuilder& WithBody(const TString& body);
TChangeRecordBuilder& WithBody(TString&& body);
+ TChangeRecordBuilder& WithSource(ESource source);
+
TChangeRecord&& Build();
private:
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 052f985977..5d197b32b7 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -548,6 +548,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLEH() { html << "LockOffset"; }
TABLEH() { html << "PathId"; }
TABLEH() { html << "Kind"; }
+ TABLEH() { html << "Source"; }
TABLEH() { html << "TableId"; }
TABLEH() { html << "SchemaVersion"; }
}
@@ -565,6 +566,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLED() { html << record.GetLockOffset(); }
TABLED() { PathLink(html, record.GetPathId()); }
TABLED() { html << record.GetKind(); }
+ TABLED() { html << record.GetSource(); }
TABLED() { PathLink(html, record.GetTableId()); }
TABLED() { html << record.GetSchemaVersion(); }
}
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 5197f9db13..c9564e262f 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -725,7 +725,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
- NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()));
+ NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
+ NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
@@ -773,7 +774,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::LockChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
db.Table<Schema::LockChangeRecordDetails>().Key(record.GetLockId(), record.GetLockOffset()).Update(
NIceDb::TUpdate<Schema::LockChangeRecordDetails::Kind>(record.GetKind()),
- NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody()));
+ NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody()),
+ NIceDb::TUpdate<Schema::LockChangeRecordDetails::Source>(record.GetSource()));
}
}
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 3775e54656..a96a7d5a55 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -105,7 +105,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
}
}
- RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>())
+ auto changeRecordBuilder = TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>())
.WithOrder(it->Order)
.WithGroup(itCommit->second.Group)
.WithStep(itCommit->second.Step)
@@ -119,8 +119,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
.WithSchema(schema)
.WithBody(details.GetValue<Schema::LockChangeRecordDetails::Body>())
.WithLockId(itQueue->second.LockId)
- .WithLockOffset(itQueue->second.LockOffset)
- .Build());
+ .WithLockOffset(itQueue->second.LockOffset);
+
+ if (details.HaveValue<Schema::LockChangeRecordDetails::Source>()) {
+ changeRecordBuilder.WithSource(details.GetValue<Schema::LockChangeRecordDetails::Source>());
+ }
+
+ RecordsToSend[recipient].emplace_back(changeRecordBuilder.Build());
} else {
auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select();
auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select();
@@ -157,7 +162,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
}
}
- RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>())
+ auto changeRecordBuilder = TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>())
.WithOrder(it->Order)
.WithGroup(basic.GetValue<Schema::ChangeRecords::Group>())
.WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>())
@@ -169,8 +174,13 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
.WithTableId(tableId)
.WithSchemaVersion(schemaVersion)
.WithSchema(schema)
- .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
- .Build());
+ .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>());
+
+ if (details.HaveValue<Schema::ChangeRecordDetails::Source>()) {
+ changeRecordBuilder.WithSource(details.GetValue<Schema::ChangeRecordDetails::Source>());
+ }
+
+ RecordsToSend[recipient].emplace_back(changeRecordBuilder.Build());
}
MemUsage += it->BodySize;
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 5895e33597..9819649631 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -788,9 +788,10 @@ class TDataShard
struct Order : Column<1, NScheme::NTypeIds::Uint64> {};
struct Kind : Column<2, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; };
struct Body : Column<3, NScheme::NTypeIds::String> { using Type = TString; };
+ struct Source : Column<4, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::ESource; };
using TKey = TableKey<Order>;
- using TColumns = TableColumns<Order, Kind, Body>;
+ using TColumns = TableColumns<Order, Kind, Body, Source>;
};
struct ChangeSenders : Table<19> {
@@ -935,9 +936,10 @@ class TDataShard
struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {};
struct Kind : Column<3, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; };
struct Body : Column<4, NScheme::NTypeIds::String> { using Type = TString; };
+ struct Source : Column<5, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::ESource; };
using TKey = TableKey<LockId, LockOffset>;
- using TColumns = TableColumns<LockId, LockOffset, Kind, Body>;
+ using TColumns = TableColumns<LockId, LockOffset, Kind, Body, Source>;
};
// Maps [Order ... Order+N-1] change records in the shard order
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 0c40a8a820..3634ebbeea 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -720,7 +720,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
.SetGrpcPort(PortManager.GetPort(2135))
.SetEnableChangefeedDynamoDBStreamsFormat(true)
.SetEnableChangefeedDebeziumJsonFormat(true)
- .SetEnableTopicMessageMeta(true);
+ .SetEnableTopicMessageMeta(true)
+ .SetEnableChangefeedInitialScan(true);
Server = new TServer(settings);
if (useRealThreads) {
@@ -1309,10 +1310,10 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})")
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})")
});
}
@@ -1370,13 +1371,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
}, false);
}
@@ -1394,13 +1395,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
}, false);
}
@@ -1418,13 +1419,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", R"({"payload":{"key":1}})"),
}, false);
}
@@ -1455,13 +1456,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
- MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":1,"value":10},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":2,"value":20},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":3,"value":30},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200},"ts":"***"}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300},"ts":"***"}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"d","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":100},"ts":"***"}})", "__key", R"({"payload":{"key":1}})"),
}, false);
}
@@ -1647,7 +1648,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
UPSERT INTO `/Root/Table` (key, value) VALUES
("%s", 1);
)", key.c_str())}, {
- MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***"}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false}}})", "__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())),
}, false);
}
@@ -2549,6 +2550,116 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ void WaitForContent(NYdb::NTopic::TTopicClient& client, const TString& consumerName, const TString& path, const TVector<std::pair<TString, TVector<std::pair<TString, TString>>>>& expected) {
+ // get records
+ auto reader = client.CreateReadSession(NYdb::NTopic::TReadSessionSettings()
+ .AppendTopics(path)
+ .ConsumerName(consumerName)
+ );
+
+ ui32 reads = 0;
+ while (reads < expected.size()) {
+ auto ev = reader->GetEvent(true);
+ UNIT_ASSERT(ev);
+
+ NYdb::NTopic::TPartitionSession::TPtr pStream;
+ if (auto* data = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
+ pStream = data->GetPartitionSession();
+ for (const auto& item : data->GetMessages()) {
+ const auto& record = expected.at(reads++);
+ UNIT_ASSERT_C(AreJsonsEqual(item.GetData(), record.first), TStringBuilder() << "Jsons are different: " << item.GetData() << " != " << record.first);
+ AssertMessageMetadataContains(item.GetMessageMeta()->Fields, record.second);
+ }
+ data->Commit();
+ } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) {
+ pStream = create->GetPartitionSession();
+ create->Confirm();
+ } else if (auto* destroy = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*ev)) {
+ pStream = destroy->GetPartitionSession();
+ destroy->Confirm();
+ } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*ev)) {
+ break;
+ }
+
+ if (pStream) {
+ UNIT_ASSERT_VALUES_EQUAL(pStream->GetTopicPath(), path);
+ }
+ }
+
+ UNIT_ASSERT_C(reads == expected.size(), "Read less events than expected");
+ }
+
+ Y_UNIT_TEST(InitialScanDebezium) {
+ auto table = SimpleTable();
+ auto unusedStream = KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream");
+ TTestTopicEnv env(table, unusedStream);
+
+ // Populate data
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ // add a stream with initial scan
+ WaitTxNotification(
+ env.GetServer(), env.GetEdgeActor(),
+ AsyncAlterAddStream(
+ env.GetServer(), "/Root", "Table",
+ WithInitialScan(NewAndOldImages(
+ NKikimrSchemeOp::ECdcStreamFormatDebeziumJson))));
+
+ auto &client = env.GetClient();
+
+ // add consumer
+ {
+ auto res = client
+ .AlterTopic("/Root/Table/Stream",
+ NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer("user")
+ .EndAddConsumer())
+ .ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ // Wait for initial scan records
+ WaitForContent(client, "user", "/Root/Table/Stream", {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":1,"value":10}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":2,"value":20}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"r","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":true},"after":{"key":3,"value":30}}})", "__key", R"({"payload":{"key":3}})"),
+ });
+
+ // Perform update after initial scan
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300),
+ (4, 400);
+ )");
+
+ // Wait for update records
+ WaitForContent(client, "user", "/Root/Table/Stream", {
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":1,"value":10},"after":{"key":1,"value":100}}})", "__key", R"({"payload":{"key":1}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":2,"value":20},"after":{"key":2,"value":200}}})", "__key", R"({"payload":{"key":2}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"u","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"before":{"key":3,"value":30},"after":{"key":3,"value":300}}})", "__key", R"({"payload":{"key":3}})"),
+ MessageWithOneMetadataItem(R"({"payload":{"op":"c","source":{"version":"***","connector":"ydb_debezium_json","ts_ms":"***","txId":"***","snapshot":false},"after":{"key":4,"value":400}}})", "__key", R"({"payload":{"key":4}})"),
+ });
+
+ // remove consumer
+ {
+ auto res =
+ client
+ .AlterTopic(
+ "/Root/Table/Stream",
+ NYdb::NTopic::TAlterTopicSettings().AppendDropConsumers(
+ "user"))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+ }
+
Y_UNIT_TEST(InitialScanUpdatedRows) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 7bf964d741..40b18c561a 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1074,6 +1074,11 @@
"ColumnId": 3,
"ColumnName": "Body",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "Source",
+ "ColumnType": "Byte"
}
],
"ColumnsDropped": [],
@@ -1082,7 +1087,8 @@
"Columns": [
1,
2,
- 3
+ 3,
+ 4
],
"RoomID": 0,
"Codec": 0,
@@ -2120,6 +2126,11 @@
"ColumnId": 4,
"ColumnName": "Body",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "Source",
+ "ColumnType": "Byte"
}
],
"ColumnsDropped": [],
@@ -2129,7 +2140,8 @@
1,
2,
3,
- 4
+ 4,
+ 5
],
"RoomID": 0,
"Codec": 0,