aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <s.puchin@gmail.com>2022-05-13 11:16:12 +0300
committerSergei Puchin <s.puchin@gmail.com>2022-05-13 11:16:12 +0300
commit647fde571299faf9f74c7682c660a9c4795f3318 (patch)
tree20b42833bd5123fb0523f1d55da48d7d1606d1a5
parent369009a919fa9d7814cc20e4b9690133bf631a7d (diff)
downloadydb-647fde571299faf9f74c7682c660a9c4795f3318.tar.gz
Check _KqpForceNewEngine on explicit tx commit/rollback. (KIKIMR-14897)
ref:bd2f734bf6894bf6697bf007f77ef8350f544124
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp31
-rw-r--r--ydb/core/kqp/host/kqp_run_prepared.cpp3
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp56
3 files changed, 83 insertions, 7 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 2a5d5957c5..5d0c9883a1 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1647,12 +1647,18 @@ private:
auto query = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto engine = maybeTx->Get()->DeferredEffects.GetEngine();
+
+ if (engine.has_value()) {
+ bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine;
+ YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine);
+ } else if (SessionCtx->Config().HasKqpForceNewEngine()) {
+ engine = TKqpTransactionInfo::EEngine::NewEngine;
+ }
+
if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) {
- YQL_ENSURE(settings.UseNewEngine.Defined() && *settings.UseNewEngine);
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA);
} else {
- YQL_ENSURE(!settings.UseNewEngine.Defined() || !*settings.UseNewEngine);
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1);
query->AddKqls();
}
@@ -1670,15 +1676,20 @@ private:
auto query = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto settings1 = settings;
-
auto engine = tx->DeferredEffects.GetEngine();
+
+ if (engine.has_value()) {
+ bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine;
+ YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine);
+ } else if (SessionCtx->Config().HasKqpForceNewEngine()) {
+ engine = TKqpTransactionInfo::EEngine::NewEngine;
+ }
+
if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) {
- YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == true);
settings1.UseNewEngine = true;
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA);
} else {
- YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == false);
settings1.UseNewEngine = false;
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1);
query->AddKqls();
@@ -1702,12 +1713,18 @@ private:
auto query = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto engine = maybeTx->Get()->DeferredEffects.GetEngine();
+
+ if (engine.has_value()) {
+ bool newEngine = *engine == TKqpTransactionInfo::EEngine::NewEngine;
+ YQL_ENSURE(!settings.UseNewEngine.Defined() || *settings.UseNewEngine == newEngine);
+ } else if (SessionCtx->Config().HasKqpForceNewEngine()) {
+ engine = TKqpTransactionInfo::EEngine::NewEngine;
+ }
+
if (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine) {
- YQL_ENSURE(settings.UseNewEngine.Defined() && *settings.UseNewEngine);
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
query->MutablePhysicalQuery()->SetType(NKqpProto::TKqpPhyQuery::TYPE_DATA);
} else {
- YQL_ENSURE(!settings.UseNewEngine.Defined() || !*settings.UseNewEngine);
query->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1);
query->AddKqls();
}
diff --git a/ydb/core/kqp/host/kqp_run_prepared.cpp b/ydb/core/kqp/host/kqp_run_prepared.cpp
index e71a3bacee..4f65bb63aa 100644
--- a/ydb/core/kqp/host/kqp_run_prepared.cpp
+++ b/ydb/core/kqp/host/kqp_run_prepared.cpp
@@ -29,6 +29,9 @@ public:
const auto& kql = TransformCtx->GetPreparedKql();
+ // TODO: Enable after switch to NewEngine
+ // YQL_ENSURE(!TransformCtx->Config->HasKqpForceNewEngine());
+
if (CurrentMkqlIndex >= kql.MkqlsSize()) {
return Finish(ctx);
}
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 3fec24c11f..2798459c81 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3166,6 +3166,62 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(InteractiveForceNE) {
+ TVector<NKikimrKqp::TKqpSetting> settings;
+ NKikimrKqp::TKqpSetting setting;
+ setting.SetName("_KqpForceNewEngine");
+ setting.SetValue("true");
+ settings.push_back(setting);
+
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ SELECT * FROM TwoShard;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx = result.GetTransaction();
+
+ result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ SELECT 1;
+ )", TTxControl::Tx(*tx).CommitTx()).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ SELECT * FROM TwoShard;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx = result.GetTransaction();
+
+ auto commitResult = tx->Commit().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS,
+ commitResult.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ SELECT * FROM TwoShard;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto tx = result.GetTransaction();
+
+ auto rollbackResult = tx->Rollback().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(rollbackResult.GetStatus(), EStatus::SUCCESS,
+ rollbackResult.GetIssues().ToString());
+ }
+ }
}
} // namespace NKikimr::NKqp