diff options
| author | hor911 <[email protected]> | 2023-08-08 01:32:01 +0300 |
|---|---|---|
| committer | hor911 <[email protected]> | 2023-08-08 02:01:29 +0300 |
| commit | 96ca47afe1c5916b083e9ba5ea29224363b929a5 (patch) | |
| tree | 77970e607abcb9d9a58927c1635be5baf802a4b1 | |
| parent | f872f48f2bb4ba264b430f1e28a74f05dc5f444b (diff) | |
Alloc S3 Read Buffer with explicit quota
7 files changed, 59 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index b7d5257fb77..f20e873ef11 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -315,7 +315,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) { ? limits.MkqlHeavyProgramMemoryLimit : limits.MkqlLightProgramMemoryLimit; - limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit, limit); + limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit); auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index aef807741b3..cafb8f18be5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -232,15 +232,6 @@ struct TComputeRuntimeSettings { TMaybe<NDqProto::TRlPath> RlPath; }; -struct IMemoryQuotaManager { - using TPtr = std::shared_ptr<IMemoryQuotaManager>; - using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>; - virtual ~IMemoryQuotaManager() = default; - virtual bool AllocateQuota(ui64 memorySize) = 0; - virtual void FreeQuota(ui64 memorySize) = 0; - virtual ui64 GetCurrentQuota() const = 0; -}; - struct TGuaranteeQuotaManager : public IMemoryQuotaManager { TGuaranteeQuotaManager(ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0) @@ -290,10 +281,28 @@ struct TGuaranteeQuotaManager : public IMemoryQuotaManager { virtual void FreeExtraQuota(ui64) { } - ui64 Limit; - ui64 Guarantee; - ui64 Step; - ui64 Quota; + ui64 Limit; // current consumption (Quota + leftover from allocation chunk) + ui64 Guarantee; // do not free memory below this value even if Quota == 0 + ui64 Step; // allocation chunk size + ui64 Quota; // current value +}; + +struct TChainedQuotaManager : public TGuaranteeQuotaManager { + + TChainedQuotaManager(IMemoryQuotaManager::TPtr extraQuotaManager, ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0) + : TGuaranteeQuotaManager(limit, guarantee, step, quota) + , ExtraQuotaManager(extraQuotaManager) { + } + + bool AllocateExtraQuota(ui64 memorySize) override { + return ExtraQuotaManager->AllocateQuota(memorySize); + } + + void FreeExtraQuota(ui64 memorySize) override { + ExtraQuotaManager->FreeQuota(memorySize); + } + + IMemoryQuotaManager::TPtr ExtraQuotaManager; }; struct TComputeMemoryLimits { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 0c496218fb9..16b149576be 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -31,6 +31,15 @@ class TProgramBuilder; namespace NYql::NDq { +struct IMemoryQuotaManager { + using TPtr = std::shared_ptr<IMemoryQuotaManager>; + using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>; + virtual ~IMemoryQuotaManager() = default; + virtual bool AllocateQuota(ui64 memorySize) = 0; + virtual void FreeQuota(ui64 memorySize) = 0; + virtual ui64 GetCurrentQuota() const = 0; +}; + // Source/transform. // Must be IActor. // @@ -181,6 +190,7 @@ public: const NKikimr::NMiniKQL::THolderFactory& HolderFactory; ::NMonitoring::TDynamicCounterPtr TaskCounters; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + IMemoryQuotaManager::TPtr MemoryQuotaManager; }; struct TSinkArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 274e2c03647..3f02419dbe7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1570,7 +1570,8 @@ protected: .TypeEnv = typeEnv, .HolderFactory = holderFactory, .TaskCounters = TaskCounters, - .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr + .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr, + .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager }); } catch (const std::exception& ex) { throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 951aee6dd0d..78979a0f754 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -2234,7 +2234,8 @@ public: const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, - ui64 fileSizeLimit + ui64 fileSizeLimit, + IMemoryQuotaManager::TPtr memoryQuotaManager ) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) @@ -2252,7 +2253,8 @@ public: , ReadSpec(readSpec) , Counters(std::move(counters)) , TaskCounters(std::move(taskCounters)) - , FileSizeLimit(fileSizeLimit) { + , FileSizeLimit(fileSizeLimit) + , MemoryQuotaManager(memoryQuotaManager) { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); @@ -2279,6 +2281,16 @@ public: void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); + + // Arrow blocks are currently not limited by mem quoter, so we use rough buffer quotation + // After exact mem control implementation, this allocation should be deleted + if (!MemoryQuotaManager->AllocateQuota(ReadActorFactoryCfg.DataInflight)) { + TIssues issues; + issues.AddIssue(TIssue{TStringBuilder() << "OutOfMemory - can't allocate read buffer"}); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::OVERLOADED)); + return; + } + QueueBufferCounter = std::make_shared<TReadBufferCounter>( ReadActorFactoryCfg.DataInflight, TActivationContext::ActorSystem(), @@ -2521,10 +2533,13 @@ private: ContainerCache.Clear(); ArrowTupleContainerCache.Clear(); ArrowRowContainerCache.Clear(); + + MemoryQuotaManager->FreeQuota(ReadActorFactoryCfg.DataInflight); } else { LOG_W("TS3StreamReadActor", "PassAway w/o Bootstrap"); } + MemoryQuotaManager.reset(); TActorBootstrapped<TS3StreamReadActor>::PassAway(); } @@ -2687,6 +2702,7 @@ private: NActors::TActorId FileQueueActor; const ui64 FileSizeLimit; bool Bootstrapped = false; + IMemoryQuotaManager::TPtr MemoryQuotaManager; }; using namespace NKikimr::NMiniKQL; @@ -2841,7 +2857,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, ::NMonitoring::TDynamicCounterPtr counters, - ::NMonitoring::TDynamicCounterPtr taskCounters) + ::NMonitoring::TDynamicCounterPtr taskCounters, + IMemoryQuotaManager::TPtr memoryQuotaManager) { const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); @@ -2988,7 +3005,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, - cfg, counters, taskCounters, fileSizeLimit); + cfg, counters, taskCounters, fileSizeLimit, memoryQuotaManager); return {actor, actor}; } else { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 2b3e64b12a6..690fb59385b 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -25,6 +25,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, ::NMonitoring::TDynamicCounterPtr counters, - ::NMonitoring::TDynamicCounterPtr taskCounters); + ::NMonitoring::TDynamicCounterPtr taskCounters, + IMemoryQuotaManager::TPtr memoryQuotaManager); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index b84c0335703..ffd571e4b4a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -24,7 +24,7 @@ void RegisterS3ReadActorFactory( return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, - counters, args.TaskCounters); + counters, args.TaskCounters, args.MemoryQuotaManager); }); #else Y_UNUSED(factory); |
