diff options
author | gvit <gvit@ydb.tech> | 2022-09-19 21:30:48 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-09-19 21:30:48 +0300 |
commit | f8fba855293278a23b47bd219e7a4b14daf2d95e (patch) | |
tree | ca704861c42965f0ec72d5ecfba44101ef451cc3 | |
parent | 6dda03e1d7cb67ba59009d054119673552f3981e (diff) | |
download | ydb-f8fba855293278a23b47bd219e7a4b14daf2d95e.tar.gz |
remove scan buffer limit from resource estimation
23 files changed, 158 insertions, 122 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index dcce35616e..99cf2cc81f 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -127,7 +127,7 @@ public: } SysViewActorId = Register(scanActor.Release()); - Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(MemoryLimits.ScanBufferSize)); + Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(MemoryLimits.ChannelBufferSize)); } ContinueExecute(); @@ -159,7 +159,7 @@ public: protected: ui64 CalcMkqlMemoryLimit() override { - return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ScanBufferSize; + return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ChannelBufferSize; } public: @@ -221,7 +221,7 @@ private: Y_VERIFY_DEBUG(SysViewActorId == ActorIdFromProto(msg.GetScanActorId())); CA_LOG_D("Got sysview scan initial event, scan actor: " << SysViewActorId << ", scanId: 0"); - Send(ev->Sender, new TEvKqpCompute::TEvScanDataAck(GetMemoryLimits().ScanBufferSize)); + Send(ev->Sender, new TEvKqpCompute::TEvScanDataAck(GetMemoryLimits().ChannelBufferSize)); return; } @@ -272,8 +272,8 @@ private: } } - ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() + ui64 freeSpace = GetMemoryLimits().ChannelBufferSize > ScanData->GetStoredBytes() + ? GetMemoryLimits().ChannelBufferSize - ScanData->GetStoredBytes() : 0; if (freeSpace > 0) { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index cfe4d7ed8f..ed720761fe 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -271,7 +271,7 @@ public: protected: ui64 CalcMkqlMemoryLimit() override { - return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ScanBufferSize; + return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ChannelBufferSize; } private: @@ -1029,9 +1029,9 @@ private: private: ui64 CalculateFreeSpace() const { - return GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() - : 0ul; + return GetMemoryLimits().ChannelBufferSize > ScanData->GetStoredBytes() + ? GetMemoryLimits().ChannelBufferSize - ScanData->GetStoredBytes() + : 0ul; } std::any GetSourcesState() override { diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index d6255e721a..d7659d13b6 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1335,7 +1335,6 @@ private: settings.UseSpilling = false; TComputeMemoryLimits limits; - limits.ScanBufferSize = 50_MB; // for system views only limits.ChannelBufferSize = 50_MB; limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB; limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB; diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index aa0434c4e2..836e4d4a8f 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -105,13 +105,13 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot ui64 i = 0; for (auto& task : Tasks) { - EstimateTaskResources(task, 0, 0, rmConfig, est[i]); + EstimateTaskResources(task, rmConfig, est[i]); localRunMemoryEst += est[i].TotalMemoryLimit; i++; } if (auto it = ScanTasks.find(SelfId().NodeId()); it != ScanTasks.end()) { for (auto& task : it->second) { - EstimateTaskResources(task, 0, 0, rmConfig, est[i]); + EstimateTaskResources(task, rmConfig, est[i]); localRunMemoryEst += est[i].TotalMemoryLimit; i++; } diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index c125129d31..0ff2d5c3f4 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -141,23 +141,12 @@ private: return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR); } - ui32 requestScans = 0; ui32 requestChannels = 0; - for (auto& dqTask : *msg.MutableTasks()) { - ui32 nScans = 0; - - if (isScan) { - NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta; - - YQL_ENSURE(msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN); - YQL_ENSURE(msg.GetRuntimeSettings().GetTasksOnNodeCount() == 0); // legacy - - dqTask.GetMeta().UnpackTo(&protoTaskMeta); - nScans = protoTaskMeta.GetReads().size(); - } - - auto estimation = EstimateTaskResources(dqTask, nScans, /* dsOnNodeCount */ 0, Config); + auto estimation = EstimateTaskResources(dqTask, Config); + LOG_D("Resource estimation complete" + << ", TxId: " << txId << ", task id: " << dqTask.GetId() << ", node id: " << SelfId().NodeId() + << ", estimated resources: " << estimation.ToString()); NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[dqTask.GetId()]; YQL_ENSURE(taskCtx.TaskId == 0); @@ -168,12 +157,11 @@ private: LOG_D("TxId: " << txId << ", task: " << taskCtx.TaskId << ", requested memory: " << taskCtx.Memory); - requestScans += estimation.ScanBuffersCount; requestChannels += estimation.ChannelBuffersCount; request.TotalMemory += taskCtx.Memory; } - LOG_D("TxId: " << txId << ", requested scans: " << requestScans << ", channels: " << requestChannels + LOG_D("TxId: " << txId << ", channels: " << requestChannels << ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory); ui64 txMemory = State.GetTxMemory(txId, NRm::EKqpMemoryPool::ScanQuery) + request.TotalMemory; @@ -237,7 +225,6 @@ private: NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; - memoryLimits.ScanBufferSize = Config.GetScanBufferSize(); memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit(); memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit(); if (Config.GetEnableInstantMkqlMemoryAlloc()) { @@ -446,7 +433,6 @@ private: #define FORCE_VALUE(name) if (!Config.Has ## name ()) Config.Set ## name(Config.Get ## name()); FORCE_VALUE(ComputeActorsCount) FORCE_VALUE(ChannelBufferSize) - FORCE_VALUE(ScanBufferSize) FORCE_VALUE(MkqlLightProgramMemoryLimit) FORCE_VALUE(MkqlHeavyProgramMemoryLimit) FORCE_VALUE(QueryMemoryLimit) @@ -454,8 +440,6 @@ private: FORCE_VALUE(EnableInstantMkqlMemoryAlloc); FORCE_VALUE(MaxTotalChannelBuffersSize); FORCE_VALUE(MinChannelBufferSize); - FORCE_VALUE(MaxTotalScanBuffersSize); - FORCE_VALUE(MinScanBufferSize); #undef FORCE_VALUE LOG_I("Updated table service config: " << Config.DebugString()); diff --git a/ydb/core/kqp/node/kqp_node_ut.cpp b/ydb/core/kqp/node/kqp_node_ut.cpp index c131bd69f3..3d0cc7b10c 100644 --- a/ydb/core/kqp/node/kqp_node_ut.cpp +++ b/ydb/core/kqp/node/kqp_node_ut.cpp @@ -96,7 +96,6 @@ NKikimrConfig::TTableServiceConfig MakeKqpResourceManagerConfig() { config.MutableResourceManager()->SetComputeActorsCount(100); config.MutableResourceManager()->SetChannelBufferSize(10); config.MutableResourceManager()->SetMinChannelBufferSize(10); - config.MutableResourceManager()->SetScanBufferSize(50); config.MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000); config.MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(10'000); config.MutableResourceManager()->SetQueryMemoryLimit(30'000); @@ -293,7 +292,6 @@ void KqpNode::CommonCase() { auto& memoryLimits = CompFactory->Task2Actor.begin()->second.MemoryLimits; UNIT_ASSERT_VALUES_EQUAL(10, memoryLimits.ChannelBufferSize); - UNIT_ASSERT_VALUES_EQUAL(50, memoryLimits.ScanBufferSize); UNIT_ASSERT_VALUES_EQUAL(1'000, memoryLimits.MkqlLightProgramMemoryLimit); UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit); @@ -486,7 +484,6 @@ void KqpNode::NotEnoughMemory_Extra() { auto& memoryLimits = CompFactory->Task2Actor.begin()->second.MemoryLimits; UNIT_ASSERT_VALUES_EQUAL(10, memoryLimits.ChannelBufferSize); - UNIT_ASSERT_VALUES_EQUAL(50, memoryLimits.ScanBufferSize); UNIT_ASSERT_VALUES_EQUAL(1'000, memoryLimits.MkqlLightProgramMemoryLimit); UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit); diff --git a/ydb/core/kqp/rm/kqp_resource_estimation.cpp b/ydb/core/kqp/rm/kqp_resource_estimation.cpp index aeeef80177..2500c94020 100644 --- a/ydb/core/kqp/rm/kqp_resource_estimation.cpp +++ b/ydb/core/kqp/rm/kqp_resource_estimation.cpp @@ -5,29 +5,18 @@ namespace NKikimr::NKqp { using namespace NYql::NDqProto; using namespace NKikimrConfig; -TTaskResourceEstimation EstimateTaskResources(const TDqTask& task, int nScans, ui32 dsOnNodeCount, +TTaskResourceEstimation EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config) { TTaskResourceEstimation ret; - EstimateTaskResources(task, nScans, dsOnNodeCount, config, ret); + EstimateTaskResources(task, config, ret); return ret; } -void EstimateTaskResources(const TDqTask& task, int nScans, ui32 dsOnNodeCount, - const TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& ret) +void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config, + TTaskResourceEstimation& ret) { ret.TaskId = task.GetId(); - - if (nScans > 0) { - ret.ScanBuffersCount = nScans; - ret.ScanBufferMemoryLimit = config.GetScanBufferSize(); - - if (dsOnNodeCount && ret.ScanBufferMemoryLimit * dsOnNodeCount > config.GetMaxTotalScanBuffersSize()) { - ret.ScanBufferMemoryLimit = std::max(config.GetMinScanBufferSize(), - config.GetMaxTotalScanBuffersSize() / dsOnNodeCount); - } - } - for (const auto& input : task.GetInputs()) { ret.ChannelBuffersCount += input.ChannelsSize(); } @@ -50,18 +39,17 @@ void EstimateTaskResources(const TDqTask& task, int nScans, ui32 dsOnNodeCount, ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit(); } - ret.TotalMemoryLimit = ret.ScanBuffersCount * ret.ScanBufferMemoryLimit - + ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit + ret.TotalMemoryLimit = ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit + ret.MkqlProgramMemoryLimit; } -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, int nScans, - ui32 dsOnNodeCount, const TTableServiceConfig::TResourceManager& config) +TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, + const TTableServiceConfig::TResourceManager& config) { TVector<TTaskResourceEstimation> ret; ret.resize(tasks.size()); for (ui64 i = 0; i < tasks.size(); ++i) { - EstimateTaskResources(tasks[i], nScans, dsOnNodeCount, config, ret[i]); + EstimateTaskResources(tasks[i], config, ret[i]); } return ret; } diff --git a/ydb/core/kqp/rm/kqp_resource_estimation.h b/ydb/core/kqp/rm/kqp_resource_estimation.h index b69ac20351..2cb6751838 100644 --- a/ydb/core/kqp/rm/kqp_resource_estimation.h +++ b/ydb/core/kqp/rm/kqp_resource_estimation.h @@ -10,9 +10,7 @@ namespace NKikimr::NKqp { struct TTaskResourceEstimation { ui64 TaskId = 0; - ui32 ScanBuffersCount = 0; ui32 ChannelBuffersCount = 0; - ui64 ScanBufferMemoryLimit = 0; ui64 ChannelBufferMemoryLimit = 0; ui64 MkqlProgramMemoryLimit = 0; ui64 TotalMemoryLimit = 0; @@ -20,9 +18,7 @@ struct TTaskResourceEstimation { TString ToString() const { return TStringBuilder() << "TaskResourceEstimation{" << " TaskId: " << TaskId - << ", ScanBuffersCount: " << ScanBuffersCount << ", ChannelBuffersCount: " << ChannelBuffersCount - << ", ScanBufferMemoryLimit: " << ScanBufferMemoryLimit << ", ChannelBufferMemoryLimit: " << ChannelBufferMemoryLimit << ", MkqlProgramMemoryLimit: " << MkqlProgramMemoryLimit << ", TotalMemoryLimit: " << TotalMemoryLimit @@ -30,13 +26,13 @@ struct TTaskResourceEstimation { } }; -TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, int nScans, ui32 dsOnNodeCount, +TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const NKikimrConfig::TTableServiceConfig::TResourceManager& config); -void EstimateTaskResources(const NYql::NDqProto::TDqTask& task, int nScans, ui32 dsOnNodeCount, +void EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result); -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, int nScans, - ui32 dsOnNodeCount, const NKikimrConfig::TTableServiceConfig::TResourceManager& config); +TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, + const NKikimrConfig::TTableServiceConfig::TResourceManager& config); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm/kqp_resource_estimation_ut.cpp b/ydb/core/kqp/rm/kqp_resource_estimation_ut.cpp index 689c0ed86b..df17fc4814 100644 --- a/ydb/core/kqp/rm/kqp_resource_estimation_ut.cpp +++ b/ydb/core/kqp/rm/kqp_resource_estimation_ut.cpp @@ -31,7 +31,7 @@ Y_UNIT_TEST(TestChannelSize) { auto* output = task.MutableOutputs()->Add(); output->MutableChannels()->Add(); - auto est = EstimateTaskResources(task, 0, 0, config); + auto est = EstimateTaskResources(task, config); UNIT_ASSERT_EQUAL(201, est.ChannelBuffersCount); UNIT_ASSERT_EQUAL(est.ChannelBufferMemoryLimit, config.GetChannelBufferSize()); @@ -41,45 +41,13 @@ Y_UNIT_TEST(TestChannelSize) { input->MutableChannels()->Add(); } - est = EstimateTaskResources(task, 0, 0, config); + est = EstimateTaskResources(task, config); UNIT_ASSERT_EQUAL(301, est.ChannelBuffersCount); UNIT_ASSERT(est.ChannelBufferMemoryLimit < config.GetChannelBufferSize()); UNIT_ASSERT(est.ChannelBufferMemoryLimit >= config.GetMinChannelBufferSize()); } -Y_UNIT_TEST(TestScanBufferSize) { - NKikimrConfig::TTableServiceConfig::TResourceManager config; - config.SetScanBufferSize(10_MB); - config.SetMaxTotalScanBuffersSize(50_MB); - config.SetMinScanBufferSize(2_MB); - config.SetChannelBufferSize(100); - config.SetMinChannelBufferSize(100); - config.SetMaxTotalChannelBuffersSize(200_GB); - config.SetMkqlLightProgramMemoryLimit(100); - - NYql::NDqProto::TDqTask task; - - for (int i = 1; i <= 5; ++i) { - auto est = EstimateTaskResources(task, i, i, config); - UNIT_ASSERT_VALUES_EQUAL(10_MB, est.ScanBufferMemoryLimit); - UNIT_ASSERT_VALUES_EQUAL(i, est.ScanBuffersCount); - } - - for (int i = 6; i <= 24; ++i) { - auto est = EstimateTaskResources(task, i, i, config); - UNIT_ASSERT(10_MB >= est.ScanBufferMemoryLimit); - UNIT_ASSERT(2_MB < est.ScanBufferMemoryLimit); - UNIT_ASSERT_VALUES_EQUAL(i, est.ScanBuffersCount); - } - - for (int i = 25; i <= 30; ++i) { - auto est = EstimateTaskResources(task, i, i, config); - UNIT_ASSERT_VALUES_EQUAL(2_MB, est.ScanBufferMemoryLimit); - UNIT_ASSERT_VALUES_EQUAL(i, est.ScanBuffersCount); - } -} - } // suite KqpResourceEstimation } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm/kqp_rm.cpp b/ydb/core/kqp/rm/kqp_rm.cpp index 81f950afdd..e3fa8998c9 100644 --- a/ydb/core/kqp/rm/kqp_rm.cpp +++ b/ydb/core/kqp/rm/kqp_rm.cpp @@ -712,7 +712,6 @@ private: #define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name()); FORCE_VALUE(ComputeActorsCount) FORCE_VALUE(ChannelBufferSize) - FORCE_VALUE(ScanBufferSize) FORCE_VALUE(MkqlLightProgramMemoryLimit) FORCE_VALUE(MkqlHeavyProgramMemoryLimit) FORCE_VALUE(QueryMemoryLimit) @@ -720,8 +719,6 @@ private: FORCE_VALUE(EnableInstantMkqlMemoryAlloc); FORCE_VALUE(MaxTotalChannelBuffersSize); FORCE_VALUE(MinChannelBufferSize); - FORCE_VALUE(MaxTotalScanBuffersSize); - FORCE_VALUE(MinScanBufferSize); #undef FORCE_VALUE LOG_I("Updated table service config: " << config.DebugString()); diff --git a/ydb/core/kqp/ut/CMakeLists.darwin.txt b/ydb/core/kqp/ut/CMakeLists.darwin.txt index 6f8de5cd9d..f5fffdab6f 100644 --- a/ydb/core/kqp/ut/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/CMakeLists.darwin.txt @@ -8,6 +8,7 @@ add_subdirectory(common) add_subdirectory(fat) +add_subdirectory(spilling) add_executable(ydb-core-kqp-ut) target_compile_options(ydb-core-kqp-ut PRIVATE @@ -64,7 +65,6 @@ target_sources(ydb-core-kqp-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_params_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_pragma_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_query_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scan_spilling_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scan_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scheme_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scripting_ut.cpp diff --git a/ydb/core/kqp/ut/CMakeLists.linux.txt b/ydb/core/kqp/ut/CMakeLists.linux.txt index c66efa1e2a..af6b41955c 100644 --- a/ydb/core/kqp/ut/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/CMakeLists.linux.txt @@ -8,6 +8,7 @@ add_subdirectory(common) add_subdirectory(fat) +add_subdirectory(spilling) add_executable(ydb-core-kqp-ut) target_compile_options(ydb-core-kqp-ut PRIVATE @@ -68,7 +69,6 @@ target_sources(ydb-core-kqp-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_params_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_pragma_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_query_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scan_spilling_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scan_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scheme_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_scripting_ut.cpp diff --git a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp index 5c9d2a8707..46e107c240 100644 --- a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp @@ -53,7 +53,6 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct NKikimrConfig::TAppConfig appCfg; appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(limit); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMinChannelBufferSize(limit); - appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetScanBufferSize(limit); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(200ul << 20); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(20ul << 30); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 3e8e86876a..5b1ced8390 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -23,7 +23,6 @@ NKikimrConfig::TAppConfig AppCfg() { appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(1_MB); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMinChannelBufferSize(1_MB); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMaxTotalChannelBuffersSize(100_GB); - appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetScanBufferSize(1_MB); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1_MB); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(1_MB); return appCfg; diff --git a/ydb/core/kqp/ut/spilling/CMakeLists.darwin.txt b/ydb/core/kqp/ut/spilling/CMakeLists.darwin.txt new file mode 100644 index 0000000000..5f49554dc5 --- /dev/null +++ b/ydb/core/kqp/ut/spilling/CMakeLists.darwin.txt @@ -0,0 +1,51 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-kqp-ut-spilling) +target_compile_options(ydb-core-kqp-ut-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-kqp-ut-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp +) +target_link_libraries(ydb-core-kqp-ut-spilling PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-kqp + cpp-client-ydb_proto + core-kqp-counters + core-kqp-host + core-kqp-provider + kqp-ut-common +) +target_link_options(ydb-core-kqp-ut-spilling PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-kqp-ut-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp +) +add_test( + NAME + ydb-core-kqp-ut-spilling + COMMAND + ydb-core-kqp-ut-spilling + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-kqp-ut-spilling) diff --git a/ydb/core/kqp/ut/spilling/CMakeLists.linux.txt b/ydb/core/kqp/ut/spilling/CMakeLists.linux.txt new file mode 100644 index 0000000000..9821c1b040 --- /dev/null +++ b/ydb/core/kqp/ut/spilling/CMakeLists.linux.txt @@ -0,0 +1,55 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-kqp-ut-spilling) +target_compile_options(ydb-core-kqp-ut-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-kqp-ut-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp +) +target_link_libraries(ydb-core-kqp-ut-spilling PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-kqp + cpp-client-ydb_proto + core-kqp-counters + core-kqp-host + core-kqp-provider + kqp-ut-common +) +target_link_options(ydb-core-kqp-ut-spilling PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-kqp-ut-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp +) +add_test( + NAME + ydb-core-kqp-ut-spilling + COMMAND + ydb-core-kqp-ut-spilling + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-kqp-ut-spilling) diff --git a/ydb/core/kqp/ut/spilling/CMakeLists.txt b/ydb/core/kqp/ut/spilling/CMakeLists.txt new file mode 100644 index 0000000000..dbfe6fa2c4 --- /dev/null +++ b/ydb/core/kqp/ut/spilling/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/kqp/ut/kqp_scan_spilling_ut.cpp b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp index 613724e640..104121d452 100644 --- a/ydb/core/kqp/ut/kqp_scan_spilling_ut.cpp +++ b/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp @@ -20,7 +20,6 @@ NKikimrConfig::TAppConfig AppCfg() { auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); rm->SetChannelBufferSize(100); rm->SetMinChannelBufferSize(100); - rm->SetScanBufferSize(1 << 20); rm->SetMkqlLightProgramMemoryLimit(100 << 20); rm->SetMkqlHeavyProgramMemoryLimit(100 << 20); @@ -85,9 +84,9 @@ Y_UNIT_TEST(SelfJoin) { ])", StreamResultToYson(it)); TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); - UNIT_ASSERT_VALUES_EQUAL(18, counters.SpillingWriteBlobs->Val()); - UNIT_ASSERT_VALUES_EQUAL(18, counters.SpillingReadBlobs->Val()); - UNIT_ASSERT(0 == counters.SpillingStoredBlobs->Val() || 18 == counters.SpillingStoredBlobs->Val()); + UNIT_ASSERT_VALUES_EQUAL(14, counters.SpillingWriteBlobs->Val()); + UNIT_ASSERT_VALUES_EQUAL(14, counters.SpillingReadBlobs->Val()); + UNIT_ASSERT(0 == counters.SpillingStoredBlobs->Val() || 14 == counters.SpillingStoredBlobs->Val()); } } // suite diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c760e40b6e..5f360946ca 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1011,8 +1011,8 @@ message TTableServiceConfig { message TResourceManager { optional uint32 ComputeActorsCount = 1 [default = 1000]; - optional uint64 ChannelBufferSize = 2 [default = 4194304]; // 4 MB - optional uint64 ScanBufferSize = 3 [default = 8388608]; // 8 MB + optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB + reserved 3; optional uint64 MkqlLightProgramMemoryLimit = 4 [default = 31457280]; // 30 MB optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB optional uint64 QueryMemoryLimit = 6 [default = 32212254720]; // 30 GB @@ -1024,8 +1024,8 @@ message TTableServiceConfig { optional uint64 MaxTotalChannelBuffersSize = 11 [default = 2147483648]; // 2 GB optional uint64 MinChannelBufferSize = 12 [default = 2097152]; // 2 MB - optional uint64 MaxTotalScanBuffersSize = 13 [default = 4294967296]; // 4 GB - optional uint64 MinScanBufferSize = 14 [default = 2097152]; // 2 MB + reserved 13; + reserved 14; optional TShardsScanningPolicy ShardsScanningPolicy = 16; } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp.cpp b/ydb/core/tx/datashard/datashard_ut_kqp.cpp index 1e15046860..9fdf188627 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp.cpp @@ -57,9 +57,6 @@ public: void BeforeTest(const char* test) { Cerr << "-- Before test " << test << Endl; AppCfg.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(1); - if (test == "AbortOnDisconnect"sv) { - AppCfg.MutableTableServiceConfig()->MutableResourceManager()->SetScanBufferSize(1); - } TTestBase::BeforeTest(test); } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index ed516f2941..a2bf70f7d6 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -55,7 +55,6 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); rm->SetChannelBufferSize(100); rm->SetMinChannelBufferSize(100); - rm->SetScanBufferSize(100); TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); @@ -251,7 +250,6 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); rm->SetChannelBufferSize(100); rm->SetMinChannelBufferSize(100); - rm->SetScanBufferSize(100); TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); @@ -385,7 +383,6 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); rm->SetChannelBufferSize(100); rm->SetMinChannelBufferSize(100); - rm->SetScanBufferSize(100); TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); @@ -505,7 +502,6 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); rm->SetChannelBufferSize(100); rm->SetMinChannelBufferSize(100); - rm->SetScanBufferSize(100); TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index c3cbaa6b43..dd2152f9e7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -238,7 +238,6 @@ using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, u struct TComputeMemoryLimits { ui64 ChannelBufferSize = 0; - ui64 ScanBufferSize = 0; // TODO: drop it ui64 MkqlLightProgramMemoryLimit = 0; ui64 MkqlHeavyProgramMemoryLimit = 0; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index f9bd987e5f..f5742a492e 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -27,7 +27,6 @@ IActor* CreateComputeActor( { auto memoryLimits = NDq::TComputeMemoryLimits(); memoryLimits.ChannelBufferSize = 1000000; - memoryLimits.ScanBufferSize = 16_MB; // light == heavy since we allow extra allocation memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit; memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit; |