aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-07-06 17:36:58 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-07-06 17:36:58 +0300
commite4c3ea21c0186d6c240208e46597dc0510ac8fc7 (patch)
tree2c3f812d035d8809718d50679b5284501127e54c
parent39cd7e9b3f85ae1deb440370b79fd49a33a10e48 (diff)
downloadydb-e4c3ea21c0186d6c240208e46597dc0510ac8fc7.tar.gz
Fixed *Image modes KIKIMR-15277
ref:efc311ce588842d06acd4c94a80b74dbddaee616
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp54
2 files changed, 55 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index 688b32b23e..a64d997775 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -193,7 +193,7 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO
auto it = updates.find(tag);
if (it != updates.end()) {
newState.Set(pos, it->second.Op, it->second.AsCell());
- } else if (rop == ERowOp::Upsert) {
+ } else if (rop == ERowOp::Upsert && oldState.GetRowState() != ERowOp::Erase) {
newState.Set(pos, oldState.GetCellOp(pos), oldState.Get(pos));
} else {
newState.Set(pos, ECellOp::Null, TCell());
@@ -222,10 +222,6 @@ void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId&
TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags,
const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags)
{
- if (!oldState && !newState) {
- return;
- }
-
NKikimrChangeExchange::TChangeRecord::TDataChange body;
Serialize(body, rop, key, keyTags, oldState, newState, valueTags);
TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index 7d815dd8a1..d087a4a368 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -716,6 +716,22 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {
};
}
+ TCdcStream NewImage() {
+ return TCdcStream{
+ .Name = "new_image",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeNewImage,
+ .Format = NKikimrSchemeOp::ECdcStreamFormatProto,
+ };
+ }
+
+ TCdcStream OldImage() {
+ return TCdcStream{
+ .Name = "old_image",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeOldImage,
+ .Format = NKikimrSchemeOp::ECdcStreamFormatProto,
+ };
+ }
+
TCdcStream NewAndOldImages() {
return TCdcStream{
.Name = "new_and_old_images",
@@ -844,6 +860,44 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {
});
}
+ Y_UNIT_TEST(NewImage) {
+ const auto schema = TShardedTableOptions()
+ .Columns({
+ {"a", "Uint32", true, false},
+ {"b", "Uint32", false, false},
+ {"c", "Uint32", false, false},
+ });
+
+ Run("/Root/path", schema, TVector<TCdcStream>{NewImage()}, TVector<TString>{
+ "INSERT INTO `/Root/path` (a, b) values (1, 2)",
+ "DELETE FROM `/Root/path` WHERE a = 1;",
+ }, {
+ {"new_image", {
+ TStructRecord(NTable::ERowOp::Upsert, {{"a", 1}}, {}, {}, {{"b", 2}, {"c", Null}}),
+ TStructRecord(NTable::ERowOp::Erase, {{"a", 1}}, {}, {}, {}),
+ }},
+ });
+ }
+
+ Y_UNIT_TEST(OldImage) {
+ const auto schema = TShardedTableOptions()
+ .Columns({
+ {"a", "Uint32", true, false},
+ {"b", "Uint32", false, false},
+ {"c", "Uint32", false, false},
+ });
+
+ Run("/Root/path", schema, TVector<TCdcStream>{OldImage()}, TVector<TString>{
+ "INSERT INTO `/Root/path` (a, b) values (1, 2)",
+ "DELETE FROM `/Root/path` WHERE a = 1;",
+ }, {
+ {"old_image", {
+ TStructRecord(NTable::ERowOp::Upsert, {{"a", 1}}, {}, {}, {}),
+ TStructRecord(NTable::ERowOp::Erase, {{"a", 1}}, {}, {{"b", 2}, {"c", Null}}, {}),
+ }},
+ });
+ }
+
} // CdcStreamChangeCollector
} // NKikimr