diff options
author | hor911 <hor911@ydb.tech> | 2023-01-11 14:22:20 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-01-11 14:22:20 +0300 |
commit | 17773ad0b578e15ec7eb4bece9631e93fc98de4b (patch) | |
tree | 172a220b5364ff21eaf1f2935df19882e5e96025 | |
parent | dae1c49d39a044bdfdd67ecd3c7347103625516e (diff) | |
download | ydb-17773ad0b578e15ec7eb4bece9631e93fc98de4b.tar.gz |
HTTP Back Pressure + MonCounters refactoring
17 files changed, 303 insertions, 161 deletions
diff --git a/ydb/core/yq/libs/config/protos/fq_config.proto b/ydb/core/yq/libs/config/protos/fq_config.proto index c7c7deb1024..ffa36918c2c 100644 --- a/ydb/core/yq/libs/config/protos/fq_config.proto +++ b/ydb/core/yq/libs/config/protos/fq_config.proto @@ -50,4 +50,5 @@ message TConfig { THealthConfig Health = 20; TQuotasManagerConfig QuotasManager = 21; TRateLimiterConfig RateLimiter = 22; + bool EnableTaskCounters = 23; } diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index c5e6a9fef70..013fbaeb843 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -12,7 +12,7 @@ message TS3ReadActorFactoryConfig { NYql.NS3.TRetryConfig RetryConfig = 1; uint64 RowsInBatch = 2; // Default = 1000 uint64 MaxInflight = 3; // Default = 20 - uint64 DataInflight = 4; // Default = 1 GB + uint64 DataInflight = 4; // Default = 200 MB } message TPqReadActorFactoryConfig { diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 2fd62cd16ba..bd1cac26551 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -177,8 +177,15 @@ TPingTaskParams ConstructHardPingTask( } if (request.statistics()) { - *query.mutable_statistics()->mutable_json() = request.statistics(); - *job.mutable_statistics()->mutable_json() = request.statistics(); + TString statistics; + try { + statistics = GetPrettyStatistics(request.statistics()); + } catch (const std::exception&) { + CPS_LOG_E("Error on statistics prettification: " << CurrentExceptionMessage()); + statistics = request.statistics(); + } + *query.mutable_statistics()->mutable_json() = statistics; + *job.mutable_statistics()->mutable_json() = statistics; } if (!request.result_set_meta().empty()) { @@ -380,8 +387,8 @@ TPingTaskParams ConstructHardPingTask( if (IsTerminalStatus(request.status()) && request.statistics()) { try { meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName()); - } catch (yexception &e) { - CPS_LOG_E(e.what()); + } catch (const std::exception&) { + CPS_LOG_E("Error on statistics meterification: " << CurrentExceptionMessage()); } } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp index ab11f98f9ee..8d2f537c010 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp @@ -1,5 +1,7 @@ #include "utils.h" +#include <library/cpp/json/yson/json2yson.h> + #include <ydb/core/metering/bill_record.h> #include <ydb/core/metering/metering.h> @@ -165,4 +167,53 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, const TString return result; } +void RemapValue(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& key) { + ui64 value = 0; + if (auto* keyNode = node.GetValueByPath(key)) { + value = keyNode->GetInteger(); + } + writer.OnKeyedItem(key); + writer.OnInt64Scalar(value); +} + +void RemapNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& path, const TString& key) { + if (auto* subNode = node.GetValueByPath(path)) { + writer.OnKeyedItem(key); + writer.OnBeginMap(); + RemapValue(writer, *subNode, "sum"); + RemapValue(writer, *subNode, "count"); + RemapValue(writer, *subNode, "avg"); + RemapValue(writer, *subNode, "max"); + RemapValue(writer, *subNode, "min"); + writer.OnEndMap(); + } +} + +TString GetPrettyStatistics(const TString& statistics) { + TStringStream out; + NYson::TYsonWriter writer(&out); + writer.OnBeginMap(); + NJson::TJsonReaderConfig jsonConfig; + NJson::TJsonValue stat; + if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { + for (const auto& p : stat.GetMap()) { + if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { + writer.OnKeyedItem(p.first); + writer.OnBeginMap(); + RemapNode(writer, p.second, "StagesCount", "StagesCount"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.TasksCount", "TasksCount"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.BuildCpuTimeUs", "BuildCpuTimeUs"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.ComputeCpuTimeUs", "ComputeCpuTimeUs"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes"); + RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes"); + RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows"); + writer.OnEndMap(); + } + } + } + writer.OnEndMap(); + return NJson2Yson::ConvertYson2Json(out.Str()); +} + }; diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.h b/ydb/core/yq/libs/control_plane_storage/internal/utils.h index 98c645b5bb4..b5ff5f1f338 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.h +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.h @@ -32,5 +32,6 @@ NYql::TIssues ValidateNodesHealthCheck( NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner); std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId); +TString GetPrettyStatistics(const TString& statistics); }; diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index e730c6edb48..87b63d46e65 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -177,7 +177,7 @@ void Init( RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, - yqCounters->GetSubgroup("subsystem", "S3ReadActor"), appData->Counters->GetSubgroup("counters", "dq_tasks")); + yqCounters->GetSubgroup("subsystem", "S3ReadActor")); RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy); RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway); @@ -200,7 +200,7 @@ void Init( } NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; - lwmOptions.DqTaskCounters = appData->Counters->GetSubgroup("counters", "dq_tasks"); + lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); lwmOptions.AsyncIoFactory = asyncIoFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index b84e55a0e00..171d8723af0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -97,8 +97,10 @@ public: void InitExtraMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { if (taskCounters && UseCpuQuota()) { - CpuTimeGetQuotaLatency = taskCounters->GetSubgroup("subsystem", "mkql")->GetHistogram("CpuTimeGetQuotaLatencyMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10'000, 60'000, 600'000})); - CpuTimeQuotaWaitDelay = taskCounters->GetSubgroup("subsystem", "mkql")->GetHistogram("CpuTimeQuotaWaitDelayMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10'000, 60'000, 600'000})); + CpuTimeGetQuotaLatency = taskCounters->GetHistogram("CpuTimeGetQuotaLatencyMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000})); + CpuTimeQuotaWaitDelay = taskCounters->GetHistogram("CpuTimeQuotaWaitDelayMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000})); + CpuTime = taskCounters->GetCounter("CpuTimeMs", true); + CpuTime->Add(0); } } @@ -352,6 +354,9 @@ private: TaskRunnerActor->PassAway(); } if (UseCpuQuota() && CpuTimeSpent.MilliSeconds()) { + if (CpuTime) { + CpuTime->Add(CpuTimeSpent.MilliSeconds()); + } // Send the rest of CPU time that we haven't taken into account Send(QuoterServiceActorId, new NKikimr::TEvQuota::TEvRequest( @@ -802,6 +807,9 @@ private: Y_VERIFY(!CpuTimeQuotaAsked); if (CpuTimeSpent >= MIN_QUOTED_CPU_TIME) { CA_LOG_T("Ask CPU quota: " << CpuTimeSpent.MilliSeconds() << "ms"); + if (CpuTime) { + CpuTime->Add(CpuTimeSpent.MilliSeconds()); + } Send(QuoterServiceActorId, new NKikimr::TEvQuota::TEvRequest( NKikimr::TEvQuota::EResourceOperator::And, @@ -903,6 +911,7 @@ private: bool ContinueRunInflight = false; NMonitoring::THistogramPtr CpuTimeGetQuotaLatency; NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay; + NMonitoring::TDynamicCounters::TCounterPtr CpuTime; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 8f284db6dc8..8e45f4f40fa 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -169,6 +169,7 @@ public: const NActors::TActorId& ComputeActorId; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + ::NMonitoring::TDynamicCounterPtr TaskCounters; }; struct TSinkArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 97a689d09df..0c936298a5a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -189,8 +189,8 @@ protected: , FunctionRegistry(functionRegistry) , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) - , MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr) , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) + , TaskCounters(taskCounters) , DqComputeActorMetrics(taskCounters) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") , Running(!Task.GetCreateSuspended()) @@ -199,8 +199,11 @@ protected: if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); } - InitializeTask(); InitMonCounters(taskCounters); + InitializeTask(); + if (ownMemoryQuota) { + MemoryQuota = InitMemoryQuota(); + } InitializeWatermarks(); } @@ -219,8 +222,8 @@ protected: , AsyncIoFactory(std::move(asyncIoFactory)) , FunctionRegistry(functionRegistry) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) - , MemoryQuota(InitMemoryQuota()) , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) + , TaskCounters(taskCounters) , DqComputeActorMetrics(taskCounters) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") , Running(!Task.GetCreateSuspended()) @@ -228,23 +231,16 @@ protected: if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); } - InitializeTask(); InitMonCounters(taskCounters); + InitializeTask(); + MemoryQuota = InitMemoryQuota(); InitializeWatermarks(); } void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { if (taskCounters) { - MkqlMemoryUsage = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryUsage"); - MkqlMemoryLimit = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryLimit"); - MonCountersProvided = true; - } - } - - void UpdateMonCounters() { - if (MonCountersProvided) { - *MkqlMemoryUsage = GetProfileStats()->MkqlMaxUsedMemory; - *MkqlMemoryLimit = GetMkqlMemoryLimit(); + MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota"); + OutputChannelSize = taskCounters->GetCounter("OutputChannelSize"); } } @@ -316,6 +312,7 @@ protected: protected: THolder<TDqMemoryQuota> InitMemoryQuota() { return MakeHolder<TDqMemoryQuota>( + MkqlMemoryQuota, CalcMkqlMemoryLimit(), MemoryLimits, TxId, @@ -543,6 +540,10 @@ protected: } } + if (OutputChannelSize) { + OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize); + } + for (auto& [_, outputChannel] : OutputChannelsMap) { if (outputChannel.Channel) { outputChannel.Channel->Terminate(); @@ -1502,7 +1503,8 @@ protected: .TaskParams = taskParams, .ComputeActorId = this->SelfId(), .TypeEnv = typeEnv, - .HolderFactory = holderFactory + .HolderFactory = holderFactory, + .TaskCounters = TaskCounters }); } catch (const std::exception& ex) { throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); @@ -1808,6 +1810,10 @@ private: } } } + + if (OutputChannelSize) { + OutputChannelSize->Add(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize); + } } void InitializeWatermarks() { @@ -2074,6 +2080,7 @@ protected: THolder<TDqMemoryQuota> MemoryQuota; TDqComputeActorWatermarks WatermarksTracker; + ::NMonitoring::TDynamicCounterPtr TaskCounters; TDqComputeActorMetrics DqComputeActorMetrics; NWilson::TSpan ComputeActorSpan; private: @@ -2081,9 +2088,8 @@ private: TInstant LastSendStatsTime; bool PassExceptions = false; protected: - bool MonCountersProvided = false; - ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage; - ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; + ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; + ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; THolder<NYql::TCounters> Stat; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h index f8c94daab63..8bff90633b0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h @@ -26,8 +26,9 @@ namespace NYql::NDq { class TDqMemoryQuota { public: - TDqMemoryQuota(ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem) - : InitialMkqlMemoryLimit(initialMkqlMemoryLimit) + TDqMemoryQuota(::NMonitoring::TDynamicCounters::TCounterPtr& mkqlMemoryQuota, ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem) + : MkqlMemoryQuota(mkqlMemoryQuota) + , InitialMkqlMemoryLimit(initialMkqlMemoryLimit) , MkqlMemoryLimit(initialMkqlMemoryLimit) , MemoryLimits(memoryLimits) , TxId(txId) @@ -35,6 +36,9 @@ namespace NYql::NDq { , ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr) , CanAllocateExtraMemory(canAllocateExtraMemory) , ActorSystem(actorSystem) { + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Add(MkqlMemoryLimit); + } } ui64 GetMkqlMemoryLimit() const { @@ -59,6 +63,9 @@ namespace NYql::NDq { MkqlMemoryLimit = newLimit; alloc->SetLimit(newLimit); MemoryLimits.FreeMemoryFn(TxId, TaskId, freedSize); + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Sub(freedSize); + } CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit); } } @@ -89,6 +96,9 @@ namespace NYql::NDq { void TryReleaseQuota() { if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) { MemoryLimits.FreeMemoryFn(TxId, TaskId, MkqlMemoryLimit); + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Sub(MkqlMemoryLimit); + } MkqlMemoryLimit = 0; } } @@ -111,6 +121,9 @@ namespace NYql::NDq { if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) { MkqlMemoryLimit += memory; + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Add(memory); + } CAMQ_LOG_D("[Mem] memory " << memory << " granted, new limit: " << MkqlMemoryLimit); alloc->SetLimit(MkqlMemoryLimit); } else { @@ -132,6 +145,7 @@ namespace NYql::NDq { } private: + ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; const ui64 InitialMkqlMemoryLimit; ui64 MkqlMemoryLimit; const TComputeMemoryLimits MemoryLimits; diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 95c1c0207a7..6e545bb3622 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -69,17 +69,22 @@ public: } } - TCancelHook Download( - TString , - THeaders , - std::size_t , - std::size_t , - TOnDownloadStart , - TOnNewDataPart , - TOnDownloadFinish ) final { + TCancelHook Download( + TString, + THeaders, + std::size_t, + std::size_t, + TOnDownloadStart, + TOnNewDataPart, + TOnDownloadFinish, + const ::NMonitoring::TDynamicCounters::TCounterPtr&) final { return {}; } + ui64 GetBuffersSizePerStream() final { + return 0; + } + void AddDefaultResponse(TDataDefaultResponse response) { DefaultResponse = response; } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2e9bdc1940e..53a48398759 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -331,7 +331,7 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlStream>; TEasyCurlStream( - const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, @@ -340,13 +340,18 @@ public: IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, const TCurlInitConfig& config = TCurlInitConfig()) : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config)) - , OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + , OnStart(std::move(onStart)) + , OnNewData(std::move(onNewData)) + , OnFinish(std::move(onFinish)) + , Counter(std::make_shared<std::atomic_size_t>(0ULL)) + , InflightCounter(inflightCounter) {} static TPtr Make( - const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, @@ -356,9 +361,10 @@ public: IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, const TCurlInitConfig& config = TCurlInitConfig()) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config)); + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config)); } enum class EAction : i8 { @@ -378,7 +384,6 @@ public: if (Cancelled) { return EAction::Drop; } - if (buffersSize && Paused != Counter->load() >= buffersSize) { Paused = !Paused; return Paused ? EAction::Stop : EAction::Work; @@ -413,7 +418,7 @@ private: const auto realsize = size * nmemb; if (!Cancelled) - OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter)); + OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter)); return realsize; } @@ -425,6 +430,7 @@ private: const IHTTPGateway::TOnDownloadFinish OnFinish; const std::shared_ptr<std::atomic_size_t> Counter; + const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; bool Started = false; bool Paused = false; bool Cancelled = false; @@ -755,9 +761,10 @@ private: size_t sizeLimit, TOnDownloadStart onStart, TOnNewDataPart onNewData, - TOnDownloadFinish onFinish) final + TOnDownloadFinish onFinish, + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish)); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; @@ -770,6 +777,10 @@ private: }; } + ui64 GetBuffersSizePerStream() final { + return BuffersSizePerStream; + } + void OnRetry(TEasyCurlBuffer::TPtr easy) { const std::unique_lock lock(Sync); const size_t sizeLimit = easy->GetSizeLimit(); @@ -875,19 +886,29 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con , HttpResponseCode(httpResponseCode) {} -IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter) - : TContentBase(std::move(data)), Counter(counter) +IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) + : TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter) { Counter->fetch_add(size()); + if (InflightCounter) { + InflightCounter->Add(size()); + } } IHTTPGateway::TCountedContent::~TCountedContent() { Counter->fetch_sub(size()); + if (InflightCounter) { + InflightCounter->Sub(size()); + } } TString IHTTPGateway::TCountedContent::Extract() { Counter->fetch_sub(size()); + if (InflightCounter) { + InflightCounter->Sub(size()); + } return TContentBase::Extract(); } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index acd74b265a1..d9aec8fa32d 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -84,7 +84,7 @@ public: class TCountedContent : public TContentBase { public: - TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter); + TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter); ~TCountedContent(); TCountedContent(TCountedContent&&) = default; @@ -93,6 +93,7 @@ public: TString Extract(); private: const std::shared_ptr<std::atomic_size_t> Counter; + const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; }; using TOnDownloadStart = std::function<void(long)>; // http code. @@ -107,7 +108,10 @@ public: std::size_t sizeLimit, TOnDownloadStart onStart, TOnNewDataPart onNewData, - TOnDownloadFinish onFinish) = 0; + TOnDownloadFinish onFinish, + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) = 0; + + virtual ui64 GetBuffersSizePerStream() = 0; }; } diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 6b1aab4dee8..2731af67b6d 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -211,21 +211,19 @@ private: YQL_CLOG(DEBUG, ProviderDq) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId; TFailureInjector::Reach("allocate_workers_failure", [] { ::_exit(1); }); - auto& allocationInfo = AllocatedWorkers[resourceId]; auto traceId = ev->Get()->Record.GetTraceId(); - allocationInfo.TxId = traceId; - auto count = ev->Get()->Record.GetCount(); - Y_VERIFY(count > 0); bool canAllocate = MemoryQuoter->Allocate(traceId, 0, count * Options.MkqlInitialMemoryLimit); - if (!canAllocate) { Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks", NYql::NDqProto::StatusIds::OVERLOADED), 0, ev->Cookie); return; } + auto& allocationInfo = AllocatedWorkers[resourceId]; + allocationInfo.TxId = traceId; + if (allocationInfo.WorkerActors.empty()) { allocationInfo.WorkerActors.reserve(count); allocationInfo.Sender = ev->Sender; @@ -240,15 +238,23 @@ private: Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count)); } auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId()); + ::NMonitoring::TDynamicCounterPtr taskCounters; + + if (createComputeActor && Options.DqTaskCounters) { + auto& info = TaskCountersMap[traceId]; + if (!info.TaskCounters) { + info.TaskCounters = Options.DqTaskCounters->GetSubgroup("operation", traceId); + } + info.ReferenceCount += count; + taskCounters = info.TaskCounters; + } for (ui32 i = 0; i < count; i++) { THolder<NActors::IActor> actor; if (createComputeActor) { - auto id = tasks[i].GetId(); - auto stageId = tasks[i].GetStageId(); YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType; - auto taskCounters = Options.DqTaskCounters ? Options.DqTaskCounters->GetSubgroup("operation", traceId)->GetSubgroup("stage", ToString(stageId))->GetSubgroup("id", ToString(id)) : nullptr; + actor.Reset(NYql::CreateComputeActor( Options, Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr, @@ -306,6 +312,20 @@ private: } MemoryQuoter->Free(it->second.TxId, 0); + + auto traceId = std::get<TString>(it->second.TxId); + auto itt = TaskCountersMap.find(traceId); + if (itt != TaskCountersMap.end()) { + if (itt->second.ReferenceCount <= it->second.WorkerActors.size()) { + if (Options.DqTaskCounters) { + Options.DqTaskCounters->RemoveSubgroup("operation", traceId); + } + TaskCountersMap.erase(itt); + } else { + itt->second.ReferenceCount -= it->second.WorkerActors.size(); + } + } + Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size()); AllocatedWorkers.erase(it); } @@ -341,6 +361,13 @@ private: NDq::TAllocateMemoryCallback AllocateMemoryFn; NDq::TFreeMemoryCallback FreeMemoryFn; std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter; + + struct TCountersInfo { + ::NMonitoring::TDynamicCounterPtr TaskCounters; + ui64 ReferenceCount; + }; + + TMap<TString, TCountersInfo> TaskCountersMap; }; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index a133b9f3386..601094bd6b3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -174,14 +174,11 @@ struct TEvPrivate { }; struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> { - TEvFileFinished(size_t pathIndex, ui64 ingressBytes, ui64 expectedDataSize, ui64 actualDataSize) - : PathIndex(pathIndex), IngressBytes(ingressBytes), - ExpectedDataSize(expectedDataSize), ActualDataSize(actualDataSize) { + TEvFileFinished(size_t pathIndex, ui64 ingressBytes) + : PathIndex(pathIndex), IngressBytes(ingressBytes) { } const size_t PathIndex; ui64 IngressBytes; - ui64 ExpectedDataSize; - ui64 ActualDataSize; }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { @@ -479,12 +476,13 @@ void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const T actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues)))); } -void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex) { +void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) { retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1), std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1), - std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1)); + std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1), + inflightCounter); } template <typename T> @@ -592,6 +590,7 @@ ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) { } class TS3ReadCoroImpl : public TActorCoroImpl { + friend class TS3StreamReadActor; private: class TReadBufferFromStream : public NDB::ReadBuffer { public: @@ -618,12 +617,22 @@ public: TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly, - const TS3ReadActorFactoryConfig& readActorFactoryCfg, ui64 expectedDataSize) + const TS3ReadActorFactoryConfig& readActorFactoryCfg, + const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, + const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, + const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ExpectedDataSize(expectedDataSize) + PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), + DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) {} + ~TS3ReadCoroImpl() override { + if (DeferredEvents.size() && DeferredQueueSize) { + DeferredQueueSize->Sub(DeferredEvents.size()); + } + } + bool Next(TString& value) { if (InputFinished) return false; @@ -648,6 +657,9 @@ public: throw TS3ReadAbort(); default: DeferredEvents.push(std::move(ev)); + if (DeferredQueueSize) { + DeferredQueueSize->Inc(); + } break; } } @@ -660,6 +672,9 @@ public: THolder<IEventHandle> ev; ev.Swap(DeferredEvents.front()); DeferredEvents.pop(); + if (DeferredQueueSize) { + DeferredQueueSize->Dec(); + } switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadStarted::EventType: @@ -681,7 +696,7 @@ public: if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) { LOG_CORO_D("TS3ReadCoroImpl", "Retry Download in " << RetryStuff->NextRetryDelay << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "], Issues: " << Issues.ToOneLineString()); - GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex)))); + GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize)))); value.clear(); } else { LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); @@ -693,6 +708,9 @@ public: } return true; case TEvPrivate::TEvDataPart::EventType: + if (HttpDataRps) { + HttpDataRps->Inc(); + } if (200L == HttpResponseCode || 206L == HttpResponseCode) { value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); IngressBytes += value.size(); @@ -800,11 +818,11 @@ private: } TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices), std::move(columnConverters)); - ActualDataSize += ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); + ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); } else { auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); TBlockReader reader(std::move(stream)); - ActualDataSize += ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); + ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); } } catch (const TS3ReadError&) { // Finish reading. Add error from server to issues @@ -838,7 +856,7 @@ private: if (issues) Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else - Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes, ExpectedDataSize, ActualDataSize)); + Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes)); } catch (const TS3ReadAbort&) { LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path); } catch (const TDtorException&) { @@ -848,36 +866,17 @@ private: return; } - template <typename T> - ui64 SizeOfBatch(const T&) { - return 0; - } - - template <> - ui64 SizeOfBatch(const std::shared_ptr<arrow::RecordBatch>& batch) { - return GetSizeOfBatch(*batch); - } - - template <> - ui64 SizeOfBatch(const NDB::Block& batch) { - return batch.bytes(); - } - template <typename T, typename TEv> - ui64 ProcessBatches(IBatchReader<T>& reader, bool isLocal) { + void ProcessBatches(IBatchReader<T>& reader, bool isLocal) { auto actorSystem = GetActorSystem(); auto selfActorId = SelfActorId; size_t cntBlocksInFly = 0; - ui64 result = 0; if (isLocal) { for (;;) { T batch; if (!reader.Next(batch)) { break; } - - result += SizeOfBatch<T>(batch); - if (++cntBlocksInFly > MaxBlocksInFly) { WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); --cntBlocksInFly; @@ -895,11 +894,9 @@ private: if (!reader.Next(batch)) { break; } - result += SizeOfBatch<T>(batch); Send(ParentActorId, new TEv(batch, PathIndex)); } } - return result; } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { @@ -966,55 +963,35 @@ private: TString LastData; std::size_t MaxBlocksInFly = 2; ui64 IngressBytes = 0; - ui64 ExpectedDataSize; - ui64 ActualDataSize = 0; bool Paused = false; std::queue<THolder<IEventHandle>> DeferredEvents; + const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; }; class TS3ReadCoroActor : public TActorCoro { public: - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex) + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) , RetryStuff(std::move(retryStuff)) , PathIndex(pathIndex) + , HttpInflightSize(httpInflightSize) {} private: void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. if (RetryStuff->Url.substr(0, 6) != "file://") { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); - DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex); + DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize); } } const TRetryStuff::TPtr RetryStuff; const size_t PathIndex; + const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; }; -double FormatRatio(const TString& formatName) { - Y_UNUSED(formatName); - return 1.0; -} - -double CompressionRatio(const TString& compressionName) { - if (compressionName == "gzip") { - return 15.0; - } - if (compressionName == "lz4") { - return 20.0; - } - if (compressionName) { - // "brotli" - // "zstd" - // "bzip2" - // "xz" - return 10.0; - } - // no compression - return 1.0; -} - class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput { public: TS3StreamReadActor( @@ -1054,15 +1031,21 @@ public: { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); + QueueDataLimit = Counters->GetCounter("QueueDataLimit"); QueueBlockCount = Counters->GetCounter("QueueBlockCount"); - BufferDataSize = Counters->GetCounter("BufferDataSize"); DownloadPaused = Counters->GetCounter("DownloadPaused"); + QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); } if (TaskCounters) { - ExpectedDataSize = TaskCounters->GetCounter("ExpectedDataSize"); - ActualDataSize = TaskCounters->GetCounter("ActualDataSize"); + TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize"); + TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit"); + TaskDownloadPaused = TaskCounters->GetCounter("DownloadPaused"); + DeferredQueueSize = TaskCounters->GetCounter("DeferredQueueSize"); + HttpInflightSize = TaskCounters->GetCounter("HttpInflightSize"); + HttpInflightLimit = TaskCounters->GetCounter("HttpInflightLimit"); + HttpDataRps = TaskCounters->GetCounter("HttpDataRps", true); + TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); } - Ratio = FormatRatio(ReadSpec->Compression) * CompressionRatio(ReadSpec->Format); } void Bootstrap() { @@ -1078,12 +1061,10 @@ public: // no path is pending return false; } - /* - if (BufferTotalDataSize > static_cast<i64>(ReadActorFactoryCfg.DataInflight)) { + if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) { // too large data inflight return false; } - */ if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) { // too large download inflight return false; @@ -1097,16 +1078,15 @@ public: const TPath& path = Paths[index]; const TString requestId = CreateGuidAsString(); ui64 fileSize = std::get<std::size_t>(path); - ui64 expectedDataSize = (ui64) fileSize * Ratio; auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), fileSize, TxId, requestId, RetryPolicy); auto pathIndex = index + StartPathIndex; RetryStuffForFile.emplace(pathIndex, stuff); - BufferTotalDataSize += expectedDataSize; - if (Counters) { - BufferDataSize->Add(expectedDataSize); + if (TaskCounters) { + HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream()); } - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, expectedDataSize); - CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release())); + ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter; + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps); + CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->HttpInflightSize).release())); } static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR"; @@ -1204,13 +1184,14 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); - BufferTotalDataSize -= s; QueueTotalDataSize -= s; if (Counters) { - BufferDataSize->Sub(s); QueueDataSize->Sub(s); QueueBlockCount->Dec(); } + if (TaskCounters) { + TaskQueueDataSize->Sub(s); + } TryRegisterCoro(); } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free)); @@ -1229,14 +1210,23 @@ private: void PassAway() override { // Is called from Compute Actor LOG_D("TS3StreamReadActor", "PassAway"); if (Counters) { - BufferDataSize->Sub(BufferTotalDataSize); QueueDataSize->Sub(QueueTotalDataSize); QueueBlockCount->Sub(Blocks.size()); - if (Paused) { + QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); + } + if (TaskCounters) { + TaskQueueDataSize->Sub(QueueTotalDataSize); + TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); + HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream() * CoroActors.size()); + } + if (Paused) { + if (Counters) { DownloadPaused->Dec(); } + if (TaskCounters) { + TaskDownloadPaused->Dec(); + } } - BufferTotalDataSize = 0; QueueTotalDataSize = 0; for (const auto actorId : CoroActors) { @@ -1264,11 +1254,15 @@ private: if (Counters) { DownloadPaused->Inc(); } + if (TaskCounters) { + TaskDownloadPaused->Inc(); + } } } void MaybeContinue() { - if (Paused && QueueTotalDataSize < ReadActorFactoryCfg.DataInflight) { + // resume download on 3/4 == 75% to avoid oscillation (hysteresis) + if (Paused && QueueTotalDataSize * 4 < ReadActorFactoryCfg.DataInflight * 3) { for (const auto actorId : CoroActors) { Send(actorId, new TEvPrivate::TEvContinue()); } @@ -1276,6 +1270,9 @@ private: if (Counters) { DownloadPaused->Dec(); } + if (TaskCounters) { + TaskDownloadPaused->Dec(); + } } } @@ -1299,6 +1296,9 @@ private: QueueBlockCount->Inc(); QueueDataSize->Add(size); } + if (TaskCounters) { + TaskQueueDataSize->Add(size); + } MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); @@ -1314,6 +1314,9 @@ private: QueueBlockCount->Inc(); QueueDataSize->Add(size); } + if (TaskCounters) { + TaskQueueDataSize->Add(size); + } MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); @@ -1327,16 +1330,8 @@ private: Y_VERIFY(Count); --Count; - // final netto of expected vs actual - BufferTotalDataSize += ev->Get()->ActualDataSize; - BufferTotalDataSize -= ev->Get()->ExpectedDataSize; - if (Counters) { - BufferDataSize->Add(ev->Get()->ActualDataSize); - BufferDataSize->Sub(ev->Get()->ExpectedDataSize); - } if (TaskCounters) { - ExpectedDataSize->Add(ev->Get()->ExpectedDataSize); - ActualDataSize->Add(ev->Get()->ActualDataSize); + HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream()); } DownloadInflight--; if (CurrentPathIndex < Paths.size()) { @@ -1379,16 +1374,19 @@ private: size_t CurrentPathIndex = 0; mutable TInstant LastMemoryReport = TInstant::Now(); ui64 QueueTotalDataSize = 0; - i64 BufferTotalDataSize = 0; ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; + ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit; ::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount; - ::NMonitoring::TDynamicCounters::TCounterPtr ExpectedDataSize; - ::NMonitoring::TDynamicCounters::TCounterPtr ActualDataSize; - ::NMonitoring::TDynamicCounters::TCounterPtr BufferDataSize; ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused; + ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit; + ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; + ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit; + ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounterPtr TaskCounters; - double Ratio = 0.0; ui64 DownloadInflight = 0; std::set<NActors::TActorId> CoroActors; bool Paused = false; @@ -1566,7 +1564,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( if (params.HasFormat() && params.HasRowType()) { const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry); - const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr); + const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr); const auto structType = static_cast<TStructType*>(outputItemType); const auto readSpec = std::make_shared<TReadSpec>(); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 059e3179724..8339ab757a1 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -16,23 +16,21 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - ::NMonitoring::TDynamicCounterPtr counters, - ::NMonitoring::TDynamicCounterPtr taskCounters) { + ::NMonitoring::TDynamicCounterPtr counters) { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryPolicy, cfg, counters, taskCounters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, - counters, taskCounters ? taskCounters->GetSubgroup("operation", ToString(args.TxId)) : nullptr); + counters, args.TaskCounters); }); #else Y_UNUSED(factory); Y_UNUSED(credentialsFactory); Y_UNUSED(gateway); Y_UNUSED(counters); - Y_UNUSED(taskCounters); #endif } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 6574ab4aacb..8a61b163961 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -15,7 +15,7 @@ namespace NYql::NDq { struct TS3ReadActorFactoryConfig { ui64 RowsInBatch = 1000; ui64 MaxInflight = 20; - ui64 DataInflight = 1_GB; + ui64 DataInflight = 200_MB; }; void RegisterS3ReadActorFactory( @@ -24,7 +24,6 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), const TS3ReadActorFactoryConfig& = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr, - ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr); + ::NMonitoring::TDynamicCounterPtr counters = nullptr); } |