diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-07-06 17:36:58 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-07-06 17:36:58 +0300 |
commit | e4c3ea21c0186d6c240208e46597dc0510ac8fc7 (patch) | |
tree | 2c3f812d035d8809718d50679b5284501127e54c | |
parent | 39cd7e9b3f85ae1deb440370b79fd49a33a10e48 (diff) | |
download | ydb-e4c3ea21c0186d6c240208e46597dc0510ac8fc7.tar.gz |
Fixed *Image modes KIKIMR-15277
ref:efc311ce588842d06acd4c94a80b74dbddaee616
-rw-r--r-- | ydb/core/tx/datashard/change_collector_cdc_stream.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_collector.cpp | 54 |
2 files changed, 55 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 688b32b23e..a64d997775 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -193,7 +193,7 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO auto it = updates.find(tag); if (it != updates.end()) { newState.Set(pos, it->second.Op, it->second.AsCell()); - } else if (rop == ERowOp::Upsert) { + } else if (rop == ERowOp::Upsert && oldState.GetRowState() != ERowOp::Erase) { newState.Set(pos, oldState.GetCellOp(pos), oldState.Get(pos)); } else { newState.Set(pos, ECellOp::Null, TCell()); @@ -222,10 +222,6 @@ void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId& TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags) { - if (!oldState && !newState) { - return; - } - NKikimrChangeExchange::TChangeRecord::TDataChange body; Serialize(body, rop, key, keyTags, oldState, newState, valueTags); TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body); diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index 7d815dd8a1..d087a4a368 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -716,6 +716,22 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { }; } + TCdcStream NewImage() { + return TCdcStream{ + .Name = "new_image", + .Mode = NKikimrSchemeOp::ECdcStreamModeNewImage, + .Format = NKikimrSchemeOp::ECdcStreamFormatProto, + }; + } + + TCdcStream OldImage() { + return TCdcStream{ + .Name = "old_image", + .Mode = NKikimrSchemeOp::ECdcStreamModeOldImage, + .Format = NKikimrSchemeOp::ECdcStreamFormatProto, + }; + } + TCdcStream NewAndOldImages() { return TCdcStream{ .Name = "new_and_old_images", @@ -844,6 +860,44 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { }); } + Y_UNIT_TEST(NewImage) { + const auto schema = TShardedTableOptions() + .Columns({ + {"a", "Uint32", true, false}, + {"b", "Uint32", false, false}, + {"c", "Uint32", false, false}, + }); + + Run("/Root/path", schema, TVector<TCdcStream>{NewImage()}, TVector<TString>{ + "INSERT INTO `/Root/path` (a, b) values (1, 2)", + "DELETE FROM `/Root/path` WHERE a = 1;", + }, { + {"new_image", { + TStructRecord(NTable::ERowOp::Upsert, {{"a", 1}}, {}, {}, {{"b", 2}, {"c", Null}}), + TStructRecord(NTable::ERowOp::Erase, {{"a", 1}}, {}, {}, {}), + }}, + }); + } + + Y_UNIT_TEST(OldImage) { + const auto schema = TShardedTableOptions() + .Columns({ + {"a", "Uint32", true, false}, + {"b", "Uint32", false, false}, + {"c", "Uint32", false, false}, + }); + + Run("/Root/path", schema, TVector<TCdcStream>{OldImage()}, TVector<TString>{ + "INSERT INTO `/Root/path` (a, b) values (1, 2)", + "DELETE FROM `/Root/path` WHERE a = 1;", + }, { + {"old_image", { + TStructRecord(NTable::ERowOp::Upsert, {{"a", 1}}, {}, {}, {}), + TStructRecord(NTable::ERowOp::Erase, {{"a", 1}}, {}, {{"b", 2}, {"c", Null}}, {}), + }}, + }); + } + } // CdcStreamChangeCollector } // NKikimr |