aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-28 21:43:27 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-28 21:43:27 +0300
commitf090faf2c306236557f75a72164d0e7803f166e1 (patch)
treebd952b3131d92dfd17b024437732db59eb2101fd
parent636f4902fbb658f49836b5ac894884d0856d475e (diff)
downloadydb-f090faf2c306236557f75a72164d0e7803f166e1.tar.gz
properly set from and to borders when continue reading reverse range
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp29
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp197
2 files changed, 220 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 807528e453..36e3d2c282 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -315,17 +315,34 @@ public:
const TSerializedTableRange& range)
{
bool fromInclusive;
+ bool toInclusive;
TSerializedCellVec keyFromCells;
+ TSerializedCellVec keyToCells;
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
- fromInclusive = false;
- keyFromCells = TSerializedCellVec(State.LastProcessedKey);
+ if (!State.Reverse) {
+ keyFromCells = TSerializedCellVec(State.LastProcessedKey);
+ fromInclusive = false;
+
+ keyToCells = range.To;
+ toInclusive = range.ToInclusive;
+ } else {
+ // reverse
+ keyFromCells = range.From;
+ fromInclusive = true;
+
+ keyToCells = TSerializedCellVec(State.LastProcessedKey);
+ toInclusive = false;
+ }
} else {
- fromInclusive = range.FromInclusive;
keyFromCells = range.From;
+ fromInclusive = range.FromInclusive;
+
+ keyToCells = range.To;
+ toInclusive = range.ToInclusive;
}
+
const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive);
- const TSerializedCellVec keyToCells(range.To);
- const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !range.ToInclusive);
+ const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !toInclusive);
// TODO: split range into parts like in read_columns
@@ -333,7 +350,7 @@ public:
iterRange.MinKey = keyFrom;
iterRange.MaxKey = keyTo;
iterRange.MinInclusive = fromInclusive;
- iterRange.MaxInclusive = range.ToInclusive;
+ iterRange.MaxInclusive = toInclusive;
bool reverse = State.Reverse;
EReadStatus result;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 208e339307..79a1d6730f 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -1240,6 +1240,203 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 4);
}
+ Y_UNIT_TEST(ShouldRangeReadReverseLeftInclusive) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ request1->Record.SetReverse(true);
+ AddRangeQuery<ui32>(
+ *request1,
+ {8, 0, 0},
+ true,
+ {11, 11, 11},
+ true
+ );
+
+ // limit quota (enough to read all rows)
+ request1->Record.SetMaxRows(8);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(readResult1);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->GetRowsCount(), 5);
+ UNIT_ASSERT(readResult1->Record.GetFinished());
+
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {11, 11, 11, 1111},
+ {8, 1, 1, 803},
+ {8, 1, 0, 802},
+ {8, 0, 1, 801},
+ {8, 0, 0, 800}
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+ }
+
+ Y_UNIT_TEST(ShouldRangeReadReverseLeftNonInclusive) {
+ // Regression test for KIKIMR-17253
+ // Version with no ACK: only reverse and left not inclusive like in ReadContinue
+
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ request1->Record.SetReverse(true);
+ AddRangeQuery<ui32>(
+ *request1,
+ {8, 0, 0},
+ false,
+ {11, 11, 11},
+ true
+ );
+
+ // limit quota (enough to read all rows)
+ request1->Record.SetMaxRows(8);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(readResult1);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->GetRowsCount(), 4);
+ UNIT_ASSERT(readResult1->Record.GetFinished());
+
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {11, 11, 11, 1111},
+ {8, 1, 1, 803},
+ {8, 1, 0, 802},
+ {8, 0, 1, 801},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+ }
+
+ Y_UNIT_TEST(ShouldHandleReadAckWhenExhaustedRangeRead) {
+ // Regression test for KIKIMR-17253
+
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddRangeQuery<ui32>(
+ *request1,
+ {1, 1, 1},
+ true,
+ {11, 11, 11},
+ true
+ );
+
+ // limit quota
+ request1->Record.SetMaxRows(5);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(readResult1);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->GetRowsCount(), 5);
+ UNIT_ASSERT(!readResult1->Record.GetFinished());
+
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {1, 1, 1, 100},
+ {3, 3, 3, 300},
+ {5, 5, 5, 500},
+ {8, 0, 0, 800},
+ {8, 0, 1, 801},
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+
+ helper.SendReadAck("table-1", readResult1->Record, 8, 10000);
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->GetRowsCount(), 3);
+ UNIT_ASSERT(readResult2->Record.GetFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
+ {8, 1, 0, 802},
+ {8, 1, 1, 803},
+ {11, 11, 11, 1111}
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 1);
+ }
+
+ Y_UNIT_TEST(ShouldHandleReadAckWhenExhaustedRangeReadReverse) {
+ // Regression test for KIKIMR-17253
+
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ request1->Record.SetReverse(true);
+ AddRangeQuery<ui32>(
+ *request1,
+ {1, 1, 1},
+ true,
+ {11, 11, 11},
+ true
+ );
+
+ // limit quota
+ request1->Record.SetMaxRows(5);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(readResult1);
+ UNIT_ASSERT_VALUES_EQUAL(readResult1->GetRowsCount(), 5);
+ UNIT_ASSERT(!readResult1->Record.GetFinished());
+
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {
+ {11, 11, 11, 1111},
+ {8, 1, 1, 803},
+ {8, 1, 0, 802},
+ {8, 0, 1, 801},
+ {8, 0, 0, 800}
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+
+ helper.SendReadAck("table-1", readResult1->Record, 8, 10000);
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->GetRowsCount(), 3);
+ UNIT_ASSERT(readResult2->Record.GetFinished());
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
+ {5, 5, 5, 500},
+ {3, 3, 3, 300},
+ {1, 1, 1, 100}
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 1);
+ }
+
Y_UNIT_TEST(ShouldNotReadAfterCancel) {
TTestHelper helper;