diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 11:43:00 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 11:43:00 +0300 |
commit | f024fe34b8d6b7bbdb6f4361fadd0b5b59b98b9f (patch) | |
tree | a4c058081e822d69596924c942df92337dece159 | |
parent | 565cb58f95c7dfbc05d2a6d3a0c16222141026a1 (diff) | |
download | ydb-f024fe34b8d6b7bbdb6f4361fadd0b5b59b98b9f.tar.gz |
fix read iterator bug, when read some rows and got pagefault
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 158 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 18 |
3 files changed, 173 insertions, 26 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index c4297970d0c..95d8cf4c458 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -284,7 +284,6 @@ public: keyFromCells = range.From; } const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive); - const TSerializedCellVec keyToCells(range.To); const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !range.ToInclusive); @@ -300,10 +299,10 @@ public: EReadStatus result; if (!reverse) { auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); - result = Iterate(iter.Get(), true, ctx); + result = IterateRange(iter.Get(), ctx); } else { auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); - result = Iterate(iter.Get(), true, ctx); + result = IterateRange(iter.Get(), ctx); } if (result == EReadStatus::NeedData) { @@ -385,6 +384,8 @@ public: case EReadStatus::StoppedByLimit: return true; case EReadStatus::NeedData: + if (RowsRead) + return true; return false; } } @@ -405,6 +406,8 @@ public: case EReadStatus::StoppedByLimit: return true; case EReadStatus::NeedData: + if (RowsRead) + return true; return false; } } @@ -555,14 +558,12 @@ private: } template <typename TIterator> - EReadStatus Iterate(TIterator* iter, bool isRange, const TActorContext& ctx) { + EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx) { Y_UNUSED(ctx); while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) { TDbTupleRef rowKey = iter->GetKey(); - if (isRange) { - LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); - } + LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); TDbTupleRef rowValues = iter->GetValues(); @@ -584,12 +585,12 @@ private: // TODO: consider restart when Page and too few data read // (how much is too few, less than user's limit?) - if (iter->Last() == NTable::EReady::Page && RowsRead == 0) { + if (iter->Last() == NTable::EReady::Page) { return EReadStatus::NeedData; } // range fully read, no reason to keep LastProcessedKey - if (isRange && iter->Last() == NTable::EReady::Gone) + if (iter->Last() == NTable::EReady::Gone) LastProcessedKey.clear(); return EReadStatus::Done; @@ -1803,6 +1804,10 @@ public: } } + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId + << " sends rowCount# " << Reader->GetRowsRead() << ", hasUnreadQueries# " << Reader->HasUnreadQueries() + << ", firstUnprocessed# " << state.FirstUnprocessedQuery); + Reader->FillResult(*Result); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index e95b4accb30..19de40a2b51 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -303,14 +303,14 @@ struct TTestHelper { auto &runtime = *Server->GetRuntime(); Sender = runtime.AllocateEdgeActor(); - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO); InitRoot(Server, Sender); - auto& table1 = Tables["table-1"]; - table1.Name = "table-1"; { + auto& table1 = Tables["table-1"]; + table1.Name = "table-1"; CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount); ExecSQL(Server, Sender, R"( UPSERT INTO `/Root/table-1` @@ -336,9 +336,9 @@ struct TTestHelper { table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig()); } - auto& table2 = Tables["movies"]; - table2.Name = "movies"; { + auto& table2 = Tables["movies"]; + table2.Name = "movies"; CreateMoviesTable(Server, Sender, "/Root", "movies"); ExecSQL(Server, Sender, R"( UPSERT INTO `/Root/movies` @@ -358,6 +358,83 @@ struct TTestHelper { table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig()); } + + { + auto& table3 = Tables["table-1-many"]; + table3.Name = "table-1-many"; + CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount); + + auto shards = GetTableShards(Server, Sender, "/Root/table-1-many"); + table3.TabletId = shards.at(0); + + auto [tables, ownerId] = GetTables(Server, table3.TabletId); + table3.OwnerId = ownerId; + table3.UserTable = tables["table-1-many"]; + + table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig()); + } + } + + void UpsertMany(ui32 startRow, ui32 rowCount) { + auto &runtime = *Server->GetRuntime(); + const auto& table = Tables["table-1-many"]; + auto endRow = startRow + rowCount; + + for (ui32 key = startRow; key < endRow;) { + auto request = std::make_unique<TEvDataShard::TEvUploadRowsRequest>(); + auto& record = request->Record; + record.SetTableId(table.UserTable.GetPathId()); + + auto& rowScheme = *record.MutableRowScheme(); + + const auto& description = table.UserTable.GetDescription(); + std::set<ui32> keyColumns( + description.GetKeyColumnIds().begin(), + description.GetKeyColumnIds().end()); + + for (const auto& column: description.GetColumns()) { + if (keyColumns.contains(column.GetId())) + continue; + rowScheme.AddValueColumnIds(column.GetId()); + } + + for (auto column: keyColumns) { + rowScheme.AddKeyColumnIds(column); + } + + for (size_t i = 0; i < 1000 && key < endRow; ++i) { + TVector<TCell> keys; + keys.reserve(keyColumns.size()); + for (size_t i = 0; i < keyColumns.size(); ++i) { + keys.emplace_back(TCell::Make(key)); + } + + TVector<TCell> values; + for (size_t i = 0; i < description.ColumnsSize() - keyColumns.size(); ++i) { + values.emplace_back(TCell::Make(key)); // key intentionally as value + } + + auto& row = *record.AddRows(); + row.SetKeyColumns(TSerializedCellVec::Serialize(keys)); + row.SetValueColumns(TSerializedCellVec::Serialize(values)); + + ++key; + } + + runtime.SendToPipe( + table.TabletId, + Sender, + request.release(), + 0, + GetTestPipeConfig(), + table.ClientId); + + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<TEvDataShard::TEvUploadRowsResponse>(handle); + UNIT_ASSERT(handle); + auto event = handle->Release<TEvDataShard::TEvUploadRowsResponse>(); + UNIT_ASSERT(event->Record.GetStatus() == 0); + } } void SplitTable1() { @@ -383,10 +460,6 @@ struct TTestHelper { record.MutableTableId()->SetTableId(table.UserTable.GetPathId()); const auto& description = table.UserTable.GetDescription(); - std::vector<ui32> keyColumns( - description.GetKeyColumnIds().begin(), - description.GetKeyColumnIds().end()); - for (const auto& column: description.GetColumns()) { record.AddColumns(column.GetId()); } @@ -557,6 +630,36 @@ struct TTestHelper { || newLock->GetGeneration() != prevLock->GetGeneration()); } + void TestChunkRead(ui32 chunkSize, ui32 rowCount) { + UpsertMany(1, rowCount); + + auto request = GetBaseReadRequest("table-1-many", 1, NKikimrTxDataShard::CELLVEC, TRowVersion::Max()); + request->Record.ClearSnapshot(); + AddRangeQuery<ui32>( + *request, + {1, 1, 1}, + true, + {rowCount + 1, 1, 1}, + true + ); + + request->Record.SetMaxRowsInResult(chunkSize); + + auto readResult = SendRead("table-1-many", request.release()); + UNIT_ASSERT(readResult); + + ui32 rowsRead = readResult->GetRowsCount(); + UNIT_ASSERT(rowsRead > 0); + + while (!readResult->Record.GetFinished()) { + readResult = WaitReadResult(); + UNIT_ASSERT(readResult); + rowsRead += readResult->GetRowsCount(); + } + + UNIT_ASSERT_VALUES_EQUAL(rowsRead, rowCount); + } + struct THangedReturn { ui64 LastPlanStep = 0; TVector<THolder<IEventHandle>> ReadSets; @@ -1282,6 +1385,41 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { // TODO: check no continuation token } + Y_UNIT_TEST(ShouldReadRangeChunk1_100) { + TTestHelper helper; + helper.TestChunkRead(1, 100); + } + + Y_UNIT_TEST(ShouldReadRangeChunk1) { + TTestHelper helper; + helper.TestChunkRead(1, 1000); + } + + Y_UNIT_TEST(ShouldReadRangeChunk2) { + TTestHelper helper; + helper.TestChunkRead(2, 1000); + } + + Y_UNIT_TEST(ShouldReadRangeChunk3) { + TTestHelper helper; + helper.TestChunkRead(3, 1000); + } + + Y_UNIT_TEST(ShouldReadRangeChunk5) { + TTestHelper helper; + helper.TestChunkRead(5, 1000); + } + + Y_UNIT_TEST(ShouldReadRangeChunk7) { + TTestHelper helper; + helper.TestChunkRead(7, 1000); + } + + Y_UNIT_TEST(ShouldReadRangeChunk100) { + TTestHelper helper; + helper.TestChunkRead(99, 10000); + } + Y_UNIT_TEST(ShouldReadKeyPrefix1) { TTestHelper helper; diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index 27e95c72836..2881e0b30ff 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -91,9 +91,8 @@ struct TTestHelper { auto &runtime = *Server->GetRuntime(); Sender = runtime.AllocateEdgeActor(); - runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::DS_LOAD_TEST, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); InitRoot(Server, Sender); @@ -284,21 +283,26 @@ Y_UNIT_TEST_SUITE(ReadLoad) { Y_UNIT_TEST(ShouldReadIterate) { TTestHelper helper; - const ui64 expectedRowCount = 20; + const ui64 expectedRowCount = 1000; std::unique_ptr<TEvDataShardLoad::TEvTestLoadRequest> request(new TEvDataShardLoad::TEvTestLoadRequest()); auto& record = request->Record; auto& command = *record.MutableReadIteratorStart(); + command.AddChunks(0); + command.AddChunks(1); + command.AddChunks(10); + + command.AddInflights(1); + command.SetRowCount(expectedRowCount); command.SetPath("/Root/usertable"); auto result = helper.RunTestLoad(std::move(request)); UNIT_ASSERT(result->Report); - // fullscans with different chunks: 5 - // read head with inflight 1 - UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 12); + UNIT_ASSERT_VALUES_EQUAL(result->Report->SubtestCount, 4); + UNIT_ASSERT_VALUES_EQUAL(result->Report->OperationsOK, (4 * expectedRowCount)); // sanity check that there was data in table helper.CheckKeysCount(expectedRowCount); |