aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-03-06 22:00:38 +0300
committereivanov89 <eivanov89@ydb.tech>2023-03-06 22:00:38 +0300
commitfccb745feb0ac047fb767a1d9829c7b48d8d8f48 (patch)
tree46234a839588a5cb5e4fecad730b71d951d0e8d9
parent4332e68ad3f50d60ba750aa49348d9c0f757a4a1 (diff)
downloadydb-fccb745feb0ac047fb767a1d9829c7b48d8d8f48.tar.gz
reverse queries itself, not just ranges of each query
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp29
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp153
2 files changed, 176 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 36e3d2c282..48fd4108e9 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -264,7 +264,7 @@ class TReader {
std::vector<NScheme::TTypeInfo> ColumnTypes;
- ui32 FirstUnprocessedQuery;
+ ui32 FirstUnprocessedQuery; // must be unsigned
TString LastProcessedKey;
ui64 RowsRead = 0;
@@ -351,7 +351,7 @@ public:
iterRange.MaxKey = keyTo;
iterRange.MinInclusive = fromInclusive;
iterRange.MaxInclusive = toInclusive;
- bool reverse = State.Reverse;
+ const bool reverse = State.Reverse;
EReadStatus result;
if (!reverse) {
@@ -432,7 +432,9 @@ public:
// TODO: merge ReadRanges and ReadKeys to single template Read?
bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) {
- for (; FirstUnprocessedQuery < State.Request->Ranges.size(); ++FirstUnprocessedQuery) {
+ // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
+ // then it will also become less than size() when finished
+ while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
if (ShouldStop())
return true;
@@ -440,7 +442,7 @@ public:
auto status = ReadRange(txc, ctx, range);
switch (status) {
case EReadStatus::Done:
- continue;
+ break;
case EReadStatus::StoppedByLimit:
return true;
case EReadStatus::NeedData:
@@ -448,13 +450,20 @@ public:
return true;
return false;
}
+
+ if (!State.Reverse)
+ FirstUnprocessedQuery++;
+ else
+ FirstUnprocessedQuery--;
}
return true;
}
bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) {
- for (; FirstUnprocessedQuery < State.Request->Keys.size(); ++FirstUnprocessedQuery) {
+ // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
+ // then it will also become less than size() when finished
+ while (FirstUnprocessedQuery < State.Request->Keys.size()) {
if (ShouldStop())
return true;
@@ -462,7 +471,7 @@ public:
auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery);
switch (status) {
case EReadStatus::Done:
- continue;
+ break;
case EReadStatus::StoppedByLimit:
return true;
case EReadStatus::NeedData:
@@ -470,6 +479,11 @@ public:
return true;
return false;
}
+
+ if (!State.Reverse)
+ FirstUnprocessedQuery++;
+ else
+ FirstUnprocessedQuery--;
}
return true;
@@ -1177,6 +1191,9 @@ public:
}
state.Reverse = record.GetReverse();
+ if (state.Reverse) {
+ state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1;
+ }
if (state.PathId.OwnerId != Self->TabletID()) {
// owner is schemeshard, read user table
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 79a1d6730f..4cd0e9209f 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -1072,6 +1072,23 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
}
+ Y_UNIT_TEST(ShouldReverseReadMultipleKeys) {
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request, {3, 3, 3});
+ AddKeyQuery(*request, {1, 1, 1});
+ AddKeyQuery(*request, {5, 5, 5});
+ request->Record.SetReverse(true);
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {5, 5, 5, 500},
+ {1, 1, 1, 100},
+ {3, 3, 3, 300},
+ });
+ }
+
Y_UNIT_TEST(ShouldReadMultipleKeysOneByOne) {
TTestHelper helper;
@@ -1130,6 +1147,65 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
// TODO: check continuation token
}
+ Y_UNIT_TEST(ShouldReverseReadMultipleKeysOneByOne) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+ AddKeyQuery(*request1, {5, 5, 5});
+ request1->Record.SetMaxRowsInResult(1);
+ request1->Record.SetReverse(true);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {5, 5, 5, 500}
+ });
+
+ const auto& record1 = readResult1->Record;
+ UNIT_ASSERT(!record1.GetLimitReached());
+ UNIT_ASSERT(record1.HasSeqNo());
+ //UNIT_ASSERT(!record1.HasFinished());
+ UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL);
+ // TODO: check continuation token
+
+ auto readResult2 = helper.WaitReadResult();
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
+ {1, 1, 1, 100}
+ });
+
+ const auto& record2 = readResult2->Record;
+ UNIT_ASSERT(!record2.GetLimitReached());
+ UNIT_ASSERT(!record2.HasFinished());
+ UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
+ // TODO: check continuation token
+
+ auto readResult3 = helper.WaitReadResult();
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult3, {
+ {3, 3, 3, 300}
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2);
+
+ const auto& record3 = readResult3->Record;
+ UNIT_ASSERT(!record3.GetLimitReached());
+ UNIT_ASSERT(record3.HasFinished());
+ UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
+ // TODO: check continuation token
+ }
+
Y_UNIT_TEST(ShouldHandleReadAck) {
TTestHelper helper;
@@ -1240,6 +1316,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 4);
}
+ Y_UNIT_TEST(ShouldReverseReadMultipleRanges) {
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1);
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 0, 0},
+ true,
+ {5, 5, 5},
+ true
+ );
+ AddRangeQuery<ui32>(
+ *request,
+ {8, 1, 1},
+ true,
+ {11, 11, 11},
+ true
+ );
+
+ request->Record.SetReverse(true);
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ {11, 11, 11, 1111},
+ {8, 1, 1, 803},
+ {5, 5, 5, 500},
+ {3, 3, 3, 300},
+ {1, 1, 1, 100},
+ });
+ }
+
+ Y_UNIT_TEST(ShouldReverseReadMultipleRangesOneByOneWithAcks) {
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1);
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 0, 0},
+ true,
+ {5, 5, 5},
+ true
+ );
+ AddRangeQuery<ui32>(
+ *request,
+ {8, 1, 1},
+ true,
+ {11, 11, 11},
+ true
+ );
+
+ request->Record.SetReverse(true);
+ request->Record.SetMaxRows(1);
+
+ std::vector<std::vector<ui32>> gold = {
+ {11, 11, 11, 1111},
+ {8, 1, 1, 803},
+ {5, 5, 5, 500},
+ {3, 3, 3, 300},
+ {1, 1, 1, 100},
+ };
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ UNIT_ASSERT(readResult);
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ gold[0]
+ });
+
+ for (size_t i = 1; i < gold.size(); ++i) {
+ helper.SendReadAck("table-1", readResult->Record, 1, 10000);
+ readResult = helper.WaitReadResult();
+ UNIT_ASSERT(readResult);
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ gold[i]
+ });
+ }
+ }
+
Y_UNIT_TEST(ShouldRangeReadReverseLeftInclusive) {
TTestHelper helper;