aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-06-29 12:07:45 +0300
committerilnaz <ilnaz@ydb.tech>2023-06-29 12:07:45 +0300
commitde77c1ac2d47b8ba61b738300b347584bb382b0e (patch)
tree41ac4f01e4d5586328a7e15da7f998247014ae45
parent91144846ad7dfd6e1431a63342b6fccb93c2b75f (diff)
downloadydb-de77c1ac2d47b8ba61b738300b347584bb382b0e.tar.gz
Continue on page faults
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp11
1 files changed, 9 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index ffd8ad5e5f..8c39fc6849 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -219,6 +219,7 @@ public:
LOG_D("Progress"
<< ": streamPathId# " << streamPathId);
+ ChangeRecords.clear();
if (Self->CheckChangesQueueOverflow()) {
return true;
}
@@ -230,6 +231,8 @@ public:
Y_VERIFY(it != table->CdcStreams.end());
NIceDb::TNiceDb db(txc.DB);
+ bool pageFault = false;
+
for (const auto& [k, v] : ev.Rows) {
const auto key = MakeKey(k.GetCells(), table);
const auto& keyTags = table->KeyColumnIds;
@@ -238,10 +241,10 @@ public:
TSelectStats stats;
auto ready = txc.DB.Select(table->LocalTid, key, {}, row, stats, 0, readVersion);
if (ready == EReady::Page) {
- return false;
+ pageFault = true;
}
- if (ready == EReady::Gone || stats.InvisibleRowSkips) {
+ if (pageFault || ready == EReady::Gone || stats.InvisibleRowSkips) {
continue;
}
@@ -293,6 +296,10 @@ public:
Self->PersistChangeRecord(db, record);
}
+ if (pageFault) {
+ return false;
+ }
+
if (ev.Rows) {
const auto& [key, _] = ev.Rows.back();