aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-28 11:43:00 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-28 11:43:00 +0300
commitf024fe34b8d6b7bbdb6f4361fadd0b5b59b98b9f (patch)
treea4c058081e822d69596924c942df92337dece159
parent565cb58f95c7dfbc05d2a6d3a0c16222141026a1 (diff)
downloadydb-f024fe34b8d6b7bbdb6f4361fadd0b5b59b98b9f.tar.gz
fix read iterator bug, when read some rows and got pagefault
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp158
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp18
3 files changed, 173 insertions, 26 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index c4297970d0c..95d8cf4c458 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -284,7 +284,6 @@ public:
keyFromCells = range.From;
}
const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive);
-
const TSerializedCellVec keyToCells(range.To);
const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !range.ToInclusive);
@@ -300,10 +299,10 @@ public:
EReadStatus result;
if (!reverse) {
auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
- result = Iterate(iter.Get(), true, ctx);
+ result = IterateRange(iter.Get(), ctx);
} else {
auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
- result = Iterate(iter.Get(), true, ctx);
+ result = IterateRange(iter.Get(), ctx);
}
if (result == EReadStatus::NeedData) {
@@ -385,6 +384,8 @@ public:
case EReadStatus::StoppedByLimit:
return true;
case EReadStatus::NeedData:
+ if (RowsRead)
+ return true;
return false;
}
}
@@ -405,6 +406,8 @@ public:
case EReadStatus::StoppedByLimit:
return true;
case EReadStatus::NeedData:
+ if (RowsRead)
+ return true;
return false;
}
}
@@ -555,14 +558,12 @@ private:
}
template <typename TIterator>
- EReadStatus Iterate(TIterator* iter, bool isRange, const TActorContext& ctx) {
+ EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx) {
Y_UNUSED(ctx);
while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) {
TDbTupleRef rowKey = iter->GetKey();
- if (isRange) {
- LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
- }
+ LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
TDbTupleRef rowValues = iter->GetValues();
@@ -584,12 +585,12 @@ private:
// TODO: consider restart when Page and too few data read
// (how much is too few, less than user's limit?)
- if (iter->Last() == NTable::EReady::Page && RowsRead == 0) {
+ if (iter->Last() == NTable::EReady::Page) {
return EReadStatus::NeedData;
}
// range fully read, no reason to keep LastProcessedKey
- if (isRange && iter->Last() == NTable::EReady::Gone)
+ if (iter->Last() == NTable::EReady::Gone)
LastProcessedKey.clear();
return EReadStatus::Done;
@@ -1803,6 +1804,10 @@ public:
}
}
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId
+ << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries()
+ << ", firstUnprocessed# " << state.FirstUnprocessedQuery);
+
Reader->FillResult(*Result);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index e95b4accb30..19de40a2b51 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -303,14 +303,14 @@ struct TTestHelper {
auto &runtime = *Server->GetRuntime();
Sender = runtime.AllocateEdgeActor();
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
InitRoot(Server, Sender);
- auto& table1 = Tables["table-1"];
- table1.Name = "table-1";
{
+ auto& table1 = Tables["table-1"];
+ table1.Name = "table-1";
CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/table-1`
@@ -336,9 +336,9 @@ struct TTestHelper {
table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
}
- auto& table2 = Tables["movies"];
- table2.Name = "movies";
{
+ auto& table2 = Tables["movies"];
+ table2.Name = "movies";
CreateMoviesTable(Server, Sender, "/Root", "movies");
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/movies`
@@ -358,6 +358,83 @@ struct TTestHelper {
table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
}
+
+ {
+ auto& table3 = Tables["table-1-many"];
+ table3.Name = "table-1-many";
+ CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
+
+ auto shards = GetTableShards(Server, Sender, "/Root/table-1-many");
+ table3.TabletId = shards.at(0);
+
+ auto [tables, ownerId] = GetTables(Server, table3.TabletId);
+ table3.OwnerId = ownerId;
+ table3.UserTable = tables["table-1-many"];
+
+ table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());
+ }
+ }
+
+ void UpsertMany(ui32 startRow, ui32 rowCount) {
+ auto &runtime = *Server->GetRuntime();
+ const auto& table = Tables["table-1-many"];
+ auto endRow = startRow + rowCount;
+
+ for (ui32 key = startRow; key < endRow;) {
+ auto request = std::make_unique<TEvDataShard::TEvUploadRowsRequest>();
+ auto& record = request->Record;
+ record.SetTableId(table.UserTable.GetPathId());
+
+ auto& rowScheme = *record.MutableRowScheme();
+
+ const auto& description = table.UserTable.GetDescription();
+ std::set<ui32> keyColumns(
+ description.GetKeyColumnIds().begin(),
+ description.GetKeyColumnIds().end());
+
+ for (const auto& column: description.GetColumns()) {
+ if (keyColumns.contains(column.GetId()))
+ continue;
+ rowScheme.AddValueColumnIds(column.GetId());
+ }
+
+ for (auto column: keyColumns) {
+ rowScheme.AddKeyColumnIds(column);
+ }
+
+ for (size_t i = 0; i < 1000 && key < endRow; ++i) {
+ TVector<TCell> keys;
+ keys.reserve(keyColumns.size());
+ for (size_t i = 0; i < keyColumns.size(); ++i) {
+ keys.emplace_back(TCell::Make(key));
+ }
+
+ TVector<TCell> values;
+ for (size_t i = 0; i < description.ColumnsSize() - keyColumns.size(); ++i) {
+ values.emplace_back(TCell::Make(key)); // key intentionally as value
+ }
+
+ auto& row = *record.AddRows();
+ row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
+ row.SetValueColumns(TSerializedCellVec::Serialize(values));
+
+ ++key;
+ }
+
+ runtime.SendToPipe(
+ table.TabletId,
+ Sender,
+ request.release(),
+ 0,
+ GetTestPipeConfig(),
+ table.ClientId);
+
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<TEvDataShard::TEvUploadRowsResponse>(handle);
+ UNIT_ASSERT(handle);
+ auto event = handle->Release<TEvDataShard::TEvUploadRowsResponse>();
+ UNIT_ASSERT(event->Record.GetStatus() == 0);
+ }
}
void SplitTable1() {
@@ -383,10 +460,6 @@ struct TTestHelper {
record.MutableTableId()->SetTableId(table.UserTable.GetPathId());
const auto& description = table.UserTable.GetDescription();
- std::vector<ui32> keyColumns(
- description.GetKeyColumnIds().begin(),
- description.GetKeyColumnIds().end());
-
for (const auto& column: description.GetColumns()) {
record.AddColumns(column.GetId());
}
@@ -557,6 +630,36 @@ struct TTestHelper {
|| newLock->GetGeneration() != prevLock->GetGeneration());
}
+ void TestChunkRead(ui32 chunkSize, ui32 rowCount) {
+ UpsertMany(1, rowCount);
+
+ auto request = GetBaseReadRequest("table-1-many", 1, NKikimrTxDataShard::CELLVEC, TRowVersion::Max());
+ request->Record.ClearSnapshot();
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 1, 1},
+ true,
+ {rowCount + 1, 1, 1},
+ true
+ );
+
+ request->Record.SetMaxRowsInResult(chunkSize);
+
+ auto readResult = SendRead("table-1-many", request.release());
+ UNIT_ASSERT(readResult);
+
+ ui32 rowsRead = readResult->GetRowsCount();
+ UNIT_ASSERT(rowsRead > 0);
+
+ while (!readResult->Record.GetFinished()) {
+ readResult = WaitReadResult();
+ UNIT_ASSERT(readResult);
+ rowsRead += readResult->GetRowsCount();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(rowsRead, rowCount);
+ }
+
struct THangedReturn {
ui64 LastPlanStep = 0;
TVector<THolder<IEventHandle>> ReadSets;
@@ -1282,6 +1385,41 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
// TODO: check no continuation token
}
+ Y_UNIT_TEST(ShouldReadRangeChunk1_100) {
+ TTestHelper helper;
+ helper.TestChunkRead(1, 100);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk1) {
+ TTestHelper helper;
+ helper.TestChunkRead(1, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk2) {
+ TTestHelper helper;
+ helper.TestChunkRead(2, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk3) {
+ TTestHelper helper;
+ helper.TestChunkRead(3, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk5) {
+ TTestHelper helper;
+ helper.TestChunkRead(5, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk7) {
+ TTestHelper helper;
+ helper.TestChunkRead(7, 1000);
+ }
+
+ Y_UNIT_TEST(ShouldReadRangeChunk100) {
+ TTestHelper helper;
+ helper.TestChunkRead(99, 10000);
+ }
+
Y_UNIT_TEST(ShouldReadKeyPrefix1) {
TTestHelper helper;
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index 27e95c72836..2881e0b30ff 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -91,9 +91,8 @@ struct TTestHelper {
auto &runtime = *Server->GetRuntime();
Sender = runtime.AllocateEdgeActor();
- runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_INFO);
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
InitRoot(Server, Sender);
@@ -284,21 +283,26 @@ Y_UNIT_TEST_SUITE(ReadLoad) {
Y_UNIT_TEST(ShouldReadIterate) {
TTestHelper helper;
- const ui64 expectedRowCount = 20;
+ const ui64 expectedRowCount = 1000;
std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest());
auto& record = request->Record;
auto& command = *record.MutableReadIteratorStart();
+ command.AddChunks(0);
+ command.AddChunks(1);
+ command.AddChunks(10);
+
+ command.AddInflights(1);
+
command.SetRowCount(expectedRowCount);
command.SetPath("/Root/usertable");
auto result = helper.RunTestLoad(std::move(request));
UNIT_ASSERT(result->Report);
- // fullscans with different chunks: 5
- // read head with inflight 1
- UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 12);
+ UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 4);
+ UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount));
// sanity check that there was data in table
helper.CheckKeysCount(expectedRowCount);