diff options
| author | hor911 <[email protected]> | 2023-03-06 21:52:22 +0300 |
|---|---|---|
| committer | hor911 <[email protected]> | 2023-03-06 21:52:22 +0300 |
| commit | 4332e68ad3f50d60ba750aa49348d9c0f757a4a1 (patch) | |
| tree | 679cc40be902421c7cb7921ade81a51b47866992 | |
| parent | 56082bd86655e7d36ae18f8c4f19e468dafc651a (diff) | |
Measure, publish and limit Source CPU usage
7 files changed, 100 insertions, 14 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp index 009572f792a..3d720aa995b 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp @@ -208,6 +208,7 @@ TString GetPrettyStatistics(const TString& statistics) { RemapNode(writer, p.second, "TaskRunner.Stage=Total.BuildCpuTimeUs", "BuildCpuTimeUs"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.ComputeCpuTimeUs", "ComputeCpuTimeUs"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.SourceCpuTimeUs", "SourceCpuTimeUs"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes"); RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressPqSourceBytes", "IngressStreamBytes"); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index aa34d774700..b7886cefe0f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -934,7 +934,6 @@ private: // Cpu quota TActorId QuoterServiceActorId; TInstant CpuTimeQuotaAsked; - TDuration CpuTimeSpent; std::unique_ptr<NTaskRunnerActor::TEvContinueRun> ContinueRunEvent; TInstant ContinueRunStartWaitTime; bool ContinueRunInflight = false; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 6668be4e0aa..b147b070e67 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -87,6 +87,10 @@ struct IDqComputeActorAsyncInput { return 0; } + virtual TDuration GetCpuTime() { + return TDuration::Zero(); + } + virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; } virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 169815a33b5..0a7f159b56f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -241,6 +241,7 @@ protected: if (taskCounters) { MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota"); OutputChannelSize = taskCounters->GetCounter("OutputChannelSize"); + SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true); } } @@ -1695,6 +1696,11 @@ protected: void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex)); + auto cpuTimeDelta = TakeSourceCpuTimeDelta(); + if (SourceCpuTimeMs) { + SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; ContinueExecute(); } @@ -1880,6 +1886,22 @@ private: } public: + + TDuration GetSourceCpuTime() const { + auto result = TDuration::Zero(); + for (auto& [inputIndex, sourceInfo] : SourcesMap) { + result += sourceInfo.AsyncInput->GetCpuTime(); + } + return result; + } + + TDuration TakeSourceCpuTimeDelta() { + auto newSourceCpuTime = GetSourceCpuTime(); + auto result = newSourceCpuTime - SourceCpuTime; + SourceCpuTime = newSourceCpuTime; + return result; + } + void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (!BasicStats) { return; @@ -1931,6 +1953,7 @@ public: // More accurate cpu time counter: if (TDerived::HasAsyncTaskRunner) { protoTask->SetCpuTimeUs(BasicStats->CpuTime.MicroSeconds() + taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds()); + protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds()); } for (auto& [outputIndex, sinkInfo] : SinksMap) { @@ -2113,6 +2136,7 @@ protected: ::NMonitoring::TDynamicCounterPtr TaskCounters; TDqComputeActorMetrics DqComputeActorMetrics; NWilson::TSpan ComputeActorSpan; + TDuration SourceCpuTime; private: bool Running = true; TInstant LastSendStatsTime; @@ -2120,7 +2144,9 @@ private: protected: ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; + ::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs; THolder<NYql::TCounters> Stat; + TDuration CpuTimeSpent; }; } // namespace NYql diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index a79b79ba704..7eeb8abc642 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -122,6 +122,7 @@ message TDqTaskStats { uint64 TaskId = 1; uint32 StageId = 2; uint64 CpuTimeUs = 3; // total cpu time (build & compute) + uint64 SourceCpuTimeUs = 15; // time consumed in source uint64 FirstRowTimeMs = 4; // first row time, timestamp in millis uint64 FinishTimeMs = 5; // task finish time, timestamp in millis uint64 InputRows = 6; diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 2015e4c7186..a633293d66a 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -347,6 +347,7 @@ private: // basic stats ADD_COUNTER(CpuTimeUs) ADD_COUNTER(ComputeCpuTimeUs) + ADD_COUNTER(SourceCpuTimeUs) ADD_COUNTER(PendingInputTimeUs) ADD_COUNTER(PendingOutputTimeUs) ADD_COUNTER(FinishTimeUs) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 3a021f1ec98..fbc8f0143bd 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -66,6 +66,7 @@ #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/util/datetime.h> #include <util/generic/size_literals.h> #include <util/stream/format.h> @@ -210,11 +211,12 @@ struct TEvPrivate { }; struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> { - TEvFileFinished(size_t pathIndex, ui64 ingressDelta) - : PathIndex(pathIndex), IngressDelta(ingressDelta) { + TEvFileFinished(size_t pathIndex, ui64 ingressDelta, TDuration cpuTimeDelta) + : PathIndex(pathIndex), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { } const size_t PathIndex; - ui64 IngressDelta; + const ui64 IngressDelta; + const TDuration CpuTimeDelta; }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { @@ -240,19 +242,26 @@ struct TEvPrivate { }; struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> { - TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { Block.swap(block); } + TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta) + : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { + Block.swap(block); + } NDB::Block Block; const size_t PathIndex; std::function<void()> Functor; - ui64 IngressDelta; + const ui64 IngressDelta; + const TDuration CpuTimeDelta; }; struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> { - TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { } + TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta) + : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { + } std::shared_ptr<arrow::RecordBatch> Batch; const size_t PathIndex; std::function<void()> Functor; - ui64 IngressDelta; + const ui64 IngressDelta; + const TDuration CpuTimeDelta; }; struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> { @@ -795,6 +804,10 @@ private: return IngressBytes; } + TDuration GetCpuTime() override { + return CpuTime; + } + STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvReadResult, Handle); hFunc(TEvPrivate::TEvReadError, Handle); @@ -977,6 +990,7 @@ private: const ui64 StartPathIndex; const ui64 SizeLimit; ui64 IngressBytes = 0; + TDuration CpuTime; std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks; @@ -1247,7 +1261,9 @@ public: private: bool nextImpl() final { while (!Coro->InputFinished || !Coro->DeferredDataParts.empty()) { + Coro->CpuTime += Coro->GetCpuTimeDelta(); Coro->ProcessOneEvent(); + Coro->StartCycleCount = GetCycleCountFast(); if (Coro->InputBuffer) { RawDataBuffer.swap(Coro->InputBuffer); Coro->InputBuffer.clear(); @@ -1286,7 +1302,7 @@ public: ); while (NDB::Block batch = stream->read()) { - Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta())); + Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta())); } LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED"); @@ -1325,7 +1341,7 @@ public: } Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() { actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); - }, TakeIngressDelta())); + }, TakeIngressDelta(), TakeCpuTimeDelta())); } while (cntBlocksInFly--) { WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); @@ -1367,9 +1383,14 @@ public: future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); }); + + CpuTime += GetCpuTimeDelta(); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); auto result = future.GetValue(); + StartCycleCount = GetCycleCountFast(); + fileDesc.Cookie = result.Cookie; std::shared_ptr<arrow::Schema> schema = result.Schema; @@ -1384,15 +1405,21 @@ public: future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); }); + + CpuTime += GetCpuTimeDelta(); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); auto table = future.GetValue(); + + StartCycleCount = GetCycleCountFast(); + auto reader = std::make_unique<arrow::TableBatchReader>(*table); std::shared_ptr<arrow::RecordBatch> batch; ::arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta() + ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() )); } if (!status.ok()) { @@ -1465,7 +1492,9 @@ public: TEvPrivate::TReadRange range { .Offset = position, .Length = nbytes }; auto cache = GetOrCreate(range); while (cache.Data.empty()) { + CpuTime += GetCpuTimeDelta(); auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(); + StartCycleCount = GetCycleCountFast(); if (ev->Get()->Failure) { throw yexception() << ev->Get()->Issues.ToOneLineString(); } @@ -1524,7 +1553,7 @@ public: ::arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta() + ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() )); } if (!status.ok()) { @@ -1580,7 +1609,7 @@ public: Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() { actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); - }, TakeIngressDelta() + }, TakeIngressDelta(), TakeCpuTimeDelta() )); } if (!status.ok()) { @@ -1750,9 +1779,22 @@ private: return currentIngressBytes; } + TDuration TakeCpuTimeDelta() { + auto currentCpuTime = CpuTime; + CpuTime = TDuration::Zero(); + return currentCpuTime; + } + + TDuration GetCpuTimeDelta() { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + } + void Run() final { NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; + + StartCycleCount = GetCycleCountFast(); + try { if (ReadSpec->Arrow) { if (ReadSpec->Compression) { @@ -1809,11 +1851,13 @@ private: RetryStuff->Cancel(); } + CpuTime += GetCpuTimeDelta(); + auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues)); if (issues) Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else - Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta())); + Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { @@ -1875,6 +1919,8 @@ private: std::size_t MaxBlocksInFly = 2; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; + TDuration CpuTime; + ui64 StartCycleCount = 0; TString InputBuffer; bool Paused = false; std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts; @@ -2091,6 +2137,10 @@ private: return IngressBytes; } + TDuration GetCpuTime() override { + return CpuTime; + } + ui64 GetBlockSize(const TReadyBlock& block) const { return ReadSpec->Arrow ? NUdf::GetSizeOfArrowBatchInBytes(*block.Batch) : block.Block.bytes(); } @@ -2280,6 +2330,7 @@ private: void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { YQL_ENSURE(!ReadSpec->Arrow); IngressBytes += next->Get()->IngressDelta; + CpuTime += next->Get()->CpuTimeDelta; auto size = next->Get()->Block.bytes(); QueueTotalDataSize += size; if (Counters) { @@ -2298,6 +2349,7 @@ private: void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { YQL_ENSURE(ReadSpec->Arrow); IngressBytes += next->Get()->IngressDelta; + CpuTime += next->Get()->CpuTimeDelta; auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch); QueueTotalDataSize += size; if (Counters) { @@ -2316,6 +2368,7 @@ private: void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) { CoroActors.erase(ev->Sender); IngressBytes += ev->Get()->IngressDelta; + CpuTime += ev->Get()->CpuTimeDelta; RetryStuffForFile.erase(ev->Get()->PathIndex); if (TaskCounters) { @@ -2371,6 +2424,7 @@ private: const std::size_t MaxBlocksInFly; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; + TDuration CpuTime; mutable TInstant LastMemoryReport = TInstant::Now(); ui64 QueueTotalDataSize = 0; ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; |
