diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-05-13 11:16:12 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-05-13 11:16:12 +0300 |
commit | 647fde571299faf9f74c7682c660a9c4795f3318 (patch) | |
tree | 20b42833bd5123fb0523f1d55da48d7d1606d1a5 | |
parent | 369009a919fa9d7814cc20e4b9690133bf631a7d (diff) | |
download | ydb-647fde571299faf9f74c7682c660a9c4795f3318.tar.gz |
Check _KqpForceNewEngine on explicit tx commit/rollback. (KIKIMR-14897)
ref:bd2f734bf6894bf6697bf007f77ef8350f544124
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_run_prepared.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 56 |
3 files changed, 83 insertions, 7 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 2a5d5957c5..5d0c9883a1 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1647,12 +1647,18 @@ private: auto query = std::make_unique<NKikimrKqp::TPreparedQuery>(); auto engine = maybeTx->Get()->DeferredEffects.GetEngine(); + + if (engine.has_value()) { + bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine; + YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine); + } else if (SessionCtx->Config().HasKqpForceNewEngine()) { + engine = TKqpTransactionInfo::EEngine::NewEngine; + } + if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) { - YQL_ENSURE(settings.UseNewEngine.Defined() && *settings.UseNewEngine); query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA); } else { - YQL_ENSURE(!settings.UseNewEngine.Defined() || !*settings.UseNewEngine); query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1); query->AddKqls(); } @@ -1670,15 +1676,20 @@ private: auto query = std::make_unique<NKikimrKqp::TPreparedQuery>(); auto settings1 = settings; - auto engine = tx->DeferredEffects.GetEngine(); + + if (engine.has_value()) { + bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine; + YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine); + } else if (SessionCtx->Config().HasKqpForceNewEngine()) { + engine = TKqpTransactionInfo::EEngine::NewEngine; + } + if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) { - YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == true); settings1.UseNewEngine = true; query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA); } else { - YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == false); settings1.UseNewEngine = false; query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1); query->AddKqls(); @@ -1702,12 +1713,18 @@ private: auto query = std::make_unique<NKikimrKqp::TPreparedQuery>(); auto engine = maybeTx->Get()->DeferredEffects.GetEngine(); + + if (engine.has_value()) { + bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine; + YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine); + } else if (SessionCtx->Config().HasKqpForceNewEngine()) { + engine = TKqpTransactionInfo::EEngine::NewEngine; + } + if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) { - YQL_ENSURE(settings.UseNewEngine.Defined() && *settings.UseNewEngine); query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA); } else { - YQL_ENSURE(!settings.UseNewEngine.Defined() || !*settings.UseNewEngine); query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1); query->AddKqls(); } diff --git a/ydb/core/kqp/host/kqp_run_prepared.cpp b/ydb/core/kqp/host/kqp_run_prepared.cpp index e71a3bacee..4f65bb63aa 100644 --- a/ydb/core/kqp/host/kqp_run_prepared.cpp +++ b/ydb/core/kqp/host/kqp_run_prepared.cpp @@ -29,6 +29,9 @@ public: const auto& kql = TransformCtx->GetPreparedKql(); + // TODO: Enable after switch to NewEngine + // YQL_ENSURE(!TransformCtx->Config->HasKqpForceNewEngine()); + if (CurrentMkqlIndex >= kql.MkqlsSize()) { return Finish(ctx); } diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 3fec24c11f..2798459c81 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3166,6 +3166,62 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(InteractiveForceNE) { + TVector<NKikimrKqp::TKqpSetting> settings; + NKikimrKqp::TKqpSetting setting; + setting.SetName("_KqpForceNewEngine"); + setting.SetValue("true"); + settings.push_back(setting); + + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT * FROM TwoShard; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT 1; + )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT * FROM TwoShard; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + auto commitResult = tx->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, + commitResult.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT * FROM TwoShard; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx = result.GetTransaction(); + + auto rollbackResult = tx->Rollback().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(rollbackResult.GetStatus(), EStatus::SUCCESS, + rollbackResult.GetIssues().ToString()); + } + } } } // namespace NKikimr::NKqp |