diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2025-05-28 14:22:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-28 14:22:29 +0300 |
commit | 869ee2f10ff433aada3bd8caf738e1184deb95cd (patch) | |
tree | ff21c184adc860babe6ef81a0c129b2a2896fdea | |
parent | 1316d5c944a9a03065a7c98f3c83dd02219c105a (diff) | |
download | ydb-869ee2f10ff433aada3bd8caf738e1184deb95cd.tar.gz |
fix stream lookup square and add simple overload checker (#18922)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 134 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 4 |
3 files changed, 105 insertions, 55 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 26100fdb317..7bce73dc3e7 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -102,14 +102,15 @@ public: } } + auto affectedShards = Reads.AffectedShards(); // TODO: use evread statistics after KIKIMR-16924 tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate); tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate); - tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size()); + tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + affectedShards.size()); NKqpProto::TKqpTableExtraStats tableExtraStats; auto readActorTableAggrExtraStats = tableExtraStats.MutableReadActorTableAggrExtraStats(); - for (const auto& [shardId, _] : ReadsPerShard) { + for (const auto& shardId : affectedShards) { readActorTableAggrExtraStats->AddAffectedShards(shardId); } @@ -140,10 +141,6 @@ private: , ShardId(shardId) , State(EReadState::Initial) {} - void SetFinished() { - State = EReadState::Finished; - } - bool Finished() const { return (State == EReadState::Finished); } @@ -162,7 +159,68 @@ private: struct TShardState { ui64 RetryAttempts = 0; - std::vector<TReadState*> Reads; + std::unordered_set<ui64> Reads; + }; + + struct TReads { + std::unordered_map<ui64, TReadState> Reads; + std::unordered_map<ui64, TShardState> ReadsPerShard; + + std::unordered_map<ui64, TReadState>::iterator begin() { return Reads.begin(); } + + std::unordered_map<ui64, TReadState>::iterator end() { return Reads.end(); } + + std::unordered_map<ui64, TReadState>::iterator find(ui64 readId) { + return Reads.find(readId); + } + + void insert(TReadState&& read) { + const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)}); + YQL_ENSURE(succeeded); + ReadsPerShard[readIt->second.ShardId].Reads.emplace(readIt->second.Id); + } + + size_t InFlightReads() const { + return Reads.size(); + } + + std::vector<ui64> AffectedShards() const { + std::vector<ui64> result; + result.reserve(ReadsPerShard.size()); + for(const auto& [shard, _]: ReadsPerShard) { + result.push_back(shard); + } + return result; + } + + bool CheckShardRetriesExeeded(TReadState& failedRead) { + const auto& shardState = ReadsPerShard[failedRead.ShardId]; + return shardState.RetryAttempts + 1 > MaxShardRetries(); + } + + TDuration CalcDelayForShard(TReadState& failedRead, bool allowInstantRetry) { + auto& shardState = ReadsPerShard[failedRead.ShardId]; + ++shardState.RetryAttempts; + return CalcDelay(shardState.RetryAttempts, allowInstantRetry); + } + + void erase(TReadState& read) { + ReadsPerShard[read.ShardId].Reads.erase(read.Id); + Reads.erase(read.Id); + } + + std::vector<TReadState*> GetShardReads(ui64 shardId) { + auto it = ReadsPerShard.find(shardId); + YQL_ENSURE(it != ReadsPerShard.end()); + std::vector<TReadState*> result; + for(ui64 readId: it->second.Reads) { + auto it = Reads.find(readId); + YQL_ENSURE(it != Reads.end()); + result.push_back(&it->second); + } + + return result; + } }; struct TEvPrivate { @@ -227,13 +285,15 @@ private: ReadRowsCount += replyResultStats.ReadRowsCount; ReadBytesCount += replyResultStats.ReadBytesCount; - auto status = FetchInputRows(); + if (!StreamLookupWorker->IsOverloaded()) { + FetchInputRows(); + } if (Partitioning) { ProcessInputRows(); } - const bool inputRowsFinished = status == NUdf::EFetchStatus::Finish; + const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish; const bool allReadsFinished = AllReadsFinished(); const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed(); @@ -308,7 +368,6 @@ private: void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { const auto& record = ev->Get()->Record; - auto readIt = Reads.find(record.GetReadId()); if (readIt == Reads.end() || readIt->second.State != EReadState::Running) { CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed or blocked"); @@ -316,6 +375,7 @@ private: } auto& read = readIt->second; + ui64 shardId = read.ShardId; CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId << ", Table = " << StreamLookupWorker->GetTablePath() @@ -380,13 +440,13 @@ private: case Ydb::StatusIds::NOT_FOUND: { StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey); - read.SetFinished(); CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". " << getIssues().ToOneLineString()); + Reads.erase(read); return ResolveTableShards(); } case Ydb::StatusIds::OVERLOADED: { - if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) { + if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) { return replyError( TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.", NYql::NDqProto::StatusIds::OVERLOADED); @@ -397,7 +457,7 @@ private: return RetryTableRead(read, /*allowInstantRetry = */false); } case Ydb::StatusIds::INTERNAL_ERROR: { - if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) { + if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) { return replyError( TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.", NYql::NDqProto::StatusIds::INTERNAL_ERROR); @@ -416,7 +476,7 @@ private: read.LastSeqNo = record.GetSeqNo(); if (record.GetFinished()) { - read.SetFinished(); + Reads.erase(read); } else { YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token"); NKikimrTxDataShard::TReadContinuationToken continuationToken; @@ -454,7 +514,7 @@ private: auto guard = BindAllocator(); StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{ - read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()) + shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()) }); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } @@ -463,11 +523,9 @@ private: CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId); const auto& tabletId = ev->Get()->TabletId; - auto shardIt = ReadsPerShard.find(tabletId); - YQL_ENSURE(shardIt != ReadsPerShard.end()); TVector<TReadState*> toRetry; - for (auto* read : shardIt->second.Reads) { + for (auto* read : Reads.GetShardReads(tabletId)) { if (read->State == EReadState::Running) { Counters->IteratorDeliveryProblems->Inc(); toRetry.push_back(read); @@ -500,27 +558,24 @@ private: if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) { if (ev->Get()->InstantStart) { - read.SetFinished(); auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId); for (auto& request : requests) { StartTableRead(read.ShardId, std::move(request)); } + Reads.erase(read); } else { RetryTableRead(read); } } } - NUdf::EFetchStatus FetchInputRows() { + void FetchInputRows() { auto guard = BindAllocator(); - NUdf::EFetchStatus status; NUdf::TUnboxedValue row; - while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) { + while ((LastFetchStatus = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) { StreamLookupWorker->AddInputRow(std::move(row)); } - - return status; } void ProcessInputRows() { @@ -580,9 +635,7 @@ private: auto readId = read.Id; auto lastSeqNo = read.LastSeqNo; - const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)}); - YQL_ENSURE(succeeded); - ReadsPerShard[shardId].Reads.push_back(&readIt->second); + Reads.insert(std::move(read)); if (auto delay = ShardTimeout()) { TlsActivationContext->Schedule( @@ -596,11 +649,6 @@ private: return limit && TotalRetryAttempts + 1 > *limit; } - bool CheckShardRetriesExeeded(TReadState& failedRead) { - const auto& shardState = ReadsPerShard[failedRead.ShardId]; - return shardState.RetryAttempts + 1 > MaxShardRetries(); - } - void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) { CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id << ", shardId: " << failedRead.ShardId); @@ -611,21 +659,19 @@ private: } ++TotalRetryAttempts; - if (CheckShardRetriesExeeded(failedRead)) { + if (Reads.CheckShardRetriesExeeded(failedRead)) { StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey); - failedRead.SetFinished(); + Reads.erase(failedRead); return ResolveTableShards(); } - auto& shardState = ReadsPerShard[failedRead.ShardId]; - ++shardState.RetryAttempts; - auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry); + auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry); if (delay == TDuration::Zero()) { - failedRead.SetFinished(); auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId); for (auto& request : requests) { StartTableRead(failedRead.ShardId, std::move(request)); } + Reads.erase(failedRead); } else { CA_LOG_D("Schedule retry atempt for readId: " << failedRead.Id << " after " << delay); TlsActivationContext->Schedule( @@ -671,13 +717,7 @@ private: } bool AllReadsFinished() const { - for (const auto& [_, read] : Reads) { - if (!read.Finished()) { - return false; - } - } - - return true; + return Reads.InFlightReads() == 0; } TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() { @@ -715,8 +755,8 @@ private: const TMaybe<ui64> LockTxId; const TMaybe<ui32> NodeLockId; const TMaybe<NKikimrDataEvents::ELockMode> LockMode; - std::unordered_map<ui64, TReadState> Reads; - std::unordered_map<ui64, TShardState> ReadsPerShard; + TReads Reads; + NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield; std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 32eea7ec74e..39595b74be2 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -13,6 +13,8 @@ namespace NKikimr { namespace NKqp { +constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500; + namespace { std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo, const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) { @@ -286,12 +288,15 @@ public: ReadResults.emplace_back(std::move(result)); } + bool IsOverloaded() final { + return false; + } + TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { TReadResultStats resultStats; - bool sizeLimitExceeded = false; batch.clear(); - while (!ReadResults.empty() && !sizeLimitExceeded) { + while (!ReadResults.empty() && !resultStats.SizeLimitExceeded) { auto& result = ReadResults.front(); for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); @@ -317,10 +322,10 @@ public: } if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) { - sizeLimitExceeded = true; + resultStats.SizeLimitExceeded = true; } - if (resultStats.ResultRowsCount && sizeLimitExceeded) { + if (resultStats.ResultRowsCount && resultStats.SizeLimitExceeded) { row.DeleteUnreferenced(); break; } @@ -456,6 +461,10 @@ public: UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1)))); } + bool IsOverloaded() final { + return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT; + } + std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final { @@ -730,7 +739,6 @@ public: TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { TReadResultStats resultStats; - bool sizeLimitExceeded = false; batch.clear(); // we should process left rows that haven't matches on the right @@ -759,7 +767,7 @@ public: return ResultRowsBySeqNo.find(CurrentResultSeqNo); }; - while (!sizeLimitExceeded) { + while (!resultStats.SizeLimitExceeded) { auto resultIt = getNextResult(); if (resultIt == ResultRowsBySeqNo.end()) { break; @@ -770,7 +778,7 @@ public: auto& row = result.Rows[result.FirstUnprocessedRow]; if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { - sizeLimitExceeded = true; + resultStats.SizeLimitExceeded = true; break; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 1a80c8bbee1..26dfbf5462c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -27,6 +27,7 @@ public: ui64 ReadBytesCount = 0; ui64 ResultRowsCount = 0; ui64 ResultBytesCount = 0; + bool SizeLimitExceeded = false; void Add(const TReadResultStats& other) { ReadRowsCount += other.ReadRowsCount; @@ -57,13 +58,14 @@ public: } virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0; - virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, + virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0; virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0; virtual void AddResult(TShardReadResult result) = 0; virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0; virtual bool AllRowsProcessed() = 0; virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0; + virtual bool IsOverloaded() = 0; protected: const NKikimrKqp::TKqpStreamLookupSettings Settings; |