aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2025-05-28 14:22:29 +0300
committerGitHub <noreply@github.com>2025-05-28 14:22:29 +0300
commit869ee2f10ff433aada3bd8caf738e1184deb95cd (patch)
treeff21c184adc860babe6ef81a0c129b2a2896fdea
parent1316d5c944a9a03065a7c98f3c83dd02219c105a (diff)
downloadydb-869ee2f10ff433aada3bd8caf738e1184deb95cd.tar.gz
fix stream lookup square and add simple overload checker (#18922)
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp134
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp22
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h4
3 files changed, 105 insertions, 55 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 26100fdb317..7bce73dc3e7 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -102,14 +102,15 @@ public:
}
}
+ auto affectedShards = Reads.AffectedShards();
// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
- tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
+ tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + affectedShards.size());
NKqpProto::TKqpTableExtraStats tableExtraStats;
auto readActorTableAggrExtraStats = tableExtraStats.MutableReadActorTableAggrExtraStats();
- for (const auto& [shardId, _] : ReadsPerShard) {
+ for (const auto& shardId : affectedShards) {
readActorTableAggrExtraStats->AddAffectedShards(shardId);
}
@@ -140,10 +141,6 @@ private:
, ShardId(shardId)
, State(EReadState::Initial) {}
- void SetFinished() {
- State = EReadState::Finished;
- }
-
bool Finished() const {
return (State == EReadState::Finished);
}
@@ -162,7 +159,68 @@ private:
struct TShardState {
ui64 RetryAttempts = 0;
- std::vector<TReadState*> Reads;
+ std::unordered_set<ui64> Reads;
+ };
+
+ struct TReads {
+ std::unordered_map<ui64, TReadState> Reads;
+ std::unordered_map<ui64, TShardState> ReadsPerShard;
+
+ std::unordered_map<ui64, TReadState>::iterator begin() { return Reads.begin(); }
+
+ std::unordered_map<ui64, TReadState>::iterator end() { return Reads.end(); }
+
+ std::unordered_map<ui64, TReadState>::iterator find(ui64 readId) {
+ return Reads.find(readId);
+ }
+
+ void insert(TReadState&& read) {
+ const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)});
+ YQL_ENSURE(succeeded);
+ ReadsPerShard[readIt->second.ShardId].Reads.emplace(readIt->second.Id);
+ }
+
+ size_t InFlightReads() const {
+ return Reads.size();
+ }
+
+ std::vector<ui64> AffectedShards() const {
+ std::vector<ui64> result;
+ result.reserve(ReadsPerShard.size());
+ for(const auto& [shard, _]: ReadsPerShard) {
+ result.push_back(shard);
+ }
+ return result;
+ }
+
+ bool CheckShardRetriesExeeded(TReadState& failedRead) {
+ const auto& shardState = ReadsPerShard[failedRead.ShardId];
+ return shardState.RetryAttempts + 1 > MaxShardRetries();
+ }
+
+ TDuration CalcDelayForShard(TReadState& failedRead, bool allowInstantRetry) {
+ auto& shardState = ReadsPerShard[failedRead.ShardId];
+ ++shardState.RetryAttempts;
+ return CalcDelay(shardState.RetryAttempts, allowInstantRetry);
+ }
+
+ void erase(TReadState& read) {
+ ReadsPerShard[read.ShardId].Reads.erase(read.Id);
+ Reads.erase(read.Id);
+ }
+
+ std::vector<TReadState*> GetShardReads(ui64 shardId) {
+ auto it = ReadsPerShard.find(shardId);
+ YQL_ENSURE(it != ReadsPerShard.end());
+ std::vector<TReadState*> result;
+ for(ui64 readId: it->second.Reads) {
+ auto it = Reads.find(readId);
+ YQL_ENSURE(it != Reads.end());
+ result.push_back(&it->second);
+ }
+
+ return result;
+ }
};
struct TEvPrivate {
@@ -227,13 +285,15 @@ private:
ReadRowsCount += replyResultStats.ReadRowsCount;
ReadBytesCount += replyResultStats.ReadBytesCount;
- auto status = FetchInputRows();
+ if (!StreamLookupWorker->IsOverloaded()) {
+ FetchInputRows();
+ }
if (Partitioning) {
ProcessInputRows();
}
- const bool inputRowsFinished = status == NUdf::EFetchStatus::Finish;
+ const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish;
const bool allReadsFinished = AllReadsFinished();
const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed();
@@ -308,7 +368,6 @@ private:
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;
-
auto readIt = Reads.find(record.GetReadId());
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed or blocked");
@@ -316,6 +375,7 @@ private:
}
auto& read = readIt->second;
+ ui64 shardId = read.ShardId;
CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
<< ", Table = " << StreamLookupWorker->GetTablePath()
@@ -380,13 +440,13 @@ private:
case Ydb::StatusIds::NOT_FOUND:
{
StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey);
- read.SetFinished();
CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". "
<< getIssues().ToOneLineString());
+ Reads.erase(read);
return ResolveTableShards();
}
case Ydb::StatusIds::OVERLOADED: {
- if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
+ if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
return replyError(
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
NYql::NDqProto::StatusIds::OVERLOADED);
@@ -397,7 +457,7 @@ private:
return RetryTableRead(read, /*allowInstantRetry = */false);
}
case Ydb::StatusIds::INTERNAL_ERROR: {
- if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
+ if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
return replyError(
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
@@ -416,7 +476,7 @@ private:
read.LastSeqNo = record.GetSeqNo();
if (record.GetFinished()) {
- read.SetFinished();
+ Reads.erase(read);
} else {
YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
NKikimrTxDataShard::TReadContinuationToken continuationToken;
@@ -454,7 +514,7 @@ private:
auto guard = BindAllocator();
StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
- read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
+ shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
});
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -463,11 +523,9 @@ private:
CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
const auto& tabletId = ev->Get()->TabletId;
- auto shardIt = ReadsPerShard.find(tabletId);
- YQL_ENSURE(shardIt != ReadsPerShard.end());
TVector<TReadState*> toRetry;
- for (auto* read : shardIt->second.Reads) {
+ for (auto* read : Reads.GetShardReads(tabletId)) {
if (read->State == EReadState::Running) {
Counters->IteratorDeliveryProblems->Inc();
toRetry.push_back(read);
@@ -500,27 +558,24 @@ private:
if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
if (ev->Get()->InstantStart) {
- read.SetFinished();
auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId);
for (auto& request : requests) {
StartTableRead(read.ShardId, std::move(request));
}
+ Reads.erase(read);
} else {
RetryTableRead(read);
}
}
}
- NUdf::EFetchStatus FetchInputRows() {
+ void FetchInputRows() {
auto guard = BindAllocator();
- NUdf::EFetchStatus status;
NUdf::TUnboxedValue row;
- while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
+ while ((LastFetchStatus = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
StreamLookupWorker->AddInputRow(std::move(row));
}
-
- return status;
}
void ProcessInputRows() {
@@ -580,9 +635,7 @@ private:
auto readId = read.Id;
auto lastSeqNo = read.LastSeqNo;
- const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
- YQL_ENSURE(succeeded);
- ReadsPerShard[shardId].Reads.push_back(&readIt->second);
+ Reads.insert(std::move(read));
if (auto delay = ShardTimeout()) {
TlsActivationContext->Schedule(
@@ -596,11 +649,6 @@ private:
return limit && TotalRetryAttempts + 1 > *limit;
}
- bool CheckShardRetriesExeeded(TReadState& failedRead) {
- const auto& shardState = ReadsPerShard[failedRead.ShardId];
- return shardState.RetryAttempts + 1 > MaxShardRetries();
- }
-
void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
<< ", shardId: " << failedRead.ShardId);
@@ -611,21 +659,19 @@ private:
}
++TotalRetryAttempts;
- if (CheckShardRetriesExeeded(failedRead)) {
+ if (Reads.CheckShardRetriesExeeded(failedRead)) {
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
- failedRead.SetFinished();
+ Reads.erase(failedRead);
return ResolveTableShards();
}
- auto& shardState = ReadsPerShard[failedRead.ShardId];
- ++shardState.RetryAttempts;
- auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
+ auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry);
if (delay == TDuration::Zero()) {
- failedRead.SetFinished();
auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId);
for (auto& request : requests) {
StartTableRead(failedRead.ShardId, std::move(request));
}
+ Reads.erase(failedRead);
} else {
CA_LOG_D("Schedule retry atempt for readId: " << failedRead.Id << " after " << delay);
TlsActivationContext->Schedule(
@@ -671,13 +717,7 @@ private:
}
bool AllReadsFinished() const {
- for (const auto& [_, read] : Reads) {
- if (!read.Finished()) {
- return false;
- }
- }
-
- return true;
+ return Reads.InFlightReads() == 0;
}
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
@@ -715,8 +755,8 @@ private:
const TMaybe<ui64> LockTxId;
const TMaybe<ui32> NodeLockId;
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
- std::unordered_map<ui64, TReadState> Reads;
- std::unordered_map<ui64, TShardState> ReadsPerShard;
+ TReads Reads;
+ NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 32eea7ec74e..39595b74be2 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -13,6 +13,8 @@
namespace NKikimr {
namespace NKqp {
+constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;
+
namespace {
std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) {
@@ -286,12 +288,15 @@ public:
ReadResults.emplace_back(std::move(result));
}
+ bool IsOverloaded() final {
+ return false;
+ }
+
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
- bool sizeLimitExceeded = false;
batch.clear();
- while (!ReadResults.empty() && !sizeLimitExceeded) {
+ while (!ReadResults.empty() && !resultStats.SizeLimitExceeded) {
auto& result = ReadResults.front();
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
@@ -317,10 +322,10 @@ public:
}
if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) {
- sizeLimitExceeded = true;
+ resultStats.SizeLimitExceeded = true;
}
- if (resultStats.ResultRowsCount && sizeLimitExceeded) {
+ if (resultStats.ResultRowsCount && resultStats.SizeLimitExceeded) {
row.DeleteUnreferenced();
break;
}
@@ -456,6 +461,10 @@ public:
UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
}
+ bool IsOverloaded() final {
+ return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
+ }
+
std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final {
@@ -730,7 +739,6 @@ public:
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
- bool sizeLimitExceeded = false;
batch.clear();
// we should process left rows that haven't matches on the right
@@ -759,7 +767,7 @@ public:
return ResultRowsBySeqNo.find(CurrentResultSeqNo);
};
- while (!sizeLimitExceeded) {
+ while (!resultStats.SizeLimitExceeded) {
auto resultIt = getNextResult();
if (resultIt == ResultRowsBySeqNo.end()) {
break;
@@ -770,7 +778,7 @@ public:
auto& row = result.Rows[result.FirstUnprocessedRow];
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
- sizeLimitExceeded = true;
+ resultStats.SizeLimitExceeded = true;
break;
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index 1a80c8bbee1..26dfbf5462c 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -27,6 +27,7 @@ public:
ui64 ReadBytesCount = 0;
ui64 ResultRowsCount = 0;
ui64 ResultBytesCount = 0;
+ bool SizeLimitExceeded = false;
void Add(const TReadResultStats& other) {
ReadRowsCount += other.ReadRowsCount;
@@ -57,13 +58,14 @@ public:
}
virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
- virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
+ virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0;
virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0;
virtual void AddResult(TShardReadResult result) = 0;
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
virtual bool AllRowsProcessed() = 0;
virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
+ virtual bool IsOverloaded() = 0;
protected:
const NKikimrKqp::TKqpStreamLookupSettings Settings;