aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-19 11:57:22 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-19 11:57:22 +0300
commit94de3c44fbfb54ffa1ea7a8baa73d174ee6f40b1 (patch)
tree0574750aa5d6795c4adb679840278d4a2540463c
parentc261c7112a1857f7c093dd87cfd07a93cd219822 (diff)
downloadydb-94de3c44fbfb54ffa1ea7a8baa73d174ee6f40b1.tar.gz
limit usage for stop finally and additional statistics calculation
incapsulate costs state into common manager switch on parallel scan for olap requests
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.h19
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp85
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp40
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h39
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h4
-rw-r--r--ydb/core/testlib/test_client.h2
7 files changed, 135 insertions, 62 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h
index 3c87df2e37f..382d0709d9a 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_state.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h
@@ -30,15 +30,32 @@ public:
};
class TShardCostsState: public TCommonRetriesState {
+public:
+ using TReadData = NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta;
private:
const ui32 ScanId;
- const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* ReadData;
+ const TReadData* ReadData;
public:
using TPtr = std::shared_ptr<TShardCostsState>;
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& GetReadData() const {
return *ReadData;
}
+ static TVector<TSerializedTableRange> BuildSerializedTableRanges(const TReadData& readData) {
+ TVector<TSerializedTableRange> resultLocal;
+ resultLocal.reserve(readData.GetKeyRanges().size());
+ for (const auto& range : readData.GetKeyRanges()) {
+ auto& sr = resultLocal.emplace_back(TSerializedTableRange(range));
+ if (!range.HasTo()) {
+ sr.To = sr.From;
+ sr.FromInclusive = sr.ToInclusive = true;
+ }
+ }
+ Y_VERIFY_DEBUG(!resultLocal.empty());
+ return resultLocal;
+ }
+
+
ui32 GetScanId() const {
return ScanId;
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 69c1ada9b9c..cfe4d7ed8f9 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -121,7 +121,9 @@ public:
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
- , Counters(counters) {
+ , Counters(counters)
+ , InFlightShards(ShardsScanningPolicy)
+ {
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
YQL_ENSURE(!Meta.GetReads().empty());
YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -170,16 +172,17 @@ public:
ScanData->TaskId = GetTask().GetId();
ScanData->TableReader = CreateKqpTableReader(*ScanData);
ShardsScanningPolicy.FillRequestScanFeatures(Meta, MaxInFlight, IsAggregationRequest);
- if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable()) {
+ if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable() || ShardsScanningPolicy.GetShardSplitFactor() == 0) {
for (const auto& read : Meta.GetReads()) {
auto& state = PendingShards.emplace_back(TShardState(read.GetShardId(), ++ScansCounter));
- state.Ranges = BuildSerializedTableRanges(read);
+ state.Ranges = TShardCostsState::BuildSerializedTableRanges(read);
}
StartTableScan();
ContinueExecute();
} else {
+ CA_LOG_D("EVLOGKQP: Costs usage");
for (const auto& read : Meta.GetReads()) {
- StartCostsRequest(read);
+ StartCostsRequest(InFlightShards.PrepareCostRequest(read));
}
}
Become(&TKqpScanComputeActor::StateFunc);
@@ -272,20 +275,6 @@ protected:
}
private:
- TVector<TSerializedTableRange> BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData) {
- TVector<TSerializedTableRange> resultLocal;
- resultLocal.reserve(readData.GetKeyRanges().size());
- for (const auto& range : readData.GetKeyRanges()) {
- auto& sr = resultLocal.emplace_back(TSerializedTableRange(range));
- if (!range.HasTo()) {
- sr.To = sr.From;
- sr.FromInclusive = sr.ToInclusive = true;
- }
- }
- Y_VERIFY_DEBUG(!resultLocal.empty());
- return resultLocal;
- }
-
THolder<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const {
auto ev = MakeHolder<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId);
@@ -364,44 +353,17 @@ private:
}
void StartCostsRequest(TShardCostsState::TPtr state) {
- TSmallVec<TSerializedTableRange> serializedTableRanges = BuildSerializedTableRanges(state->GetReadData());
+ TSmallVec<TSerializedTableRange> serializedTableRanges = TShardCostsState::BuildSerializedTableRanges(state->GetReadData());
THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges);
ev->Record.SetCostDataOnly(true);
THolder<TEvPipeCache::TEvForward> evForward = MakeHolder<TEvPipeCache::TEvForward>(ev.Release(), state->GetShardId());
Send(MakePipePeNodeCacheID(false), evForward.Release(), IEventHandle::FlagTrackDelivery);
}
- void StartCostsRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) {
- const ui32 scanId = CostRequestsByScanId.size() + 1;
- auto costsState = std::make_shared<TShardCostsState>(scanId, &read);
- Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second);
- Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second);
- StartCostsRequest(costsState);
- }
-
- bool ReceiveCostRequest(TEvKqpCompute::TEvCostData::TPtr& ev, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta*& readData,
- TSmallVec<TSerializedTableRange>& result, const bool splitFactor) {
- auto it = CostRequestsByScanId.find(ev->Get()->GetScanId());
- Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId());
- readData = &it->second->GetReadData();
- if (!ev->Get()->GetTableRanges().ColumnsCount()) {
- result = BuildSerializedTableRanges(*readData);
- } else {
- result = ev->Get()->GetSerializedTableRanges(splitFactor);
- }
- if (Meta.GetReverse()) {
- std::reverse(result.begin(), result.end());
- }
- CostRequestsByShardId.erase(it->second->GetShardId());
- CostRequestsByScanId.erase(it);
- return true;
- }
-
void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) {
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr;
TSmallVec<TSerializedTableRange> ranges;
- const double shardSplitFactor = ShardsScanningPolicy.GetShardSplitFactor();
- ReceiveCostRequest(ev, read, ranges, shardSplitFactor);
+ Y_VERIFY(InFlightShards.ProcessCostReply(ev, read, ranges));
for (auto&& i : ranges) {
auto& state = PendingShards.emplace_back(TShardState(read->GetShardId(), ++ScansCounter));
state.Ranges.emplace_back(i);
@@ -428,7 +390,6 @@ private:
state->State = EShardState::Running;
state->ActorId = scanActorId;
state->ResetRetry();
- AffectedShards.insert(state->TabletId);
InFlightShards.NeedAck(state);
SendScanDataAck(state);
} else {
@@ -463,6 +424,7 @@ private:
}
void StopFinally() {
+ CA_LOG_D("EVLOGKQP: Stop finally");
std::vector<TShardState::TPtr> currentScans;
for (auto&& i : InFlightShards) {
for (auto&& s : i.second) {
@@ -472,6 +434,8 @@ private:
for (auto&& i : currentScans) {
StopReadChunk(*i);
}
+ InFlightShards.ClearAll();
+ InFlightShards.Stop();
PendingShards.clear();
}
@@ -522,7 +486,8 @@ private:
<< ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"
<< ", tabletId: " << state->TabletId);
bool stopFinally = false;
- if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && Stats.TotalReadRows >= Meta.GetItemsLimit()) {
+ CA_LOG_T("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount);
+ if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && InFlightShards.GetTotalRowsCount() >= Meta.GetItemsLimit()) {
StopFinally();
stopFinally = true;
} else if (!msg.Finished) {
@@ -539,12 +504,13 @@ private:
}
if (stopFinally) {
ScanData->Finish();
+ InFlightShards.Stop();
CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")"
<< Endl << InFlightShards.GetDurationStats()
<< Endl << InFlightShards.StatisticsToString()
);
if (ScanData->BasicStats) {
- ScanData->BasicStats->AffectedShards = AffectedShards.size();
+ ScanData->BasicStats->AffectedShards = InFlightShards.GetAffectedShards().size();
}
}
@@ -623,9 +589,8 @@ private:
YQL_ENSURE(ScanData);
auto& msg = *ev->Get();
- auto it = CostRequestsByShardId.find(msg.TabletId);
- if (it != CostRequestsByShardId.end()) {
- RetryCostsRequest(it->second);
+ if (auto costsState = InFlightShards.GetCostsState(msg.TabletId)) {
+ RetryCostsRequest(costsState);
return;
}
@@ -646,12 +611,12 @@ private:
void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) {
if (ev->Get()->IsCostsRequest) {
- auto it = CostRequestsByShardId.find(ev->Get()->TabletId);
- if (it == CostRequestsByShardId.end()) {
+ auto costsState = InFlightShards.GetCostsState(ev->Get()->TabletId);
+ if (!costsState) {
CA_LOG_E("Received retry shard costs for unexpected tablet " << ev->Get()->TabletId);
return;
}
- StartCostsRequest(it->second);
+ StartCostsRequest(costsState);
} else {
const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(ev->Get()->Generation);
auto state = InFlightShards.GetStateByIndex(scannerIdx);
@@ -873,7 +838,7 @@ private:
<< "average read rows: " << Stats.AverageReadRows() << ", "
<< "average read bytes: " << Stats.AverageReadBytes() << ", ");
- return CostRequestsByScanId.size() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0;
+ return InFlightShards.GetCostRequestsCount() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0;
}
void StartReadShard(TShardState::TPtr state) {
@@ -1148,6 +1113,9 @@ private:
template<class TMessage>
TShardState::TPtr GetShardState(const TMessage& msg, const TActorId& scanActorId) {
+ if (!InFlightShards.IsActive()) {
+ return nullptr;
+ }
ui32 generation;
if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) {
generation = msg.GetGeneration();
@@ -1202,10 +1170,7 @@ private:
TInFlightShards InFlightShards;
ui32 ScansCounter = 0;
- std::set<ui64> AffectedShards;
std::set<ui32> TrackingNodes;
- std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId;
- std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId;
ui32 MaxInFlight = 1024;
bool IsAggregationRequest = false;
};
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index e027b329f93..02065e56d5d 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -86,4 +86,44 @@ ui32 TInFlightShards::GetScansCount() const {
return result;
}
+void TInFlightShards::ClearAll() {
+ CostRequestsByScanId.clear();
+ CostRequestsByShardId.clear();
+ Shards.clear();
+ AllocatedGenerations.clear();
+ StatesByIndex.clear();
+ NeedAckStates.clear();
+}
+
+bool TInFlightShards::ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData, TSmallVec<TSerializedTableRange>& result) {
+ auto it = CostRequestsByScanId.find(ev->Get()->GetScanId());
+ Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId());
+ readData = &it->second->GetReadData();
+ if (!ev->Get()->GetTableRanges().ColumnsCount()) {
+ result = TShardCostsState::BuildSerializedTableRanges(*readData);
+ } else {
+ result = ev->Get()->GetSerializedTableRanges(ScanningPolicy.GetShardSplitFactor());
+ }
+ CostRequestsByShardId.erase(it->second->GetShardId());
+ CostRequestsByScanId.erase(it);
+ return true;
+}
+
+TShardCostsState::TPtr TInFlightShards::PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) {
+ const ui32 scanId = CostRequestsByScanId.size() + 1;
+ auto costsState = std::make_shared<TShardCostsState>(scanId, &read);
+ Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second);
+ Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second);
+ return costsState;
+}
+
+TShardCostsState::TPtr TInFlightShards::GetCostsState(const ui64 shardId) const {
+ auto it = CostRequestsByShardId.find(shardId);
+ if (it == CostRequestsByShardId.end()) {
+ return nullptr;
+ } else {
+ return it->second;
+ }
+}
+
} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index cf808a5d178..c1741fd0ecf 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -1,4 +1,5 @@
#pragma once
+#include "kqp_compute_actor.h"
#include "kqp_compute_state.h"
#include "kqp_scan_compute_stat.h"
@@ -14,8 +15,40 @@ private:
std::set<ui32> ActualScannerIds;
ui32 LastGeneration = 0;
std::map<ui32, TShardState::TPtr> NeedAckStates;
-
+ std::set<ui64> AffectedShards;
+ std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId;
+ std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId;
+ const TShardsScanningPolicy& ScanningPolicy;
+ bool IsActiveFlag = true;
public:
+ TInFlightShards(const TShardsScanningPolicy& scanningPolicy)
+ : ScanningPolicy(scanningPolicy)
+ {
+
+ }
+ ui32 GetAvailableTasks() const {
+ return GetScansCount() + GetCostRequestsCount();
+ }
+ bool IsActive() const {
+ return IsActiveFlag;
+ }
+ void Stop() {
+ Y_VERIFY(GetAvailableTasks() == 0);
+ IsActiveFlag = false;
+ }
+ void ClearAll();
+ const std::set<ui64>& GetAffectedShards() const {
+ return AffectedShards;
+ }
+
+ TShardCostsState::TPtr GetCostsState(const ui64 shardId) const;
+
+
+ bool ProcessCostReply(TEvKqpCompute::TEvCostData::TPtr ev, const TShardCostsState::TReadData*& readData,
+ TSmallVec<TSerializedTableRange>& result);
+
+ TShardCostsState::TPtr PrepareCostRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read);
+
TString TraceToString() const;
const std::map<ui32, TShardState::TPtr>& GetNeedAck() const {
@@ -30,9 +63,13 @@ public:
void NeedAck(TShardState::TPtr state) {
Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
NeedAckStates.emplace(state->ScannerIdx, state);
+ AffectedShards.emplace(state->TabletId);
}
ui32 AllocateGeneration(TShardState::TPtr state);
+ ui32 GetCostRequestsCount() const {
+ return CostRequestsByScanId.size();
+ }
ui32 GetScansCount() const;
ui32 GetShardsCount() const {
return Shards.size();
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
index 1ee73017aed..f0d31a4c381 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
@@ -108,4 +108,12 @@ TString TScanShardsStatistics::GetDurationStats() const {
return sb;
}
+ui32 TScanShardsStatistics::GetTotalRowsCount() const {
+ ui32 result = 0;
+ for (auto&& i : Statistics) {
+ result += i.second.GetTotalRowsCount();
+ }
+ return result;
+}
+
} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
index 4ccb51c3f55..3f6a09ebe98 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
@@ -37,6 +37,9 @@ public:
TChunkStatistics& MutableStatistics(const ui32 scannerIdx) {
return Statistics[scannerIdx];
}
+ ui32 GetTotalRowsCount() const {
+ return TotalRowsCount;
+ }
TString StatisticsToString() const;
};
@@ -56,6 +59,7 @@ public:
TShardStatistics& MutableStatistics(const ui64 shardId) {
return Statistics[shardId];
}
+ ui32 GetTotalRowsCount() const;
TString StatisticsToString() const;
TString GetDurationStats() const;
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index b71ea58a00c..fe86c76f279 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -198,6 +198,8 @@ namespace Tests {
, PQConfig(pqConfig)
{
AddStoragePool("test", "/" + DomainName + ":test");
+ AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetParallelScanningAvailable(true);
+ AppConfig.MutableTableServiceConfig()->MutableResourceManager()->MutableShardsScanningPolicy()->SetShardSplitFactor(16);
}
TServerSettings(const TServerSettings& settings) = default;