diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-03-11 00:29:04 +0300 |
---|---|---|
committer | Igor Makunin <igor.makunin@gmail.com> | 2022-03-11 00:29:04 +0300 |
commit | 7f8663c8cb1ae82ff4e2f97f1b6cb188899ceb97 (patch) | |
tree | 05102ae7ef7ccd64a81a79f0a483182bb7d2d551 | |
parent | 67a4e49589842edecb859b25a7aab43f11204888 (diff) | |
download | ydb-7f8663c8cb1ae82ff4e2f97f1b6cb188899ceb97.tar.gz |
KIKIMR-14480: fix crash, that can occurs while swithing KqpForceNewEngine from level 1 to level 2
ref:43af128c63dc5eb3a73b7c7de1e0452d7dda66d8
-rw-r--r-- | ydb/core/kqp/common/kqp_transform.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp | 38 |
4 files changed, 57 insertions, 23 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index e09ed61c85..8d68686b47 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -221,13 +221,15 @@ public: TKqpTransactionInfo GetInfo() const; void ForceOldEngine() { - YQL_ENSURE(DeferredEffects.Empty()); - YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value()); + auto engine = DeferredEffects.GetEngine(); + YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::OldEngine); + YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine || *ForceNewEngineSettings.ForcedNewEngine == false); ForceNewEngineSettings.ForcedNewEngine = false; } void ForceNewEngine(ui32 percent, ui32 level) { - YQL_ENSURE(DeferredEffects.Empty()); + auto engine = DeferredEffects.GetEngine(); + YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::NewEngine); YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value()); ForceNewEngineSettings.ForcedNewEngine = true; ForceNewEngineSettings.ForceNewEnginePercent = percent; diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index 27cf296a03..25c6723cf6 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -279,7 +279,7 @@ private: if (RecompileWithNewEngine && KqpCompileResult->PreparedQuery->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_V1) { - LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "About to recompile with NewEngine" + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "About to recompile with NewEngine" << ", self: " << ctx.SelfID); RecompileStartTime = now; diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index c3562dfd7d..43afc213c5 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -1036,6 +1036,9 @@ private: QueryState->NewEngineCompatibleQuery = (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine && newEngineCompatibleTx; + // select engine according to deferred effects + auto effectsEngine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine; + if (newEngineCompatibleTx) { if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 0) { if (QueryState->InteractiveTx || !QueryState->QueryTraits) { @@ -1064,11 +1067,10 @@ private: QueryState->NewEngineCompatibleQuery = false; // but Tx is still NE Compatible, i.e. RO-queries can be executed with NewEngine } + if (commit) { - // select engine according to deferred effects - auto engine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine; - if (engine) { - if (*engine == TKqpTransactionInfo::EEngine::NewEngine) { + if (effectsEngine) { + if (*effectsEngine == TKqpTransactionInfo::EEngine::NewEngine) { QueryState->NewEngineCompatibleQuery = true; } else { QueryState->NewEngineCompatibleQuery = false; @@ -1090,19 +1092,22 @@ private: << ", query: " << QueryState->NewEngineCompatibleQuery << ", interactive: " << QueryState->InteractiveTx << ", preparedNewEngine: " << (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine + << ", effects: " << (effectsEngine ? (int) *effectsEngine : -1) << ", forcedNewEngine: " << (QueryState->ForceNewEngineState.ForcedNewEngine ? ToString(*QueryState->ForceNewEngineState.ForcedNewEngine) : "<none>") << ", traits: " << (QueryState->QueryCompileResult->QueryTraits ? QueryState->QueryCompileResult->QueryTraits->ToString() - : "<none>")); + : "<none>") + << ", commit: " << commit + << ", text: " << queryRequest.GetQuery()); if (newEngineCompatibleTx) { if (QueryState->NewEngineCompatibleQuery) { if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) { preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine; LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (as part of tx)"); - } else if (QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) { + } else if (!effectsEngine && QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) { preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine; QueryState->ForceNewEngineState.ForcedNewEngine = true; @@ -1120,19 +1125,8 @@ private: } } else { if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) { - if (QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) { - QueryState->ForceNewEngineState.ForcedNewEngine = true; - - KqpHost->ForceTxNewEngine( - QueryState->TxId, - QueryState->ForceNewEngineState.ForceNewEnginePercent, - QueryState->ForceNewEngineState.ForceNewEngineLevel - ); - LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (new tx)"); - } else { - QueryState->ForceNewEngineState.ForcedNewEngine = false; - KqpHost->ForceTxOldEngine(QueryState->TxId); - } + QueryState->ForceNewEngineState.ForcedNewEngine = false; + KqpHost->ForceTxOldEngine(QueryState->TxId); } preparedQuery = QueryState->QueryCompileResult->PreparedQuery; diff --git a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp index 09fbe8f295..83663828d1 100644 --- a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp +++ b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp @@ -357,6 +357,7 @@ public: UNIT_TEST(Level2_InteractiveWriteOnly); UNIT_TEST(Level2_CompilationFailure); UNIT_TEST(Level2_NoFallback); + UNIT_TEST(Level2_ActiveRequest); UNIT_TEST(Level3_NotInteractiveReadOnly); UNIT_TEST(Level3_NotInteractiveWriteOnly); @@ -396,6 +397,7 @@ public: void Level2_InteractiveWriteOnly(); void Level2_CompilationFailure(); void Level2_NoFallback(); + void Level2_ActiveRequest(); void Level3_NotInteractiveReadOnly(); void Level3_NotInteractiveWriteOnly(); @@ -735,6 +737,42 @@ void KqpForceNewEngine::Level2_NoFallback() { TestNotInteractiveReadOnlyTxFallback(2); } +void KqpForceNewEngine::Level2_ActiveRequest() { + auto session = Session(); + + auto result = session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); + + ForceNewEngine(100, 2); + + // dont force new engine on active transactions + + result = session.ExecuteDataQuery(R"( + SELECT 42 + )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); + UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); + + + result = session.ExecuteDataQuery(R"( + SELECT 42 + )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); + UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); + UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); +} + /////////// LEVEL 3 //////////////////////////////////////////////////////////////////////////////////////////////////// void KqpForceNewEngine::Level3_NotInteractiveReadOnly() { TestNotInteractiveReadOnlyTx(3); |