diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-03-06 22:00:38 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-03-06 22:00:38 +0300 |
commit | fccb745feb0ac047fb767a1d9829c7b48d8d8f48 (patch) | |
tree | 46234a839588a5cb5e4fecad730b71d951d0e8d9 | |
parent | 4332e68ad3f50d60ba750aa49348d9c0f757a4a1 (diff) | |
download | ydb-fccb745feb0ac047fb767a1d9829c7b48d8d8f48.tar.gz |
reverse queries itself, not just ranges of each query
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 29 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 153 |
2 files changed, 176 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 36e3d2c282..48fd4108e9 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -264,7 +264,7 @@ class TReader { std::vector<NScheme::TTypeInfo> ColumnTypes; - ui32 FirstUnprocessedQuery; + ui32 FirstUnprocessedQuery; // must be unsigned TString LastProcessedKey; ui64 RowsRead = 0; @@ -351,7 +351,7 @@ public: iterRange.MaxKey = keyTo; iterRange.MinInclusive = fromInclusive; iterRange.MaxInclusive = toInclusive; - bool reverse = State.Reverse; + const bool reverse = State.Reverse; EReadStatus result; if (!reverse) { @@ -432,7 +432,9 @@ public: // TODO: merge ReadRanges and ReadKeys to single template Read? bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) { - for (; FirstUnprocessedQuery < State.Request->Ranges.size(); ++FirstUnprocessedQuery) { + // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, + // then it will also become less than size() when finished + while (FirstUnprocessedQuery < State.Request->Ranges.size()) { if (ShouldStop()) return true; @@ -440,7 +442,7 @@ public: auto status = ReadRange(txc, ctx, range); switch (status) { case EReadStatus::Done: - continue; + break; case EReadStatus::StoppedByLimit: return true; case EReadStatus::NeedData: @@ -448,13 +450,20 @@ public: return true; return false; } + + if (!State.Reverse) + FirstUnprocessedQuery++; + else + FirstUnprocessedQuery--; } return true; } bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) { - for (; FirstUnprocessedQuery < State.Request->Keys.size(); ++FirstUnprocessedQuery) { + // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, + // then it will also become less than size() when finished + while (FirstUnprocessedQuery < State.Request->Keys.size()) { if (ShouldStop()) return true; @@ -462,7 +471,7 @@ public: auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery); switch (status) { case EReadStatus::Done: - continue; + break; case EReadStatus::StoppedByLimit: return true; case EReadStatus::NeedData: @@ -470,6 +479,11 @@ public: return true; return false; } + + if (!State.Reverse) + FirstUnprocessedQuery++; + else + FirstUnprocessedQuery--; } return true; @@ -1177,6 +1191,9 @@ public: } state.Reverse = record.GetReverse(); + if (state.Reverse) { + state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1; + } if (state.PathId.OwnerId != Self->TabletID()) { // owner is schemeshard, read user table diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 79a1d6730f..4cd0e9209f 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -1072,6 +1072,23 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); } + Y_UNIT_TEST(ShouldReverseReadMultipleKeys) { + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request, {3, 3, 3}); + AddKeyQuery(*request, {1, 1, 1}); + AddKeyQuery(*request, {5, 5, 5}); + request->Record.SetReverse(true); + + auto readResult = helper.SendRead("table-1", request.release()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {5, 5, 5, 500}, + {1, 1, 1, 100}, + {3, 3, 3, 300}, + }); + } + Y_UNIT_TEST(ShouldReadMultipleKeysOneByOne) { TTestHelper helper; @@ -1130,6 +1147,65 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { // TODO: check continuation token } + Y_UNIT_TEST(ShouldReverseReadMultipleKeysOneByOne) { + TTestHelper helper; + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + AddKeyQuery(*request1, {5, 5, 5}); + request1->Record.SetMaxRowsInResult(1); + request1->Record.SetReverse(true); + + ui32 continueCounter = 0; + helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) { + ++continueCounter; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult1, { + {5, 5, 5, 500} + }); + + const auto& record1 = readResult1->Record; + UNIT_ASSERT(!record1.GetLimitReached()); + UNIT_ASSERT(record1.HasSeqNo()); + //UNIT_ASSERT(!record1.HasFinished()); + UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL); + // TODO: check continuation token + + auto readResult2 = helper.WaitReadResult(); + CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { + {1, 1, 1, 100} + }); + + const auto& record2 = readResult2->Record; + UNIT_ASSERT(!record2.GetLimitReached()); + UNIT_ASSERT(!record2.HasFinished()); + UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); + // TODO: check continuation token + + auto readResult3 = helper.WaitReadResult(); + CheckResult(helper.Tables["table-1"].UserTable, *readResult3, { + {3, 3, 3, 300} + }); + + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2); + + const auto& record3 = readResult3->Record; + UNIT_ASSERT(!record3.GetLimitReached()); + UNIT_ASSERT(record3.HasFinished()); + UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); + // TODO: check continuation token + } + Y_UNIT_TEST(ShouldHandleReadAck) { TTestHelper helper; @@ -1240,6 +1316,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(continueCounter, 4); } + Y_UNIT_TEST(ShouldReverseReadMultipleRanges) { + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1); + AddRangeQuery<ui32>( + *request, + {1, 0, 0}, + true, + {5, 5, 5}, + true + ); + AddRangeQuery<ui32>( + *request, + {8, 1, 1}, + true, + {11, 11, 11}, + true + ); + + request->Record.SetReverse(true); + + auto readResult = helper.SendRead("table-1", request.release()); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + {11, 11, 11, 1111}, + {8, 1, 1, 803}, + {5, 5, 5, 500}, + {3, 3, 3, 300}, + {1, 1, 1, 100}, + }); + } + + Y_UNIT_TEST(ShouldReverseReadMultipleRangesOneByOneWithAcks) { + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1); + AddRangeQuery<ui32>( + *request, + {1, 0, 0}, + true, + {5, 5, 5}, + true + ); + AddRangeQuery<ui32>( + *request, + {8, 1, 1}, + true, + {11, 11, 11}, + true + ); + + request->Record.SetReverse(true); + request->Record.SetMaxRows(1); + + std::vector<std::vector<ui32>> gold = { + {11, 11, 11, 1111}, + {8, 1, 1, 803}, + {5, 5, 5, 500}, + {3, 3, 3, 300}, + {1, 1, 1, 100}, + }; + + auto readResult = helper.SendRead("table-1", request.release()); + UNIT_ASSERT(readResult); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + gold[0] + }); + + for (size_t i = 1; i < gold.size(); ++i) { + helper.SendReadAck("table-1", readResult->Record, 1, 10000); + readResult = helper.WaitReadResult(); + UNIT_ASSERT(readResult); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + gold[i] + }); + } + } + Y_UNIT_TEST(ShouldRangeReadReverseLeftInclusive) { TTestHelper helper; |