diff options
| author | Aleksei Borzenkov <[email protected]> | 2024-09-02 16:08:49 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-09-02 16:08:49 +0300 |
| commit | 37c91d73971af00b2d673427307934cfd3d9117d (patch) | |
| tree | dda306e3eadb7d1f6034de311b10ddedebfe83a2 | |
| parent | b9b486d4cf1f2fcb336100094e3113520bf8809e (diff) | |
Fix external reference precharging in datashard read iterators (#8592)
| -rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 184 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp | 169 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 2 |
3 files changed, 226 insertions, 129 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index b0dadde686c..b056c481688 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -220,15 +220,15 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder( } std::vector<TRawTypeValue> ToRawTypeValue( - const TSerializedCellVec& keyCells, + TArrayRef<const TCell> keyCells, const TShortTableInfo& tableInfo, bool addNulls) { std::vector<TRawTypeValue> result; - result.reserve(keyCells.GetCells().size()); + result.reserve(keyCells.size()); - for (ui32 i = 0; i < keyCells.GetCells().size(); ++i) { - result.push_back(TRawTypeValue(keyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i])); + for (ui32 i = 0; i < keyCells.size(); ++i) { + result.push_back(TRawTypeValue(keyCells[i].AsRef(), tableInfo.KeyColumnTypes[i])); } // note that currently without nulls it is [prefix, +inf, +inf], @@ -275,7 +275,7 @@ class TReader { ui32 FirstUnprocessedQuery; // must be unsigned TString LastProcessedKey; - bool LastProcessedKeyErasedOrMissing = false; + bool LastProcessedKeyErased = false; ui64 RowsRead = 0; ui64 RowsProcessed = 0; @@ -298,9 +298,9 @@ class TReader { bool VolatileWaitForCommit = false; enum class EReadStatus { - Done = 0, + Done, NeedData, - StoppedByLimit, + NeedContinue, }; public: @@ -317,7 +317,7 @@ public: , TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion) , FirstUnprocessedQuery(State.FirstUnprocessedQuery) , LastProcessedKey(State.LastProcessedKey) - , LastProcessedKeyErasedOrMissing(State.LastProcessedKeyErasedOrMissing) + , LastProcessedKeyErased(State.LastProcessedKeyErased) { GetTimeFast(&StartTime); EndTime = StartTime; @@ -325,7 +325,6 @@ public: EReadStatus ReadRange( TTransactionContext& txc, - const TActorContext& ctx, const TSerializedTableRange& range) { bool fromInclusive; @@ -335,7 +334,7 @@ public: if (LastProcessedKey) { if (!State.Reverse) { keyFromCells = TSerializedCellVec(LastProcessedKey); - fromInclusive = LastProcessedKeyErasedOrMissing; + fromInclusive = LastProcessedKeyErased; keyToCells = range.To; toInclusive = range.ToInclusive; @@ -345,7 +344,7 @@ public: fromInclusive = range.FromInclusive; keyToCells = TSerializedCellVec(LastProcessedKey); - toInclusive = LastProcessedKeyErasedOrMissing; + toInclusive = LastProcessedKeyErased; } } else { keyFromCells = range.From; @@ -355,8 +354,8 @@ public: toInclusive = range.ToInclusive; } - const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive); - const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !toInclusive); + const auto keyFrom = ToRawTypeValue(keyFromCells.GetCells(), TableInfo, fromInclusive); + const auto keyTo = ToRawTypeValue(keyToCells.GetCells(), TableInfo, !toInclusive); // TODO: split range into parts like in read_columns @@ -367,7 +366,7 @@ public: iterRange.MaxInclusive = toInclusive; const bool reverse = State.Reverse; - if (TArrayRef<const TCell> cells = keyFromCells.GetCells()) { + if (TArrayRef<const TCell> cells = (reverse ? keyToCells.GetCells() : keyFromCells.GetCells())) { if (!fromInclusive || cells.size() >= TableInfo.KeyColumnTypes.size()) { Self->GetKeyAccessSampler()->AddSample(TableId, cells); } else { @@ -383,31 +382,19 @@ public: if (!reverse) { auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); - result = IterateRange(iter.Get(), ctx, txc.Env); + result = IterateRange(iter.Get(), iterRange, txc); } else { auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); - result = IterateRange(iter.Get(), ctx, txc.Env); + result = IterateRange(iter.Get(), iterRange, txc); } txc.Env.DisableReadMissingReferences(); - if (result == EReadStatus::NeedData && !(RowsProcessed && CanResume())) { - if (LastProcessedKey) { - keyFromCells = TSerializedCellVec(LastProcessedKey); - const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, false); - Precharge(txc.DB, keyFrom, iterRange.MaxKey, reverse); - } else { - Precharge(txc.DB, iterRange.MinKey, iterRange.MaxKey, reverse); - } - return EReadStatus::NeedData; - } - return result; } EReadStatus ReadKey( TTransactionContext& txc, - const TActorContext& ctx, const TSerializedCellVec& keyCells, size_t keyIndex) { @@ -418,7 +405,7 @@ public: range.To = keyCells; range.ToInclusive = true; range.FromInclusive = true; - return ReadRange(txc, ctx, range); + return ReadRange(txc, range); } if (ColumnTypes.empty()) { @@ -429,13 +416,16 @@ public: } } - const auto key = ToRawTypeValue(keyCells, TableInfo, true); + const auto key = ToRawTypeValue(keyCells.GetCells(), TableInfo, true); NTable::TRowState rowState; rowState.Init(State.Columns.size()); NTable::TSelectStats stats; auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); if (ready == NTable::EReady::Page) { + if (RowsProcessed && CanResume()) { + return EReadStatus::NeedContinue; + } return EReadStatus::NeedData; } @@ -467,11 +457,11 @@ public: { if (keyCells.GetCells().size() != TableInfo.KeyColumnCount) { // key prefix, treat it as range [prefix, null, null] - [prefix, +inf, +inf] - auto minKey = ToRawTypeValue(keyCells, TableInfo, true); - auto maxKey = ToRawTypeValue(keyCells, TableInfo, false); + auto minKey = ToRawTypeValue(keyCells.GetCells(), TableInfo, true); + auto maxKey = ToRawTypeValue(keyCells.GetCells(), TableInfo, false); return Precharge(txc.DB, minKey, maxKey, State.Reverse); } else { - auto key = ToRawTypeValue(keyCells, TableInfo, true); + auto key = ToRawTypeValue(keyCells.GetCells(), TableInfo, true); return Precharge(txc.DB, key, key, State.Reverse); } } @@ -502,7 +492,7 @@ public: // TODO: merge ReadRanges and ReadKeys to single template Read? - bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) { + bool ReadRanges(TTransactionContext& txc) { // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, // then it will also become less than size() when finished while (FirstUnprocessedQuery < State.Request->Ranges.size()) { @@ -516,19 +506,16 @@ public: return true; const auto& range = State.Request->Ranges[FirstUnprocessedQuery]; - auto status = ReadRange(txc, ctx, range); + auto status = ReadRange(txc, range); switch (status) { case EReadStatus::Done: break; - case EReadStatus::StoppedByLimit: - return true; case EReadStatus::NeedData: - if (RowsProcessed && CanResume()) - return true; - // Note: ReadRange has already precharged current range and // we don't precharge multiple ranges as opposed to keys return false; + case EReadStatus::NeedContinue: + return true; } if (!State.Reverse) @@ -541,7 +528,7 @@ public: return true; } - bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) { + bool ReadKeys(TTransactionContext& txc) { // note that FirstUnprocessedQuery is unsigned and if we do reverse iteration, // then it will also become less than size() when finished while (FirstUnprocessedQuery < State.Request->Keys.size()) { @@ -555,18 +542,15 @@ public: return true; const auto& key = State.Request->Keys[FirstUnprocessedQuery]; - auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery); + auto status = ReadKey(txc, key, FirstUnprocessedQuery); switch (status) { case EReadStatus::Done: break; - case EReadStatus::StoppedByLimit: - return true; case EReadStatus::NeedData: - if (RowsProcessed && CanResume()) - return true; - PrechargeKeysAfter(txc, FirstUnprocessedQuery); return false; + case EReadStatus::NeedContinue: + return true; } if (!State.Reverse) @@ -580,16 +564,16 @@ public: } // return semantics the same as in the Execute() - bool Read(TTransactionContext& txc, const TActorContext& ctx) { + bool Read(TTransactionContext& txc) { // TODO: consider trying to precharge multiple records at once in case // when first precharge fails? if (!State.Request->Keys.empty()) { - return ReadKeys(txc, ctx); + return ReadKeys(txc); } // since no keys, then we must have ranges (has been checked initially) - return ReadRanges(txc, ctx); + return ReadRanges(txc); } bool HasUnreadQueries() const { @@ -764,7 +748,7 @@ public: state.TotalRows += RowsRead; state.FirstUnprocessedQuery = FirstUnprocessedQuery; state.LastProcessedKey = LastProcessedKey; - state.LastProcessedKeyErasedOrMissing = LastProcessedKeyErasedOrMissing; + state.LastProcessedKeyErased = LastProcessedKeyErased; if (sentResult) { state.ConsumeSeqNo(RowsRead, BytesInResult); } @@ -852,8 +836,8 @@ private: bool Precharge( NTable::TDatabase& db, - NTable::TRawVals keyFrom, - NTable::TRawVals keyTo, + NTable::TRawVals minKey, + NTable::TRawVals maxKey, bool reverse) { ui64 rowsLeft = GetRowsLeft(); @@ -861,8 +845,8 @@ private: auto direction = reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward; return db.Precharge(TableInfo.LocalTid, - keyFrom, - keyTo, + minKey, + maxKey, State.Columns, 0, rowsLeft, @@ -872,9 +856,7 @@ private: } template <typename TIterator> - EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx, IExecuting& env) { - Y_UNUSED(ctx); - + EReadStatus IterateRange(TIterator* iter, NTable::TKeyRange& iterRange, TTransactionContext& txc) { auto keyAccessSampler = Self->GetKeyAccessSampler(); bool advanced = false; @@ -887,38 +869,21 @@ private: TDbTupleRef rowKey = iter->GetKey(); TDbTupleRef rowValues = iter->GetValues(); - if (!precharging && env.MissingReferencesSize()) { + if (!precharging && txc.Env.MissingReferencesSize()) { + // Note: the current key must be returned to reader, but the + // previous key is lost, and we cannot safely resume. We can + // only restart query from the beginning, and don't want to + // keep track of any stats. precharging = true; - - ui64 deletedRowSkips = iter->Stats.DeletedRowSkips; - ui64 invisibleRowSkips = iter->Stats.InvisibleRowSkips; - - const ui64 processedRecords = ResetRowSkips(iter->Stats); - - if ((RowsProcessed + processedRecords) > 0) { - // There were some rows (regular, erased or invisible), so - // this transaction won't be restarting and there is no point in precharging missing references. - RowsSinceLastCheck += processedRecords; - RowsProcessed += processedRecords; - - DeletedRowSkips += deletedRowSkips; - InvisibleRowSkips += invisibleRowSkips; - - // We will be continuing from the current key (inclusive). - LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); - LastProcessedKeyErasedOrMissing = true; - break; - } } if (precharging) { - // Precharge only if we didn't meet any rows prior to a row with a missing reference. - // Meanwhile, RowsProcessed, RowsSinceLastCheck and LastProcessed key are not updated, + // Note: RowsProcessed, RowsSinceLastCheck and LastProcessed key are not updated, // so we will restart the transaction from the exact same key we started iterating from. prechargedCount++; prechargedRowsSize += EstimateSize(rowValues.Cells()); - if (ReachedTotalRowsLimit(prechargedCount) || ShouldStop(prechargedCount, prechargedRowsSize + env.MissingReferencesSize())) { + if (ReachedTotalRowsLimit(prechargedCount) || ShouldStop(prechargedCount, prechargedRowsSize + txc.Env.MissingReferencesSize())) { break; } @@ -947,8 +912,8 @@ private: if (ShouldStop()) { LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); - LastProcessedKeyErasedOrMissing = false; - return EReadStatus::StoppedByLimit; + LastProcessedKeyErased = false; + return EReadStatus::NeedContinue; } } @@ -960,20 +925,38 @@ private: // row). When there are not enough rows we would prefer restarting in // the same transaction, instead of starting a new one, in which case // we will not update stats and will not update RowsProcessed. - if (!precharging) { - auto lastKey = iter->GetKey().Cells(); - - if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) { - LastProcessedKey = TSerializedCellVec::Serialize(lastKey); - LastProcessedKeyErasedOrMissing = iter->GetKeyState() == NTable::ERowOp::Erase; - advanced = true; + auto lastKey = iter->GetKey().Cells(); + + auto prechargeFromLastKey = [&]() { + if (lastKey) { + const auto key = ToRawTypeValue(lastKey, TableInfo, false); + if (!State.Reverse) { + Precharge(txc.DB, key, iterRange.MaxKey, State.Reverse); + } else { + Precharge(txc.DB, iterRange.MinKey, key, State.Reverse); + } } else { - LastProcessedKey.clear(); + Precharge(txc.DB, iterRange.MinKey, iterRange.MaxKey, State.Reverse); + } + }; + + if (precharging) { + if (iter->Last() == NTable::EReady::Page) { + prechargeFromLastKey(); } + return EReadStatus::NeedData; + } + + if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) { + LastProcessedKey = TSerializedCellVec::Serialize(lastKey); + LastProcessedKeyErased = iter->GetKeyState() == NTable::ERowOp::Erase; + advanced = true; + } else { + LastProcessedKey.clear(); } // last iteration to Page or Gone might also have deleted or invisible rows - if (advanced || (iter->Last() != NTable::EReady::Page && !precharging)) { + if (advanced || iter->Last() != NTable::EReady::Page) { DeletedRowSkips += iter->Stats.DeletedRowSkips; InvisibleRowSkips += iter->Stats.InvisibleRowSkips; const ui64 processedRecords = ResetRowSkips(iter->Stats); @@ -981,9 +964,14 @@ private: RowsProcessed += processedRecords; } - // TODO: consider restart when Page and too few data read - // (how much is too few, less than user's limit?) - if (iter->Last() == NTable::EReady::Page || precharging) { + if (iter->Last() == NTable::EReady::Page) { + // TODO: consider restart when Page and too few data read + // (how much is too few, less than user's limit?) + if (RowsProcessed && CanResume()) { + return EReadStatus::NeedContinue; + } + + prechargeFromLastKey(); return EReadStatus::NeedData; } @@ -1824,7 +1812,7 @@ private: AppData()->MonotonicTimeProvider->Now(), Self)); - return Reader->Read(txc, ctx); + return Reader->Read(txc); } void PrepareValidationInfo(const TActorContext&, const TReadIteratorState& state) { @@ -2482,7 +2470,7 @@ public: LWTRACK(ReadExecute, state.Orbit); - if (Reader->Read(txc, ctx)) { + if (Reader->Read(txc)) { // Retry later when dependencies are resolved if (!Reader->GetVolatileReadDependencies().empty()) { state.ReadContinuePending = true; diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp index 9b48fa579e5..8b845b0f4b6 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp @@ -24,15 +24,22 @@ using namespace Tests; Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { - struct ReadIteratorCounter { + struct TReadIteratorCounter { int Reads = 0; int Continues = 0; int EvGets = 0; int BlobsRequested = 0; + + bool operator==(const TReadIteratorCounter&) const = default; + + friend inline IOutputStream& operator<<(IOutputStream& out, const TReadIteratorCounter& c) { + out << "{ " << c.Reads << ", " << c.Continues << ", " << c.EvGets << ", " << c.BlobsRequested << " }"; + return out; + } }; - std::unique_ptr<ReadIteratorCounter> SetupReadIteratorObserver(TTestActorRuntime& runtime) { - std::unique_ptr<ReadIteratorCounter> iteratorCounter = std::make_unique<ReadIteratorCounter>(); + std::unique_ptr<TReadIteratorCounter> SetupReadIteratorObserver(TTestActorRuntime& runtime) { + std::unique_ptr<TReadIteratorCounter> iteratorCounter = std::make_unique<TReadIteratorCounter>(); auto captureEvents = [&](TAutoPtr<IEventHandle> &event) -> auto { switch (event->GetTypeRewrite()) { @@ -59,7 +66,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { return iteratorCounter; } - struct Node { + struct TNode { TPortManager Pm; TServerSettings ServerSettings; TServer::TPtr Server; @@ -68,7 +75,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { TActorId Sender; TTestActorRuntime* Runtime; - Node(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) { + TNode(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) { ServerSettings.SetDomainName("Root") .SetUseRealThreads(false) .AddStoragePool("ssd") @@ -113,7 +120,12 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } }; - void ValidateReadResult(TTestActorRuntime& runtime, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, int rowsCount, int firstBlobChunkNum = 0, int extBlobColumnCount = 1) { + void ValidateReadResult(TTestActorRuntime& runtime, + NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, + int rowsCount, + int firstBlobChunkNum = 0, + int extBlobColumnCount = 1) + { Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture)); auto& operation = res.Getoperation(); UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS); @@ -132,7 +144,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { UNIT_ASSERT(chunkNumValue.has_int32_value()); UNIT_ASSERT_EQUAL(chunkNumValue.Getint32_value(), firstBlobChunkNum + i); - + for (int j = 0; j < extBlobColumnCount; j++) { auto& dataValue = row.get_idx_items(2 + j); UNIT_ASSERT(dataValue.has_bytes_value()); @@ -141,8 +153,10 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } } - template <ui8 resultSize> - void ValidateReadResult(TTestActorRuntime& runtime, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, std::array<i32, resultSize> expectedResult) { + void ValidateReadResult(TTestActorRuntime& runtime, + NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, + const std::vector<i32>& expectedResult) + { Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture)); auto& operation = res.Getoperation(); UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS); @@ -150,7 +164,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { operation.result().UnpackTo(&result); UNIT_ASSERT_EQUAL(result.result_sets().size(), 1); auto& resultSet = result.result_sets()[0]; - UNIT_ASSERT_EQUAL(resultSet.rows_size(), resultSize); + UNIT_ASSERT_EQUAL(size_t(resultSet.rows_size()), expectedResult.size()); for (int i = 0; i < resultSet.rows_size(); i++) { auto& row = resultSet.get_idx_rows(i); @@ -161,7 +175,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { UNIT_ASSERT(chunkNumValue.has_int32_value()); UNIT_ASSERT_EQUAL(chunkNumValue.Getint32_value(), expectedResult[i]); - + auto& dataValue = row.get_idx_items(2); UNIT_ASSERT(dataValue.has_bytes_value()); UNIT_ASSERT_EQUAL(dataValue.bytes_value().size(), 1_MB); @@ -169,7 +183,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } Y_UNIT_TEST(ExtBlobs) { - Node node(true); + TNode node(true); auto server = node.Server; auto& runtime = *node.Runtime; @@ -225,7 +239,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } Y_UNIT_TEST(ExtBlobsWithDeletesInTheBeginning) { - Node node(true); + TNode node(true); auto server = node.Server; auto& runtime = *node.Runtime; @@ -268,13 +282,13 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { ValidateReadResult(runtime, std::move(readFuture), 3, 7); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1); - UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 1); + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 0); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 1); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 3); } Y_UNIT_TEST(ExtBlobsWithDeletesInTheEnd) { - Node node(true); + TNode node(true); auto server = node.Server; auto& runtime = *node.Runtime; @@ -323,7 +337,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } Y_UNIT_TEST(ExtBlobsWithDeletesInTheMiddle) { - Node node(true); + TNode node(true); auto server = node.Server; auto& runtime = *node.Runtime; @@ -370,16 +384,113 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { ORDER BY blob_id, chunk_num ASC LIMIT 100;)"); - ValidateReadResult<6>(runtime, std::move(readFuture), {1, 5, 6, 7, 8, 9}); + ValidateReadResult(runtime, std::move(readFuture), {1, 5, 6, 7, 8, 9}); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1); - UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 2); + UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 1); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 2); UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 6); } + void DoExtBlobsWithFirstRowPreloaded(bool withReboot) { + TNode node(true); + + auto server = node.Server; + auto& runtime = *node.Runtime; + auto& sender = node.Sender; + auto shard1 = node.Shard; + auto tableId1 = node.TableId; + + TString largeValue(1_MB, 'L'); + + for (int i = 0; i < 10; i++) { + TString chunkNum = ToString(i); + TString query = R"___( + UPSERT INTO `/Root/table-1` (blob_id, chunk_num, data0) VALUES + (Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), )___" + chunkNum + ", \"" + largeValue + "\");"; + ExecSQL(server, sender, query); + } + + CompactTable(runtime, shard1, tableId1, false); + + runtime.SimulateSleep(TDuration::Seconds(1)); + + auto preloadFuture = KqpSimpleSend(runtime, R"( + SELECT blob_id, chunk_num, data0 + FROM `/Root/table-1` + WHERE + blob_id = Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c") AND + chunk_num = 0; + )"); + + ValidateReadResult(runtime, std::move(preloadFuture), 1); + + size_t passedRows = 0; + bool finished = false; + std::vector<TEvDataShard::TEvReadResult::TPtr> blockedResults; + std::optional<std::pair<TActorId, ui64>> dropReadId; + auto blockResults = runtime.AddObserver<TEvDataShard::TEvReadResult>( + [&](TEvDataShard::TEvReadResult::TPtr& ev) { + auto* msg = ev->Get(); + if (dropReadId) { + if (*dropReadId == std::make_pair(ev->GetRecipientRewrite(), msg->Record.GetReadId())) { + ev.Reset(); + } + return; + } + if (passedRows > 0) { + blockedResults.push_back(std::move(ev)); + return; + } + passedRows += msg->GetRowsCount(); + if (msg->Record.GetFinished()) { + finished = true; + } + }); + + auto readFuture = KqpSimpleSend(runtime, R"( + SELECT blob_id, chunk_num, data0 + FROM `/Root/table-1` + WHERE + blob_id = Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c") AND + chunk_num >= 0 + ORDER BY blob_id, chunk_num ASC + LIMIT 5; + )"); + + runtime.WaitFor("blocked results", [&]{ return blockedResults.size() > 0 || finished; }); + + if (!finished) { + UNIT_ASSERT_VALUES_EQUAL(passedRows, 1u); + + if (withReboot) { + dropReadId.emplace( + blockedResults[0]->GetRecipientRewrite(), + blockedResults[0]->Get()->Record.GetReadId()); + + RebootTablet(runtime, shard1, sender); + } else { + blockResults.Remove(); + for (auto& ev : blockedResults) { + runtime.Send(ev.Release(), 0, true); + } + blockedResults.clear(); + } + } + + ValidateReadResult(runtime, std::move(readFuture), 5); + } + + Y_UNIT_TEST(ExtBlobsWithFirstRowPreloaded) { + DoExtBlobsWithFirstRowPreloaded(false); + } + + Y_UNIT_TEST(ExtBlobsWithFirstRowPreloadedWithReboot) { + DoExtBlobsWithFirstRowPreloaded(true); + } + Y_UNIT_TEST(ExtBlobsMultipleColumns) { - Node node(true, 2); + TNode node(true, 2); auto server = node.Server; auto& runtime = *node.Runtime; @@ -436,11 +547,11 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } Y_UNIT_TEST(ExtBlobsWithCompactingMiddleRows) { - std::unordered_map<int, ReadIteratorCounter> expectedResults; - expectedResults[1] = {1, 7, 7, 18}; - expectedResults[2] = {1, 8, 7, 16}; - expectedResults[3] = {1, 7, 6, 14}; - expectedResults[4] = {1, 8, 6, 12}; + std::unordered_map<int, TReadIteratorCounter> expectedResults; + expectedResults[1] = {1, 4, 4, 18}; + expectedResults[2] = {1, 4, 4, 16}; + expectedResults[3] = {1, 4, 4, 14}; + expectedResults[4] = {1, 4, 4, 12}; expectedResults[5] = {1, 4, 2, 10}; // We write 20 rows, some of them are compacted, then we write some more rows "before" and "after" and read all of them @@ -449,7 +560,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { for (int test = 1; test < 6; test++) { int compactedPart = 20 - (test * 2); - Node node(true); + TNode node(true); auto server = node.Server; auto& runtime = *node.Runtime; @@ -520,14 +631,12 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { auto& expectedResult = expectedResults[test]; - UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, expectedResult.Reads); - UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, expectedResult.EvGets); - UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, expectedResult.BlobsRequested); + UNIT_ASSERT_VALUES_EQUAL_C(*iteratorCounter, expectedResult, "test " << test); } } Y_UNIT_TEST(ExtBlobsEmptyTable) { - Node node(true); + TNode node(true); auto& runtime = *node.Runtime; @@ -543,7 +652,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { } Y_UNIT_TEST(NotExtBlobs) { - Node node(false); + TNode node(false); auto server = node.Server; auto& runtime = *node.Runtime; diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 96cea578d50..6648388a613 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -212,7 +212,7 @@ public: ui64 LastAckSeqNo = 0; ui32 FirstUnprocessedQuery = 0; TString LastProcessedKey; - bool LastProcessedKeyErasedOrMissing = false; + bool LastProcessedKeyErased = false; // Orbit used for tracking progress NLWTrace::TOrbit Orbit; |
