aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-10-28 14:10:55 +0300
committerwhcrc <whcrc@ydb.tech>2022-10-28 14:10:55 +0300
commit549e5a86afc4eb32da1f188983b548ebf850d4bf (patch)
treec03c9aa88df58246c028d32028f63419ee170215
parent7ac0d812a0a46f738031ac8e9e3dfec4681bbd73 (diff)
downloadydb-549e5a86afc4eb32da1f188983b548ebf850d4bf.tar.gz
DQ, limit concurrency
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_state.h24
5 files changed, 31 insertions, 3 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 57b1b828ef..76adaf38a1 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -52,6 +52,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, WatermarksMode);
REGISTER_SETTING(*this, WatermarksGranularityMs);
REGISTER_SETTING(*this, UseAggPhases);
+ REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 87a408b486..635d2dd649 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -28,6 +28,7 @@ struct TDqSettings {
static constexpr ui64 OutputChunkMaxSize = 4_MB;
static constexpr ui64 ChunkSizeLimit = 128_MB;
static constexpr bool EnableDqReplicate = false;
+ static constexpr ui64 ParallelOperationsLimit = 16;
};
using TPtr = std::shared_ptr<TDqSettings>;
@@ -75,6 +76,7 @@ struct TDqSettings {
NCommon::TConfSetting<TString, false> WatermarksMode;
NCommon::TConfSetting<ui64, false> WatermarksGranularityMs;
NCommon::TConfSetting<bool, false> UseAggPhases;
+ NCommon::TConfSetting<ui64, false> ParallelOperationsLimit;
NCommon::TConfSetting<TString, false> WorkerFilter;
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
index 736d33142e..79c518f259 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(providers-dq-provider PUBLIC
yutil
cpp-grpc-client
cpp-threading-task_scheduler
+ cpp-threading-future
public-lib-yson_value
cpp-client-ydb_driver
library-yql-core
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 2b14918525..f3c7bcade1 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -790,7 +790,7 @@ private:
}
} else {
graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval));
- future = State->DqGateway->ExecutePlan(
+ future = State->ExecutePlan(
State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
}
@@ -1150,7 +1150,7 @@ private:
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);
- auto future = State->DqGateway->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
+ auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
@@ -1596,7 +1596,7 @@ private:
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);
- auto future = State->DqGateway->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams,
+ auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams,
settings, progressWriter, ModulesMapping, false);
executionPlanner.Destroy();
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
index da1ed10e80..4c4ef09351 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
@@ -7,6 +7,7 @@
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <library/cpp/threading/future/async_semaphore.h>
#include <util/generic/ptr.h>
namespace NYql {
@@ -36,6 +37,8 @@ struct TDqState: public TThrRefBase {
std::atomic<ui32> MetricId = 1;
std::function<void()> AbortHidden = [](){};
+ NThreading::TAsyncSemaphore::TPtr OperationSemaphore = nullptr; // pragmas are not yet parsed, so we initialize it later
+ TAdaptiveLock Mutex_;
TDqState(
const IDqGateway::TPtr& dqGateway,
@@ -71,6 +74,27 @@ struct TDqState: public TThrRefBase {
, ExternalUser(externalUser)
, AbortHidden(std::move(hiddenAborter))
{ }
+
+ NThreading::TFuture<IDqGateway::TResult>
+ ExecutePlan(const TString& sessionId, NDqs::TPlan&& plan, const TVector<TString>& columns,
+ const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
+ const TDqSettings::TPtr& settings,
+ const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
+ bool discard) {
+ with_lock(Mutex_) {
+ if (!OperationSemaphore) {
+ const auto parallelOperationsLimit = Settings->ParallelOperationsLimit.Get().GetOrElse(TDqSettings::TDefault::ParallelOperationsLimit);
+ OperationSemaphore = NThreading::TAsyncSemaphore::Make(parallelOperationsLimit);
+ }
+ }
+ return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable {
+ auto lock = f.GetValue()->MakeAutoRelease();
+ return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) {
+ unlock(NThreading::MakeFuture());
+ return f;
+ });
+ });
+ }
};
using TDqStatePtr = TIntrusivePtr<TDqState>;