diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-06-29 12:07:45 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-06-29 12:07:45 +0300 |
commit | de77c1ac2d47b8ba61b738300b347584bb382b0e (patch) | |
tree | 41ac4f01e4d5586328a7e15da7f998247014ae45 | |
parent | 91144846ad7dfd6e1431a63342b6fccb93c2b75f (diff) | |
download | ydb-de77c1ac2d47b8ba61b738300b347584bb382b0e.tar.gz |
Continue on page faults
-rw-r--r-- | ydb/core/tx/datashard/cdc_stream_scan.cpp | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index ffd8ad5e5f..8c39fc6849 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -219,6 +219,7 @@ public: LOG_D("Progress" << ": streamPathId# " << streamPathId); + ChangeRecords.clear(); if (Self->CheckChangesQueueOverflow()) { return true; } @@ -230,6 +231,8 @@ public: Y_VERIFY(it != table->CdcStreams.end()); NIceDb::TNiceDb db(txc.DB); + bool pageFault = false; + for (const auto& [k, v] : ev.Rows) { const auto key = MakeKey(k.GetCells(), table); const auto& keyTags = table->KeyColumnIds; @@ -238,10 +241,10 @@ public: TSelectStats stats; auto ready = txc.DB.Select(table->LocalTid, key, {}, row, stats, 0, readVersion); if (ready == EReady::Page) { - return false; + pageFault = true; } - if (ready == EReady::Gone || stats.InvisibleRowSkips) { + if (pageFault || ready == EReady::Gone || stats.InvisibleRowSkips) { continue; } @@ -293,6 +296,10 @@ public: Self->PersistChangeRecord(db, record); } + if (pageFault) { + return false; + } + if (ev.Rows) { const auto& [key, _] = ev.Rows.back(); |