diff options
author | snaury <snaury@ydb.tech> | 2023-07-10 18:12:34 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-07-10 18:12:34 +0300 |
commit | e4cd85048f6ff72ba71a0e8c29ec9f5d16da9c7f (patch) | |
tree | ac5293a7165fcb83773eac168dfd2c30c2965658 | |
parent | 08616ad06442dce72a7b83f685bea447e6008e55 (diff) | |
download | ydb-e4cd85048f6ff72ba71a0e8c29ec9f5d16da9c7f.tar.gz |
Support total rows limit in read iterator KIKIMR-18672
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 41 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 84 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 4 |
3 files changed, 119 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 0fb608cd422..018100b19a8 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -480,6 +480,11 @@ public: // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, // then it will also become less than size() when finished while (FirstUnprocessedQuery < State.Request->Ranges.size()) { + if (ReachedTotalRowsLimit()) { + FirstUnprocessedQuery = -1; + return true; + } + if (ShouldStop()) return true; @@ -509,6 +514,11 @@ public: // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, // then it will also become less than size() when finished while (FirstUnprocessedQuery < State.Request->Keys.size()) { + if (ReachedTotalRowsLimit()) { + FirstUnprocessedQuery = -1; + return true; + } + if (ShouldStop()) return true; @@ -676,6 +686,7 @@ public: } void UpdateState(TReadIteratorState& state) { + state.TotalRows += RowsRead; state.FirstUnprocessedQuery = FirstUnprocessedQuery; state.LastProcessedKey = LastProcessedKey; state.ConsumeSeqNo(RowsRead, BytesInResult); @@ -710,6 +721,27 @@ private: return RowsRead >= State.MaxRowsInResult; } + bool ReachedTotalRowsLimit() const { + if (State.TotalRowsLimit == Max<ui64>()) { + return false; + } + + return State.TotalRows + RowsRead >= State.TotalRowsLimit; + } + + ui64 GetTotalRowsLeft() const { + if (State.TotalRowsLimit == Max<ui64>()) { + return Max<ui64>(); + } + + if (State.TotalRows + RowsRead >= State.TotalRowsLimit) { + return 0; + } + + + return State.TotalRowsLimit - State.TotalRows - RowsRead; + } + bool ShouldStop() { if (!CanResume()) { return false; @@ -735,6 +767,8 @@ private: bytesLeft = State.Quota.Bytes - BlockBuilder.Bytes(); } + rowsLeft = Min(rowsLeft, GetTotalRowsLeft()); + auto direction = reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward; return db.Precharge(TableInfo.LocalTid, keyFrom, @@ -773,6 +807,10 @@ private: keyAccessSampler->AddSample(TableId, rowKey.Cells()); + if (ReachedTotalRowsLimit()) { + break; + } + if (ShouldStop()) { stoppedByLimit = true; break; @@ -1282,6 +1320,9 @@ public: if (record.HasMaxRowsInResult()) state.MaxRowsInResult = record.GetMaxRowsInResult(); + if (record.HasTotalRowsLimit()) + state.TotalRowsLimit = record.GetTotalRowsLimit(); + state.Reverse = record.GetReverse(); if (state.Reverse) { state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1; diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 84c5ae628f8..cbe15edc186 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -673,20 +673,32 @@ struct TTestHelper { || newLock->GetGeneration() != prevLock->GetGeneration()); } - void TestChunkRead(ui32 chunkSize, ui32 rowCount) { + void TestChunkRead(ui32 chunkSize, ui32 rowCount, ui32 ranges = 1, ui32 limit = Max<ui32>()) { UpsertMany(1, rowCount); auto request = GetBaseReadRequest("table-1-many", 1, NKikimrTxDataShard::CELLVEC, TRowVersion::Max()); request->Record.ClearSnapshot(); - AddRangeQuery<ui32>( - *request, - {1, 1, 1}, - true, - {rowCount + 1, 1, 1}, - true - ); + + ui32 base = 1; + for (ui32 i = 0; i < ranges; ++i) { + ui32 count = rowCount / ranges; + if (i < (rowCount % ranges)) { + ++count; + } + AddRangeQuery<ui32>( + *request, + {base, 1, 1}, + true, + {base + count - 1, Max<ui32>(), Max<ui32>()}, + true + ); + base += count; + } request->Record.SetMaxRowsInResult(chunkSize); + if (limit != Max<ui32>()) { + request->Record.SetTotalRowsLimit(limit); + } auto readResult = SendRead("table-1-many", request.release()); UNIT_ASSERT(readResult); @@ -697,10 +709,12 @@ struct TTestHelper { while (!readResult->Record.GetFinished()) { readResult = WaitReadResult(); UNIT_ASSERT(readResult); - rowsRead += readResult->GetRowsCount(); + ui32 count = readResult->GetRowsCount(); + UNIT_ASSERT_C(count > 0 || readResult->Record.GetFinished(), "Unexpected empty intermediate result"); + rowsRead += count; } - UNIT_ASSERT_VALUES_EQUAL(rowsRead, rowCount); + UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit)); } struct THangedReturn { @@ -1946,6 +1960,56 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.TestChunkRead(99, 10000); } + Y_UNIT_TEST(ShouldLimitReadRangeChunk1Limit100) { + TTestHelper helper; + helper.TestChunkRead(1, 1000, 1, 100); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit98) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 98); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit99) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 99); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit100) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 100); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit101) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 101); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit198) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 198); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk99Limit900) { + TTestHelper helper; + helper.TestChunkRead(99, 1000, 10, 900); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit900) { + TTestHelper helper; + helper.TestChunkRead(100, 1000, 10, 900); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit1000) { + TTestHelper helper; + helper.TestChunkRead(100, 1000, 10, 1000); + } + + Y_UNIT_TEST(ShouldLimitRead10RangesChunk100Limit1001) { + TTestHelper helper; + helper.TestChunkRead(100, 1000, 10, 1001); + } + Y_UNIT_TEST(ShouldReadKeyPrefix1) { TTestHelper helper; diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index f57dd1ac839..38d0b082ea0 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -189,6 +189,10 @@ public: TQuota Quota; + // Number of rows processed so far + ui64 TotalRows = 0; + ui64 TotalRowsLimit = Max<ui64>(); + // items are running total, // first item corresponds to SeqNo = LastAckSeqNo + 1, // i.e. [LastAckSeqNo + 1; SeqNo] |