diff options
author | whcrc <whcrc@ydb.tech> | 2022-10-28 14:10:55 +0300 |
---|---|---|
committer | whcrc <whcrc@ydb.tech> | 2022-10-28 14:10:55 +0300 |
commit | 549e5a86afc4eb32da1f188983b548ebf850d4bf (patch) | |
tree | c03c9aa88df58246c028d32028f63419ee170215 | |
parent | 7ac0d812a0a46f738031ac8e9e3dfec4681bbd73 (diff) | |
download | ydb-549e5a86afc4eb32da1f188983b548ebf850d4bf.tar.gz |
DQ, limit concurrency
5 files changed, 31 insertions, 3 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 57b1b828ef..76adaf38a1 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -52,6 +52,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, WatermarksMode); REGISTER_SETTING(*this, WatermarksGranularityMs); REGISTER_SETTING(*this, UseAggPhases); + REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 87a408b486..635d2dd649 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -28,6 +28,7 @@ struct TDqSettings { static constexpr ui64 OutputChunkMaxSize = 4_MB; static constexpr ui64 ChunkSizeLimit = 128_MB; static constexpr bool EnableDqReplicate = false; + static constexpr ui64 ParallelOperationsLimit = 16; }; using TPtr = std::shared_ptr<TDqSettings>; @@ -75,6 +76,7 @@ struct TDqSettings { NCommon::TConfSetting<TString, false> WatermarksMode; NCommon::TConfSetting<ui64, false> WatermarksGranularityMs; NCommon::TConfSetting<bool, false> UseAggPhases; + NCommon::TConfSetting<ui64, false> ParallelOperationsLimit; NCommon::TConfSetting<TString, false> WorkerFilter; diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt index 736d33142e..79c518f259 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(providers-dq-provider PUBLIC yutil cpp-grpc-client cpp-threading-task_scheduler + cpp-threading-future public-lib-yson_value cpp-client-ydb_driver library-yql-core diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 2b14918525..f3c7bcade1 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -790,7 +790,7 @@ private: } } else { graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval)); - future = State->DqGateway->ExecutePlan( + future = State->ExecutePlan( State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, settings, progressWriter, ModulesMapping, fillSettings.Discard); } @@ -1150,7 +1150,7 @@ private: IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); - auto future = State->DqGateway->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, + auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, settings, progressWriter, ModulesMapping, fillSettings.Discard); future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { @@ -1596,7 +1596,7 @@ private: IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); - auto future = State->DqGateway->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams, + auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams, settings, progressWriter, ModulesMapping, false); executionPlanner.Destroy(); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h index da1ed10e80..4c4ef09351 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h @@ -7,6 +7,7 @@ #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <library/cpp/threading/future/async_semaphore.h> #include <util/generic/ptr.h> namespace NYql { @@ -36,6 +37,8 @@ struct TDqState: public TThrRefBase { std::atomic<ui32> MetricId = 1; std::function<void()> AbortHidden = [](){}; + NThreading::TAsyncSemaphore::TPtr OperationSemaphore = nullptr; // pragmas are not yet parsed, so we initialize it later + TAdaptiveLock Mutex_; TDqState( const IDqGateway::TPtr& dqGateway, @@ -71,6 +74,27 @@ struct TDqState: public TThrRefBase { , ExternalUser(externalUser) , AbortHidden(std::move(hiddenAborter)) { } + + NThreading::TFuture<IDqGateway::TResult> + ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams, + const TDqSettings::TPtr& settings, + const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping, + bool discard) { + with_lock(Mutex_) { + if (!OperationSemaphore) { + const auto parallelOperationsLimit = Settings->ParallelOperationsLimit.Get().GetOrElse(TDqSettings::TDefault::ParallelOperationsLimit); + OperationSemaphore = NThreading::TAsyncSemaphore::Make(parallelOperationsLimit); + } + } + return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable { + auto lock = f.GetValue()->MakeAutoRelease(); + return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) { + unlock(NThreading::MakeFuture()); + return f; + }); + }); + } }; using TDqStatePtr = TIntrusivePtr<TDqState>; |