diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 17:33:20 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 17:33:20 +0300 |
commit | e3ec0b86507a1915e9ac47235efab317582fea67 (patch) | |
tree | 6c6fad645828a3f1fad2437d9e6ae39e7c598031 | |
parent | 82e2b3d036ee20d7fccdfc48c7e65da523d93a70 (diff) | |
download | ydb-e3ec0b86507a1915e9ac47235efab317582fea67.tar.gz |
speed up scanners requesting in case global memory limits usage
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_events.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_state.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 19 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h | 7 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 4 |
8 files changed, 61 insertions, 20 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index e0cd067901c..0d7101c4144 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -30,6 +30,7 @@ struct TEvKqpCompute { , Generation(generation) , Finished(false) {} + std::optional<ui32> AvailablePacks; ui32 ScanId; ui32 Generation; TVector<TOwnedCellVec> Rows; @@ -52,11 +53,7 @@ struct TEvKqpCompute { } bool IsEmpty() const { - if (ArrowBatch) { - return ArrowBatch->num_rows() == 0; - } else { - return Rows.size() == 0; - } + return GetRowsCount() == 0; } bool IsSerializable() const override { @@ -93,6 +90,9 @@ struct TEvKqpCompute { ev->Finished = pbEv->Record.GetFinished(); ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached(); ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells()); + if (pbEv->Record.HasAvailablePacks()) { + ev->AvailablePacks = pbEv->Record.GetAvailablePacks(); + } auto rows = pbEv->Record.GetRows(); ev->Rows.reserve(rows.size()); @@ -123,6 +123,9 @@ struct TEvKqpCompute { Remote->Record.SetPageFaults(PageFaults); Remote->Record.SetPageFault(PageFault); Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey)); + if (AvailablePacks) { + Remote->Record.SetAvailablePacks(*AvailablePacks); + } switch (GetDataFormat()) { case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h index 8edbc0ffd75..674ced5176b 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_state.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h @@ -39,6 +39,7 @@ struct TShardState: public TCommonRetriesState { bool SubscribedOnTablet = false; TActorId ActorId; TOwnedCellVec LastKey; + std::optional<ui32> AvailablePacks; TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index 79b2c068745..f904353d686 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -26,6 +26,10 @@ private: return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ChannelBufferSize; } public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR; + } + TKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 8e183b6d09a..eee64d41e81 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -18,6 +18,10 @@ public: : ShardState(shardState) { } + + bool operator<(const TWaitingShard& item) const { + return ShardState->AvailablePacks.value_or(0) < item.ShardState->AvailablePacks.value_or(0); + } }; class TFreeComputeActor { @@ -42,7 +46,7 @@ private: std::map<ui64, TFreeComputeActor> ComputeActorsWaitData; std::set<ui64> ShardsWaitingIds; - std::deque<TWaitingShard> ShardsWaiting; + std::vector<TWaitingShard> ShardsWaiting; void DetachComputeActorFromShard(const ui64 tabletId) { ShardsWaitingIds.erase(tabletId); @@ -97,8 +101,9 @@ public: return false; } else { Y_VERIFY(ShardsWaitingIds.erase(ShardsWaiting.front().GetShardState()->TabletId)); - result = std::move(ShardsWaiting.front()); - ShardsWaiting.pop_front(); + std::pop_heap(ShardsWaiting.begin(), ShardsWaiting.end()); + result = std::move(ShardsWaiting.back()); + ShardsWaiting.pop_back(); return true; } } @@ -126,10 +131,18 @@ public: } } + bool ReturnShardInPool(TShardState::TPtr state) { + Y_VERIFY(ShardsWaitingIds.emplace(state->TabletId).second); + ShardsWaiting.emplace_back(TWaitingShard(state)); + std::push_heap(ShardsWaiting.begin(), ShardsWaiting.end()); + return true; + } + bool PrepareShardAck(TShardState::TPtr state, ui64& freeSpace) { if (ComputeActorsWaitShard.empty()) { Y_VERIFY(ShardsWaitingIds.emplace(state->TabletId).second); ShardsWaiting.emplace_back(TWaitingShard(state)); + std::push_heap(ShardsWaiting.begin(), ShardsWaiting.end()); return false; } std::pop_heap(ComputeActorsWaitShard.begin(), ComputeActorsWaitShard.end()); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 876116fe92e..62a91c1d161 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -22,7 +22,7 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3; } // anonymous namespace -NScanPrivate::TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, +TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) @@ -86,6 +86,7 @@ void TKqpScanFetcherActor::Bootstrap() { } void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) { + Y_VERIFY(ev->Get()->GetFreeSpace()); ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender; if (!InFlightComputes.OnComputeAck(ev->Sender, ev->Get()->GetFreeSpace())) { ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender << " IGNORED"; @@ -95,7 +96,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) } void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvTerminateFromCompute::TPtr& ev) { - ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "TEvTerminateFromCompute: " << ev->Sender; + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvTerminateFromCompute")("sender", ev->Sender); for (auto&& itTablet : InFlightShards) { for (auto&& it : itTablet.second) { auto state = it.second; @@ -115,6 +116,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvTerminateFromComput } } } + PassAway(); } void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) { @@ -429,6 +431,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvInterconnect::TEvNodeDisconnected::T if (state->ActorId && state->ActorId.NodeId() == nodeId) { SendGlobalFail(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection with node " << nodeId << " lost."); + break; } } } @@ -448,12 +451,13 @@ bool TKqpScanFetcherActor::SendGlobalFail(const NDqProto::EComputeState state, N return true; } -bool TKqpScanFetcherActor::ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) { +bool TKqpScanFetcherActor::ProvideDataToCompute(const NActors::TActorId& scannerId, TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) noexcept { if (msg.IsEmpty()) { InFlightComputes.OnEmptyDataReceived(state->TabletId, msg.RequestedBytesLimitReached || msg.Finished); } else { - ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "PROVIDING (FROM " << SelfId() << "): used free compute " << InFlightComputes.DebugString(); auto computeActorInfo = InFlightComputes.OnDataReceived(state->TabletId, msg.RequestedBytesLimitReached || msg.Finished); + ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << SelfId() << " PROVIDING (FROM " << scannerId << " to " << computeActorInfo.GetActorId() << + "): used free compute " << InFlightComputes.DebugString(); Send(computeActorInfo.GetActorId(), new TEvScanExchange::TEvSendData(msg, state->TabletId)); } return true; @@ -518,7 +522,7 @@ THolder<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan( return ev; } -void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) { +void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) noexcept { auto& msg = *ev->Get(); auto state = GetShardState(msg, ev->Sender); @@ -542,17 +546,22 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData << ";in flight shards=" << InFlightShards.GetShardsCount() << ";delayed_for=" << latency.SecondsFloat() << " seconds by ratelimiter" << ";tablet_id=" << state->TabletId); - ProvideDataToCompute(msg, state); + ProvideDataToCompute(ev->Sender, msg, state); + state->AvailablePacks = msg.AvailablePacks; InFlightShards.MutableStatistics(state->TabletId).AddPack(rowsCount, 0); - Stats.AddReadStat(state->ScannerIdx, rowsCount, 0); bool stopFinally = false; - CA_LOG_T("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount); + CA_LOG_D("EVLOGKQP:" << IsAggregationRequest << "/" << Meta.GetItemsLimit() << "/" << InFlightShards.GetTotalRowsCount() << "/" << rowsCount); if (!msg.Finished) { if (msg.RequestedBytesLimitReached) { InFlightShards.NeedAck(state); - SendScanDataAck(state); + if (!state->AvailablePacks || *state->AvailablePacks == 0) { + ReturnShardInPool(state); + DoAckAvailableWaiting(); + } else { + SendScanDataAck(state); + } } } else { CA_LOG_D("Chunk " << state->TabletId << "/" << state->ScannerIdx << " scan finished"); @@ -627,6 +636,10 @@ void TKqpScanFetcherActor::StartReadShard(TShardState::TPtr state) { SendStartScanRequest(state, state->Generation); } +bool TKqpScanFetcherActor::ReturnShardInPool(TShardState::TPtr state) { + return InFlightComputes.ReturnShardInPool(state); +} + bool TKqpScanFetcherActor::SendScanDataAck(TShardState::TPtr state) { ui64 freeSpace; if (!InFlightComputes.PrepareShardAck(state, freeSpace)) { @@ -678,6 +691,7 @@ void TKqpScanFetcherActor::RetryDeliveryProblem(TShardState::TPtr state) { // so after several consecutive delivery problem responses retry logic should // resolve shard details again. if (state->RetryAttempt >= MAX_SHARD_RETRIES) { + Send(state->ActorId, new NActors::TEvents::TEvPoisonPill()); return ResolveShard(*state); } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index a87790969f5..04b58eae0bb 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -50,9 +50,10 @@ private: const NMiniKQL::TScanDataMetaFull ScanDataMeta; const NYql::NDq::TComputeRuntimeSettings RuntimeSettings; const NYql::NDq::TTxId TxId; + bool ReturnShardInPool(TShardState::TPtr state); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR; + return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR; } TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings, @@ -102,7 +103,7 @@ private: bool SendGlobalFail(const NYql::NDqProto::EComputeState state, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& issues) const; - bool ProvideDataToCompute(TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state); + bool ProvideDataToCompute(const NActors::TActorId& scannerId, TEvKqpCompute::TEvScanData& msg, TShardState::TPtr state) noexcept; bool SendScanFinished(); @@ -112,7 +113,7 @@ private: void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev); - void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt); + void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) noexcept; void ProcessScanData(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index f52e14fae1d..5f5235409d1 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -585,6 +585,7 @@ message TEvRemoteScanData { optional TArrowBatch ArrowBatch = 10; optional bool RequestedBytesLimitReached = 11 [default = false]; + optional uint32 AvailablePacks = 12; } message TEvRemoteScanDataAck { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index f5ef8c1a9a2..121ee7f9bcc 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -371,6 +371,8 @@ enum EServiceKikimr { ARROW_HELPER = 2100; KAFKA_PROXY = 2200; + + OBJECTS_MONITORING = 2300; }; message TActivity { @@ -991,5 +993,7 @@ message TActivity { FEDERATION_DISCOVERY = 611; GRPC_REQ_SHARD_WRITER = 612; CMS_API_ADAPTER = 613; + KQP_SCAN_FETCH_ACTOR = 614; + COLUMNSHARD_CONVEYOR = 615; }; }; |