From 94de3c44fbfb54ffa1ea7a8baa73d174ee6f40b1 Mon Sep 17 00:00:00 2001 From: ivanmorozov Date: Mon, 19 Sep 2022 11:57:22 +0300 Subject: limit usage for stop finally and additional statistics calculation incapsulate costs state into common manager switch on parallel scan for olap requests --- ydb/core/kqp/compute_actor/kqp_compute_state.h | 19 ++++- .../kqp/compute_actor/kqp_scan_compute_actor.cpp | 85 +++++++--------------- .../kqp/compute_actor/kqp_scan_compute_manager.cpp | 40 ++++++++++ .../kqp/compute_actor/kqp_scan_compute_manager.h | 39 +++++++++- .../kqp/compute_actor/kqp_scan_compute_stat.cpp | 8 ++ ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h | 4 + ydb/core/testlib/test_client.h | 2 + 7 files changed, 135 insertions(+), 62 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h index 3c87df2e37f..382d0709d9a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_state.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h @@ -30,15 +30,32 @@ public: }; class TShardCostsState: public TCommonRetriesState { +public: + using TReadData = NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta; private: const ui32 ScanId; - const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* ReadData; + const TReadData* ReadData; public: using TPtr = std::shared_ptr; const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& GetReadData() const { return *ReadData; } + static TVector BuildSerializedTableRanges(const TReadData& readData) { + TVector resultLocal; + resultLocal.reserve(readData.GetKeyRanges().size()); + for (const auto& range : readData.GetKeyRanges()) { + auto& sr = resultLocal.emplace_back(TSerializedTableRange(range)); + if (!range.HasTo()) { + sr.To = sr.From; + sr.FromInclusive = sr.ToInclusive = true; + } + } + Y_VERIFY_DEBUG(!resultLocal.empty()); + return resultLocal; + } + + ui32 GetScanId() const { return ScanId; } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 69c1ada9b9c..cfe4d7ed8f9 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -121,7 +121,9 @@ public: , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , ShardsScanningPolicy(shardsScanningPolicy) - , Counters(counters) { + , Counters(counters) + , InFlightShards(ShardsScanningPolicy) + { YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString()); YQL_ENSURE(!Meta.GetReads().empty()); YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); @@ -170,16 +172,17 @@ public: ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); ShardsScanningPolicy.FillRequestScanFeatures(Meta, MaxInFlight, IsAggregationRequest); - if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable()) { + if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable() || ShardsScanningPolicy.GetShardSplitFactor() == 0) { for (const auto& read : Meta.GetReads()) { auto& state = PendingShards.emplace_back(TShardState(read.GetShardId(), ++ScansCounter)); - state.Ranges = BuildSerializedTableRanges(read); + state.Ranges = TShardCostsState::BuildSerializedTableRanges(read); } StartTableScan(); ContinueExecute(); } else { + CA_LOG_D("EVLOGKQP: Costs usage"); for (const auto& read : Meta.GetReads()) { - StartCostsRequest(read); + StartCostsRequest(InFlightShards.PrepareCostRequest(read)); } } Become(&TKqpScanComputeActor::StateFunc); @@ -272,20 +275,6 @@ protected: } private: - TVector BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData) { - TVector resultLocal; - resultLocal.reserve(readData.GetKeyRanges().size()); - for (const auto& range : readData.GetKeyRanges()) { - auto& sr = resultLocal.emplace_back(TSerializedTableRange(range)); - if (!range.HasTo()) { - sr.To = sr.From; - sr.FromInclusive = sr.ToInclusive = true; - } - } - Y_VERIFY_DEBUG(!resultLocal.empty()); - return resultLocal; - } - THolder BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec& ranges) const { auto ev = MakeHolder(); ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId); @@ -364,44 +353,17 @@ private: } void StartCostsRequest(TShardCostsState::TPtr state) { - TSmallVec serializedTableRanges = BuildSerializedTableRanges(state->GetReadData()); + TSmallVec serializedTableRanges = TShardCostsState::BuildSerializedTableRanges(state->GetReadData()); THolder ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges); ev->Record.SetCostDataOnly(true); THolder evForward = MakeHolder(ev.Release(), state->GetShardId()); Send(MakePipePeNodeCacheID(false), evForward.Release(), IEventHandle::FlagTrackDelivery); } - void StartCostsRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) { - const ui32 scanId = CostRequestsByScanId.size() + 1; - auto costsState = std::make_shared(scanId, &read); - Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second); - Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second); - StartCostsRequest(costsState); - } - - bool ReceiveCostRequest(TEvKqpCompute::TEvCostData::TPtr& ev, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta*& readData, - TSmallVec& result, const bool splitFactor) { - auto it = CostRequestsByScanId.find(ev->Get()->GetScanId()); - Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId()); - readData = &it->second->GetReadData(); - if (!ev->Get()->GetTableRanges().ColumnsCount()) { - result = BuildSerializedTableRanges(*readData); - } else { - result = ev->Get()->GetSerializedTableRanges(splitFactor); - } - if (Meta.GetReverse()) { - std::reverse(result.begin(), result.end()); - } - CostRequestsByShardId.erase(it->second->GetShardId()); - CostRequestsByScanId.erase(it); - return true; - } - void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) { const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr; TSmallVec ranges; - const double shardSplitFactor = ShardsScanningPolicy.GetShardSplitFactor(); - ReceiveCostRequest(ev, read, ranges, shardSplitFactor); + Y_VERIFY(InFlightShards.ProcessCostReply(ev, read, ranges)); for (auto&& i : ranges) { auto& state = PendingShards.emplace_back(TShardState(read->GetShardId(), ++ScansCounter)); state.Ranges.emplace_back(i); @@ -428,7 +390,6 @@ private: state->State = EShardState::Running; state->ActorId = scanActorId; state->ResetRetry(); - AffectedShards.insert(state->TabletId); InFlightShards.NeedAck(state); SendScanDataAck(state); } else { @@ -463,6 +424,7 @@ private: } void StopFinally() { + CA_LOG_D("EVLOGKQP: Stop finally"); std::vector currentScans; for (auto&& i : InFlightShards) { for (auto&& s : i.second) { @@ -472,6 +434,8 @@ private: for (auto&& i : currentScans) { StopReadChunk(*i); } + InFlightShards.ClearAll(); + InFlightShards.Stop(); PendingShards.clear(); } @@ -522,7 +486,8 @@ private: << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" << ", tabletId: " << state->TabletId); bool stopFinally = false; - if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && Stats.TotalReadRows >= Meta.GetItemsLimit()) { + CA_LOG_T("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount); + if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && InFlightShards.GetTotalRowsCount() >= Meta.GetItemsLimit()) { StopFinally(); stopFinally = true; } else if (!msg.Finished) { @@ -539,12 +504,13 @@ private: } if (stopFinally) { ScanData->Finish(); + InFlightShards.Stop(); CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")" << Endl << InFlightShards.GetDurationStats() << Endl << InFlightShards.StatisticsToString() ); if (ScanData->BasicStats) { - ScanData->BasicStats->AffectedShards = AffectedShards.size(); + ScanData->BasicStats->AffectedShards = InFlightShards.GetAffectedShards().size(); } } @@ -623,9 +589,8 @@ private: YQL_ENSURE(ScanData); auto& msg = *ev->Get(); - auto it = CostRequestsByShardId.find(msg.TabletId); - if (it != CostRequestsByShardId.end()) { - RetryCostsRequest(it->second); + if (auto costsState = InFlightShards.GetCostsState(msg.TabletId)) { + RetryCostsRequest(costsState); return; } @@ -646,12 +611,12 @@ private: void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) { if (ev->Get()->IsCostsRequest) { - auto it = CostRequestsByShardId.find(ev->Get()->TabletId); - if (it == CostRequestsByShardId.end()) { + auto costsState = InFlightShards.GetCostsState(ev->Get()->TabletId); + if (!costsState) { CA_LOG_E("Received retry shard costs for unexpected tablet " << ev->Get()->TabletId); return; } - StartCostsRequest(it->second); + StartCostsRequest(costsState); } else { const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(ev->Get()->Generation); auto state = InFlightShards.GetStateByIndex(scannerIdx); @@ -873,7 +838,7 @@ private: << "average read rows: " << Stats.AverageReadRows() << ", " << "average read bytes: " << Stats.AverageReadBytes() << ", "); - return CostRequestsByScanId.size() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0; + return InFlightShards.GetCostRequestsCount() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0; } void StartReadShard(TShardState::TPtr state) { @@ -1148,6 +1113,9 @@ private: template TShardState::TPtr GetShardState(const TMessage& msg, const TActorId& scanActorId) { + if (!InFlightShards.IsActive()) { + return nullptr; + } ui32 generation; if constexpr (std::is_same_v) { generation = msg.GetGeneration(); @@ -1202,10 +1170,7 @@ private: TInFlightShards InFlightShards; ui32 ScansCounter = 0; - std::set AffectedShards; std::set TrackingNodes; - std::map CostRequestsByScanId; - std::map CostRequestsByShardId; ui32 MaxInFlight = 1024; bool IsAggregationRequest = false; }; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index e027b329f93..02065e56d5d 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -86,4 +86,44 @@ ui32 TInFlightShards::GetScansCount() const { return result; } +void TInFlightShards::ClearAll() { + CostRequestsByScanId.clear(); + CostRequestsByShardId.clear(); + Shards.clear(); + AllocatedGenerations.clear(); + StatesByIndex.clear(); + NeedAckStates.clear(); +} + +bool TInFlightShards::ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData, TSmallVec& result) { + auto it = CostRequestsByScanId.find(ev->Get()->GetScanId()); + Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId()); + readData = &it->second->GetReadData(); + if (!ev->Get()->GetTableRanges().ColumnsCount()) { + result = TShardCostsState::BuildSerializedTableRanges(*readData); + } else { + result = ev->Get()->GetSerializedTableRanges(ScanningPolicy.GetShardSplitFactor()); + } + CostRequestsByShardId.erase(it->second->GetShardId()); + CostRequestsByScanId.erase(it); + return true; +} + +TShardCostsState::TPtr TInFlightShards::PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) { + const ui32 scanId = CostRequestsByScanId.size() + 1; + auto costsState = std::make_shared(scanId, &read); + Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second); + Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second); + return costsState; +} + +TShardCostsState::TPtr TInFlightShards::GetCostsState(const ui64 shardId) const { + auto it = CostRequestsByShardId.find(shardId); + if (it == CostRequestsByShardId.end()) { + return nullptr; + } else { + return it->second; + } +} + } \ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index cf808a5d178..c1741fd0ecf 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -1,4 +1,5 @@ #pragma once +#include "kqp_compute_actor.h" #include "kqp_compute_state.h" #include "kqp_scan_compute_stat.h" @@ -14,8 +15,40 @@ private: std::set ActualScannerIds; ui32 LastGeneration = 0; std::map NeedAckStates; - + std::set AffectedShards; + std::map CostRequestsByScanId; + std::map CostRequestsByShardId; + const TShardsScanningPolicy& ScanningPolicy; + bool IsActiveFlag = true; public: + TInFlightShards(const TShardsScanningPolicy& scanningPolicy) + : ScanningPolicy(scanningPolicy) + { + + } + ui32 GetAvailableTasks() const { + return GetScansCount() + GetCostRequestsCount(); + } + bool IsActive() const { + return IsActiveFlag; + } + void Stop() { + Y_VERIFY(GetAvailableTasks() == 0); + IsActiveFlag = false; + } + void ClearAll(); + const std::set& GetAffectedShards() const { + return AffectedShards; + } + + TShardCostsState::TPtr GetCostsState(const ui64 shardId) const; + + + bool ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData, + TSmallVec& result); + + TShardCostsState::TPtr PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read); + TString TraceToString() const; const std::map& GetNeedAck() const { @@ -30,9 +63,13 @@ public: void NeedAck(TShardState::TPtr state) { Y_VERIFY(StatesByIndex.contains(state->ScannerIdx)); NeedAckStates.emplace(state->ScannerIdx, state); + AffectedShards.emplace(state->TabletId); } ui32 AllocateGeneration(TShardState::TPtr state); + ui32 GetCostRequestsCount() const { + return CostRequestsByScanId.size(); + } ui32 GetScansCount() const; ui32 GetShardsCount() const { return Shards.size(); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp index 1ee73017aed..f0d31a4c381 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp @@ -108,4 +108,12 @@ TString TScanShardsStatistics::GetDurationStats() const { return sb; } +ui32 TScanShardsStatistics::GetTotalRowsCount() const { + ui32 result = 0; + for (auto&& i : Statistics) { + result += i.second.GetTotalRowsCount(); + } + return result; +} + } \ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h index 4ccb51c3f55..3f6a09ebe98 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h @@ -37,6 +37,9 @@ public: TChunkStatistics& MutableStatistics(const ui32 scannerIdx) { return Statistics[scannerIdx]; } + ui32 GetTotalRowsCount() const { + return TotalRowsCount; + } TString StatisticsToString() const; }; @@ -56,6 +59,7 @@ public: TShardStatistics& MutableStatistics(const ui64 shardId) { return Statistics[shardId]; } + ui32 GetTotalRowsCount() const; TString StatisticsToString() const; TString GetDurationStats() const; diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b71ea58a00c..fe86c76f279 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -198,6 +198,8 @@ namespace Tests { , PQConfig(pqConfig) { AddStoragePool("test", "/" + DomainName + ":test"); + AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetParallelScanningAvailable(true); + AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetShardSplitFactor(16); } TServerSettings(const TServerSettings& settings) = default; -- cgit v1.3