aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-11-01 19:24:05 +0300
committerspuchin <spuchin@ydb.tech>2022-11-01 19:24:05 +0300
commit89a1197513c1d767967e489061db3fc25c56f890 (patch)
tree65f815bfe1e75fb5a4f0dfbcb9b37ee5a9f7809d
parent31f51049ff883f69143f6618b930f26569d8c727 (diff)
downloadydb-89a1197513c1d767967e489061db3fc25c56f890.tar.gz
Remove engine selection code from kqp_worker. ()
-rw-r--r--ydb/core/kqp/common/kqp_transform.h19
-rw-r--r--ydb/core/kqp/common/kqp_tx_info.h7
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp15
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp131
-rw-r--r--ydb/core/kqp/kqp_impl.h1
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp609
-rw-r--r--ydb/core/kqp/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/ut/fat/CMakeLists.darwin.txt52
-rw-r--r--ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt54
-rw-r--r--ydb/core/kqp/ut/fat/CMakeLists.linux.txt56
-rw-r--r--ydb/core/kqp/ut/fat/CMakeLists.txt15
-rw-r--r--ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp1042
14 files changed, 41 insertions, 1963 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index 309b67951a8..856fd8ef760 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -221,27 +221,10 @@ public:
DeferredEffects.Clear();
ParamsState = MakeIntrusive<TParamsState>();
SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot;
- ForceNewEngineSettings = {};
}
TKqpTransactionInfo GetInfo() const;
- void ForceOldEngine() {
- auto engine = DeferredEffects.GetEngine();
- YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::OldEngine);
- YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine || *ForceNewEngineSettings.ForcedNewEngine == false);
- ForceNewEngineSettings.ForcedNewEngine = false;
- }
-
- void ForceNewEngine(ui32 percent, ui32 level) {
- auto engine = DeferredEffects.GetEngine();
- YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::NewEngine);
- YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value());
- ForceNewEngineSettings.ForcedNewEngine = true;
- ForceNewEngineSettings.ForceNewEnginePercent = percent;
- ForceNewEngineSettings.ForceNewEngineLevel = level;
- }
-
void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) {
switch (settings.tx_mode_case()) {
case Ydb::Table::TransactionSettings::kSerializableReadWrite:
@@ -296,8 +279,6 @@ public:
TIntrusivePtr<TParamsState> ParamsState;
IKqpGateway::TKqpSnapshotHandle SnapshotHandle;
-
- TKqpForceNewEngineState ForceNewEngineSettings;
};
class TLogExprTransformer {
diff --git a/ydb/core/kqp/common/kqp_tx_info.h b/ydb/core/kqp/common/kqp_tx_info.h
index a3e57414ae6..0c26981bd3f 100644
--- a/ydb/core/kqp/common/kqp_tx_info.h
+++ b/ydb/core/kqp/common/kqp_tx_info.h
@@ -7,12 +7,6 @@
namespace NKikimr {
namespace NKqp {
-struct TKqpForceNewEngineState {
- ui32 ForceNewEnginePercent = 0;
- ui32 ForceNewEngineLevel = 0;
- std::optional<bool> ForcedNewEngine;
-};
-
struct TKqpTransactionInfo {
enum class EKind {
Pure,
@@ -39,7 +33,6 @@ public:
TDuration TotalDuration;
TDuration ServerDuration;
ui32 QueriesCount = 0;
- TKqpForceNewEngineState ForceNewEngineState;
};
} // namespace NKqp
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index ec37ab7f42c..349ee4d49f0 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1183,15 +1183,15 @@ public:
}
void ForceTxOldEngine(const TString& txId) override {
- if (auto tx = FindTransaction(txId)) {
- (*tx)->ForceOldEngine();
- }
- }
+ Y_UNUSED(txId);
+ Y_ENSURE(false, "Unsupported ForceTxOldEngine");
+ }
void ForceTxNewEngine(const TString& txId, ui32 percent, ui32 level) override {
- if (auto tx = FindTransaction(txId)) {
- (*tx)->ForceNewEngine(percent, level);
- }
+ Y_UNUSED(txId);
+ Y_UNUSED(percent);
+ Y_UNUSED(level);
+ Y_ENSURE(false, "Unsupported ForceTxNewEngine");
}
TMaybe<TKqpTransactionInfo> GetTransactionInfo(const TString& txId) override {
@@ -2292,7 +2292,6 @@ TKqpTransactionInfo TKqpTransactionContext::GetInfo() const {
txInfo.QueriesCount = QueriesCount;
txInfo.TxEngine = DeferredEffects.GetEngine();
- txInfo.ForceNewEngineState = ForceNewEngineSettings;
return txInfo;
}
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index fb41511d93f..c3a2196a66b 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -25,19 +25,6 @@ using namespace NThreading;
namespace {
-void FillAstAndPlan(NKikimrKqp::TPreparedKql& kql, const TExprNode::TPtr& queryExpr, TExprContext& ctx) {
- TStringStream astStream;
- auto ast = ConvertToAst(*queryExpr, ctx, TExprAnnotationFlags::None, true);
- ast.Root->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
- kql.SetAst(astStream.Str());
-
- NJsonWriter::TBuf writer;
- writer.SetIndentSpaces(2);
-
- WriteKqlPlan(writer, queryExpr);
- kql.SetPlan(writer.Str());
-}
-
class TAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQueryResult, false> {
public:
using TResult = IKikimrQueryExecutor::TQueryResult;
@@ -325,12 +312,6 @@ public:
YQL_ENSURE(cluster == Cluster);
YQL_ENSURE(phyQuery->GetType() == NKqpProto::TKqpPhyQuery::TYPE_DATA);
- if (!Config->HasAllowKqpNewEngine()) {
- ctx.AddError(TIssue(TPosition(), "NewEngine execution is not allowed on this cluster."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(
- ctx.IssueManager.GetIssues()));
- }
-
return ExecutePhysicalDataQuery(world, std::move(phyQuery), ctx, settings);
}
@@ -391,111 +372,7 @@ private:
}
bool sysColumnsEnabled = TransformCtx->Config->SystemColumnsEnabled();
-
- std::optional<TKqpTransactionInfo::EEngine> engine;
- if (settings.UseNewEngine.Defined()) {
- engine = *settings.UseNewEngine
- ? TKqpTransactionInfo::EEngine::NewEngine
- : TKqpTransactionInfo::EEngine::OldEngine;
- }
- if (!engine.has_value() && Config->UseNewEngine.Get().Defined()) {
- engine = Config->UseNewEngine.Get().Get()
- ? TKqpTransactionInfo::EEngine::NewEngine
- : TKqpTransactionInfo::EEngine::OldEngine;
- }
- if (!engine.has_value() && Config->HasKqpForceNewEngine()) {
- engine = TKqpTransactionInfo::EEngine::NewEngine;
- }
-
- if ((queryCtx->Type == EKikimrQueryType::Scan) ||
- (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine))
- {
- return PrepareQueryNewEngine(cluster, dataQuery, ctx, settings, sysColumnsEnabled);
- }
-
- YQL_ENSURE(false, "Unexpected query prepare in OldEngine mode.");
-
- // OldEngine only
- YQL_ENSURE(!engine.has_value() || *engine == TKqpTransactionInfo::EEngine::OldEngine);
-
- for (const auto& [name, table] : TransformCtx->Tables->GetTables()) {
- if (!table.Metadata->SysView.empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), TStringBuilder()
- << "Table " << table.Metadata->Name << " is a system view. "
- << "System views are not supported by data queries."
- ));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
- }
-
- auto program = BuildKiProgram(dataQuery, *TransformCtx->Tables, ctx, sysColumnsEnabled);
-
- KqlOptimizeTransformer->Rewind();
-
- TExprNode::TPtr optimizedProgram = program.Ptr();
- auto status = InstantTransform(*KqlOptimizeTransformer, optimizedProgram, ctx);
- if (status != IGraphTransformer::TStatus::Ok || !TMaybeNode<TKiProgram>(optimizedProgram)) {
- ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), "Failed to optimize KQL query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- YQL_ENSURE(optimizedProgram->GetTypeAnn());
-
- queryCtx->QueryTraits = CollectQueryTraits(TKiProgram(optimizedProgram), ctx);
-
- KqlTypeAnnTransformer->Rewind();
-
- TExprNode::TPtr finalProgram;
- bool hasNonDeterministicFunctions;
- TPeepholeSettings peepholeSettings;
- peepholeSettings.WithNonDeterministicRules = false;
- status = PeepHoleOptimizeNode<false>(optimizedProgram, finalProgram, ctx, TypesCtx, KqlTypeAnnTransformer.Get(),
- hasNonDeterministicFunctions, peepholeSettings);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), "Failed to peephole optimize KQL query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- status = ReplaceNonDetFunctionsWithParams(finalProgram, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()),
- "Failed to replace non deterministic functions with params for KQL query."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- KqlPrepareTransformer->Rewind();
-
- NKikimrKqp::TKqlSettings kqlSettings;
- kqlSettings.SetCommitTx(settings.CommitTx);
- kqlSettings.SetRollbackTx(settings.RollbackTx);
-
- TransformCtx->Reset();
- TransformCtx->Settings = kqlSettings;
-
- if (!TxState->Tx().EffectiveIsolationLevel) {
- TxState->Tx().EffectiveIsolationLevel = kqlSettings.GetIsolationLevel();
- }
-
- TransformCtx->QueryCtx->PreparingQuery->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1);
- auto kql = TransformCtx->QueryCtx->PreparingQuery->AddKqls();
- kql->MutableSettings()->CopyFrom(TransformCtx->Settings);
-
- FillAstAndPlan(*kql, finalProgram, ctx);
-
- auto operations = TableOperationsToProto(dataQuery.Operations(), ctx);
- for (auto& op : operations) {
- const auto tableName = op.GetTable();
- auto operation = static_cast<TYdbOperation>(op.GetOperation());
-
- *kql->AddOperations() = std::move(op);
-
- const auto& desc = TransformCtx->Tables->GetTable(cluster, tableName);
- TableDescriptionToTableInfo(desc, operation, *kql->MutableTableInfo());
- }
-
- TransformCtx->PreparingKql = kql;
-
- return MakeIntrusive<TAsyncRunResult>(finalProgram, ctx, *KqlPrepareTransformer, *TransformCtx);
+ return PrepareQueryNewEngine(cluster, dataQuery, ctx, settings, sysColumnsEnabled);
}
TIntrusivePtr<TAsyncQueryResult> PrepareQueryNewEngine(const TString& cluster, const TKiDataQuery& dataQuery,
@@ -515,12 +392,6 @@ private:
YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType);
}
- if (!Config->HasAllowKqpNewEngine() && queryType == EKikimrQueryType::Dml) {
- ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()),
- "NewEngine execution is not allowed on this cluster."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
auto kqlQuery = BuildKqlQuery(dataQuery, *TransformCtx->Tables, ctx, sysColumnsEnabled, OptimizeCtx);
if (!kqlQuery) {
return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index f9b3333bc29..06883ec1773 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -95,7 +95,6 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues);
// for tests only
void FailForcedNewEngineCompilationForTests(bool fail = true);
-void FailForcedNewEngineExecutionForTests(bool fail = true);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index a3fc2966b59..3b2a3dd8e3d 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -34,11 +34,6 @@ using namespace NYql;
using namespace NYql::NDq;
using namespace NRuCalc;
-static std::atomic<bool> FailForcedNewEngineExecution = false;
-void FailForcedNewEngineExecutionForTests(bool fail) {
- FailForcedNewEngineExecution = fail;
-}
-
namespace {
using TQueryResult = IKqpHost::TQueryResult;
@@ -63,13 +58,6 @@ struct TKqpQueryState {
NKqpProto::TKqpStatsCompile CompileStats;
ui32 ReplyFlags = 0;
bool KeepSession = false;
- bool InteractiveTx = true;
-
- bool OldEngineFallback = false;
- bool NewEngineCompatibleQuery = false;
-
- TKqpForceNewEngineState ForceNewEngineState;
- std::optional<TQueryTraits> QueryTraits;
TMaybe<NKikimrKqp::TRlPath> RlPath;
};
@@ -198,13 +186,6 @@ public:
Y_UNUSED(ctx);
}
- void HandleReady(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
-
- LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId)
- << "Unexpected compile response while in Ready state.");
- }
-
void HandleReady(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
@@ -295,17 +276,39 @@ public:
return;
}
+ // Most of the queries should be executed directly via session_actor
switch (queryRequest.GetAction()) {
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
- case NKikimrKqp::QUERY_ACTION_BEGIN_TX:
- case NKikimrKqp::QUERY_ACTION_COMMIT_TX:
- case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX:
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ case NKikimrKqp::QUERY_ACTION_EXPLAIN:
+ case NKikimrKqp::QUERY_ACTION_VALIDATE:
+ case NKikimrKqp::QUERY_ACTION_PARSE:
break;
+
default:
- if (!queryRequest.HasType()) {
- onBadRequest("Query type not specified");
+ onError(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() <<
+ "Unexpected query action type in KQP worker: " << (ui32)queryRequest.GetAction());
+ return;
+ }
+
+ if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE) {
+ switch (queryRequest.GetType()) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DDL:
+ case NKikimrKqp::QUERY_TYPE_AST_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
+ case NKikimrKqp::QUERY_TYPE_AST_SCAN:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
+ break;
+
+ default:
+ onError(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() <<
+ "Unexpected execute query type in KQP worker: " << (ui32)queryRequest.GetType());
return;
- }
+ }
+ }
+
+ if (!queryRequest.HasType()) {
+ onBadRequest("Query type not specified");
+ return;
}
switch (queryRequest.GetAction()) {
@@ -323,71 +326,11 @@ public:
break;
}
- HandleQueryRequest(timer, false, ctx);
+ HandleQueryRequest(timer, ctx);
}
- void HandleQueryRequest(NCpuTime::TCpuTimer& timer, bool fallbackToOldEngine, const TActorContext& ctx) {
- auto& queryRequest = QueryState->Request;
-
- if (fallbackToOldEngine) {
- QueryState->ForceNewEngineState.ForcedNewEngine = false;
- QueryState->OldEngineFallback = true;
- }
-
- auto replyError = [this, &ctx] (NYql::EYqlIssueCode status, const TString& info) {
- QueryState->AsyncQueryResult = MakeKikimrResultHolder(NCommon::ResultFromError<TQueryResult>(
- YqlIssue(TPosition(), status, info)));
-
- ContinueQueryProcess(ctx);
- Become(&TKqpWorkerActor::PerformQueryState);
- };
-
- if (queryRequest.HasTxControl()) {
- const auto& txControl = queryRequest.GetTxControl();
-
- switch (txControl.tx_selector_case()) {
- case Ydb::Table::TransactionControl::kTxId: {
- QueryState->TxId = txControl.tx_id();
-
- auto txInfo = KqpHost->GetTransactionInfo(QueryState->TxId);
- if (!txInfo) {
- replyError(TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << QueryState->TxId);
- return;
- }
-
- YQL_ENSURE(!QueryState->OldEngineFallback);
- QueryState->ForceNewEngineState = txInfo->ForceNewEngineState;
- break;
- }
-
- case Ydb::Table::TransactionControl::kBeginTx: {
- if (txControl.commit_tx()) {
- QueryState->InteractiveTx = false;
- }
- if (QueryState->OldEngineFallback) {
- YQL_ENSURE(!QueryState->InteractiveTx);
- }
- break;
- }
-
- case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: {
- replyError(TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder() << "wrong TxControl: tx_selector must be set");
- return;
- }
- }
- } else {
- // some kind of internal query? or verify here?
- }
-
+ void HandleQueryRequest(NCpuTime::TCpuTimer& timer, const TActorContext& ctx) {
StopIdleTimer(ctx);
-
- if (CompileQuery(ctx)) {
- if (QueryState) {
- QueryState->CpuTime += timer.GetTime();
- }
- return;
- }
-
PerformQuery(ctx);
if (QueryState) {
QueryState->CpuTime += timer.GetTime();
@@ -422,105 +365,10 @@ public:
}
}
- void HandleCompileQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
- auto compileResult = ev->Get()->CompileResult;
-
- Y_VERIFY(compileResult);
- Y_VERIFY(QueryState);
-
- if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
- if (ReplyQueryCompileError(compileResult, ctx)) {
- StartIdleTimer(ctx);
- Become(&TKqpWorkerActor::ReadyState);
- } else {
- FinalCleanup(ctx);
- }
-
- return;
- }
-
- QueryState->QueryTraits = compileResult->QueryTraits;
-
- if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) {
- // first query in tx
- QueryState->ForceNewEngineState.ForceNewEnginePercent = ev->Get()->ForceNewEnginePercent;
- QueryState->ForceNewEngineState.ForceNewEngineLevel = ev->Get()->ForceNewEngineLevel;
- }
-
- auto& queryRequest = QueryState->Request;
-
- QueryState->CompileStats.Swap(&ev->Get()->Stats);
-
- if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
- if (ReplyPrepareResult(compileResult, ctx)) {
- StartIdleTimer(ctx);
- Become(&TKqpWorkerActor::ReadyState);
- } else {
- FinalCleanup(ctx);
- }
-
- return;
- }
-
- QueryState->QueryCompileResult = compileResult;
-
- switch (queryRequest.GetAction()) {
- case NKikimrKqp::QUERY_ACTION_EXECUTE:
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
- break;
-
- default:
- Y_VERIFY_S(false, "Unexpected action on successful compile result: "
- << NKikimrKqp::EQueryAction_Name(queryRequest.GetAction()));
- break;
- }
-
- NCpuTime::TCpuTimer timer;
- PerformQuery(ctx);
-
- // PerformQuery can reset QueryState
- if (QueryState) {
- QueryState->CpuTime += timer.GetTime();
- }
- }
-
- void HandleCompileQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
- ReplyBusy(ev, ctx);
- }
-
- void HandleCompileQuery(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
-
- Y_VERIFY(QueryState);
- ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId,
- TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION,
- "Session is being closed", ctx);
-
- Counters->ReportWorkerClosedError(Settings.DbCounters);
- FinalCleanup(ctx);
- }
-
- void HandleCompileQuery(TEvKqp::TEvPingSessionRequest::TPtr &ev, const TActorContext &ctx) {
- ui64 proxyRequestId = ev->Cookie;
- ReplyPingStatus(ev->Sender, proxyRequestId, false, ctx);
- }
-
- void HandleCompileQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
- Y_UNUSED(ctx);
- }
-
void HandlePerformQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ReplyBusy(ev, ctx);
}
- void HandlePerformQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
-
- LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId)
- << "Unexpected compile response while in PerformQuery state.");
- }
-
void HandlePerformQuery(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
Y_UNUSED(ctx);
@@ -543,46 +391,6 @@ public:
if (ev->Get()->Finished) {
QueryState->QueryResult = QueryState->AsyncQueryResult->GetResult();
QueryState->AsyncQueryResult.Reset();
-
- if (!QueryState->OldEngineFallback) {
- auto& x = QueryState->ForceNewEngineState;
- if (x.ForcedNewEngine && *x.ForcedNewEngine) {
- auto status = GetYdbStatus(QueryState->QueryResult);
- bool failForTests = FailForcedNewEngineExecution.load(std::memory_order_relaxed);
-
- if (status != Ydb::StatusIds::SUCCESS || failForTests) {
- if (x.ForceNewEngineLevel == 0 || x.ForceNewEngineLevel == 1) {
- bool shouldFallback = status != Ydb::StatusIds::CANCELLED
- && status != Ydb::StatusIds::ABORTED
- && status != Ydb::StatusIds::OVERLOADED
- && status != Ydb::StatusIds::PRECONDITION_FAILED
- && status != Ydb::StatusIds::UNAVAILABLE
- && status != Ydb::StatusIds::UNDETERMINED;
- if (shouldFallback) {
- QueryState->ForceNewEngineState = {};
- QueryState->NewEngineCompatibleQuery = false;
-
- GetServiceCounters(AppData()->Counters, "kqp")->GetCounter("Requests/OldEngineFallback", true)->Inc();
-
- LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, "OldEngine fallback request"
- << ", satus: " << Ydb::StatusIds::StatusCode_Name(status)
- << ", issues: " << QueryState->QueryResult.Issues().ToString());
-
- NCpuTime::TCpuTimer timer;
- HandleQueryRequest(timer, true, ctx);
- return;
- }
- }
-
- if (failForTests) {
- QueryState->QueryResult.SetStatus(NYql::TIssuesIds::DEFAULT_ERROR);
- QueryState->QueryResult.AddIssue(YqlIssue(TPosition(), TIssuesIds::DEFAULT_ERROR, "Failed for test."));
- GetServiceCounters(AppData()->Counters, "kqp")->GetCounter("Requests/ForceNewEngineExecError", true)->Inc();
- }
- }
- }
- }
-
QueryCleanup(ctx);
} else {
NCpuTime::TCpuTimer timer(QueryState->CpuTime);
@@ -618,16 +426,6 @@ public:
}
}
- void HandlePerformCleanup(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
-
- Y_VERIFY(CleanupState);
- if (!CleanupState->Final) {
- LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId)
- << "Unexpected compile response while in PerformCleanup state.");
- }
- }
-
void HandlePerformCleanup(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
Y_UNUSED(ctx);
@@ -684,7 +482,6 @@ public:
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvQueryRequest, HandleReady);
- HFunc(TEvKqp::TEvCompileResponse, HandleReady);
HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
HFunc(TEvKqp::TEvPingSessionRequest, HandleReady);
HFunc(TEvKqp::TEvContinueProcess, HandleReady);
@@ -699,29 +496,10 @@ public:
}
}
- STFUNC(CompileQueryState) {
- try {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery);
- HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery);
- HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
- HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
- default:
- UnexpectedEvent("CompileQueryState", ev, ctx);
- }
- } catch (const yexception& ex) {
- InternalError(ex.what(), ctx);
- }
- }
-
STFUNC(PerformQueryState) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery);
- HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery);
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery);
@@ -740,7 +518,6 @@ public:
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup);
- HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup);
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup);
@@ -822,64 +599,6 @@ private:
return KqpHost->BeginTransaction(isolation, readonly);
}
- bool CompileQuery(const TActorContext& ctx) {
- if (!Settings.LongSession) {
- return false;
- }
-
- Y_VERIFY(QueryState);
- auto& queryRequest = QueryState->Request;
-
- switch (queryRequest.GetType()) {
- case NKikimrKqp::QUERY_TYPE_SQL_DML:
- case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
- break;
-
- default:
- return false;
- }
-
- TMaybe<TKqpQueryId> query;
- TMaybe<TString> uid;
-
- bool keepInCache = false;
- switch (queryRequest.GetAction()) {
- case NKikimrKqp::QUERY_ACTION_EXECUTE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false);
- keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache();
- break;
-
- case NKikimrKqp::QUERY_ACTION_PREPARE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false);
- keepInCache = true;
- break;
-
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
- uid = queryRequest.GetPreparedQuery();
- keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache();
- break;
-
- default:
- return false;
- }
-
- if (query) {
- query->Settings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType);
- }
-
- auto compileDeadline = QueryState->QueryDeadlines.TimeoutAt;
- if (QueryState->QueryDeadlines.CancelAt) {
- compileDeadline = Min(compileDeadline, QueryState->QueryDeadlines.CancelAt);
- }
-
- auto compileRequestActor = CreateKqpCompileRequestActor(ctx.SelfID, QueryState->UserToken, uid,
- std::move(query), keepInCache, compileDeadline, Settings.DbCounters);
- ctx.ExecutorThread.RegisterActor(compileRequestActor);
-
- Become(&TKqpWorkerActor::CompileQueryState);
- return true;
- }
-
void PerformQuery(const TActorContext& ctx) {
Y_VERIFY(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -944,10 +663,6 @@ private:
Counters->ReportTxCreated(Settings.DbCounters);
QueryState->TxId = beginTxResult.TxId;
- if (commit) {
- QueryState->InteractiveTx = false;
- }
-
break;
}
@@ -959,13 +674,6 @@ private:
auto action = queryRequest.GetAction();
auto queryType = queryRequest.GetType();
- if (QueryState->QueryCompileResult) {
- if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) {
- Y_VERIFY(queryType == NKikimrKqp::QUERY_TYPE_SQL_DML);
- queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
- action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
- }
- }
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE: {
@@ -999,191 +707,8 @@ private:
break;
}
- case NKikimrKqp::QUERY_ACTION_PREPARE: {
- if (!PrepareQuery(ctx, queryRequest.GetQuery(), queryType, commit)) {
- onBadRequest(QueryState->Error);
- return;
- }
- break;
- }
-
- case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: {
- // NOTE: For compatibility with old clients, remove once not used.
- const TString& query = queryRequest.HasPreparedQuery()
- ? queryRequest.GetPreparedQuery()
- : queryRequest.GetQuery();
-
- TPreparedQueryConstPtr preparedQuery;
-
- if (QueryState->QueryCompileResult) {
- Y_VERIFY(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML);
-
- bool newEngineCompatibleTx = !QueryState->OldEngineFallback
- && QueryState->ForceNewEngineState.ForceNewEnginePercent > 0;
-
- bool forcedOldEngine = false;
-
- if (newEngineCompatibleTx &&
- QueryState->ForceNewEngineState.ForcedNewEngine.has_value() &&
- QueryState->ForceNewEngineState.ForcedNewEngine.value() == false)
- {
- // newEngineCompatibleTx = false;
- forcedOldEngine = true;
- }
-
- QueryState->NewEngineCompatibleQuery = (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine
- && newEngineCompatibleTx;
-
- // select engine according to deferred effects
- auto effectsEngine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine;
-
- if (newEngineCompatibleTx) {
- if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 0) {
- if (QueryState->InteractiveTx || !QueryState->QueryTraits) {
- newEngineCompatibleTx = false;
- QueryState->NewEngineCompatibleQuery = false;
- } else {
- const auto& traits = QueryState->QueryTraits.value();
- if (!traits.ReadOnly || traits.WithJoin || traits.WithSqlIn || traits.WithIndex) {
- newEngineCompatibleTx = false;
- QueryState->NewEngineCompatibleQuery = false;
- }
- }
- } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 1) {
- if (QueryState->InteractiveTx || !QueryState->QueryTraits) {
- newEngineCompatibleTx = false;
- QueryState->NewEngineCompatibleQuery = false;
- } else {
- const auto& traits = QueryState->QueryTraits.value();
- if (!traits.ReadOnly) {
- newEngineCompatibleTx = false;
- QueryState->NewEngineCompatibleQuery = false;
- }
- }
- } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 2) {
- if (!QueryState->QueryTraits.has_value() || !QueryState->QueryTraits->ReadOnly) {
- QueryState->NewEngineCompatibleQuery = false;
- // but Tx is still NE Compatible, i.e. RO-queries can be executed with NewEngine
- }
-
- if (commit) {
- if (effectsEngine) {
- if (*effectsEngine == TKqpTransactionInfo::EEngine::NewEngine) {
- QueryState->NewEngineCompatibleQuery = true;
- } else {
- QueryState->NewEngineCompatibleQuery = false;
- }
- } else {
- // Y_VERIFY(false);
- }
- }
- } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 3) {
- newEngineCompatibleTx = true;
- QueryState->NewEngineCompatibleQuery = true;
- } else {
- YQL_ENSURE(false);
- }
- }
-
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "-- NE Compatible: "
- << " tx: " << newEngineCompatibleTx
- << ", query: " << QueryState->NewEngineCompatibleQuery
- << ", interactive: " << QueryState->InteractiveTx
- << ", preparedNewEngine: " << (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine
- << ", effects: " << (effectsEngine ? (int) *effectsEngine : -1)
- << ", forcedNewEngine: " << (QueryState->ForceNewEngineState.ForcedNewEngine
- ? ToString(*QueryState->ForceNewEngineState.ForcedNewEngine)
- : "<none>")
- << ", traits: " << (QueryState->QueryCompileResult->QueryTraits
- ? QueryState->QueryCompileResult->QueryTraits->ToString()
- : "<none>")
- << ", commit: " << commit
- << ", text: " << queryRequest.GetQuery());
-
- if (newEngineCompatibleTx && !forcedOldEngine) {
- if (QueryState->NewEngineCompatibleQuery) {
- if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) {
- preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine;
- LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (as part of tx)");
- } else if (!effectsEngine && QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) {
- preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine;
- QueryState->ForceNewEngineState.ForcedNewEngine = true;
-
- KqpHost->ForceTxNewEngine(
- QueryState->TxId,
- QueryState->ForceNewEngineState.ForceNewEnginePercent,
- QueryState->ForceNewEngineState.ForceNewEngineLevel
- );
- LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (new tx)");
- } else {
- QueryState->ForceNewEngineState.ForcedNewEngine = false;
- KqpHost->ForceTxOldEngine(QueryState->TxId);
- preparedQuery = QueryState->QueryCompileResult->PreparedQuery;
- LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force OldEngine query execution (new tx)");
- }
- } else {
- if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) {
- QueryState->ForceNewEngineState.ForcedNewEngine = false;
- KqpHost->ForceTxOldEngine(QueryState->TxId);
- }
-
- preparedQuery = QueryState->QueryCompileResult->PreparedQuery;
- }
- } else {
- preparedQuery = QueryState->QueryCompileResult->PreparedQuery;
- }
- } else if (Settings.LongSession) {
- onError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Prepared query not found: " << query);
- return;
- } else {
- NKikimrKqp::TPreparedQuery tmp;
- if (!tmp.ParseFromString(query)) {
- onBadRequest("Failed to parse prepared query.");
- return;
- }
- preparedQuery = std::make_shared<const NKikimrKqp::TPreparedQuery>(std::move(tmp));
- }
-
- YQL_ENSURE(preparedQuery);
-
- QueryState->Request.SetQuery(preparedQuery->GetText());
-
- if (!ExecutePreparedQuery(preparedQuery, queryType, std::move(*QueryState->Request.MutableParameters()),
- commit, GetStatsMode(queryRequest, EKikimrStatsMode::Basic)))
- {
- onBadRequest(QueryState->Error);
- return;
- }
- break;
- }
-
- case NKikimrKqp::QUERY_ACTION_BEGIN_TX: {
- TQueryResult result;
- result.SetSuccess();
- QueryState->AsyncQueryResult = MakeKikimrResultHolder(std::move(result));
- ContinueQueryProcess(ctx);
- Become(&TKqpWorkerActor::PerformQueryState);
- return;
- }
-
- case NKikimrKqp::QUERY_ACTION_COMMIT_TX: {
- if (!CommitTx(commit, GetStatsMode(queryRequest, EKikimrStatsMode::Basic))) {
- onBadRequest(QueryState->Error);
- return;
- }
- break;
- }
-
- case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: {
- if (!RollbackTx(commit)) {
- onBadRequest(QueryState->Error);
- return;
- }
- break;
- }
-
default: {
- onBadRequest("Unknown query action");
+ onBadRequest(TStringBuilder() << "Unknown query action: " << (ui32)queryRequest.GetAction());
return;
}
}
@@ -1273,7 +798,6 @@ private:
auto statsMode = GetStatsMode(queryRequest, EKikimrStatsMode::Basic);
switch (type) {
- case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_AST_DML: {
bool isSql = (type == NKikimrKqp::QUERY_TYPE_SQL_DML);
@@ -1283,7 +807,6 @@ private:
execSettings.Deadlines = QueryState->QueryDeadlines;
execSettings.Limits = GetQueryLimits(Settings);
execSettings.StrictDml = false;
- execSettings.UseNewEngine = UseNewEngine();
execSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType);
QueryState->AsyncQueryResult = KqpHost->ExecuteDataQuery(QueryState->TxId, query, isSql,
@@ -1313,7 +836,6 @@ private:
break;
}
- case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_SCAN: {
bool isSql = (type == NKikimrKqp::QUERY_TYPE_SQL_SCAN);
@@ -1329,7 +851,7 @@ private:
}
default: {
- QueryState->Error = "Unexpected query type.";
+ QueryState->Error = TStringBuilder() << "Unexpected query type: " << (ui32)type;
return false;
}
}
@@ -1393,7 +915,6 @@ private:
switch (type) {
case NKikimrKqp::QUERY_TYPE_SQL_DML: {
IKqpHost::TPrepareSettings prepareSettings;
- // prepareSettings.UseNewEngine = use default settings
prepareSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType);
QueryState->AsyncQueryResult = KqpHost->PrepareDataQuery(query, prepareSettings);
break;
@@ -1426,41 +947,6 @@ private:
}
}
- bool CommitTx(bool commit, EKikimrStatsMode statsMode) {
- if (!commit) {
- QueryState->Error = "Commit should be true for commit transaction request.";
- return false;
- }
- if (QueryState->TxId.empty()) {
- QueryState->Error = "Empty tx_id for commit transaction request.";
- return false;
- }
-
- IKikimrQueryExecutor::TExecuteSettings execSettings;
- execSettings.CommitTx = true;
- execSettings.StatsMode = statsMode;
- execSettings.Deadlines = QueryState->QueryDeadlines;
- execSettings.Limits = GetQueryLimits(Settings);
- execSettings.UseNewEngine = UseNewEngine();
-
- QueryState->AsyncQueryResult = KqpHost->CommitTransaction(QueryState->TxId, execSettings);
- return true;
- }
-
- bool RollbackTx(bool commit) {
- if (commit) {
- QueryState->Error = "Commit should be false for rollback transaction request.";
- return false;
- }
- if (QueryState->TxId.empty()) {
- QueryState->Error = "Empty tx_id for rollback transaction request.";
- return false;
- }
-
- QueryState->AsyncQueryResult = KqpHost->RollbackTransaction(QueryState->TxId, CreateRollbackSettings());
- return true;
- }
-
void ContinueQueryProcess(const TActorContext &ctx) {
Y_VERIFY(QueryState);
@@ -1609,22 +1095,6 @@ private:
Counters->ReportQueryReadSets(Settings.DbCounters, queryResult.QueryStats.GetReadSetsCount());
Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, queryResult.QueryStats.GetMaxShardReplySize());
Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, queryResult.QueryStats.GetMaxShardProgramSize());
-
- if (QueryState->QueryCompileResult && QueryState->QueryCompileResult->PreparedQueryNewEngine
- && QueryState->NewEngineCompatibleQuery)
- {
- ui64 computeCpuTimeUs = 0;
- for (auto& execution : queryResult.QueryStats.GetExecutions()) {
- computeCpuTimeUs += execution.GetCpuTimeUs();
- }
-
- // query can be executed with NewEngine
- if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) {
- Counters->ReportNewEngineForcedQueryStats(queryRequest.GetAction(), queryDuration, computeCpuTimeUs);
- } else {
- Counters->ReportNewEngineCompatibleQueryStats(queryRequest.GetAction(), queryDuration, computeCpuTimeUs);
- }
- }
}
if (queryResult.SqlVersion) {
@@ -2022,23 +1492,10 @@ private:
IKikimrQueryExecutor::TExecuteSettings settings;
settings.RollbackTx = true;
settings.Deadlines.TimeoutAt = TInstant::Now() + TDuration::Minutes(1);
- settings.UseNewEngine = UseNewEngine();
return settings;
}
- TMaybe<bool> UseNewEngine() const {
- YQL_ENSURE(QueryState);
-
- if (auto txInfo = KqpHost->GetTransactionInfo(QueryState->TxId)) {
- if (auto engine = txInfo->TxEngine; engine.has_value()) {
- return *engine == TKqpTransactionInfo::EEngine::NewEngine;
- }
- }
-
- return Nothing();
- }
-
static TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) {
const auto& queryLimitsProto = settings.Service.GetQueryLimits();
const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
diff --git a/ydb/core/kqp/ut/CMakeLists.darwin.txt b/ydb/core/kqp/ut/CMakeLists.darwin.txt
index 38e9bd0e8f2..099ce198dd0 100644
--- a/ydb/core/kqp/ut/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/ut/CMakeLists.darwin.txt
@@ -7,7 +7,6 @@
add_subdirectory(common)
-add_subdirectory(fat)
add_subdirectory(pg)
add_subdirectory(spilling)
diff --git a/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt
index 904a04cfa33..4243348b290 100644
--- a/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt
@@ -7,7 +7,6 @@
add_subdirectory(common)
-add_subdirectory(fat)
add_subdirectory(pg)
add_subdirectory(spilling)
diff --git a/ydb/core/kqp/ut/CMakeLists.linux.txt b/ydb/core/kqp/ut/CMakeLists.linux.txt
index 6db7d4496db..9e2c302fcb6 100644
--- a/ydb/core/kqp/ut/CMakeLists.linux.txt
+++ b/ydb/core/kqp/ut/CMakeLists.linux.txt
@@ -7,7 +7,6 @@
add_subdirectory(common)
-add_subdirectory(fat)
add_subdirectory(pg)
add_subdirectory(spilling)
diff --git a/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt b/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt
deleted file mode 100644
index bfc9ff61e44..00000000000
--- a/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt
+++ /dev/null
@@ -1,52 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_executable(ydb-core-kqp-ut-fat)
-target_compile_options(ydb-core-kqp-ut-fat PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
-target_include_directories(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp
-)
-target_link_libraries(ydb-core-kqp-ut-fat PUBLIC
- contrib-libs-cxxsupp
- yutil
- library-cpp-cpuid_check
- cpp-testing-unittest_main
- ydb-core-kqp
- cpp-client-ydb_proto
- core-kqp-counters
- core-kqp-host
- core-kqp-provider
- kqp-ut-common
- yql-sql-pg_dummy
-)
-target_link_options(ydb-core-kqp-ut-fat PRIVATE
- -Wl,-no_deduplicate
- -Wl,-sdk_version,10.15
- -fPIC
- -fPIC
- -framework
- CoreFoundation
-)
-target_sources(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
-)
-add_test(
- NAME
- ydb-core-kqp-ut-fat
- COMMAND
- ydb-core-kqp-ut-fat
- --print-before-suite
- --print-before-test
- --fork-tests
- --print-times
- --show-fails
-)
-vcs_info(ydb-core-kqp-ut-fat)
diff --git a/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 41a5a981fcf..00000000000
--- a/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,54 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_executable(ydb-core-kqp-ut-fat)
-target_compile_options(ydb-core-kqp-ut-fat PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
-target_include_directories(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp
-)
-target_link_libraries(ydb-core-kqp-ut-fat PUBLIC
- contrib-libs-cxxsupp
- yutil
- library-cpp-lfalloc
- cpp-testing-unittest_main
- ydb-core-kqp
- cpp-client-ydb_proto
- core-kqp-counters
- core-kqp-host
- core-kqp-provider
- kqp-ut-common
- yql-sql-pg_dummy
-)
-target_link_options(ydb-core-kqp-ut-fat PRIVATE
- -ldl
- -lrt
- -Wl,--no-as-needed
- -fPIC
- -fPIC
- -lpthread
- -lrt
- -ldl
-)
-target_sources(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
-)
-add_test(
- NAME
- ydb-core-kqp-ut-fat
- COMMAND
- ydb-core-kqp-ut-fat
- --print-before-suite
- --print-before-test
- --fork-tests
- --print-times
- --show-fails
-)
-vcs_info(ydb-core-kqp-ut-fat)
diff --git a/ydb/core/kqp/ut/fat/CMakeLists.linux.txt b/ydb/core/kqp/ut/fat/CMakeLists.linux.txt
deleted file mode 100644
index 48c75b49f72..00000000000
--- a/ydb/core/kqp/ut/fat/CMakeLists.linux.txt
+++ /dev/null
@@ -1,56 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_executable(ydb-core-kqp-ut-fat)
-target_compile_options(ydb-core-kqp-ut-fat PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
-target_include_directories(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp
-)
-target_link_libraries(ydb-core-kqp-ut-fat PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-malloc-tcmalloc
- libs-tcmalloc-no_percpu_cache
- library-cpp-cpuid_check
- cpp-testing-unittest_main
- ydb-core-kqp
- cpp-client-ydb_proto
- core-kqp-counters
- core-kqp-host
- core-kqp-provider
- kqp-ut-common
- yql-sql-pg_dummy
-)
-target_link_options(ydb-core-kqp-ut-fat PRIVATE
- -ldl
- -lrt
- -Wl,--no-as-needed
- -fPIC
- -fPIC
- -lpthread
- -lrt
- -ldl
-)
-target_sources(ydb-core-kqp-ut-fat PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
-)
-add_test(
- NAME
- ydb-core-kqp-ut-fat
- COMMAND
- ydb-core-kqp-ut-fat
- --print-before-suite
- --print-before-test
- --fork-tests
- --print-times
- --show-fails
-)
-vcs_info(ydb-core-kqp-ut-fat)
diff --git a/ydb/core/kqp/ut/fat/CMakeLists.txt b/ydb/core/kqp/ut/fat/CMakeLists.txt
deleted file mode 100644
index 3e0811fb22e..00000000000
--- a/ydb/core/kqp/ut/fat/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
- include(CMakeLists.linux-aarch64.txt)
-elseif (APPLE)
- include(CMakeLists.darwin.txt)
-elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
- include(CMakeLists.linux.txt)
-endif()
diff --git a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
deleted file mode 100644
index eb0479aa45c..00000000000
--- a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp
+++ /dev/null
@@ -1,1042 +0,0 @@
-#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
-
-#include <ydb/core/base/counters.h>
-#include <ydb/core/kqp/counters/kqp_counters.h>
-#include <ydb/core/kqp/kqp_impl.h>
-
-#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-
-#include <library/cpp/grpc/client/grpc_client_low.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYdb;
-using namespace NYdb::NTable;
-
-class KqpForceNewEngine : public TTestBase {
-public:
- void SetUp() override {
- FailForcedNewEngineCompilationForTests(false);
- FailForcedNewEngineExecutionForTests(false);
-
- TVector<NKikimrKqp::TKqpSetting> settings;
- NKikimrKqp::TKqpSetting setting;
- setting.SetName("_KqpForceNewEngine");
- setting.SetValue("false");
- settings.push_back(setting);
-
- TKikimrRunner kikimr(settings);
-
- Kikimr.reset(new TKikimrRunner(settings));
- Counters = Kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters;
- KqpCounters.reset(new TKqpCounters(Counters));
- }
-
- void TearDown() override {
- FailForcedNewEngineCompilationForTests(false);
- FailForcedNewEngineExecutionForTests(false);
- }
-
- TSession Session() {
- return Kikimr->GetTableClient().CreateSession().GetValueSync().GetSession();
- }
-
- void ForceNewEngine(ui32 percent, ui32 level) {
- NGrpc::TGRpcClientLow grpcClient;
- auto grpcContext = grpcClient.CreateContext();
-
- NGrpc::TGRpcClientConfig grpcConfig(Kikimr->GetEndpoint());
- auto grpc = grpcClient.CreateGRpcServiceConnection<NKikimrClient::TGRpcServer>(grpcConfig);
-
- NKikimrClient::TConsoleRequest request;
- auto* action = request.MutableConfigureRequest()->MutableActions()->Add();
- auto* configItem = action->MutableAddConfigItem()->MutableConfigItem();
- configItem->SetKind(NKikimrConsole::TConfigItem::TableServiceConfigItem);
-
- configItem->MutableConfig()->MutableTableServiceConfig()->SetForceNewEnginePercent(percent);
- configItem->MutableConfig()->MutableTableServiceConfig()->SetForceNewEngineLevel(level);
-
- std::atomic<int> done = 0;
- grpc->DoRequest<NKikimrClient::TConsoleRequest, NKikimrClient::TConsoleResponse>(
- request,
- [&done](NGrpc::TGrpcStatus&& status, NKikimrClient::TConsoleResponse&& response) {
- if (status.Ok()) {
- if (response.GetStatus().code() != Ydb::StatusIds::SUCCESS) {
- done = 3;
- } else if (response.GetConfigureResponse().GetStatus().code() != Ydb::StatusIds::SUCCESS) {
- done = 4;
- } else {
- done = 1;
- }
- } else {
- Cerr << "status: " << status.Msg << ", " << status.InternalError << ", " << status.GRpcStatusCode << Endl;
- Cerr << response.DebugString() << Endl;
- done = 2;
- }
- },
- &NKikimrClient::TGRpcServer::Stub::AsyncConsoleRequest,
- {},
- grpcContext.get());
-
- while (done.load() == 0) {
- ::Sleep(TDuration::Seconds(1));
- }
- grpcContext.reset();
- grpcClient.Stop(true);
-
- UNIT_ASSERT_VALUES_EQUAL(done.load(), 1);
- }
-
- void TestNotInteractiveReadOnlyTx(ui32 level, bool withSqlIn = false) {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto query = withSqlIn
- ? R"(
- --!syntax_v1
- DECLARE $values AS List<String>;
- SELECT * FROM `/Root/TwoShard` WHERE Value1 IN $values
- )"
- : R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 1
- )";
- auto params = TParamsBuilder()
- .AddParam("$values").BeginList().AddListItem().String("One").EndList().Build().Build();
- auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(50, level);
- test(20);
- bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0;
- if (level == 0 && withSqlIn) {
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- } else {
- UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val()
- << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
- }
-
- {
- ForceNewEngine(100, level);
- test(2);
- if (level == 0 && withSqlIn) {
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- } else {
- UNIT_ASSERT_VALUES_EQUAL(2, KqpCounters->NewEngineForcedQueryCount->Val());
- }
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(0, level);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
- }
-
- void TestNotInteractiveWriteOnlyTx(ui32 level) {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, level);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(level == 3 ? 2 : 0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
- }
-
- void TestNotInteractiveReadOnlyTxFailedNECompilation(ui32 level) {
- auto session = Session();
-
- auto test = [&]() {
- KqpCounters->ForceNewEngineCompileErrors->Set(0);
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/KeyValue` LIMIT 1
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- };
-
- {
- ForceNewEngine(100, level);
- FailForcedNewEngineCompilationForTests();
- test();
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
- }
-
- void TestNotInteractiveReadOnlyTxFallback(ui32 level) {
- auto session = Session();
-
- auto test = [&]() {
- KqpCounters->ForceNewEngineCompileErrors->Set(0);
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/KeyValue` LIMIT 1
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- if (level == 0 || level == 1) {
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- } else {
- UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
- }
- };
-
- {
- ForceNewEngine(100, level);
- FailForcedNewEngineExecutionForTests();
- test();
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); // failed request is not counted
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(level == 0 || level == 1 ? 1 : 0, GetServiceCounters(Counters, "kqp")->GetCounter("Requests/OldEngineFallback", true)->Val());
- UNIT_ASSERT_VALUES_EQUAL(level == 0 || level == 1 ? 0 : 1, GetServiceCounters(Counters, "kqp")->GetCounter("Requests/ForceNewEngineExecError", true)->Val());
- }
- }
-
- void TestInteractiveReadWriteEx(ui32 level, bool addBeginTx, /* bool prependWrite, */ bool appendRead, bool addCommitTx) {
- UNIT_ASSERT(level == 2);
-
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto txControl = TTxControl::BeginTx();
-
- if (addBeginTx) {
- auto tx = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync().GetTransaction();
- UNIT_ASSERT(tx.IsActive());
-
- txControl = TTxControl::Tx(tx);
- }
-
- // TODO: prependWrite
-
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/KeyValue` LIMIT 1
- )", txControl).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- txControl = TTxControl::Tx(*result.GetTransaction());
-
- if (appendRead || addCommitTx) {
- // do nothing
- } else {
- txControl = txControl.CommitTx();
- }
-
- result = session.ExecuteDataQuery(Sprintf(R"(
- UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1
- )", i), txControl).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- if (appendRead) {
- txControl = TTxControl::Tx(*result.GetTransaction());
- if (!addCommitTx) {
- txControl = txControl.CommitTx();
- }
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/KeyValue` LIMIT 2
- )", txControl).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- if (addCommitTx) {
- auto txResult = result.GetTransaction()->Commit().ExtractValueSync();
- UNIT_ASSERT_C(txResult.IsSuccess(), txResult.GetIssues().ToString());
- }
- } else {
- if (addCommitTx) {
- auto txResult = result.GetTransaction()->Commit().ExtractValueSync();
- UNIT_ASSERT_C(txResult.IsSuccess(), txResult.GetIssues().ToString());
- }
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(50, level);
- test(20);
- bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0;
- UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val()
- << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, level);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(2 * (appendRead && addCommitTx ? 2 : 1), KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(0, level);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
- }
-
-
- UNIT_TEST_SUITE(KqpForceNewEngine);
- UNIT_TEST(Level0_NotInteractiveReadOnly);
- UNIT_TEST(Level0_NotInteractiveReadOnlySqlIn);
- UNIT_TEST(Level0_NotInteractiveWriteOnly);
- UNIT_TEST(Level0_NotInteractiveReadWrite);
- UNIT_TEST(Level0_InteractiveReadOnly);
- UNIT_TEST(Level0_CompilationFailure);
- UNIT_TEST(Level0_Fallback);
-
- UNIT_TEST(Level1_NotInteractiveReadOnly);
- UNIT_TEST(Level1_NotInteractiveReadOnlySqlIn);
- UNIT_TEST(Level1_NotInteractiveWriteOnly);
- UNIT_TEST(Level1_NotInteractiveReadWrite);
- UNIT_TEST(Level1_InteractiveReadOnly);
- UNIT_TEST(Level1_CompilationFailure);
- UNIT_TEST(Level1_Fallback);
-
- UNIT_TEST(Level2_NotInteractiveReadOnly);
- UNIT_TEST(Level2_NotInteractiveWriteOnly);
- UNIT_TEST(Level2_InteractiveReadOnly);
- UNIT_TEST(Level2_InteractiveReadWrite);
- UNIT_TEST(Level2_InteractiveBeginReadWrite);
- UNIT_TEST(Level2_InteractiveReadWriteCommit);
- UNIT_TEST(Level2_InteractiveBeginReadWriteCommit);
- UNIT_TEST(Level2_InteractiveReadWriteRead);
- UNIT_TEST(Level2_InteractiveBeginReadWriteRead);
- UNIT_TEST(Level2_InteractiveReadWriteReadCommit);
- UNIT_TEST(Level2_InteractiveBeginReadWriteReadCommit);
- UNIT_TEST(Level2_InteractiveWriteOnly);
- UNIT_TEST(Level2_CompilationFailure);
- UNIT_TEST(Level2_NoFallback);
- UNIT_TEST(Level2_ActiveRequestRead);
- UNIT_TEST(Level2_ActiveRequestWrite);
-
- UNIT_TEST(Level3_NotInteractiveReadOnly);
- UNIT_TEST(Level3_NotInteractiveWriteOnly);
- UNIT_TEST(Level3_InteractiveReadOnly);
- UNIT_TEST(Level3_InteractiveReadWrite);
- UNIT_TEST(Level3_CompilationFailure);
- UNIT_TEST(Level3_NoFallback);
- UNIT_TEST(Level3_ActiveRequestRead);
- UNIT_TEST(Level3_ActiveRequestWrite);
- UNIT_TEST_SUITE_END();
-
- void Level0_NotInteractiveReadOnly();
- void Level0_NotInteractiveReadOnlySqlIn();
- void Level0_NotInteractiveWriteOnly();
- void Level0_NotInteractiveReadWrite();
- void Level0_InteractiveReadOnly();
- void Level0_CompilationFailure();
- void Level0_Fallback();
-
- void Level1_NotInteractiveReadOnly();
- void Level1_NotInteractiveReadOnlySqlIn();
- void Level1_NotInteractiveWriteOnly();
- void Level1_NotInteractiveReadWrite();
- void Level1_InteractiveReadOnly();
- void Level1_CompilationFailure();
- void Level1_Fallback();
-
- void Level2_NotInteractiveReadOnly();
- void Level2_NotInteractiveWriteOnly();
- void Level2_InteractiveReadOnly();
- void Level2_InteractiveReadWrite();
- void Level2_InteractiveBeginReadWrite();
- void Level2_InteractiveReadWriteCommit();
- void Level2_InteractiveBeginReadWriteCommit();
- void Level2_InteractiveReadWriteRead();
- void Level2_InteractiveBeginReadWriteRead();
- void Level2_InteractiveReadWriteReadCommit();
- void Level2_InteractiveBeginReadWriteReadCommit();
- void Level2_InteractiveWriteOnly();
- void Level2_CompilationFailure();
- void Level2_NoFallback();
- void Level2_ActiveRequestRead();
- void Level2_ActiveRequestWrite();
-
- void Level3_NotInteractiveReadOnly();
- void Level3_NotInteractiveWriteOnly();
- void Level3_InteractiveReadOnly();
- void Level3_InteractiveReadWrite();
- void Level3_CompilationFailure();
- void Level3_NoFallback();
- void Level3_ActiveRequestRead();
- void Level3_ActiveRequestWrite();
-
-private:
- std::unique_ptr<TKikimrRunner> Kikimr;
- ::NMonitoring::TDynamicCounterPtr Counters;
- std::unique_ptr<TKqpCounters> KqpCounters;
-};
-UNIT_TEST_SUITE_REGISTRATION(KqpForceNewEngine);
-
-/////////// LEVEL 0 ////////////////////////////////////////////////////////////////////////////////////////////////////
-void KqpForceNewEngine::Level0_NotInteractiveReadOnly() {
- TestNotInteractiveReadOnlyTx(0);
-}
-
-void KqpForceNewEngine::Level0_NotInteractiveReadOnlySqlIn() {
- TestNotInteractiveReadOnlyTx(0, /* withSqlIn */ true);
-}
-
-void KqpForceNewEngine::Level0_NotInteractiveWriteOnly() {
- TestNotInteractiveWriteOnlyTx(0);
-}
-
-void KqpForceNewEngine::Level0_NotInteractiveReadWrite() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(Sprintf(R"(
- UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1
- )", i), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 0);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level0_InteractiveReadOnly() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0)));
-
- auto tx = *result.GetTransaction();
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 2
- )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 0);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level0_CompilationFailure() {
- TestNotInteractiveReadOnlyTxFailedNECompilation(0);
-}
-
-void KqpForceNewEngine::Level0_Fallback() {
- TestNotInteractiveReadOnlyTxFallback(0);
-}
-
-/////////// LEVEL 1 ////////////////////////////////////////////////////////////////////////////////////////////////////
-void KqpForceNewEngine::Level1_NotInteractiveReadOnly() {
- TestNotInteractiveReadOnlyTx(1);
-}
-
-void KqpForceNewEngine::Level1_NotInteractiveReadOnlySqlIn() {
- TestNotInteractiveReadOnlyTx(1, /* withSqlIn */ true);
-}
-
-void KqpForceNewEngine::Level1_NotInteractiveWriteOnly() {
- TestNotInteractiveWriteOnlyTx(1);
-}
-
-void KqpForceNewEngine::Level1_NotInteractiveReadWrite() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(Sprintf(R"(
- UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1
- )", i), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 1);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level1_InteractiveReadOnly() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0)));
-
- auto tx = *result.GetTransaction();
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 2
- )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 1);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level1_CompilationFailure() {
- TestNotInteractiveReadOnlyTxFailedNECompilation(1);
-}
-
-void KqpForceNewEngine::Level1_Fallback() {
- TestNotInteractiveReadOnlyTxFallback(1);
-}
-
-/////////// LEVEL 2 ////////////////////////////////////////////////////////////////////////////////////////////////////
-void KqpForceNewEngine::Level2_NotInteractiveReadOnly() {
- TestNotInteractiveReadOnlyTx(2);
-}
-
-void KqpForceNewEngine::Level2_NotInteractiveWriteOnly() {
- TestNotInteractiveWriteOnlyTx(2);
-}
-
-void KqpForceNewEngine::Level2_InteractiveReadOnly() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0)));
-
- auto tx = *result.GetTransaction();
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 2
- )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(50, 2);
- test(20);
- bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0;
- UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val()
- << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 2);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(0, 2);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level2_InteractiveReadWrite() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ false, /* addCommitTx */ false);
-}
-
-void KqpForceNewEngine::Level2_InteractiveBeginReadWrite() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ false, /* addCommitTx */ false);
-}
-
-void KqpForceNewEngine::Level2_InteractiveReadWriteCommit() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ false, /* addCommitTx */ true);
-}
-
-void KqpForceNewEngine::Level2_InteractiveBeginReadWriteCommit() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ false, /* addCommitTx */ true);
-}
-
-void KqpForceNewEngine::Level2_InteractiveReadWriteRead() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ true, /* addCommitTx */ false);
-}
-
-void KqpForceNewEngine::Level2_InteractiveBeginReadWriteRead() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ true, /* addCommitTx */ false);
-}
-
-void KqpForceNewEngine::Level2_InteractiveReadWriteReadCommit() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ true, /* addCommitTx */ true);
-}
-
-void KqpForceNewEngine::Level2_InteractiveBeginReadWriteReadCommit() {
- TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ true, /* addCommitTx */ true);
-}
-
-void KqpForceNewEngine::Level2_InteractiveWriteOnly() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (1, "OneOne")
- )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 2);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level2_CompilationFailure() {
- TestNotInteractiveReadOnlyTxFailedNECompilation(2);
-}
-
-void KqpForceNewEngine::Level2_NoFallback() {
- TestNotInteractiveReadOnlyTxFallback(2);
-}
-
-void KqpForceNewEngine::Level2_ActiveRequestRead() {
- auto session = Session();
-
- // start request with OldEngine
-
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` LIMIT 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- ForceNewEngine(100, 2);
-
- // can switch to NewEngine (RO-query, no deferred effects)
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` LIMIT 1
- )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(2, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-}
-
-void KqpForceNewEngine::Level2_ActiveRequestWrite() {
- auto session = Session();
-
- // start query with OldEngine
-
- auto result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- ForceNewEngine(100, 2);
-
- // have deferred effects, so dont force new engine on active transactions
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
-}
-
-/////////// LEVEL 3 ////////////////////////////////////////////////////////////////////////////////////////////////////
-void KqpForceNewEngine::Level3_NotInteractiveReadOnly() {
- TestNotInteractiveReadOnlyTx(3);
-}
-
-void KqpForceNewEngine::Level3_NotInteractiveWriteOnly() {
- TestNotInteractiveWriteOnlyTx(3);
-}
-
-void KqpForceNewEngine::Level3_InteractiveReadOnly() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0)));
-
- auto tx = *result.GetTransaction();
-
- result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` WHERE Key = 2
- )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(50, 3);
- test(20);
- bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0;
- UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val()
- << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 3);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(0, 3);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level3_InteractiveReadWrite() {
- auto session = Session();
-
- auto test = [&](ui32 count) {
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
-
- for (ui32 i = 0; i < count; ++i) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/KeyValue` LIMIT 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- result = session.ExecuteDataQuery(Sprintf(R"(
- UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1
- )", i), TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- };
-
- {
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(50, 3);
- test(20);
- bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0;
- UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val()
- << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(100, 3);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-
- {
- ForceNewEngine(0, 3);
- test(2);
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
- }
-}
-
-void KqpForceNewEngine::Level3_CompilationFailure() {
- TestNotInteractiveReadOnlyTxFailedNECompilation(3);
-}
-
-void KqpForceNewEngine::Level3_NoFallback() {
- TestNotInteractiveReadOnlyTxFallback(3);
-}
-
-void KqpForceNewEngine::Level3_ActiveRequestRead() {
- auto session = Session();
-
- ForceNewEngine(1, 2);
-
- // start request with forced OldEngine
- TMaybe<TTransaction> tx;
- while (!tx) {
- auto result = session.ExecuteDataQuery(R"(
- SELECT * FROM `/Root/TwoShard` LIMIT 1
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
-
- if (KqpCounters->NewEngineForcedQueryCount->Val() == 1) {
- // reroll :-)
-
- tx->Rollback().GetValueSync();
- session = Session();
-
- KqpCounters->NewEngineForcedQueryCount->Set(0);
- KqpCounters->NewEngineCompatibleQueryCount->Set(0);
- continue;
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- tx = result.GetTransaction();
- }
-
- ForceNewEngine(100, 3);
-
- // dont change to NewEngine
-
- auto result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
- )", TTxControl::Tx(*tx)).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val());
-}
-
-void KqpForceNewEngine::Level3_ActiveRequestWrite() {
- auto session = Session();
-
- ForceNewEngine(100, 2);
-
- // start request with forced OldEngine
- auto result = session.ExecuteDataQuery(R"(
- REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne")
- )", TTxControl::BeginTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-
- ForceNewEngine(100, 3);
-
- // dont switch to NewEngine (have deferred effects)
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- result = session.ExecuteDataQuery(R"(
- SELECT 42
- )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
-
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val());
- UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val());
-}
-
-} // namespace NKqp
-} // namespace NKikimr