aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-07-10 18:12:34 +0300
committersnaury <snaury@ydb.tech>2023-07-10 18:12:34 +0300
commite4cd85048f6ff72ba71a0e8c29ec9f5d16da9c7f (patch)
treeac5293a7165fcb83773eac168dfd2c30c2965658
parent08616ad06442dce72a7b83f685bea447e6008e55 (diff)
downloadydb-e4cd85048f6ff72ba71a0e8c29ec9f5d16da9c7f.tar.gz
Support total rows limit in read iterator KIKIMR-18672
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp41
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp84
-rw-r--r--ydb/core/tx/datashard/read_iterator.h4
3 files changed, 119 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 0fb608cd422..018100b19a8 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -480,6 +480,11 @@ public:
// note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
// then it will also become less than size() when finished
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
+ if (ReachedTotalRowsLimit()) {
+ FirstUnprocessedQuery = -1;
+ return true;
+ }
+
if (ShouldStop())
return true;
@@ -509,6 +514,11 @@ public:
// note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
// then it will also become less than size() when finished
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
+ if (ReachedTotalRowsLimit()) {
+ FirstUnprocessedQuery = -1;
+ return true;
+ }
+
if (ShouldStop())
return true;
@@ -676,6 +686,7 @@ public:
}
void UpdateState(TReadIteratorState& state) {
+ state.TotalRows += RowsRead;
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
state.LastProcessedKey = LastProcessedKey;
state.ConsumeSeqNo(RowsRead, BytesInResult);
@@ -710,6 +721,27 @@ private:
return RowsRead >= State.MaxRowsInResult;
}
+ bool ReachedTotalRowsLimit() const {
+ if (State.TotalRowsLimit == Max<ui64>()) {
+ return false;
+ }
+
+ return State.TotalRows + RowsRead >= State.TotalRowsLimit;
+ }
+
+ ui64 GetTotalRowsLeft() const {
+ if (State.TotalRowsLimit == Max<ui64>()) {
+ return Max<ui64>();
+ }
+
+ if (State.TotalRows + RowsRead >= State.TotalRowsLimit) {
+ return 0;
+ }
+
+
+ return State.TotalRowsLimit - State.TotalRows - RowsRead;
+ }
+
bool ShouldStop() {
if (!CanResume()) {
return false;
@@ -735,6 +767,8 @@ private:
bytesLeft = State.Quota.Bytes - BlockBuilder.Bytes();
}
+ rowsLeft = Min(rowsLeft, GetTotalRowsLeft());
+
auto direction = reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward;
return db.Precharge(TableInfo.LocalTid,
keyFrom,
@@ -773,6 +807,10 @@ private:
keyAccessSampler->AddSample(TableId, rowKey.Cells());
+ if (ReachedTotalRowsLimit()) {
+ break;
+ }
+
if (ShouldStop()) {
stoppedByLimit = true;
break;
@@ -1282,6 +1320,9 @@ public:
if (record.HasMaxRowsInResult())
state.MaxRowsInResult = record.GetMaxRowsInResult();
+ if (record.HasTotalRowsLimit())
+ state.TotalRowsLimit = record.GetTotalRowsLimit();
+
state.Reverse = record.GetReverse();
if (state.Reverse) {
state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 84c5ae628f8..cbe15edc186 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -673,20 +673,32 @@ struct TTestHelper {
|| newLock->GetGeneration() != prevLock->GetGeneration());
}
- void TestChunkRead(ui32 chunkSize, ui32 rowCount) {
+ void TestChunkRead(ui32 chunkSize, ui32 rowCount, ui32 ranges = 1, ui32 limit = Max<ui32>()) {
UpsertMany(1, rowCount);
auto request = GetBaseReadRequest("table-1-many", 1, NKikimrTxDataShard::CELLVEC, TRowVersion::Max());
request->Record.ClearSnapshot();
- AddRangeQuery<ui32>(
- *request,
- {1, 1, 1},
- true,
- {rowCount + 1, 1, 1},
- true
- );
+
+ ui32 base = 1;
+ for (ui32 i = 0; i < ranges; ++i) {
+ ui32 count = rowCount / ranges;
+ if (i < (rowCount % ranges)) {
+ ++count;
+ }
+ AddRangeQuery<ui32>(
+ *request,
+ {base, 1, 1},
+ true,
+ {base + count - 1, Max<ui32>(), Max<ui32>()},
+ true
+ );
+ base += count;
+ }
request->Record.SetMaxRowsInResult(chunkSize);
+ if (limit != Max<ui32>()) {
+ request->Record.SetTotalRowsLimit(limit);
+ }
auto readResult = SendRead("table-1-many", request.release());
UNIT_ASSERT(readResult);
@@ -697,10 +709,12 @@ struct TTestHelper {
while (!readResult->Record.GetFinished()) {
readResult = WaitReadResult();
UNIT_ASSERT(readResult);
- rowsRead += readResult->GetRowsCount();
+ ui32 count = readResult->GetRowsCount();
+ UNIT_ASSERT_C(count > 0 || readResult->Record.GetFinished(), "Unexpected empty intermediate result");
+ rowsRead += count;
}
- UNIT_ASSERT_VALUES_EQUAL(rowsRead, rowCount);
+ UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
}
struct THangedReturn {
@@ -1946,6 +1960,56 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.TestChunkRead(99, 10000);
}
+ Y_UNIT_TEST(ShouldLimitReadRangeChunk1Limit100) {
+ TTestHelper helper;
+ helper.TestChunkRead(1, 1000, 1, 100);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit98) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 98);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit99) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 99);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit100) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 100);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit101) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 101);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit198) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 198);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit900) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 1000, 10, 900);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit900) {
+ TTestHelper helper;
+ helper.TestChunkRead(100, 1000, 10, 900);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit1000) {
+ TTestHelper helper;
+ helper.TestChunkRead(100, 1000, 10, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit1001) {
+ TTestHelper helper;
+ helper.TestChunkRead(100, 1000, 10, 1001);
+ }
+
Y_UNIT_TEST(ShouldReadKeyPrefix1) {
TTestHelper helper;
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index f57dd1ac839..38d0b082ea0 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -189,6 +189,10 @@ public:
TQuota Quota;
+ // Number of rows processed so far
+ ui64 TotalRows = 0;
+ ui64 TotalRowsLimit = Max<ui64>();
+
// items are running total,
// first item corresponds to SeqNo = LastAckSeqNo + 1,
// i.e. [LastAckSeqNo + 1; SeqNo]