summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-08-08 01:32:01 +0300
committerhor911 <[email protected]>2023-08-08 02:01:29 +0300
commit96ca47afe1c5916b083e9ba5ea29224363b929a5 (patch)
tree77970e607abcb9d9a58927c1635be5baf802a4b1
parentf872f48f2bb4ba264b430f1e28a74f05dc5f444b (diff)
Alloc S3 Read Buffer with explicit quota
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h35
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp25
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp2
7 files changed, 59 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index b7d5257fb77..f20e873ef11 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -315,7 +315,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
? limits.MkqlHeavyProgramMemoryLimit
: limits.MkqlLightProgramMemoryLimit;
- limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit, limit);
+ limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit);
auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index aef807741b3..cafb8f18be5 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -232,15 +232,6 @@ struct TComputeRuntimeSettings {
TMaybe<NDqProto::TRlPath> RlPath;
};
-struct IMemoryQuotaManager {
- using TPtr = std::shared_ptr<IMemoryQuotaManager>;
- using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>;
- virtual ~IMemoryQuotaManager() = default;
- virtual bool AllocateQuota(ui64 memorySize) = 0;
- virtual void FreeQuota(ui64 memorySize) = 0;
- virtual ui64 GetCurrentQuota() const = 0;
-};
-
struct TGuaranteeQuotaManager : public IMemoryQuotaManager {
TGuaranteeQuotaManager(ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0)
@@ -290,10 +281,28 @@ struct TGuaranteeQuotaManager : public IMemoryQuotaManager {
virtual void FreeExtraQuota(ui64) {
}
- ui64 Limit;
- ui64 Guarantee;
- ui64 Step;
- ui64 Quota;
+ ui64 Limit; // current consumption (Quota + leftover from allocation chunk)
+ ui64 Guarantee; // do not free memory below this value even if Quota == 0
+ ui64 Step; // allocation chunk size
+ ui64 Quota; // current value
+};
+
+struct TChainedQuotaManager : public TGuaranteeQuotaManager {
+
+ TChainedQuotaManager(IMemoryQuotaManager::TPtr extraQuotaManager, ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0)
+ : TGuaranteeQuotaManager(limit, guarantee, step, quota)
+ , ExtraQuotaManager(extraQuotaManager) {
+ }
+
+ bool AllocateExtraQuota(ui64 memorySize) override {
+ return ExtraQuotaManager->AllocateQuota(memorySize);
+ }
+
+ void FreeExtraQuota(ui64 memorySize) override {
+ ExtraQuotaManager->FreeQuota(memorySize);
+ }
+
+ IMemoryQuotaManager::TPtr ExtraQuotaManager;
};
struct TComputeMemoryLimits {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 0c496218fb9..16b149576be 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -31,6 +31,15 @@ class TProgramBuilder;
namespace NYql::NDq {
+struct IMemoryQuotaManager {
+ using TPtr = std::shared_ptr<IMemoryQuotaManager>;
+ using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>;
+ virtual ~IMemoryQuotaManager() = default;
+ virtual bool AllocateQuota(ui64 memorySize) = 0;
+ virtual void FreeQuota(ui64 memorySize) = 0;
+ virtual ui64 GetCurrentQuota() const = 0;
+};
+
// Source/transform.
// Must be IActor.
//
@@ -181,6 +190,7 @@ public:
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
::NMonitoring::TDynamicCounterPtr TaskCounters;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
struct TSinkArguments {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 274e2c03647..3f02419dbe7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1570,7 +1570,8 @@ protected:
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.TaskCounters = TaskCounters,
- .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr
+ .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
+ .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 951aee6dd0d..78979a0f754 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -2234,7 +2234,8 @@ public:
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
- ui64 fileSizeLimit
+ ui64 fileSizeLimit,
+ IMemoryQuotaManager::TPtr memoryQuotaManager
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
@@ -2252,7 +2253,8 @@ public:
, ReadSpec(readSpec)
, Counters(std::move(counters))
, TaskCounters(std::move(taskCounters))
- , FileSizeLimit(fileSizeLimit) {
+ , FileSizeLimit(fileSizeLimit)
+ , MemoryQuotaManager(memoryQuotaManager) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -2279,6 +2281,16 @@ public:
void Bootstrap() {
LOG_D("TS3StreamReadActor", "Bootstrap");
+
+ // Arrow blocks are currently not limited by mem quoter, so we use rough buffer quotation
+ // After exact mem control implementation, this allocation should be deleted
+ if (!MemoryQuotaManager->AllocateQuota(ReadActorFactoryCfg.DataInflight)) {
+ TIssues issues;
+ issues.AddIssue(TIssue{TStringBuilder() << "OutOfMemory - can't allocate read buffer"});
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::OVERLOADED));
+ return;
+ }
+
QueueBufferCounter = std::make_shared<TReadBufferCounter>(
ReadActorFactoryCfg.DataInflight,
TActivationContext::ActorSystem(),
@@ -2521,10 +2533,13 @@ private:
ContainerCache.Clear();
ArrowTupleContainerCache.Clear();
ArrowRowContainerCache.Clear();
+
+ MemoryQuotaManager->FreeQuota(ReadActorFactoryCfg.DataInflight);
} else {
LOG_W("TS3StreamReadActor", "PassAway w/o Bootstrap");
}
+ MemoryQuotaManager.reset();
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
@@ -2687,6 +2702,7 @@ private:
NActors::TActorId FileQueueActor;
const ui64 FileSizeLimit;
bool Bootstrapped = false;
+ IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
using namespace NKikimr::NMiniKQL;
@@ -2841,7 +2857,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
- ::NMonitoring::TDynamicCounterPtr taskCounters)
+ ::NMonitoring::TDynamicCounterPtr taskCounters,
+ IMemoryQuotaManager::TPtr memoryQuotaManager)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
@@ -2988,7 +3005,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SUPPORTED_FLAGS
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy,
- cfg, counters, taskCounters, fileSizeLimit);
+ cfg, counters, taskCounters, fileSizeLimit, memoryQuotaManager);
return {actor, actor};
} else {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 2b3e64b12a6..690fb59385b 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -25,6 +25,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
- ::NMonitoring::TDynamicCounterPtr taskCounters);
+ ::NMonitoring::TDynamicCounterPtr taskCounters,
+ IMemoryQuotaManager::TPtr memoryQuotaManager);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index b84c0335703..ffd571e4b4a 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -24,7 +24,7 @@ void RegisterS3ReadActorFactory(
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.TxId, args.SecureParams,
args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
- counters, args.TaskCounters);
+ counters, args.TaskCounters, args.MemoryQuotaManager);
});
#else
Y_UNUSED(factory);