aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-03-23 13:02:14 +0300
committerIgor Makunin <igor.makunin@gmail.com>2022-03-23 13:02:14 +0300
commit14fdfc60560cd79282616232a6cb00983374a97b (patch)
tree8e3e62a81a0c75854a3e713168329eeb03e58d0e
parent9aefb09c66f0e9d87da21835983e662ed2da04b1 (diff)
downloadydb-14fdfc60560cd79282616232a6cb00983374a97b.tar.gz
KIKIMR-14547: fix memory management
ref:1e0008921d09cf6792893ec16c4f259c8809efe7
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp10
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h2
-rw-r--r--ydb/core/kqp/ut/kqp_limits_ut.cpp50
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp13
5 files changed, 74 insertions, 5 deletions
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index d6ebb61f1f..57f9c72af2 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -96,7 +96,7 @@ private:
InternalError("Unexpected event");
}
}
- } catch (const TMemoryLimitExceededException& e) {
+ } catch (const TMemoryLimitExceededException&) {
LOG_W("TKqpLiteralExecuter, memory limit exceeded.");
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Memory limit exceeded"));
@@ -160,6 +160,14 @@ private:
: rmConfig.GetMkqlLightProgramMemoryLimit();
alloc.SetLimit(limit);
+ alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc](ui64 limit, ui64 required) {
+ if (required < 100_MB) {
+ LOG_D("Increase memory limit from " << limit << " to " << required);
+ alloc.SetLimit(required);
+ }
+ });
+
+
// task runner settings
NMiniKQL::TKqpComputeContextBase computeCtx;
TDqTaskRunnerContext context = CreateTaskRunnerContext(&computeCtx, &alloc, &typeEnv);
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index 6ae8bf1db7..57c4763b25 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -47,6 +47,8 @@ public:
// otherwise use particular memory limit
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());
+ ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }
+
const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; };
private:
diff --git a/ydb/core/kqp/ut/kqp_limits_ut.cpp b/ydb/core/kqp/ut/kqp_limits_ut.cpp
index 0e2478dad7..ab03391281 100644
--- a/ydb/core/kqp/ut/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_limits_ut.cpp
@@ -184,6 +184,56 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED));
}
+ Y_UNIT_TEST_NEW_ENGINE(BigParameter) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto result = session.ExecuteSchemeQuery(R"(
+ CREATE TABLE `ManyColumns` (
+ Key Int32,
+ Str0 String, Str1 String, Str2 String, Str3 String, Str4 String,
+ Str5 String, Str6 String, Str7 String, Str8 String, Str9 String,
+ PRIMARY KEY (Key)
+ )
+ )").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ auto query = Q1_(R"(
+ DECLARE $str0 AS String;
+ DECLARE $str1 AS String;
+ DECLARE $str2 AS String;
+ DECLARE $str3 AS String;
+ DECLARE $str4 AS String;
+ DECLARE $str5 AS String;
+ DECLARE $str6 AS String;
+ DECLARE $str7 AS String;
+ DECLARE $str8 AS String;
+ DECLARE $str9 AS String;
+
+ UPSERT INTO `/Root/ManyColumns` (Key, Str0, Str1, Str2, Str3, Str4, Str5, Str6, Str7, Str8, Str9) VALUES
+ (1, $str0, $str1, $str2, $str3, $str4, $str5, $str6, $str7, $str8, $str9)
+ )");
+
+ auto params = TParamsBuilder()
+ .AddParam("$str0").String(TString(5_MB, 'd')).Build()
+ .AddParam("$str1").String(TString(5_MB, 'o')).Build()
+ .AddParam("$str2").String(TString(5_MB, 'n')).Build()
+ .AddParam("$str3").String(TString(5_MB, 't')).Build()
+ .AddParam("$str4").String(TString(5_MB, 'g')).Build()
+ .AddParam("$str5").String(TString(5_MB, 'i')).Build()
+ .AddParam("$str6").String(TString(5_MB, 'v')).Build()
+ .AddParam("$str7").String(TString(5_MB, 'e')).Build()
+ .AddParam("$str8").String(TString(5_MB, 'u')).Build()
+ .AddParam("$str9").String(TString(1_MB, 'p')).Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
Y_UNIT_TEST_NEW_ENGINE(AffectedShardsLimit) {
NKikimrConfig::TAppConfig appConfig;
auto& queryLimits = *appConfig.MutableTableServiceConfig()->MutableQueryLimits();
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 7a0ae81275..b387addbba 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -153,9 +153,9 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
KqpSetTxLocksKeys(GetKqpTransaction().GetLocks(), self->SysLocksTable(), EngineBay);
EngineBay.MarkTxLoaded();
- auto& tasksRunner = GetKqpTasksRunner(); // prepare tasks runner, can throw TMemoryLimitExceededException
+ auto& tasksRunner = GetKqpTasksRunner(); // create tasks runner, can throw TMemoryLimitExceededException
- auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - TxSize);
+ auto allocGuard = tasksRunner.BindAllocator(100_MB); // set big enough limit, decrease/correct later
auto execCtx = DefaultKqpExecutionContext();
tasksRunner.Prepare(DefaultKqpDataReqMemoryLimits(), *execCtx);
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index afe08f99a7..7f88594412 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -119,8 +119,19 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Executed;
}
+
try {
auto& kqpTx = dataTx->GetKqpTransaction();
+ auto& tasksRunner = dataTx->GetKqpTasksRunner();
+
+ ui64 consumedMemory = dataTx->GetTxSize() + tasksRunner.GetAllocatedMemory();
+ if (MaybeRequestMoreTxMemory(consumedMemory, txc)) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << DataShard.TabletID()
+ << " requested " << txc.GetRequestedMemory() << " more memory");
+
+ DataShard.IncCounter(COUNTER_TX_WAIT_RESOURCE);
+ return EExecutionStatus::Restart;
+ }
if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) {
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
@@ -128,8 +139,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Executed;
}
- auto& tasksRunner = dataTx->GetKqpTasksRunner();
-
auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize());
NKqp::NRm::TKqpResourcesRequest req;