diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-03-23 13:02:14 +0300 |
---|---|---|
committer | Igor Makunin <igor.makunin@gmail.com> | 2022-03-23 13:02:14 +0300 |
commit | 14fdfc60560cd79282616232a6cb00983374a97b (patch) | |
tree | 8e3e62a81a0c75854a3e713168329eeb03e58d0e | |
parent | 9aefb09c66f0e9d87da21835983e662ed2da04b1 (diff) | |
download | ydb-14fdfc60560cd79282616232a6cb00983374a97b.tar.gz |
KIKIMR-14547: fix memory management
ref:1e0008921d09cf6792893ec16c4f259c8809efe7
-rw-r--r-- | ydb/core/kqp/executer/kqp_literal_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_limits_ut.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 13 |
5 files changed, 74 insertions, 5 deletions
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index d6ebb61f1f..57f9c72af2 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -96,7 +96,7 @@ private: InternalError("Unexpected event"); } } - } catch (const TMemoryLimitExceededException& e) { + } catch (const TMemoryLimitExceededException&) { LOG_W("TKqpLiteralExecuter, memory limit exceeded."); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Memory limit exceeded")); @@ -160,6 +160,14 @@ private: : rmConfig.GetMkqlLightProgramMemoryLimit(); alloc.SetLimit(limit); + alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc](ui64 limit, ui64 required) { + if (required < 100_MB) { + LOG_D("Increase memory limit from " << limit << " to " << required); + alloc.SetLimit(required); + } + }); + + // task runner settings NMiniKQL::TKqpComputeContextBase computeCtx; TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv); diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index 6ae8bf1db7..57c4763b25 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -47,6 +47,8 @@ public: // otherwise use particular memory limit TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing()); + ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); } + const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }; private: diff --git a/ydb/core/kqp/ut/kqp_limits_ut.cpp b/ydb/core/kqp/ut/kqp_limits_ut.cpp index 0e2478dad7..ab03391281 100644 --- a/ydb/core/kqp/ut/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/kqp_limits_ut.cpp @@ -184,6 +184,56 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED)); } + Y_UNIT_TEST_NEW_ENGINE(BigParameter) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto result = session.ExecuteSchemeQuery(R"( + CREATE TABLE `ManyColumns` ( + Key Int32, + Str0 String, Str1 String, Str2 String, Str3 String, Str4 String, + Str5 String, Str6 String, Str7 String, Str8 String, Str9 String, + PRIMARY KEY (Key) + ) + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + auto query = Q1_(R"( + DECLARE $str0 AS String; + DECLARE $str1 AS String; + DECLARE $str2 AS String; + DECLARE $str3 AS String; + DECLARE $str4 AS String; + DECLARE $str5 AS String; + DECLARE $str6 AS String; + DECLARE $str7 AS String; + DECLARE $str8 AS String; + DECLARE $str9 AS String; + + UPSERT INTO `/Root/ManyColumns` (Key, Str0, Str1, Str2, Str3, Str4, Str5, Str6, Str7, Str8, Str9) VALUES + (1, $str0, $str1, $str2, $str3, $str4, $str5, $str6, $str7, $str8, $str9) + )"); + + auto params = TParamsBuilder() + .AddParam("$str0").String(TString(5_MB, 'd')).Build() + .AddParam("$str1").String(TString(5_MB, 'o')).Build() + .AddParam("$str2").String(TString(5_MB, 'n')).Build() + .AddParam("$str3").String(TString(5_MB, 't')).Build() + .AddParam("$str4").String(TString(5_MB, 'g')).Build() + .AddParam("$str5").String(TString(5_MB, 'i')).Build() + .AddParam("$str6").String(TString(5_MB, 'v')).Build() + .AddParam("$str7").String(TString(5_MB, 'e')).Build() + .AddParam("$str8").String(TString(5_MB, 'u')).Build() + .AddParam("$str9").String(TString(1_MB, 'p')).Build() + .Build(); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + Y_UNIT_TEST_NEW_ENGINE(AffectedShardsLimit) { NKikimrConfig::TAppConfig appConfig; auto& queryLimits = *appConfig.MutableTableServiceConfig()->MutableQueryLimits(); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 7a0ae81275..b387addbba 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -153,9 +153,9 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, KqpSetTxLocksKeys(GetKqpTransaction().GetLocks(), self->SysLocksTable(), EngineBay); EngineBay.MarkTxLoaded(); - auto& tasksRunner = GetKqpTasksRunner(); // prepare tasks runner, can throw TMemoryLimitExceededException + auto& tasksRunner = GetKqpTasksRunner(); // create tasks runner, can throw TMemoryLimitExceededException - auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - TxSize); + auto allocGuard = tasksRunner.BindAllocator(100_MB); // set big enough limit, decrease/correct later auto execCtx = DefaultKqpExecutionContext(); tasksRunner.Prepare(DefaultKqpDataReqMemoryLimits(), *execCtx); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index afe08f99a7..7f88594412 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -119,8 +119,19 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Executed; } + try { auto& kqpTx = dataTx->GetKqpTransaction(); + auto& tasksRunner = dataTx->GetKqpTasksRunner(); + + ui64 consumedMemory = dataTx->GetTxSize() + tasksRunner.GetAllocatedMemory(); + if (MaybeRequestMoreTxMemory(consumedMemory, txc)) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << DataShard.TabletID() + << " requested " << txc.GetRequestedMemory() << " more memory"); + + DataShard.IncCounter(COUNTER_TX_WAIT_RESOURCE); + return EExecutionStatus::Restart; + } if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) { KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); @@ -128,8 +139,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Executed; } - auto& tasksRunner = dataTx->GetKqpTasksRunner(); - auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize()); NKqp::NRm::TKqpResourcesRequest req; |