aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-01-11 14:22:20 +0300
committerhor911 <hor911@ydb.tech>2023-01-11 14:22:20 +0300
commit17773ad0b578e15ec7eb4bece9631e93fc98de4b (patch)
tree172a220b5364ff21eaf1f2935df19882e5e96025
parentdae1c49d39a044bdfdd67ecd3c7347103625516e (diff)
downloadydb-17773ad0b578e15ec7eb4bece9631e93fc98de4b.tar.gz
HTTP Back Pressure + MonCounters refactoring
-rw-r--r--ydb/core/yq/libs/config/protos/fq_config.proto1
-rw-r--r--ydb/core/yq/libs/config/protos/read_actors_factory.proto2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp15
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.cpp51
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/utils.h1
-rw-r--r--ydb/core/yq/libs/init/init.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp13
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h42
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h18
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp21
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp41
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h8
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp43
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp190
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp8
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h5
17 files changed, 303 insertions, 161 deletions
diff --git a/ydb/core/yq/libs/config/protos/fq_config.proto b/ydb/core/yq/libs/config/protos/fq_config.proto
index c7c7deb1024..ffa36918c2c 100644
--- a/ydb/core/yq/libs/config/protos/fq_config.proto
+++ b/ydb/core/yq/libs/config/protos/fq_config.proto
@@ -50,4 +50,5 @@ message TConfig {
THealthConfig Health = 20;
TQuotasManagerConfig QuotasManager = 21;
TRateLimiterConfig RateLimiter = 22;
+ bool EnableTaskCounters = 23;
}
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
index c5e6a9fef70..013fbaeb843 100644
--- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto
+++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
@@ -12,7 +12,7 @@ message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
uint64 RowsInBatch = 2; // Default = 1000
uint64 MaxInflight = 3; // Default = 20
- uint64 DataInflight = 4; // Default = 1 GB
+ uint64 DataInflight = 4; // Default = 200 MB
}
message TPqReadActorFactoryConfig {
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 2fd62cd16ba..bd1cac26551 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -177,8 +177,15 @@ TPingTaskParams ConstructHardPingTask(
}
if (request.statistics()) {
- *query.mutable_statistics()->mutable_json() = request.statistics();
- *job.mutable_statistics()->mutable_json() = request.statistics();
+ TString statistics;
+ try {
+ statistics = GetPrettyStatistics(request.statistics());
+ } catch (const std::exception&) {
+ CPS_LOG_E("Error on statistics prettification: " << CurrentExceptionMessage());
+ statistics = request.statistics();
+ }
+ *query.mutable_statistics()->mutable_json() = statistics;
+ *job.mutable_statistics()->mutable_json() = statistics;
}
if (!request.result_set_meta().empty()) {
@@ -380,8 +387,8 @@ TPingTaskParams ConstructHardPingTask(
if (IsTerminalStatus(request.status()) && request.statistics()) {
try {
meteringRecords = GetMeteringRecords(request.statistics(), request.query_id().value(), request.scope(), HostName());
- } catch (yexception &e) {
- CPS_LOG_E(e.what());
+ } catch (const std::exception&) {
+ CPS_LOG_E("Error on statistics meterification: " << CurrentExceptionMessage());
}
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
index ab11f98f9ee..8d2f537c010 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.cpp
@@ -1,5 +1,7 @@
#include "utils.h"
+#include <library/cpp/json/yson/json2yson.h>
+
#include <ydb/core/metering/bill_record.h>
#include <ydb/core/metering/metering.h>
@@ -165,4 +167,53 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, const TString
return result;
}
+void RemapValue(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& key) {
+ ui64 value = 0;
+ if (auto* keyNode = node.GetValueByPath(key)) {
+ value = keyNode->GetInteger();
+ }
+ writer.OnKeyedItem(key);
+ writer.OnInt64Scalar(value);
+}
+
+void RemapNode(NYson::TYsonWriter& writer, const NJson::TJsonValue& node, const TString& path, const TString& key) {
+ if (auto* subNode = node.GetValueByPath(path)) {
+ writer.OnKeyedItem(key);
+ writer.OnBeginMap();
+ RemapValue(writer, *subNode, "sum");
+ RemapValue(writer, *subNode, "count");
+ RemapValue(writer, *subNode, "avg");
+ RemapValue(writer, *subNode, "max");
+ RemapValue(writer, *subNode, "min");
+ writer.OnEndMap();
+ }
+}
+
+TString GetPrettyStatistics(const TString& statistics) {
+ TStringStream out;
+ NYson::TYsonWriter writer(&out);
+ writer.OnBeginMap();
+ NJson::TJsonReaderConfig jsonConfig;
+ NJson::TJsonValue stat;
+ if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
+ for (const auto& p : stat.GetMap()) {
+ if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
+ writer.OnKeyedItem(p.first);
+ writer.OnBeginMap();
+ RemapNode(writer, p.second, "StagesCount", "StagesCount");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.TasksCount", "TasksCount");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.BuildCpuTimeUs", "BuildCpuTimeUs");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.ComputeCpuTimeUs", "ComputeCpuTimeUs");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressS3SourceBytes", "IngressObjectStorageBytes");
+ RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressS3SinkBytes", "EgressObjectStorageBytes");
+ RemapNode(writer, p.second, "TaskRunner.Source=0.Stage=Total.RowsIn", "IngressRows");
+ writer.OnEndMap();
+ }
+ }
+ }
+ writer.OnEndMap();
+ return NJson2Yson::ConvertYson2Json(out.Str());
+}
+
};
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.h b/ydb/core/yq/libs/control_plane_storage/internal/utils.h
index 98c645b5bb4..b5ff5f1f338 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/utils.h
+++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.h
@@ -32,5 +32,6 @@ NYql::TIssues ValidateNodesHealthCheck(
NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId, const TString& scope, const TString& tenant, const TString& owner);
std::vector<TString> GetMeteringRecords(const TString& statistics, const TString& queryId, const TString& scope, const TString& sourceId);
+TString GetPrettyStatistics(const TString& statistics);
};
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index e730c6edb48..87b63d46e65 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -177,7 +177,7 @@ void Init(
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
- yqCounters->GetSubgroup("subsystem", "S3ReadActor"), appData->Counters->GetSubgroup("counters", "dq_tasks"));
+ yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy);
RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway);
@@ -200,7 +200,7 @@ void Init(
}
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
- lwmOptions.DqTaskCounters = appData->Counters->GetSubgroup("counters", "dq_tasks");
+ lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
lwmOptions.AsyncIoFactory = asyncIoFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index b84e55a0e00..171d8723af0 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -97,8 +97,10 @@ public:
void InitExtraMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) {
if (taskCounters && UseCpuQuota()) {
- CpuTimeGetQuotaLatency = taskCounters->GetSubgroup("subsystem", "mkql")->GetHistogram("CpuTimeGetQuotaLatencyMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10'000, 60'000, 600'000}));
- CpuTimeQuotaWaitDelay = taskCounters->GetSubgroup("subsystem", "mkql")->GetHistogram("CpuTimeQuotaWaitDelayMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10'000, 60'000, 600'000}));
+ CpuTimeGetQuotaLatency = taskCounters->GetHistogram("CpuTimeGetQuotaLatencyMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000}));
+ CpuTimeQuotaWaitDelay = taskCounters->GetHistogram("CpuTimeQuotaWaitDelayMs", NMonitoring::ExplicitHistogram({0, 1, 5, 10, 50, 100, 500, 1000}));
+ CpuTime = taskCounters->GetCounter("CpuTimeMs", true);
+ CpuTime->Add(0);
}
}
@@ -352,6 +354,9 @@ private:
TaskRunnerActor->PassAway();
}
if (UseCpuQuota() && CpuTimeSpent.MilliSeconds()) {
+ if (CpuTime) {
+ CpuTime->Add(CpuTimeSpent.MilliSeconds());
+ }
// Send the rest of CPU time that we haven't taken into account
Send(QuoterServiceActorId,
new NKikimr::TEvQuota::TEvRequest(
@@ -802,6 +807,9 @@ private:
Y_VERIFY(!CpuTimeQuotaAsked);
if (CpuTimeSpent >= MIN_QUOTED_CPU_TIME) {
CA_LOG_T("Ask CPU quota: " << CpuTimeSpent.MilliSeconds() << "ms");
+ if (CpuTime) {
+ CpuTime->Add(CpuTimeSpent.MilliSeconds());
+ }
Send(QuoterServiceActorId,
new NKikimr::TEvQuota::TEvRequest(
NKikimr::TEvQuota::EResourceOperator::And,
@@ -903,6 +911,7 @@ private:
bool ContinueRunInflight = false;
NMonitoring::THistogramPtr CpuTimeGetQuotaLatency;
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
+ NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 8f284db6dc8..8e45f4f40fa 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -169,6 +169,7 @@ public:
const NActors::TActorId& ComputeActorId;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ ::NMonitoring::TDynamicCounterPtr TaskCounters;
};
struct TSinkArguments {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 97a689d09df..0c936298a5a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -189,8 +189,8 @@ protected:
, FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
- , MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
+ , TaskCounters(taskCounters)
, DqComputeActorMetrics(taskCounters)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
, Running(!Task.GetCreateSuspended())
@@ -199,8 +199,11 @@ protected:
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
}
- InitializeTask();
InitMonCounters(taskCounters);
+ InitializeTask();
+ if (ownMemoryQuota) {
+ MemoryQuota = InitMemoryQuota();
+ }
InitializeWatermarks();
}
@@ -219,8 +222,8 @@ protected:
, AsyncIoFactory(std::move(asyncIoFactory))
, FunctionRegistry(functionRegistry)
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
- , MemoryQuota(InitMemoryQuota())
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
+ , TaskCounters(taskCounters)
, DqComputeActorMetrics(taskCounters)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
, Running(!Task.GetCreateSuspended())
@@ -228,23 +231,16 @@ protected:
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
}
- InitializeTask();
InitMonCounters(taskCounters);
+ InitializeTask();
+ MemoryQuota = InitMemoryQuota();
InitializeWatermarks();
}
void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) {
if (taskCounters) {
- MkqlMemoryUsage = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryUsage");
- MkqlMemoryLimit = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryLimit");
- MonCountersProvided = true;
- }
- }
-
- void UpdateMonCounters() {
- if (MonCountersProvided) {
- *MkqlMemoryUsage = GetProfileStats()->MkqlMaxUsedMemory;
- *MkqlMemoryLimit = GetMkqlMemoryLimit();
+ MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota");
+ OutputChannelSize = taskCounters->GetCounter("OutputChannelSize");
}
}
@@ -316,6 +312,7 @@ protected:
protected:
THolder<TDqMemoryQuota> InitMemoryQuota() {
return MakeHolder<TDqMemoryQuota>(
+ MkqlMemoryQuota,
CalcMkqlMemoryLimit(),
MemoryLimits,
TxId,
@@ -543,6 +540,10 @@ protected:
}
}
+ if (OutputChannelSize) {
+ OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
+ }
+
for (auto& [_, outputChannel] : OutputChannelsMap) {
if (outputChannel.Channel) {
outputChannel.Channel->Terminate();
@@ -1502,7 +1503,8 @@ protected:
.TaskParams = taskParams,
.ComputeActorId = this->SelfId(),
.TypeEnv = typeEnv,
- .HolderFactory = holderFactory
+ .HolderFactory = holderFactory,
+ .TaskCounters = TaskCounters
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
@@ -1808,6 +1810,10 @@ private:
}
}
}
+
+ if (OutputChannelSize) {
+ OutputChannelSize->Add(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
+ }
}
void InitializeWatermarks() {
@@ -2074,6 +2080,7 @@ protected:
THolder<TDqMemoryQuota> MemoryQuota;
TDqComputeActorWatermarks WatermarksTracker;
+ ::NMonitoring::TDynamicCounterPtr TaskCounters;
TDqComputeActorMetrics DqComputeActorMetrics;
NWilson::TSpan ComputeActorSpan;
private:
@@ -2081,9 +2088,8 @@ private:
TInstant LastSendStatsTime;
bool PassExceptions = false;
protected:
- bool MonCountersProvided = false;
- ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;
- ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit;
+ ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
+ ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize;
THolder<NYql::TCounters> Stat;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
index f8c94daab63..8bff90633b0 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
@@ -26,8 +26,9 @@ namespace NYql::NDq {
class TDqMemoryQuota {
public:
- TDqMemoryQuota(ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem)
- : InitialMkqlMemoryLimit(initialMkqlMemoryLimit)
+ TDqMemoryQuota(::NMonitoring::TDynamicCounters::TCounterPtr& mkqlMemoryQuota, ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem)
+ : MkqlMemoryQuota(mkqlMemoryQuota)
+ , InitialMkqlMemoryLimit(initialMkqlMemoryLimit)
, MkqlMemoryLimit(initialMkqlMemoryLimit)
, MemoryLimits(memoryLimits)
, TxId(txId)
@@ -35,6 +36,9 @@ namespace NYql::NDq {
, ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr)
, CanAllocateExtraMemory(canAllocateExtraMemory)
, ActorSystem(actorSystem) {
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Add(MkqlMemoryLimit);
+ }
}
ui64 GetMkqlMemoryLimit() const {
@@ -59,6 +63,9 @@ namespace NYql::NDq {
MkqlMemoryLimit = newLimit;
alloc->SetLimit(newLimit);
MemoryLimits.FreeMemoryFn(TxId, TaskId, freedSize);
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Sub(freedSize);
+ }
CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit);
}
}
@@ -89,6 +96,9 @@ namespace NYql::NDq {
void TryReleaseQuota() {
if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) {
MemoryLimits.FreeMemoryFn(TxId, TaskId, MkqlMemoryLimit);
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Sub(MkqlMemoryLimit);
+ }
MkqlMemoryLimit = 0;
}
}
@@ -111,6 +121,9 @@ namespace NYql::NDq {
if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) {
MkqlMemoryLimit += memory;
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Add(memory);
+ }
CAMQ_LOG_D("[Mem] memory " << memory << " granted, new limit: " << MkqlMemoryLimit);
alloc->SetLimit(MkqlMemoryLimit);
} else {
@@ -132,6 +145,7 @@ namespace NYql::NDq {
}
private:
+ ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
const ui64 InitialMkqlMemoryLimit;
ui64 MkqlMemoryLimit;
const TComputeMemoryLimits MemoryLimits;
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 95c1c0207a7..6e545bb3622 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -69,17 +69,22 @@ public:
}
}
- TCancelHook Download(
- TString ,
- THeaders ,
- std::size_t ,
- std::size_t ,
- TOnDownloadStart ,
- TOnNewDataPart ,
- TOnDownloadFinish ) final {
+ TCancelHook Download(
+ TString,
+ THeaders,
+ std::size_t,
+ std::size_t,
+ TOnDownloadStart,
+ TOnNewDataPart,
+ TOnDownloadFinish,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr&) final {
return {};
}
+ ui64 GetBuffersSizePerStream() final {
+ return 0;
+ }
+
void AddDefaultResponse(TDataDefaultResponse response) {
DefaultResponse = response;
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 2e9bdc1940e..53a48398759 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -331,7 +331,7 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlStream>;
TEasyCurlStream(
- const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
TString url, IHTTPGateway::THeaders headers,
@@ -340,13 +340,18 @@ public:
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
const TCurlInitConfig& config = TCurlInitConfig())
: TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, sizeLimit, 0ULL, std::move(config))
- , OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ , OnStart(std::move(onStart))
+ , OnNewData(std::move(onNewData))
+ , OnFinish(std::move(onFinish))
+ , Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ , InflightCounter(inflightCounter)
{}
static TPtr Make(
- const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
TString url,
@@ -356,9 +361,10 @@ public:
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
const TCurlInitConfig& config = TCurlInitConfig())
{
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config));
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config));
}
enum class EAction : i8 {
@@ -378,7 +384,6 @@ public:
if (Cancelled) {
return EAction::Drop;
}
-
if (buffersSize && Paused != Counter->load() >= buffersSize) {
Paused = !Paused;
return Paused ? EAction::Stop : EAction::Work;
@@ -413,7 +418,7 @@ private:
const auto realsize = size * nmemb;
if (!Cancelled)
- OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));
+ OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter));
return realsize;
}
@@ -425,6 +430,7 @@ private:
const IHTTPGateway::TOnDownloadFinish OnFinish;
const std::shared_ptr<std::atomic_size_t> Counter;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
bool Started = false;
bool Paused = false;
bool Cancelled = false;
@@ -755,9 +761,10 @@ private:
size_t sizeLimit,
TOnDownloadStart onStart,
TOnNewDataPart onNewData,
- TOnDownloadFinish onFinish) final
+ TOnDownloadFinish onFinish,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
{
- auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish));
+ auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter);
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
@@ -770,6 +777,10 @@ private:
};
}
+ ui64 GetBuffersSizePerStream() final {
+ return BuffersSizePerStream;
+ }
+
void OnRetry(TEasyCurlBuffer::TPtr easy) {
const std::unique_lock lock(Sync);
const size_t sizeLimit = easy->GetSizeLimit();
@@ -875,19 +886,29 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con
, HttpResponseCode(httpResponseCode)
{}
-IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter)
- : TContentBase(std::move(data)), Counter(counter)
+IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter)
+ : TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter)
{
Counter->fetch_add(size());
+ if (InflightCounter) {
+ InflightCounter->Add(size());
+ }
}
IHTTPGateway::TCountedContent::~TCountedContent()
{
Counter->fetch_sub(size());
+ if (InflightCounter) {
+ InflightCounter->Sub(size());
+ }
}
TString IHTTPGateway::TCountedContent::Extract() {
Counter->fetch_sub(size());
+ if (InflightCounter) {
+ InflightCounter->Sub(size());
+ }
return TContentBase::Extract();
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index acd74b265a1..d9aec8fa32d 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -84,7 +84,7 @@ public:
class TCountedContent : public TContentBase {
public:
- TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter);
+ TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter);
~TCountedContent();
TCountedContent(TCountedContent&&) = default;
@@ -93,6 +93,7 @@ public:
TString Extract();
private:
const std::shared_ptr<std::atomic_size_t> Counter;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
};
using TOnDownloadStart = std::function<void(long)>; // http code.
@@ -107,7 +108,10 @@ public:
std::size_t sizeLimit,
TOnDownloadStart onStart,
TOnNewDataPart onNewData,
- TOnDownloadFinish onFinish) = 0;
+ TOnDownloadFinish onFinish,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) = 0;
+
+ virtual ui64 GetBuffersSizePerStream() = 0;
};
}
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 6b1aab4dee8..2731af67b6d 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -211,21 +211,19 @@ private:
YQL_CLOG(DEBUG, ProviderDq) << "TLocalWorkerManager::TEvAllocateWorkersRequest " << resourceId;
TFailureInjector::Reach("allocate_workers_failure", [] { ::_exit(1); });
- auto& allocationInfo = AllocatedWorkers[resourceId];
auto traceId = ev->Get()->Record.GetTraceId();
- allocationInfo.TxId = traceId;
-
auto count = ev->Get()->Record.GetCount();
-
Y_VERIFY(count > 0);
bool canAllocate = MemoryQuoter->Allocate(traceId, 0, count * Options.MkqlInitialMemoryLimit);
-
if (!canAllocate) {
Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks", NYql::NDqProto::StatusIds::OVERLOADED), 0, ev->Cookie);
return;
}
+ auto& allocationInfo = AllocatedWorkers[resourceId];
+ allocationInfo.TxId = traceId;
+
if (allocationInfo.WorkerActors.empty()) {
allocationInfo.WorkerActors.reserve(count);
allocationInfo.Sender = ev->Sender;
@@ -240,15 +238,23 @@ private:
Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count));
}
auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId());
+ ::NMonitoring::TDynamicCounterPtr taskCounters;
+
+ if (createComputeActor && Options.DqTaskCounters) {
+ auto& info = TaskCountersMap[traceId];
+ if (!info.TaskCounters) {
+ info.TaskCounters = Options.DqTaskCounters->GetSubgroup("operation", traceId);
+ }
+ info.ReferenceCount += count;
+ taskCounters = info.TaskCounters;
+ }
for (ui32 i = 0; i < count; i++) {
THolder<NActors::IActor> actor;
if (createComputeActor) {
- auto id = tasks[i].GetId();
- auto stageId = tasks[i].GetStageId();
YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType;
- auto taskCounters = Options.DqTaskCounters ? Options.DqTaskCounters->GetSubgroup("operation", traceId)->GetSubgroup("stage", ToString(stageId))->GetSubgroup("id", ToString(id)) : nullptr;
+
actor.Reset(NYql::CreateComputeActor(
Options,
Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr,
@@ -306,6 +312,20 @@ private:
}
MemoryQuoter->Free(it->second.TxId, 0);
+
+ auto traceId = std::get<TString>(it->second.TxId);
+ auto itt = TaskCountersMap.find(traceId);
+ if (itt != TaskCountersMap.end()) {
+ if (itt->second.ReferenceCount <= it->second.WorkerActors.size()) {
+ if (Options.DqTaskCounters) {
+ Options.DqTaskCounters->RemoveSubgroup("operation", traceId);
+ }
+ TaskCountersMap.erase(itt);
+ } else {
+ itt->second.ReferenceCount -= it->second.WorkerActors.size();
+ }
+ }
+
Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size());
AllocatedWorkers.erase(it);
}
@@ -341,6 +361,13 @@ private:
NDq::TAllocateMemoryCallback AllocateMemoryFn;
NDq::TFreeMemoryCallback FreeMemoryFn;
std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter;
+
+ struct TCountersInfo {
+ ::NMonitoring::TDynamicCounterPtr TaskCounters;
+ ui64 ReferenceCount;
+ };
+
+ TMap<TString, TCountersInfo> TaskCountersMap;
};
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index a133b9f3386..601094bd6b3 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -174,14 +174,11 @@ struct TEvPrivate {
};
struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> {
- TEvFileFinished(size_t pathIndex, ui64 ingressBytes, ui64 expectedDataSize, ui64 actualDataSize)
- : PathIndex(pathIndex), IngressBytes(ingressBytes),
- ExpectedDataSize(expectedDataSize), ActualDataSize(actualDataSize) {
+ TEvFileFinished(size_t pathIndex, ui64 ingressBytes)
+ : PathIndex(pathIndex), IngressBytes(ingressBytes) {
}
const size_t PathIndex;
ui64 IngressBytes;
- ui64 ExpectedDataSize;
- ui64 ActualDataSize;
};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
@@ -479,12 +476,13 @@ void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const T
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues))));
}
-void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex) {
+void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) {
retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,
std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),
std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1),
- std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1));
+ std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1),
+ inflightCounter);
}
template <typename T>
@@ -592,6 +590,7 @@ ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) {
}
class TS3ReadCoroImpl : public TActorCoroImpl {
+ friend class TS3StreamReadActor;
private:
class TReadBufferFromStream : public NDB::ReadBuffer {
public:
@@ -618,12 +617,22 @@ public:
TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
const TString& path, const TString& url, const std::size_t maxBlocksInFly,
- const TS3ReadActorFactoryConfig& readActorFactoryCfg, ui64 expectedDataSize)
+ const TS3ReadActorFactoryConfig& readActorFactoryCfg,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ExpectedDataSize(expectedDataSize)
+ PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly),
+ DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
{}
+ ~TS3ReadCoroImpl() override {
+ if (DeferredEvents.size() && DeferredQueueSize) {
+ DeferredQueueSize->Sub(DeferredEvents.size());
+ }
+ }
+
bool Next(TString& value) {
if (InputFinished)
return false;
@@ -648,6 +657,9 @@ public:
throw TS3ReadAbort();
default:
DeferredEvents.push(std::move(ev));
+ if (DeferredQueueSize) {
+ DeferredQueueSize->Inc();
+ }
break;
}
}
@@ -660,6 +672,9 @@ public:
THolder<IEventHandle> ev;
ev.Swap(DeferredEvents.front());
DeferredEvents.pop();
+ if (DeferredQueueSize) {
+ DeferredQueueSize->Dec();
+ }
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadStarted::EventType:
@@ -681,7 +696,7 @@ public:
if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {
LOG_CORO_D("TS3ReadCoroImpl", "Retry Download in " << RetryStuff->NextRetryDelay << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "], Issues: " << Issues.ToOneLineString());
- GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex))));
+ GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize))));
value.clear();
} else {
LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
@@ -693,6 +708,9 @@ public:
}
return true;
case TEvPrivate::TEvDataPart::EventType:
+ if (HttpDataRps) {
+ HttpDataRps->Inc();
+ }
if (200L == HttpResponseCode || 206L == HttpResponseCode) {
value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
IngressBytes += value.size();
@@ -800,11 +818,11 @@ private:
}
TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices), std::move(columnConverters));
- ActualDataSize += ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
+ ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
} else {
auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
TBlockReader reader(std::move(stream));
- ActualDataSize += ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
+ ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
}
} catch (const TS3ReadError&) {
// Finish reading. Add error from server to issues
@@ -838,7 +856,7 @@ private:
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
- Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes, ExpectedDataSize, ActualDataSize));
+ Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes));
} catch (const TS3ReadAbort&) {
LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path);
} catch (const TDtorException&) {
@@ -848,36 +866,17 @@ private:
return;
}
- template <typename T>
- ui64 SizeOfBatch(const T&) {
- return 0;
- }
-
- template <>
- ui64 SizeOfBatch(const std::shared_ptr<arrow::RecordBatch>& batch) {
- return GetSizeOfBatch(*batch);
- }
-
- template <>
- ui64 SizeOfBatch(const NDB::Block& batch) {
- return batch.bytes();
- }
-
template <typename T, typename TEv>
- ui64 ProcessBatches(IBatchReader<T>& reader, bool isLocal) {
+ void ProcessBatches(IBatchReader<T>& reader, bool isLocal) {
auto actorSystem = GetActorSystem();
auto selfActorId = SelfActorId;
size_t cntBlocksInFly = 0;
- ui64 result = 0;
if (isLocal) {
for (;;) {
T batch;
if (!reader.Next(batch)) {
break;
}
-
- result += SizeOfBatch<T>(batch);
-
if (++cntBlocksInFly > MaxBlocksInFly) {
WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
--cntBlocksInFly;
@@ -895,11 +894,9 @@ private:
if (!reader.Next(batch)) {
break;
}
- result += SizeOfBatch<T>(batch);
Send(ParentActorId, new TEv(batch, PathIndex));
}
}
- return result;
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
@@ -966,55 +963,35 @@ private:
TString LastData;
std::size_t MaxBlocksInFly = 2;
ui64 IngressBytes = 0;
- ui64 ExpectedDataSize;
- ui64 ActualDataSize = 0;
bool Paused = false;
std::queue<THolder<IEventHandle>> DeferredEvents;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
};
class TS3ReadCoroActor : public TActorCoro {
public:
- TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex)
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
, RetryStuff(std::move(retryStuff))
, PathIndex(pathIndex)
+ , HttpInflightSize(httpInflightSize)
{}
private:
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
if (RetryStuff->Url.substr(0, 6) != "file://") {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
- DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex);
+ DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize);
}
}
const TRetryStuff::TPtr RetryStuff;
const size_t PathIndex;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
};
-double FormatRatio(const TString& formatName) {
- Y_UNUSED(formatName);
- return 1.0;
-}
-
-double CompressionRatio(const TString& compressionName) {
- if (compressionName == "gzip") {
- return 15.0;
- }
- if (compressionName == "lz4") {
- return 20.0;
- }
- if (compressionName) {
- // "brotli"
- // "zstd"
- // "bzip2"
- // "xz"
- return 10.0;
- }
- // no compression
- return 1.0;
-}
-
class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput {
public:
TS3StreamReadActor(
@@ -1054,15 +1031,21 @@ public:
{
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
+ QueueDataLimit = Counters->GetCounter("QueueDataLimit");
QueueBlockCount = Counters->GetCounter("QueueBlockCount");
- BufferDataSize = Counters->GetCounter("BufferDataSize");
DownloadPaused = Counters->GetCounter("DownloadPaused");
+ QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
}
if (TaskCounters) {
- ExpectedDataSize = TaskCounters->GetCounter("ExpectedDataSize");
- ActualDataSize = TaskCounters->GetCounter("ActualDataSize");
+ TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize");
+ TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit");
+ TaskDownloadPaused = TaskCounters->GetCounter("DownloadPaused");
+ DeferredQueueSize = TaskCounters->GetCounter("DeferredQueueSize");
+ HttpInflightSize = TaskCounters->GetCounter("HttpInflightSize");
+ HttpInflightLimit = TaskCounters->GetCounter("HttpInflightLimit");
+ HttpDataRps = TaskCounters->GetCounter("HttpDataRps", true);
+ TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
}
- Ratio = FormatRatio(ReadSpec->Compression) * CompressionRatio(ReadSpec->Format);
}
void Bootstrap() {
@@ -1078,12 +1061,10 @@ public:
// no path is pending
return false;
}
- /*
- if (BufferTotalDataSize > static_cast<i64>(ReadActorFactoryCfg.DataInflight)) {
+ if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) {
// too large data inflight
return false;
}
- */
if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) {
// too large download inflight
return false;
@@ -1097,16 +1078,15 @@ public:
const TPath& path = Paths[index];
const TString requestId = CreateGuidAsString();
ui64 fileSize = std::get<std::size_t>(path);
- ui64 expectedDataSize = (ui64) fileSize * Ratio;
auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), fileSize, TxId, requestId, RetryPolicy);
auto pathIndex = index + StartPathIndex;
RetryStuffForFile.emplace(pathIndex, stuff);
- BufferTotalDataSize += expectedDataSize;
- if (Counters) {
- BufferDataSize->Add(expectedDataSize);
+ if (TaskCounters) {
+ HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream());
}
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, expectedDataSize);
- CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release()));
+ ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter;
+ auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps);
+ CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->HttpInflightSize).release()));
}
static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";
@@ -1204,13 +1184,14 @@ private:
total += s;
output.emplace_back(std::move(value));
Blocks.pop_front();
- BufferTotalDataSize -= s;
QueueTotalDataSize -= s;
if (Counters) {
- BufferDataSize->Sub(s);
QueueDataSize->Sub(s);
QueueBlockCount->Dec();
}
+ if (TaskCounters) {
+ TaskQueueDataSize->Sub(s);
+ }
TryRegisterCoro();
} while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free));
@@ -1229,14 +1210,23 @@ private:
void PassAway() override { // Is called from Compute Actor
LOG_D("TS3StreamReadActor", "PassAway");
if (Counters) {
- BufferDataSize->Sub(BufferTotalDataSize);
QueueDataSize->Sub(QueueTotalDataSize);
QueueBlockCount->Sub(Blocks.size());
- if (Paused) {
+ QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
+ }
+ if (TaskCounters) {
+ TaskQueueDataSize->Sub(QueueTotalDataSize);
+ TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
+ HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream() * CoroActors.size());
+ }
+ if (Paused) {
+ if (Counters) {
DownloadPaused->Dec();
}
+ if (TaskCounters) {
+ TaskDownloadPaused->Dec();
+ }
}
- BufferTotalDataSize = 0;
QueueTotalDataSize = 0;
for (const auto actorId : CoroActors) {
@@ -1264,11 +1254,15 @@ private:
if (Counters) {
DownloadPaused->Inc();
}
+ if (TaskCounters) {
+ TaskDownloadPaused->Inc();
+ }
}
}
void MaybeContinue() {
- if (Paused && QueueTotalDataSize < ReadActorFactoryCfg.DataInflight) {
+ // resume download on 3/4 == 75% to avoid oscillation (hysteresis)
+ if (Paused && QueueTotalDataSize * 4 < ReadActorFactoryCfg.DataInflight * 3) {
for (const auto actorId : CoroActors) {
Send(actorId, new TEvPrivate::TEvContinue());
}
@@ -1276,6 +1270,9 @@ private:
if (Counters) {
DownloadPaused->Dec();
}
+ if (TaskCounters) {
+ TaskDownloadPaused->Dec();
+ }
}
}
@@ -1299,6 +1296,9 @@ private:
QueueBlockCount->Inc();
QueueDataSize->Add(size);
}
+ if (TaskCounters) {
+ TaskQueueDataSize->Add(size);
+ }
MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
@@ -1314,6 +1314,9 @@ private:
QueueBlockCount->Inc();
QueueDataSize->Add(size);
}
+ if (TaskCounters) {
+ TaskQueueDataSize->Add(size);
+ }
MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
@@ -1327,16 +1330,8 @@ private:
Y_VERIFY(Count);
--Count;
- // final netto of expected vs actual
- BufferTotalDataSize += ev->Get()->ActualDataSize;
- BufferTotalDataSize -= ev->Get()->ExpectedDataSize;
- if (Counters) {
- BufferDataSize->Add(ev->Get()->ActualDataSize);
- BufferDataSize->Sub(ev->Get()->ExpectedDataSize);
- }
if (TaskCounters) {
- ExpectedDataSize->Add(ev->Get()->ExpectedDataSize);
- ActualDataSize->Add(ev->Get()->ActualDataSize);
+ HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream());
}
DownloadInflight--;
if (CurrentPathIndex < Paths.size()) {
@@ -1379,16 +1374,19 @@ private:
size_t CurrentPathIndex = 0;
mutable TInstant LastMemoryReport = TInstant::Now();
ui64 QueueTotalDataSize = 0;
- i64 BufferTotalDataSize = 0;
::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit;
::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount;
- ::NMonitoring::TDynamicCounters::TCounterPtr ExpectedDataSize;
- ::NMonitoring::TDynamicCounters::TCounterPtr ActualDataSize;
- ::NMonitoring::TDynamicCounters::TCounterPtr BufferDataSize;
::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit;
+ ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit;
+ ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr TaskCounters;
- double Ratio = 0.0;
ui64 DownloadInflight = 0;
std::set<NActors::TActorId> CoroActors;
bool Paused = false;
@@ -1566,7 +1564,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (params.HasFormat() && params.HasRowType()) {
const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry);
- const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr);
+ const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr);
const auto structType = static_cast<TStructType*>(outputItemType);
const auto readSpec = std::make_shared<TReadSpec>();
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 059e3179724..8339ab757a1 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -16,23 +16,21 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway,
const IRetryPolicy<long>::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
- ::NMonitoring::TDynamicCounterPtr counters,
- ::NMonitoring::TDynamicCounterPtr taskCounters) {
+ ::NMonitoring::TDynamicCounterPtr counters) {
#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryPolicy, cfg, counters, taskCounters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.TxId, args.SecureParams,
args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
- counters, taskCounters ? taskCounters->GetSubgroup("operation", ToString(args.TxId)) : nullptr);
+ counters, args.TaskCounters);
});
#else
Y_UNUSED(factory);
Y_UNUSED(credentialsFactory);
Y_UNUSED(gateway);
Y_UNUSED(counters);
- Y_UNUSED(taskCounters);
#endif
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 6574ab4aacb..8a61b163961 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -15,7 +15,7 @@ namespace NYql::NDq {
struct TS3ReadActorFactoryConfig {
ui64 RowsInBatch = 1000;
ui64 MaxInflight = 20;
- ui64 DataInflight = 1_GB;
+ ui64 DataInflight = 200_MB;
};
void RegisterS3ReadActorFactory(
@@ -24,7 +24,6 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
const TS3ReadActorFactoryConfig& = {},
- ::NMonitoring::TDynamicCounterPtr counters = nullptr,
- ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr);
+ ::NMonitoring::TDynamicCounterPtr counters = nullptr);
}