summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-03-06 21:52:22 +0300
committerhor911 <[email protected]>2023-03-06 21:52:22 +0300
commit4332e68ad3f50d60ba750aa49348d9c0f757a4a1 (patch)
tree679cc40be902421c7cb7921ade81a51b47866992
parent56082bd86655e7d36ae18f8c4f19e468dafc651a (diff)
Measure, publish and limit Source CPU usage
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h26
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto1
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp80
7 files changed, 100 insertions, 14 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
index 009572f792a..3d720aa995b 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
@@ -208,6 +208,7 @@ TString GetPrettyStatistics(const TString& statistics) {
RemapNode(writer, p.second, "TaskRunner.Stage=Total.BuildCpuTimeUs", "BuildCpuTimeUs");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.ComputeCpuTimeUs", "ComputeCpuTimeUs");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.SourceCpuTimeUs", "SourceCpuTimeUs");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressPqSourceBytes", "IngressStreamBytes");
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index aa34d774700..b7886cefe0f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -934,7 +934,6 @@ private:
// Cpu quota
TActorId QuoterServiceActorId;
TInstant CpuTimeQuotaAsked;
- TDuration CpuTimeSpent;
std::unique_ptr<NTaskRunnerActor::TEvContinueRun> ContinueRunEvent;
TInstant ContinueRunStartWaitTime;
bool ContinueRunInflight = false;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 6668be4e0aa..b147b070e67 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -87,6 +87,10 @@ struct IDqComputeActorAsyncInput {
return 0;
}
+ virtual TDuration GetCpuTime() {
+ return TDuration::Zero();
+ }
+
virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; }
virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { }
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 169815a33b5..0a7f159b56f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -241,6 +241,7 @@ protected:
if (taskCounters) {
MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota");
OutputChannelSize = taskCounters->GetCounter("OutputChannelSize");
+ SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true);
}
}
@@ -1695,6 +1696,11 @@ protected:
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex));
+ auto cpuTimeDelta = TakeSourceCpuTimeDelta();
+ if (SourceCpuTimeMs) {
+ SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
+ }
+ CpuTimeSpent += cpuTimeDelta;
ContinueExecute();
}
@@ -1880,6 +1886,22 @@ private:
}
public:
+
+ TDuration GetSourceCpuTime() const {
+ auto result = TDuration::Zero();
+ for (auto& [inputIndex, sourceInfo] : SourcesMap) {
+ result += sourceInfo.AsyncInput->GetCpuTime();
+ }
+ return result;
+ }
+
+ TDuration TakeSourceCpuTimeDelta() {
+ auto newSourceCpuTime = GetSourceCpuTime();
+ auto result = newSourceCpuTime - SourceCpuTime;
+ SourceCpuTime = newSourceCpuTime;
+ return result;
+ }
+
void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (!BasicStats) {
return;
@@ -1931,6 +1953,7 @@ public:
// More accurate cpu time counter:
if (TDerived::HasAsyncTaskRunner) {
protoTask->SetCpuTimeUs(BasicStats->CpuTime.MicroSeconds() + taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds());
+ protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds());
}
for (auto& [outputIndex, sinkInfo] : SinksMap) {
@@ -2113,6 +2136,7 @@ protected:
::NMonitoring::TDynamicCounterPtr TaskCounters;
TDqComputeActorMetrics DqComputeActorMetrics;
NWilson::TSpan ComputeActorSpan;
+ TDuration SourceCpuTime;
private:
bool Running = true;
TInstant LastSendStatsTime;
@@ -2120,7 +2144,9 @@ private:
protected:
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs;
THolder<NYql::TCounters> Stat;
+ TDuration CpuTimeSpent;
};
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index a79b79ba704..7eeb8abc642 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -122,6 +122,7 @@ message TDqTaskStats {
uint64 TaskId = 1;
uint32 StageId = 2;
uint64 CpuTimeUs = 3; // total cpu time (build & compute)
+ uint64 SourceCpuTimeUs = 15; // time consumed in source
uint64 FirstRowTimeMs = 4; // first row time, timestamp in millis
uint64 FinishTimeMs = 5; // task finish time, timestamp in millis
uint64 InputRows = 6;
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index 2015e4c7186..a633293d66a 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -347,6 +347,7 @@ private:
// basic stats
ADD_COUNTER(CpuTimeUs)
ADD_COUNTER(ComputeCpuTimeUs)
+ ADD_COUNTER(SourceCpuTimeUs)
ADD_COUNTER(PendingInputTimeUs)
ADD_COUNTER(PendingOutputTimeUs)
ADD_COUNTER(FinishTimeUs)
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 3a021f1ec98..fbc8f0143bd 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -66,6 +66,7 @@
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/util/datetime.h>
#include <util/generic/size_literals.h>
#include <util/stream/format.h>
@@ -210,11 +211,12 @@ struct TEvPrivate {
};
struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> {
- TEvFileFinished(size_t pathIndex, ui64 ingressDelta)
- : PathIndex(pathIndex), IngressDelta(ingressDelta) {
+ TEvFileFinished(size_t pathIndex, ui64 ingressDelta, TDuration cpuTimeDelta)
+ : PathIndex(pathIndex), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
}
const size_t PathIndex;
- ui64 IngressDelta;
+ const ui64 IngressDelta;
+ const TDuration CpuTimeDelta;
};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
@@ -240,19 +242,26 @@ struct TEvPrivate {
};
struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> {
- TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { Block.swap(block); }
+ TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta)
+ : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
+ Block.swap(block);
+ }
NDB::Block Block;
const size_t PathIndex;
std::function<void()> Functor;
- ui64 IngressDelta;
+ const ui64 IngressDelta;
+ const TDuration CpuTimeDelta;
};
struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> {
- TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { }
+ TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta)
+ : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
+ }
std::shared_ptr<arrow::RecordBatch> Batch;
const size_t PathIndex;
std::function<void()> Functor;
- ui64 IngressDelta;
+ const ui64 IngressDelta;
+ const TDuration CpuTimeDelta;
};
struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> {
@@ -795,6 +804,10 @@ private:
return IngressBytes;
}
+ TDuration GetCpuTime() override {
+ return CpuTime;
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);
@@ -977,6 +990,7 @@ private:
const ui64 StartPathIndex;
const ui64 SizeLimit;
ui64 IngressBytes = 0;
+ TDuration CpuTime;
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
@@ -1247,7 +1261,9 @@ public:
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->DeferredDataParts.empty()) {
+ Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
+ Coro->StartCycleCount = GetCycleCountFast();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
@@ -1286,7 +1302,7 @@ public:
);
while (NDB::Block batch = stream->read()) {
- Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta()));
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()));
}
LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED");
@@ -1325,7 +1341,7 @@ public:
}
Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() {
actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
- }, TakeIngressDelta()));
+ }, TakeIngressDelta(), TakeCpuTimeDelta()));
}
while (cntBlocksInFly--) {
WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
@@ -1367,9 +1383,14 @@ public:
future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
});
+
+ CpuTime += GetCpuTimeDelta();
+
WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
auto result = future.GetValue();
+ StartCycleCount = GetCycleCountFast();
+
fileDesc.Cookie = result.Cookie;
std::shared_ptr<arrow::Schema> schema = result.Schema;
@@ -1384,15 +1405,21 @@ public:
future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
});
+
+ CpuTime += GetCpuTimeDelta();
+
WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
auto table = future.GetValue();
+
+ StartCycleCount = GetCycleCountFast();
+
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
std::shared_ptr<arrow::RecordBatch> batch;
::arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta()
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
));
}
if (!status.ok()) {
@@ -1465,7 +1492,9 @@ public:
TEvPrivate::TReadRange range { .Offset = position, .Length = nbytes };
auto cache = GetOrCreate(range);
while (cache.Data.empty()) {
+ CpuTime += GetCpuTimeDelta();
auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>();
+ StartCycleCount = GetCycleCountFast();
if (ev->Get()->Failure) {
throw yexception() << ev->Get()->Issues.ToOneLineString();
}
@@ -1524,7 +1553,7 @@ public:
::arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta()
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
));
}
if (!status.ok()) {
@@ -1580,7 +1609,7 @@ public:
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() {
actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
- }, TakeIngressDelta()
+ }, TakeIngressDelta(), TakeCpuTimeDelta()
));
}
if (!status.ok()) {
@@ -1750,9 +1779,22 @@ private:
return currentIngressBytes;
}
+ TDuration TakeCpuTimeDelta() {
+ auto currentCpuTime = CpuTime;
+ CpuTime = TDuration::Zero();
+ return currentCpuTime;
+ }
+
+ TDuration GetCpuTimeDelta() {
+ return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
+ }
+
void Run() final {
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+
+ StartCycleCount = GetCycleCountFast();
+
try {
if (ReadSpec->Arrow) {
if (ReadSpec->Compression) {
@@ -1809,11 +1851,13 @@ private:
RetryStuff->Cancel();
}
+ CpuTime += GetCpuTimeDelta();
+
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
- Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta()));
+ Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
@@ -1875,6 +1919,8 @@ private:
std::size_t MaxBlocksInFly = 2;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
+ TDuration CpuTime;
+ ui64 StartCycleCount = 0;
TString InputBuffer;
bool Paused = false;
std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts;
@@ -2091,6 +2137,10 @@ private:
return IngressBytes;
}
+ TDuration GetCpuTime() override {
+ return CpuTime;
+ }
+
ui64 GetBlockSize(const TReadyBlock& block) const {
return ReadSpec->Arrow ? NUdf::GetSizeOfArrowBatchInBytes(*block.Batch) : block.Block.bytes();
}
@@ -2280,6 +2330,7 @@ private:
void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
YQL_ENSURE(!ReadSpec->Arrow);
IngressBytes += next->Get()->IngressDelta;
+ CpuTime += next->Get()->CpuTimeDelta;
auto size = next->Get()->Block.bytes();
QueueTotalDataSize += size;
if (Counters) {
@@ -2298,6 +2349,7 @@ private:
void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
YQL_ENSURE(ReadSpec->Arrow);
IngressBytes += next->Get()->IngressDelta;
+ CpuTime += next->Get()->CpuTimeDelta;
auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch);
QueueTotalDataSize += size;
if (Counters) {
@@ -2316,6 +2368,7 @@ private:
void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) {
CoroActors.erase(ev->Sender);
IngressBytes += ev->Get()->IngressDelta;
+ CpuTime += ev->Get()->CpuTimeDelta;
RetryStuffForFile.erase(ev->Get()->PathIndex);
if (TaskCounters) {
@@ -2371,6 +2424,7 @@ private:
const std::size_t MaxBlocksInFly;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
+ TDuration CpuTime;
mutable TInstant LastMemoryReport = TInstant::Now();
ui64 QueueTotalDataSize = 0;
::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;