diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-19 11:57:22 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-19 11:57:22 +0300 |
commit | 94de3c44fbfb54ffa1ea7a8baa73d174ee6f40b1 (patch) | |
tree | 0574750aa5d6795c4adb679840278d4a2540463c | |
parent | c261c7112a1857f7c093dd87cfd07a93cd219822 (diff) | |
download | ydb-94de3c44fbfb54ffa1ea7a8baa73d174ee6f40b1.tar.gz |
limit usage for stop finally and additional statistics calculation
incapsulate costs state into common manager
switch on parallel scan for olap requests
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_state.h | 19 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 85 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 39 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h | 4 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 2 |
7 files changed, 135 insertions, 62 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h index 3c87df2e37f..382d0709d9a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_state.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h @@ -30,15 +30,32 @@ public: }; class TShardCostsState: public TCommonRetriesState { +public: + using TReadData = NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta; private: const ui32 ScanId; - const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* ReadData; + const TReadData* ReadData; public: using TPtr = std::shared_ptr<TShardCostsState>; const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& GetReadData() const { return *ReadData; } + static TVector<TSerializedTableRange> BuildSerializedTableRanges(const TReadData& readData) { + TVector<TSerializedTableRange> resultLocal; + resultLocal.reserve(readData.GetKeyRanges().size()); + for (const auto& range : readData.GetKeyRanges()) { + auto& sr = resultLocal.emplace_back(TSerializedTableRange(range)); + if (!range.HasTo()) { + sr.To = sr.From; + sr.FromInclusive = sr.ToInclusive = true; + } + } + Y_VERIFY_DEBUG(!resultLocal.empty()); + return resultLocal; + } + + ui32 GetScanId() const { return ScanId; } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 69c1ada9b9c..cfe4d7ed8f9 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -121,7 +121,9 @@ public: , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , ShardsScanningPolicy(shardsScanningPolicy) - , Counters(counters) { + , Counters(counters) + , InFlightShards(ShardsScanningPolicy) + { YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString()); YQL_ENSURE(!Meta.GetReads().empty()); YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); @@ -170,16 +172,17 @@ public: ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); ShardsScanningPolicy.FillRequestScanFeatures(Meta, MaxInFlight, IsAggregationRequest); - if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable()) { + if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable() || ShardsScanningPolicy.GetShardSplitFactor() == 0) { for (const auto& read : Meta.GetReads()) { auto& state = PendingShards.emplace_back(TShardState(read.GetShardId(), ++ScansCounter)); - state.Ranges = BuildSerializedTableRanges(read); + state.Ranges = TShardCostsState::BuildSerializedTableRanges(read); } StartTableScan(); ContinueExecute(); } else { + CA_LOG_D("EVLOGKQP: Costs usage"); for (const auto& read : Meta.GetReads()) { - StartCostsRequest(read); + StartCostsRequest(InFlightShards.PrepareCostRequest(read)); } } Become(&TKqpScanComputeActor::StateFunc); @@ -272,20 +275,6 @@ protected: } private: - TVector<TSerializedTableRange> BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData) { - TVector<TSerializedTableRange> resultLocal; - resultLocal.reserve(readData.GetKeyRanges().size()); - for (const auto& range : readData.GetKeyRanges()) { - auto& sr = resultLocal.emplace_back(TSerializedTableRange(range)); - if (!range.HasTo()) { - sr.To = sr.From; - sr.FromInclusive = sr.ToInclusive = true; - } - } - Y_VERIFY_DEBUG(!resultLocal.empty()); - return resultLocal; - } - THolder<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const { auto ev = MakeHolder<TEvDataShard::TEvKqpScan>(); ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId); @@ -364,44 +353,17 @@ private: } void StartCostsRequest(TShardCostsState::TPtr state) { - TSmallVec<TSerializedTableRange> serializedTableRanges = BuildSerializedTableRanges(state->GetReadData()); + TSmallVec<TSerializedTableRange> serializedTableRanges = TShardCostsState::BuildSerializedTableRanges(state->GetReadData()); THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges); ev->Record.SetCostDataOnly(true); THolder<TEvPipeCache::TEvForward> evForward = MakeHolder<TEvPipeCache::TEvForward>(ev.Release(), state->GetShardId()); Send(MakePipePeNodeCacheID(false), evForward.Release(), IEventHandle::FlagTrackDelivery); } - void StartCostsRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) { - const ui32 scanId = CostRequestsByScanId.size() + 1; - auto costsState = std::make_shared<TShardCostsState>(scanId, &read); - Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second); - Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second); - StartCostsRequest(costsState); - } - - bool ReceiveCostRequest(TEvKqpCompute::TEvCostData::TPtr& ev, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta*& readData, - TSmallVec<TSerializedTableRange>& result, const bool splitFactor) { - auto it = CostRequestsByScanId.find(ev->Get()->GetScanId()); - Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId()); - readData = &it->second->GetReadData(); - if (!ev->Get()->GetTableRanges().ColumnsCount()) { - result = BuildSerializedTableRanges(*readData); - } else { - result = ev->Get()->GetSerializedTableRanges(splitFactor); - } - if (Meta.GetReverse()) { - std::reverse(result.begin(), result.end()); - } - CostRequestsByShardId.erase(it->second->GetShardId()); - CostRequestsByScanId.erase(it); - return true; - } - void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) { const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr; TSmallVec<TSerializedTableRange> ranges; - const double shardSplitFactor = ShardsScanningPolicy.GetShardSplitFactor(); - ReceiveCostRequest(ev, read, ranges, shardSplitFactor); + Y_VERIFY(InFlightShards.ProcessCostReply(ev, read, ranges)); for (auto&& i : ranges) { auto& state = PendingShards.emplace_back(TShardState(read->GetShardId(), ++ScansCounter)); state.Ranges.emplace_back(i); @@ -428,7 +390,6 @@ private: state->State = EShardState::Running; state->ActorId = scanActorId; state->ResetRetry(); - AffectedShards.insert(state->TabletId); InFlightShards.NeedAck(state); SendScanDataAck(state); } else { @@ -463,6 +424,7 @@ private: } void StopFinally() { + CA_LOG_D("EVLOGKQP: Stop finally"); std::vector<TShardState::TPtr> currentScans; for (auto&& i : InFlightShards) { for (auto&& s : i.second) { @@ -472,6 +434,8 @@ private: for (auto&& i : currentScans) { StopReadChunk(*i); } + InFlightShards.ClearAll(); + InFlightShards.Stop(); PendingShards.clear(); } @@ -522,7 +486,8 @@ private: << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" << ", tabletId: " << state->TabletId); bool stopFinally = false; - if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && Stats.TotalReadRows >= Meta.GetItemsLimit()) { + CA_LOG_T("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount); + if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && InFlightShards.GetTotalRowsCount() >= Meta.GetItemsLimit()) { StopFinally(); stopFinally = true; } else if (!msg.Finished) { @@ -539,12 +504,13 @@ private: } if (stopFinally) { ScanData->Finish(); + InFlightShards.Stop(); CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")" << Endl << InFlightShards.GetDurationStats() << Endl << InFlightShards.StatisticsToString() ); if (ScanData->BasicStats) { - ScanData->BasicStats->AffectedShards = AffectedShards.size(); + ScanData->BasicStats->AffectedShards = InFlightShards.GetAffectedShards().size(); } } @@ -623,9 +589,8 @@ private: YQL_ENSURE(ScanData); auto& msg = *ev->Get(); - auto it = CostRequestsByShardId.find(msg.TabletId); - if (it != CostRequestsByShardId.end()) { - RetryCostsRequest(it->second); + if (auto costsState = InFlightShards.GetCostsState(msg.TabletId)) { + RetryCostsRequest(costsState); return; } @@ -646,12 +611,12 @@ private: void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) { if (ev->Get()->IsCostsRequest) { - auto it = CostRequestsByShardId.find(ev->Get()->TabletId); - if (it == CostRequestsByShardId.end()) { + auto costsState = InFlightShards.GetCostsState(ev->Get()->TabletId); + if (!costsState) { CA_LOG_E("Received retry shard costs for unexpected tablet " << ev->Get()->TabletId); return; } - StartCostsRequest(it->second); + StartCostsRequest(costsState); } else { const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(ev->Get()->Generation); auto state = InFlightShards.GetStateByIndex(scannerIdx); @@ -873,7 +838,7 @@ private: << "average read rows: " << Stats.AverageReadRows() << ", " << "average read bytes: " << Stats.AverageReadBytes() << ", "); - return CostRequestsByScanId.size() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0; + return InFlightShards.GetCostRequestsCount() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0; } void StartReadShard(TShardState::TPtr state) { @@ -1148,6 +1113,9 @@ private: template<class TMessage> TShardState::TPtr GetShardState(const TMessage& msg, const TActorId& scanActorId) { + if (!InFlightShards.IsActive()) { + return nullptr; + } ui32 generation; if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) { generation = msg.GetGeneration(); @@ -1202,10 +1170,7 @@ private: TInFlightShards InFlightShards; ui32 ScansCounter = 0; - std::set<ui64> AffectedShards; std::set<ui32> TrackingNodes; - std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId; - std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId; ui32 MaxInFlight = 1024; bool IsAggregationRequest = false; }; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index e027b329f93..02065e56d5d 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -86,4 +86,44 @@ ui32 TInFlightShards::GetScansCount() const { return result;
}
+void TInFlightShards::ClearAll() {
+ CostRequestsByScanId.clear();
+ CostRequestsByShardId.clear();
+ Shards.clear();
+ AllocatedGenerations.clear();
+ StatesByIndex.clear();
+ NeedAckStates.clear();
+}
+
+bool TInFlightShards::ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData, TSmallVec<TSerializedTableRange>& result) {
+ auto it = CostRequestsByScanId.find(ev->Get()->GetScanId());
+ Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId());
+ readData = &it->second->GetReadData();
+ if (!ev->Get()->GetTableRanges().ColumnsCount()) {
+ result = TShardCostsState::BuildSerializedTableRanges(*readData);
+ } else {
+ result = ev->Get()->GetSerializedTableRanges(ScanningPolicy.GetShardSplitFactor());
+ }
+ CostRequestsByShardId.erase(it->second->GetShardId());
+ CostRequestsByScanId.erase(it);
+ return true;
+}
+
+TShardCostsState::TPtr TInFlightShards::PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) {
+ const ui32 scanId = CostRequestsByScanId.size() + 1;
+ auto costsState = std::make_shared<TShardCostsState>(scanId, &read);
+ Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second);
+ Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second);
+ return costsState;
+}
+
+TShardCostsState::TPtr TInFlightShards::GetCostsState(const ui64 shardId) const {
+ auto it = CostRequestsByShardId.find(shardId);
+ if (it == CostRequestsByShardId.end()) {
+ return nullptr;
+ } else {
+ return it->second;
+ }
+}
+
}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index cf808a5d178..c1741fd0ecf 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -1,4 +1,5 @@ #pragma once
+#include "kqp_compute_actor.h"
#include "kqp_compute_state.h"
#include "kqp_scan_compute_stat.h"
@@ -14,8 +15,40 @@ private: std::set<ui32> ActualScannerIds;
ui32 LastGeneration = 0;
std::map<ui32, TShardState::TPtr> NeedAckStates;
-
+ std::set<ui64> AffectedShards;
+ std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId;
+ std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId;
+ const TShardsScanningPolicy& ScanningPolicy;
+ bool IsActiveFlag = true;
public:
+ TInFlightShards(const TShardsScanningPolicy& scanningPolicy)
+ : ScanningPolicy(scanningPolicy)
+ {
+
+ }
+ ui32 GetAvailableTasks() const {
+ return GetScansCount() + GetCostRequestsCount();
+ }
+ bool IsActive() const {
+ return IsActiveFlag;
+ }
+ void Stop() {
+ Y_VERIFY(GetAvailableTasks() == 0);
+ IsActiveFlag = false;
+ }
+ void ClearAll();
+ const std::set<ui64>& GetAffectedShards() const {
+ return AffectedShards;
+ }
+
+ TShardCostsState::TPtr GetCostsState(const ui64 shardId) const;
+
+
+ bool ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData,
+ TSmallVec<TSerializedTableRange>& result);
+
+ TShardCostsState::TPtr PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read);
+
TString TraceToString() const;
const std::map<ui32, TShardState::TPtr>& GetNeedAck() const {
@@ -30,9 +63,13 @@ public: void NeedAck(TShardState::TPtr state) {
Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
NeedAckStates.emplace(state->ScannerIdx, state);
+ AffectedShards.emplace(state->TabletId);
}
ui32 AllocateGeneration(TShardState::TPtr state);
+ ui32 GetCostRequestsCount() const {
+ return CostRequestsByScanId.size();
+ }
ui32 GetScansCount() const;
ui32 GetShardsCount() const {
return Shards.size();
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp index 1ee73017aed..f0d31a4c381 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp @@ -108,4 +108,12 @@ TString TScanShardsStatistics::GetDurationStats() const { return sb;
}
+ui32 TScanShardsStatistics::GetTotalRowsCount() const {
+ ui32 result = 0;
+ for (auto&& i : Statistics) {
+ result += i.second.GetTotalRowsCount();
+ }
+ return result;
+}
+
}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h index 4ccb51c3f55..3f6a09ebe98 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h @@ -37,6 +37,9 @@ public: TChunkStatistics& MutableStatistics(const ui32 scannerIdx) {
return Statistics[scannerIdx];
}
+ ui32 GetTotalRowsCount() const {
+ return TotalRowsCount;
+ }
TString StatisticsToString() const;
};
@@ -56,6 +59,7 @@ public: TShardStatistics& MutableStatistics(const ui64 shardId) {
return Statistics[shardId];
}
+ ui32 GetTotalRowsCount() const;
TString StatisticsToString() const;
TString GetDurationStats() const;
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b71ea58a00c..fe86c76f279 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -198,6 +198,8 @@ namespace Tests { , PQConfig(pqConfig) { AddStoragePool("test", "/" + DomainName + ":test"); + AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetParallelScanningAvailable(true); + AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetShardSplitFactor(16); } TServerSettings(const TServerSettings& settings) = default; |