aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-03-11 00:29:04 +0300
committerIgor Makunin <igor.makunin@gmail.com>2022-03-11 00:29:04 +0300
commit7f8663c8cb1ae82ff4e2f97f1b6cb188899ceb97 (patch)
tree05102ae7ef7ccd64a81a79f0a483182bb7d2d551
parent67a4e49589842edecb859b25a7aab43f11204888 (diff)
downloadydb-7f8663c8cb1ae82ff4e2f97f1b6cb188899ceb97.tar.gz
KIKIMR-14480: fix crash, that can occurs while swithing KqpForceNewEngine from level 1 to level 2
ref:43af128c63dc5eb3a73b7c7de1e0452d7dda66d8
-rw-r--r--ydb/core/kqp/common/kqp_transform.h8
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp32
-rw-r--r--ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp38
4 files changed, 57 insertions, 23 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index e09ed61c85..8d68686b47 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -221,13 +221,15 @@ public:
TKqpTransactionInfo GetInfo() const;
void ForceOldEngine() {
- YQL_ENSURE(DeferredEffects.Empty());
- YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value());
+ auto engine = DeferredEffects.GetEngine();
+ YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::OldEngine);
+ YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine || *ForceNewEngineSettings.ForcedNewEngine == false);
ForceNewEngineSettings.ForcedNewEngine = false;
}
void ForceNewEngine(ui32 percent, ui32 level) {
- YQL_ENSURE(DeferredEffects.Empty());
+ auto engine = DeferredEffects.GetEngine();
+ YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::NewEngine);
YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value());
ForceNewEngineSettings.ForcedNewEngine = true;
ForceNewEngineSettings.ForceNewEnginePercent = percent;
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index 27cf296a03..25c6723cf6 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -279,7 +279,7 @@ private:
if (RecompileWithNewEngine &&
KqpCompileResult->PreparedQuery->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_V1)
{
- LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "About to recompile with NewEngine"
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "About to recompile with NewEngine"
<< ", self: " << ctx.SelfID);
RecompileStartTime = now;
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index c3562dfd7d..43afc213c5 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -1036,6 +1036,9 @@ private:
QueryState->NewEngineCompatibleQuery = (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine
&& newEngineCompatibleTx;
+ // select engine according to deferred effects
+ auto effectsEngine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine;
+
if (newEngineCompatibleTx) {
if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 0) {
if (QueryState->InteractiveTx || !QueryState->QueryTraits) {
@@ -1064,11 +1067,10 @@ private:
QueryState->NewEngineCompatibleQuery = false;
// but Tx is still NE Compatible, i.e. RO-queries can be executed with NewEngine
}
+
if (commit) {
- // select engine according to deferred effects
- auto engine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine;
- if (engine) {
- if (*engine == TKqpTransactionInfo::EEngine::NewEngine) {
+ if (effectsEngine) {
+ if (*effectsEngine == TKqpTransactionInfo::EEngine::NewEngine) {
QueryState->NewEngineCompatibleQuery = true;
} else {
QueryState->NewEngineCompatibleQuery = false;
@@ -1090,19 +1092,22 @@ private:
<< ", query: " << QueryState->NewEngineCompatibleQuery
<< ", interactive: " << QueryState->InteractiveTx
<< ", preparedNewEngine: " << (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine
+ << ", effects: " << (effectsEngine ? (int) *effectsEngine : -1)
<< ", forcedNewEngine: " << (QueryState->ForceNewEngineState.ForcedNewEngine
? ToString(*QueryState->ForceNewEngineState.ForcedNewEngine)
: "<none>")
<< ", traits: " << (QueryState->QueryCompileResult->QueryTraits
? QueryState->QueryCompileResult->QueryTraits->ToString()
- : "<none>"));
+ : "<none>")
+ << ", commit: " << commit
+ << ", text: " << queryRequest.GetQuery());
if (newEngineCompatibleTx) {
if (QueryState->NewEngineCompatibleQuery) {
if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) {
preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine;
LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (as part of tx)");
- } else if (QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) {
+ } else if (!effectsEngine && QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) {
preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine;
QueryState->ForceNewEngineState.ForcedNewEngine = true;
@@ -1120,19 +1125,8 @@ private:
}
} else {
if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) {
- if (QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) {
- QueryState->ForceNewEngineState.ForcedNewEngine = true;
-
- KqpHost->ForceTxNewEngine(
- QueryState->TxId,
- QueryState->ForceNewEngineState.ForceNewEnginePercent,
- QueryState->ForceNewEngineState.ForceNewEngineLevel
- );
- LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (new tx)");
- } else {
- QueryState->ForceNewEngineState.ForcedNewEngine = false;
- KqpHost->ForceTxOldEngine(QueryState->TxId);
- }
+ QueryState->ForceNewEngineState.ForcedNewEngine = false;
+ KqpHost->ForceTxOldEngine(QueryState->TxId);
}
preparedQuery = QueryState->QueryCompileResult->PreparedQuery;
diff --git a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
index 09fbe8f295..83663828d1 100644
--- a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
+++ b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
@@ -357,6 +357,7 @@ public:
UNIT_TEST(Level2_InteractiveWriteOnly);
UNIT_TEST(Level2_CompilationFailure);
UNIT_TEST(Level2_NoFallback);
+ UNIT_TEST(Level2_ActiveRequest);
UNIT_TEST(Level3_NotInteractiveReadOnly);
UNIT_TEST(Level3_NotInteractiveWriteOnly);
@@ -396,6 +397,7 @@ public:
void Level2_InteractiveWriteOnly();
void Level2_CompilationFailure();
void Level2_NoFallback();
+ void Level2_ActiveRequest();
void Level3_NotInteractiveReadOnly();
void Level3_NotInteractiveWriteOnly();
@@ -735,6 +737,42 @@ void KqpForceNewEngine::Level2_NoFallback() {
TestNotInteractiveReadOnlyTxFallback(2);
}
+void KqpForceNewEngine::Level2_ActiveRequest() {
+ auto session = Session();
+
+ auto result = session.ExecuteDataQuery(R"(
+ REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
+
+ ForceNewEngine(100, 2);
+
+ // dont force new engine on active transactions
+
+ result = session.ExecuteDataQuery(R"(
+ SELECT 42
+ )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
+ UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
+
+
+ result = session.ExecuteDataQuery(R"(
+ SELECT 42
+ )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
+ UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
+ UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
+}
+
/////////// LEVEL 3 ////////////////////////////////////////////////////////////////////////////////////////////////////
void KqpForceNewEngine::Level3_NotInteractiveReadOnly() {
TestNotInteractiveReadOnlyTx(3);